Module ai.utils.logging
Logging utilities.
Expand source code
"""Logging utilities."""
from . import field
from ._server import Server
from ._client import Client
__all__ = ["field", "Server", "Client"]
Sub-modules
ai.utils.logging.field
Classes
class Client (host: str, port: int)
-
Methods
def log(self, field: str, value: Any)
-
Expand source code
def log(self, field: str, value: Any): with self._lock: self._socket.send_pyobj((field, value), flags=zmq.NOBLOCK)
class Server (*fields: Base, name: str, port: int = None)
-
Process handling a summary writer.
Args
fields
:logging.field.Base
- Logging fields.
name
:str
- Name of the logger.
port
:int
, optional- Port on which the server will listen for logging
values. If
None
, then a random port is chosen.
Subclasses
- ai.rl.a3c.trainer._logger.Logger
- Logger
Instance variables
var port : int
-
Port on which the server is running.
Expand source code
@property def port(self) -> int: """Port on which the server is running.""" return self._port
var started : bool
-
Returns whether the server was started.
Expand source code
@property def started(self) -> bool: """Returns whether the server was started.""" return self._process is not None
Methods
def start(self) ‑> int
-
Starts the logging server.
Returns
int
- The port on which the server started listening to.
Expand source code
def start(self) -> int: """Starts the logging server. Returns: int: The port on which the server started listening to. """ a, b = Pipe(duplex=False) self._process = Process(target=run, args=(self._port, self._name, b, self._fields), daemon=True) self._process.start() for _ in range(15): if a.poll(timeout=1.0): break if not a.poll(timeout=0): raise RuntimeError(f"Failed starting logging server '{self._name}'.") self._port = a.recv() a.close() return self.port