Module ai.utils.logging

Logging utilities.

Expand source code
"""Logging utilities."""


from . import field
from ._server import Server
from ._client import Client

__all__ = ["field", "Server", "Client"]

Sub-modules

ai.utils.logging.field

Classes

class Client (host: str, port: int)

Methods

def log(self, field: str, value: Any)
Expand source code
def log(self, field: str, value: Any):
    with self._lock:
        self._socket.send_pyobj((field, value), flags=zmq.NOBLOCK)
class Server (*fields: Base, name: str, port: int = None)

Process handling a summary writer.

Args

fields : logging.field.Base
Logging fields.
name : str
Name of the logger.
port : int, optional
Port on which the server will listen for logging values. If None, then a random port is chosen.

Subclasses

  • ai.rl.a3c.trainer._logger.Logger
  • Logger

Instance variables

var port : int

Port on which the server is running.

Expand source code
@property
def port(self) -> int:
    """Port on which the server is running."""
    return self._port
var started : bool

Returns whether the server was started.

Expand source code
@property
def started(self) -> bool:
    """Returns whether the server was started."""
    return self._process is not None

Methods

def start(self) ‑> int

Starts the logging server.

Returns

int
The port on which the server started listening to.
Expand source code
def start(self) -> int:
    """Starts the logging server.

    Returns:
        int: The port on which the server started listening to.
    """
    a, b = Pipe(duplex=False)
    self._process = Process(target=run, args=(self._port, self._name, b, self._fields), daemon=True)
    self._process.start()
    for _ in range(15):
        if a.poll(timeout=1.0):
            break
    if not a.poll(timeout=0):
        raise RuntimeError(f"Failed starting logging server '{self._name}'.")
    self._port = a.recv()
    a.close()
    return self.port