Skip to content

wg_utilities.loggers

Useful constants and functions for use in logging in other projects.

FlushableQueueListener

Bases: QueueListener

A QueueListener that can be flushed and stopped.

Source code in wg_utilities/loggers/item_warehouse/flushable_queue_listener.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FlushableQueueListener(QueueListener):
    """A QueueListener that can be flushed and stopped."""

    queue: Queue[Any]

    def flush_and_stop(self, timeout: float = 300) -> None:
        """Wait for the queue to empty and stop.

        Args:
            timeout (float): the maximum time to wait for the queue to empty
        """

        start_time = time()

        while not self.queue.empty():
            sleep(1)

            if 0 < timeout < time() - start_time:
                LOGGER.warning("QueueListener failed to flush after %s seconds", timeout)
                break

        self.stop()

flush_and_stop(timeout=300)

Wait for the queue to empty and stop.

Parameters:

Name Type Description Default
timeout float

the maximum time to wait for the queue to empty

300
Source code in wg_utilities/loggers/item_warehouse/flushable_queue_listener.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def flush_and_stop(self, timeout: float = 300) -> None:
    """Wait for the queue to empty and stop.

    Args:
        timeout (float): the maximum time to wait for the queue to empty
    """

    start_time = time()

    while not self.queue.empty():
        sleep(1)

        if 0 < timeout < time() - start_time:
            LOGGER.warning("QueueListener failed to flush after %s seconds", timeout)
            break

    self.stop()

ListHandler

Bases: Handler

Custom handler to allow retrieval of log records after the fact.

Parameters:

Name Type Description Default
records_list list

allows the user to pass in a pre-defined list to add records to

None
Source code in wg_utilities/loggers/list_handler.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class ListHandler(Handler):
    """Custom handler to allow retrieval of log records after the fact.

    Args:
        records_list (list): allows the user to pass in a pre-defined list to add records to
    """

    def __init__(
        self,
        records_list: list[Any] | None = None,
        *,
        log_ttl: int | None = 86400,
        on_record: Callable[[LogRecord], Any] | None = None,
        on_expiry: Callable[[LogRecord], Any] | None = None,
    ):
        super().__init__()

        # Can't use `or` here as `[]` is False
        self._records_list: list[LogRecord] = (
            records_list if records_list is not None else []
        )

        self.ttl = log_ttl
        self.on_record = on_record
        self.on_expiry = on_expiry

    def emit(self, record: LogRecord) -> None:
        """Add log record to the internal record store.

        Args:
            record (LogRecord): the new log record being "emitted"
        """
        self.expire_records()

        self._records_list.append(record)

        if self.on_record is not None:
            self.on_record(record)

    def expire_records(self) -> None:
        """Remove records older than `self.ttl`, and call `self.on_expiry` on them."""
        if self.ttl is None:
            return

        now = utcnow().timestamp()

        while self._records_list:
            record = self._records_list.pop(0)

            if record.created < (now - self.ttl):
                if self.on_expiry is not None:
                    self.on_expiry(record)
            else:
                self._records_list.insert(0, record)
                break

    @property
    def debug_records(self) -> list[LogRecord]:
        """Debug level records.

        Returns:
            list: a list of log records with the level DEBUG
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == DEBUG]

    @property
    def info_records(self) -> list[LogRecord]:
        """Info level records.

        Returns:
            list: a list of log records with the level INFO
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == INFO]

    @property
    def warning_records(self) -> list[LogRecord]:
        """Warning level records.

        Returns:
            list: a list of log records with the level WARNING
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == WARNING]

    @property
    def error_records(self) -> list[LogRecord]:
        """Error level records.

        Returns:
            list: a list of log records with the level ERROR
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == ERROR]

    @property
    def critical_records(self) -> list[LogRecord]:
        """Critical level records.

        Returns:
            list: a list of log records with the level CRITICAL
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == CRITICAL]

    @property
    def records(self) -> list[LogRecord]:
        """All records.

        Returns:
            list: a list of log records with the level CRITICAL
        """
        self.expire_records()
        return self._records_list

critical_records: list[LogRecord] property

Critical level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level CRITICAL

debug_records: list[LogRecord] property

Debug level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level DEBUG

error_records: list[LogRecord] property

Error level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level ERROR

info_records: list[LogRecord] property

Info level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level INFO

records: list[LogRecord] property

All records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level CRITICAL

warning_records: list[LogRecord] property

Warning level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level WARNING

emit(record)

Add log record to the internal record store.

Parameters:

Name Type Description Default
record LogRecord

the new log record being "emitted"

required
Source code in wg_utilities/loggers/list_handler.py
40
41
42
43
44
45
46
47
48
49
50
51
def emit(self, record: LogRecord) -> None:
    """Add log record to the internal record store.

    Args:
        record (LogRecord): the new log record being "emitted"
    """
    self.expire_records()

    self._records_list.append(record)

    if self.on_record is not None:
        self.on_record(record)

expire_records()

Remove records older than self.ttl, and call self.on_expiry on them.

Source code in wg_utilities/loggers/list_handler.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def expire_records(self) -> None:
    """Remove records older than `self.ttl`, and call `self.on_expiry` on them."""
    if self.ttl is None:
        return

    now = utcnow().timestamp()

    while self._records_list:
        record = self._records_list.pop(0)

        if record.created < (now - self.ttl):
            if self.on_expiry is not None:
                self.on_expiry(record)
        else:
            self._records_list.insert(0, record)
            break

WarehouseHandler

Bases: BaseWarehouseHandler

Custom handler to allow logging directly into an Item Warehouse.

https://github.com/worgarside/addon-item-warehouse-api https://github.com/worgarside/addon-item-warehouse-website

The primary key of the log warehouse is a combination of
  • log_hash (message content)
  • logger (name of the logger)
  • log_host (hostname of the machine the log was generated on)

This means that the same log message from the same host will only be stored once.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class WarehouseHandler(BaseWarehouseHandler):
    """Custom handler to allow logging directly into an Item Warehouse.

    https://github.com/worgarside/addon-item-warehouse-api
    https://github.com/worgarside/addon-item-warehouse-website

    The primary key of the log warehouse is a combination of:
        - log_hash (message content)
        - logger (name of the logger)
        - log_host (hostname of the machine the log was generated on)

    This means that the same log message from the same host will only be stored once.
    """

    def __init__(
        self,
        *,
        level: int | Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
        warehouse_host: str | None = None,
        warehouse_port: int | None = None,
        initialize_warehouse: bool = False,
    ) -> None:
        """Initialize the handler and Log Warehouse."""

        super().__init__(
            level=level,
            warehouse_host=warehouse_host,
            warehouse_port=warehouse_port,
        )

        if initialize_warehouse:
            self.initialize_warehouse()

    def emit(self, record: LogRecord) -> None:
        """Add log record to the internal record store.

        Args:
            record (LogRecord): the new log record being "emitted"
        """

        log_payload = self.get_log_payload(record)

        self.post_with_backoff(log_payload)

    def initialize_warehouse(self) -> None:
        """Create a new warehouse or validate an existing one."""
        try:
            schema: WarehouseSchema = self.get_json_response(  # type: ignore[assignment]
                self.WAREHOUSE_ENDPOINT,
                timeout=5,
            )
        except HTTPError as exc:
            if (
                exc.response is not None
                and exc.response.status_code == HTTPStatus.NOT_FOUND
            ):
                schema = self.post_json_response(  # type: ignore[assignment]
                    "/warehouses",
                    json=self._WAREHOUSE_SCHEMA,
                    timeout=5,
                )
                LOGGER.info("Created new Warehouse: %r", schema)
        except Exception:
            LOGGER.exception("Error creating Warehouse")
        else:
            LOGGER.info(
                "Warehouse %s already exists - created at %s",
                schema.get("name", None),
                schema.get("created_at", None),
            )

            schema_types = {
                k: v["type"] for k, v in schema.get("item_schema", {}).items()
            }

            if schema_types != self._WAREHOUSE_TYPES:
                raise ValueError(
                    "Warehouse types do not match expected types: "
                    + dumps(
                        {
                            k: {"expected": v, "actual": schema_types.get(k)}
                            for k, v in self._WAREHOUSE_TYPES.items()
                            if v != schema_types.get(k)
                        },
                        default=str,
                    ),
                )

    @backoff(
        RequestException,
        logger=LOGGER,
        max_tries=BACKOFF_MAX_TRIES,
        timeout=BACKOFF_TIMEOUT,
    )
    def post_with_backoff(self, log_payload: LogPayload, /) -> None:
        """Post a JSON response to the warehouse, with backoff applied."""

        res = post(
            f"{self.base_url}{self.ITEM_ENDPOINT}",
            timeout=60,
            json=log_payload,
        )

        if res.status_code == HTTPStatus.CONFLICT:
            return

        if (
            str(res.status_code).startswith("4")
            and res.status_code != HTTPStatus.TOO_MANY_REQUESTS
        ):
            LOGGER.error(
                "Permanent error posting log to warehouse (%s %s): %s",
                res.status_code,
                res.reason,
                res.text,
            )
            return

        res.raise_for_status()

__init__(*, level='INFO', warehouse_host=None, warehouse_port=None, initialize_warehouse=False)

Initialize the handler and Log Warehouse.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    *,
    level: int | Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
    warehouse_host: str | None = None,
    warehouse_port: int | None = None,
    initialize_warehouse: bool = False,
) -> None:
    """Initialize the handler and Log Warehouse."""

    super().__init__(
        level=level,
        warehouse_host=warehouse_host,
        warehouse_port=warehouse_port,
    )

    if initialize_warehouse:
        self.initialize_warehouse()

emit(record)

Add log record to the internal record store.

Parameters:

Name Type Description Default
record LogRecord

the new log record being "emitted"

required
Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
66
67
68
69
70
71
72
73
74
75
def emit(self, record: LogRecord) -> None:
    """Add log record to the internal record store.

    Args:
        record (LogRecord): the new log record being "emitted"
    """

    log_payload = self.get_log_payload(record)

    self.post_with_backoff(log_payload)

initialize_warehouse()

Create a new warehouse or validate an existing one.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def initialize_warehouse(self) -> None:
    """Create a new warehouse or validate an existing one."""
    try:
        schema: WarehouseSchema = self.get_json_response(  # type: ignore[assignment]
            self.WAREHOUSE_ENDPOINT,
            timeout=5,
        )
    except HTTPError as exc:
        if (
            exc.response is not None
            and exc.response.status_code == HTTPStatus.NOT_FOUND
        ):
            schema = self.post_json_response(  # type: ignore[assignment]
                "/warehouses",
                json=self._WAREHOUSE_SCHEMA,
                timeout=5,
            )
            LOGGER.info("Created new Warehouse: %r", schema)
    except Exception:
        LOGGER.exception("Error creating Warehouse")
    else:
        LOGGER.info(
            "Warehouse %s already exists - created at %s",
            schema.get("name", None),
            schema.get("created_at", None),
        )

        schema_types = {
            k: v["type"] for k, v in schema.get("item_schema", {}).items()
        }

        if schema_types != self._WAREHOUSE_TYPES:
            raise ValueError(
                "Warehouse types do not match expected types: "
                + dumps(
                    {
                        k: {"expected": v, "actual": schema_types.get(k)}
                        for k, v in self._WAREHOUSE_TYPES.items()
                        if v != schema_types.get(k)
                    },
                    default=str,
                ),
            )

post_with_backoff(log_payload)

Post a JSON response to the warehouse, with backoff applied.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@backoff(
    RequestException,
    logger=LOGGER,
    max_tries=BACKOFF_MAX_TRIES,
    timeout=BACKOFF_TIMEOUT,
)
def post_with_backoff(self, log_payload: LogPayload, /) -> None:
    """Post a JSON response to the warehouse, with backoff applied."""

    res = post(
        f"{self.base_url}{self.ITEM_ENDPOINT}",
        timeout=60,
        json=log_payload,
    )

    if res.status_code == HTTPStatus.CONFLICT:
        return

    if (
        str(res.status_code).startswith("4")
        and res.status_code != HTTPStatus.TOO_MANY_REQUESTS
    ):
        LOGGER.error(
            "Permanent error posting log to warehouse (%s %s): %s",
            res.status_code,
            res.reason,
            res.text,
        )
        return

    res.raise_for_status()

add_file_handler(logger, *, logfile_path, level=DEBUG, create_directory=True)

Add a FileHandler to an existing logger.

Parameters:

Name Type Description Default
logger Logger

the logger to add a file handler to

required
logfile_path Path

the path to the logging file

required
level int

the logging level to be used for the FileHandler

DEBUG
create_directory bool

whether to force-create the directory/ies the file is contained within

True

Returns:

Name Type Description
Logger Logger

the logger instance, returned for use in one-liners: logger = add_file_handler(logging.getLogger(__name__))

Source code in wg_utilities/loggers/file_handler.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def add_file_handler(
    logger: Logger,
    *,
    logfile_path: Path,
    level: int = DEBUG,
    create_directory: bool = True,
) -> Logger:
    """Add a FileHandler to an existing logger.

    Args:
        logger (Logger): the logger to add a file handler to
        logfile_path (Path): the path to the logging file
        level (int): the logging level to be used for the FileHandler
        create_directory (bool): whether to force-create the directory/ies the file is contained within

    Returns:
        Logger: the logger instance, returned for use in one-liners:
            `logger = add_file_handler(logging.getLogger(__name__))`
    """

    f_handler = create_file_handler(
        logfile_path=logfile_path,
        level=level,
        create_directory=create_directory,
    )

    logger.addHandler(f_handler)

    return logger

add_list_handler(logger, *, log_list=None, level=DEBUG, log_ttl=86400, on_expiry=None)

Add a ListHandler to an existing logger.

Parameters:

Name Type Description Default
logger Logger

the logger to add a file handler to

required
log_list list

the list for the handler to write logs to

None
level int

the logging level to be used for the ListHandler

DEBUG
log_ttl int

number of seconds to retain a log for

86400
on_expiry Callable

function to call with expired logs

None
Source code in wg_utilities/loggers/list_handler.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def add_list_handler(
    logger: Logger,
    *,
    log_list: list[Any] | None = None,
    level: int = DEBUG,
    log_ttl: int | None = 86400,
    on_expiry: Callable[[LogRecord], Any] | None = None,
) -> ListHandler:
    """Add a ListHandler to an existing logger.

    Args:
        logger (Logger): the logger to add a file handler to
        log_list (list): the list for the handler to write logs to
        level (int): the logging level to be used for the ListHandler
        log_ttl (int): number of seconds to retain a log for
        on_expiry (Callable): function to call with expired logs
    """

    l_handler = ListHandler(log_list, log_ttl=log_ttl, on_expiry=on_expiry)
    l_handler.setLevel(level)

    logger.addHandler(l_handler)

    return l_handler

add_stream_handler(logger, *, formatter=FORMATTER, level=DEBUG)

Add a FileHandler to an existing logger.

Parameters:

Name Type Description Default
logger Logger

the logger to add a file handler to

required
formatter Formatter

the formatter to use in the stream logs

FORMATTER
level int

the logging level to be used for the FileHandler

DEBUG

Returns:

Name Type Description
Logger Logger

the logger instance, returned for use in one-liners: logger = add_stream_handler(logging.getLogger(__name__))

Source code in wg_utilities/loggers/stream_handler.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def add_stream_handler(
    logger: Logger,
    *,
    formatter: Formatter | None = FORMATTER,
    level: int = DEBUG,
) -> Logger:
    """Add a FileHandler to an existing logger.

    Args:
        logger (Logger): the logger to add a file handler to
        formatter (Formatter): the formatter to use in the stream logs
        level (int): the logging level to be used for the FileHandler

    Returns:
        Logger: the logger instance, returned for use in one-liners:
            `logger = add_stream_handler(logging.getLogger(__name__))`
    """

    s_handler = StreamHandler(stdout)
    s_handler.setFormatter(formatter)
    s_handler.setLevel(level)

    logger.addHandler(s_handler)

    return logger

add_warehouse_handler(logger, *, level=DEBUG, warehouse_host=None, warehouse_port=None, initialize_warehouse=False, disable_queue=False)

Add a WarehouseHandler to an existing logger.

Parameters:

Name Type Description Default
logger Logger

the logger to add a file handler to

required
level int

the logging level to be used for the WarehouseHandler

DEBUG
warehouse_host str

the hostname of the Item Warehouse

None
warehouse_port int

the port of the Item Warehouse

None
initialize_warehouse bool

whether to initialize the Warehouse

False
disable_queue bool

whether to disable the queue for the WarehouseHandler

False

Returns:

Name Type Description
WarehouseHandler WarehouseHandler

the WarehouseHandler that was added to the logger

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def add_warehouse_handler(
    logger: Logger,
    *,
    level: int = DEBUG,
    warehouse_host: str | None = None,
    warehouse_port: int | None = None,
    initialize_warehouse: bool = False,
    disable_queue: bool = False,
) -> WarehouseHandler:
    """Add a WarehouseHandler to an existing logger.

    Args:
        logger (Logger): the logger to add a file handler to
        level (int): the logging level to be used for the WarehouseHandler
        warehouse_host (str): the hostname of the Item Warehouse
        warehouse_port (int): the port of the Item Warehouse
        initialize_warehouse (bool): whether to initialize the Warehouse
        disable_queue (bool): whether to disable the queue for the WarehouseHandler

    Returns:
        WarehouseHandler: the WarehouseHandler that was added to the logger
    """

    wh_handler = WarehouseHandler(
        level=level,
        warehouse_host=warehouse_host,
        warehouse_port=warehouse_port,
        initialize_warehouse=initialize_warehouse,
    )

    if disable_queue:
        for handler in logger.handlers:
            if isinstance(
                handler,
                WarehouseHandler,
            ) and handler.base_url == WarehouseHandler.get_base_url(
                warehouse_host,
                warehouse_port,
            ):
                LOGGER.warning("WarehouseHandler already exists for %s", handler.base_url)
                return handler

        logger.addHandler(wh_handler)
        return wh_handler

    for handler in logger.handlers:
        if isinstance(handler, _QueueHandler) and handler.warehouse_handler == wh_handler:
            LOGGER.warning(
                "WarehouseHandler already exists for %s",
                handler.warehouse_handler.base_url,
            )
            return handler.warehouse_handler

    listener = FlushableQueueListener(LOG_QUEUE, wh_handler)
    listener.start()

    q_handler = _QueueHandler(LOG_QUEUE, wh_handler)
    q_handler.setLevel(level)

    logger.addHandler(q_handler)

    # Ensure the queue worker is stopped when the program exits
    atexit.register(LOGGER.info, "Stopped WarehouseHandler")
    atexit.register(listener.flush_and_stop)
    atexit.register(LOG_QUEUE.put, None)  # Processed in reverse order

    return wh_handler

create_file_handler(logfile_path, level=DEBUG, *, create_directory=True)

Create a file handler for use in other loggers.

Parameters:

Name Type Description Default
logfile_path str

the path to the logging file

required
level int

the logging level to be used for the FileHandler

DEBUG
create_directory bool

whether to force-create the directory/ies the file is contained within

True

Returns:

Name Type Description
FileHandler FileHandler

a log handler with a file as the output

Source code in wg_utilities/loggers/file_handler.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def create_file_handler(
    logfile_path: Path,
    level: int = DEBUG,
    *,
    create_directory: bool = True,
) -> FileHandler:
    """Create a file handler for use in other loggers.

    Args:
        logfile_path (str): the path to the logging file
        level (int): the logging level to be used for the FileHandler
        create_directory (bool): whether to force-create the directory/ies the file is contained within

    Returns:
        FileHandler: a log handler with a file as the output
    """
    if create_directory:
        force_mkdir(logfile_path, path_is_file=True)

    f_handler = FileHandler(logfile_path)
    f_handler.setFormatter(FORMATTER)
    f_handler.setLevel(level)

    return f_handler

get_streaming_logger(name, *, formatter=FORMATTER, level=DEBUG)

Get a logger with a StreamHandler attached.

Parameters:

Name Type Description Default
name str

the name of the logger to create

required
formatter Formatter

the formatter to use in the stream logs

FORMATTER
level int

the logging level to be used for the FileHandler

DEBUG

Returns:

Name Type Description
Logger Logger

the logger instance, returned for use in one-liners: logger = get_streaming_logger(__name__)

Source code in wg_utilities/loggers/stream_handler.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def get_streaming_logger(
    name: str,
    *,
    formatter: Formatter | None = FORMATTER,
    level: int = DEBUG,
) -> Logger:
    """Get a logger with a StreamHandler attached.

    Args:
        name (str): the name of the logger to create
        formatter (Formatter): the formatter to use in the stream logs
        level (int): the logging level to be used for the FileHandler

    Returns:
        Logger: the logger instance, returned for use in one-liners:
            `logger = get_streaming_logger(__name__)`
    """
    logger = getLogger(name)
    logger.setLevel(level)

    return add_stream_handler(logger, formatter=formatter, level=level)

file_handler

Helper functions for creating and adding FileHandlers to loggers.

add_file_handler(logger, *, logfile_path, level=DEBUG, create_directory=True)

Add a FileHandler to an existing logger.

Parameters:

Name Type Description Default
logger Logger

the logger to add a file handler to

required
logfile_path Path

the path to the logging file

required
level int

the logging level to be used for the FileHandler

DEBUG
create_directory bool

whether to force-create the directory/ies the file is contained within

True

Returns:

Name Type Description
Logger Logger

the logger instance, returned for use in one-liners: logger = add_file_handler(logging.getLogger(__name__))

Source code in wg_utilities/loggers/file_handler.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def add_file_handler(
    logger: Logger,
    *,
    logfile_path: Path,
    level: int = DEBUG,
    create_directory: bool = True,
) -> Logger:
    """Add a FileHandler to an existing logger.

    Args:
        logger (Logger): the logger to add a file handler to
        logfile_path (Path): the path to the logging file
        level (int): the logging level to be used for the FileHandler
        create_directory (bool): whether to force-create the directory/ies the file is contained within

    Returns:
        Logger: the logger instance, returned for use in one-liners:
            `logger = add_file_handler(logging.getLogger(__name__))`
    """

    f_handler = create_file_handler(
        logfile_path=logfile_path,
        level=level,
        create_directory=create_directory,
    )

    logger.addHandler(f_handler)

    return logger

create_file_handler(logfile_path, level=DEBUG, *, create_directory=True)

Create a file handler for use in other loggers.

Parameters:

Name Type Description Default
logfile_path str

the path to the logging file

required
level int

the logging level to be used for the FileHandler

DEBUG
create_directory bool

whether to force-create the directory/ies the file is contained within

True

Returns:

Name Type Description
FileHandler FileHandler

a log handler with a file as the output

Source code in wg_utilities/loggers/file_handler.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def create_file_handler(
    logfile_path: Path,
    level: int = DEBUG,
    *,
    create_directory: bool = True,
) -> FileHandler:
    """Create a file handler for use in other loggers.

    Args:
        logfile_path (str): the path to the logging file
        level (int): the logging level to be used for the FileHandler
        create_directory (bool): whether to force-create the directory/ies the file is contained within

    Returns:
        FileHandler: a log handler with a file as the output
    """
    if create_directory:
        force_mkdir(logfile_path, path_is_file=True)

    f_handler = FileHandler(logfile_path)
    f_handler.setFormatter(FORMATTER)
    f_handler.setLevel(level)

    return f_handler

item_warehouse

Logging utilities specific to the Item Warehouse project.

https://github.com/worgarside/addon-item-warehouse-api https://github.com/worgarside/addon-item-warehouse-website

WarehouseHandler

Bases: BaseWarehouseHandler

Custom handler to allow logging directly into an Item Warehouse.

https://github.com/worgarside/addon-item-warehouse-api https://github.com/worgarside/addon-item-warehouse-website

The primary key of the log warehouse is a combination of
  • log_hash (message content)
  • logger (name of the logger)
  • log_host (hostname of the machine the log was generated on)

This means that the same log message from the same host will only be stored once.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class WarehouseHandler(BaseWarehouseHandler):
    """Custom handler to allow logging directly into an Item Warehouse.

    https://github.com/worgarside/addon-item-warehouse-api
    https://github.com/worgarside/addon-item-warehouse-website

    The primary key of the log warehouse is a combination of:
        - log_hash (message content)
        - logger (name of the logger)
        - log_host (hostname of the machine the log was generated on)

    This means that the same log message from the same host will only be stored once.
    """

    def __init__(
        self,
        *,
        level: int | Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
        warehouse_host: str | None = None,
        warehouse_port: int | None = None,
        initialize_warehouse: bool = False,
    ) -> None:
        """Initialize the handler and Log Warehouse."""

        super().__init__(
            level=level,
            warehouse_host=warehouse_host,
            warehouse_port=warehouse_port,
        )

        if initialize_warehouse:
            self.initialize_warehouse()

    def emit(self, record: LogRecord) -> None:
        """Add log record to the internal record store.

        Args:
            record (LogRecord): the new log record being "emitted"
        """

        log_payload = self.get_log_payload(record)

        self.post_with_backoff(log_payload)

    def initialize_warehouse(self) -> None:
        """Create a new warehouse or validate an existing one."""
        try:
            schema: WarehouseSchema = self.get_json_response(  # type: ignore[assignment]
                self.WAREHOUSE_ENDPOINT,
                timeout=5,
            )
        except HTTPError as exc:
            if (
                exc.response is not None
                and exc.response.status_code == HTTPStatus.NOT_FOUND
            ):
                schema = self.post_json_response(  # type: ignore[assignment]
                    "/warehouses",
                    json=self._WAREHOUSE_SCHEMA,
                    timeout=5,
                )
                LOGGER.info("Created new Warehouse: %r", schema)
        except Exception:
            LOGGER.exception("Error creating Warehouse")
        else:
            LOGGER.info(
                "Warehouse %s already exists - created at %s",
                schema.get("name", None),
                schema.get("created_at", None),
            )

            schema_types = {
                k: v["type"] for k, v in schema.get("item_schema", {}).items()
            }

            if schema_types != self._WAREHOUSE_TYPES:
                raise ValueError(
                    "Warehouse types do not match expected types: "
                    + dumps(
                        {
                            k: {"expected": v, "actual": schema_types.get(k)}
                            for k, v in self._WAREHOUSE_TYPES.items()
                            if v != schema_types.get(k)
                        },
                        default=str,
                    ),
                )

    @backoff(
        RequestException,
        logger=LOGGER,
        max_tries=BACKOFF_MAX_TRIES,
        timeout=BACKOFF_TIMEOUT,
    )
    def post_with_backoff(self, log_payload: LogPayload, /) -> None:
        """Post a JSON response to the warehouse, with backoff applied."""

        res = post(
            f"{self.base_url}{self.ITEM_ENDPOINT}",
            timeout=60,
            json=log_payload,
        )

        if res.status_code == HTTPStatus.CONFLICT:
            return

        if (
            str(res.status_code).startswith("4")
            and res.status_code != HTTPStatus.TOO_MANY_REQUESTS
        ):
            LOGGER.error(
                "Permanent error posting log to warehouse (%s %s): %s",
                res.status_code,
                res.reason,
                res.text,
            )
            return

        res.raise_for_status()

__init__(*, level='INFO', warehouse_host=None, warehouse_port=None, initialize_warehouse=False)

Initialize the handler and Log Warehouse.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    *,
    level: int | Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
    warehouse_host: str | None = None,
    warehouse_port: int | None = None,
    initialize_warehouse: bool = False,
) -> None:
    """Initialize the handler and Log Warehouse."""

    super().__init__(
        level=level,
        warehouse_host=warehouse_host,
        warehouse_port=warehouse_port,
    )

    if initialize_warehouse:
        self.initialize_warehouse()

emit(record)

Add log record to the internal record store.

Parameters:

Name Type Description Default
record LogRecord

the new log record being "emitted"

required
Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
66
67
68
69
70
71
72
73
74
75
def emit(self, record: LogRecord) -> None:
    """Add log record to the internal record store.

    Args:
        record (LogRecord): the new log record being "emitted"
    """

    log_payload = self.get_log_payload(record)

    self.post_with_backoff(log_payload)

initialize_warehouse()

Create a new warehouse or validate an existing one.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def initialize_warehouse(self) -> None:
    """Create a new warehouse or validate an existing one."""
    try:
        schema: WarehouseSchema = self.get_json_response(  # type: ignore[assignment]
            self.WAREHOUSE_ENDPOINT,
            timeout=5,
        )
    except HTTPError as exc:
        if (
            exc.response is not None
            and exc.response.status_code == HTTPStatus.NOT_FOUND
        ):
            schema = self.post_json_response(  # type: ignore[assignment]
                "/warehouses",
                json=self._WAREHOUSE_SCHEMA,
                timeout=5,
            )
            LOGGER.info("Created new Warehouse: %r", schema)
    except Exception:
        LOGGER.exception("Error creating Warehouse")
    else:
        LOGGER.info(
            "Warehouse %s already exists - created at %s",
            schema.get("name", None),
            schema.get("created_at", None),
        )

        schema_types = {
            k: v["type"] for k, v in schema.get("item_schema", {}).items()
        }

        if schema_types != self._WAREHOUSE_TYPES:
            raise ValueError(
                "Warehouse types do not match expected types: "
                + dumps(
                    {
                        k: {"expected": v, "actual": schema_types.get(k)}
                        for k, v in self._WAREHOUSE_TYPES.items()
                        if v != schema_types.get(k)
                    },
                    default=str,
                ),
            )

post_with_backoff(log_payload)

Post a JSON response to the warehouse, with backoff applied.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@backoff(
    RequestException,
    logger=LOGGER,
    max_tries=BACKOFF_MAX_TRIES,
    timeout=BACKOFF_TIMEOUT,
)
def post_with_backoff(self, log_payload: LogPayload, /) -> None:
    """Post a JSON response to the warehouse, with backoff applied."""

    res = post(
        f"{self.base_url}{self.ITEM_ENDPOINT}",
        timeout=60,
        json=log_payload,
    )

    if res.status_code == HTTPStatus.CONFLICT:
        return

    if (
        str(res.status_code).startswith("4")
        and res.status_code != HTTPStatus.TOO_MANY_REQUESTS
    ):
        LOGGER.error(
            "Permanent error posting log to warehouse (%s %s): %s",
            res.status_code,
            res.reason,
            res.text,
        )
        return

    res.raise_for_status()

add_warehouse_handler(logger, *, level=DEBUG, warehouse_host=None, warehouse_port=None, initialize_warehouse=False, disable_queue=False)

Add a WarehouseHandler to an existing logger.

Parameters:

Name Type Description Default
logger Logger

the logger to add a file handler to

required
level int

the logging level to be used for the WarehouseHandler

DEBUG
warehouse_host str

the hostname of the Item Warehouse

None
warehouse_port int

the port of the Item Warehouse

None
initialize_warehouse bool

whether to initialize the Warehouse

False
disable_queue bool

whether to disable the queue for the WarehouseHandler

False

Returns:

Name Type Description
WarehouseHandler WarehouseHandler

the WarehouseHandler that was added to the logger

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def add_warehouse_handler(
    logger: Logger,
    *,
    level: int = DEBUG,
    warehouse_host: str | None = None,
    warehouse_port: int | None = None,
    initialize_warehouse: bool = False,
    disable_queue: bool = False,
) -> WarehouseHandler:
    """Add a WarehouseHandler to an existing logger.

    Args:
        logger (Logger): the logger to add a file handler to
        level (int): the logging level to be used for the WarehouseHandler
        warehouse_host (str): the hostname of the Item Warehouse
        warehouse_port (int): the port of the Item Warehouse
        initialize_warehouse (bool): whether to initialize the Warehouse
        disable_queue (bool): whether to disable the queue for the WarehouseHandler

    Returns:
        WarehouseHandler: the WarehouseHandler that was added to the logger
    """

    wh_handler = WarehouseHandler(
        level=level,
        warehouse_host=warehouse_host,
        warehouse_port=warehouse_port,
        initialize_warehouse=initialize_warehouse,
    )

    if disable_queue:
        for handler in logger.handlers:
            if isinstance(
                handler,
                WarehouseHandler,
            ) and handler.base_url == WarehouseHandler.get_base_url(
                warehouse_host,
                warehouse_port,
            ):
                LOGGER.warning("WarehouseHandler already exists for %s", handler.base_url)
                return handler

        logger.addHandler(wh_handler)
        return wh_handler

    for handler in logger.handlers:
        if isinstance(handler, _QueueHandler) and handler.warehouse_handler == wh_handler:
            LOGGER.warning(
                "WarehouseHandler already exists for %s",
                handler.warehouse_handler.base_url,
            )
            return handler.warehouse_handler

    listener = FlushableQueueListener(LOG_QUEUE, wh_handler)
    listener.start()

    q_handler = _QueueHandler(LOG_QUEUE, wh_handler)
    q_handler.setLevel(level)

    logger.addHandler(q_handler)

    # Ensure the queue worker is stopped when the program exits
    atexit.register(LOGGER.info, "Stopped WarehouseHandler")
    atexit.register(listener.flush_and_stop)
    atexit.register(LOG_QUEUE.put, None)  # Processed in reverse order

    return wh_handler

base_handler

Custom handler to allow logging directly into an Item Warehouse.

BaseWarehouseHandler

Bases: Handler, JsonApiClient[WarehouseLog | WarehouseLogPage]

Custom handler to allow logging directly into an Item Warehouse.

https://github.com/worgarside/addon-item-warehouse-api https://github.com/worgarside/addon-item-warehouse-website

The primary key of the log warehouse is a combination of
  • log_hash (message content)
  • logger (name of the logger)
  • log_host (hostname of the machine the log was generated on)

This means that the same log message from the same host will only be stored once.

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class BaseWarehouseHandler(Handler, JsonApiClient[WarehouseLog | WarehouseLogPage]):
    """Custom handler to allow logging directly into an Item Warehouse.

    https://github.com/worgarside/addon-item-warehouse-api
    https://github.com/worgarside/addon-item-warehouse-website

    The primary key of the log warehouse is a combination of:
        - log_hash (message content)
        - logger (name of the logger)
        - log_host (hostname of the machine the log was generated on)

    This means that the same log message from the same host will only be stored once.
    """

    HOST_NAME: Final = gethostname()

    ITEM_NAME: Final = "log"
    WAREHOUSE_NAME: Final = "lumberyard"

    WAREHOUSE_ENDPOINT: Final = f"/warehouses/{WAREHOUSE_NAME}"
    ITEM_ENDPOINT: Final = f"{WAREHOUSE_ENDPOINT}/items"

    _WAREHOUSE_SCHEMA: Final[WarehouseSchema] = {
        "name": WAREHOUSE_NAME,
        "item_name": ITEM_NAME,
        "item_schema": {
            "created_at": {
                "nullable": False,
                "type": "double",
            },
            "exception_message": {
                "nullable": True,
                "type": "string",
                "type_kwargs": {"length": 2048},
            },
            "exception_type": {
                "nullable": True,
                "type": "string",
                "type_kwargs": {"length": 64},
            },
            "exception_traceback": {
                "nullable": True,
                "type": "text",
                "type_kwargs": {"length": 16383},
            },
            "file": {
                "nullable": False,
                "type": "string",
                "type_kwargs": {"length": 255},
            },
            "level": {"nullable": False, "type": "integer"},
            "line": {"nullable": False, "type": "integer"},
            "log_hash": {
                "nullable": False,
                "primary_key": True,
                "type": "string",
                "type_kwargs": {"length": 32},
            },
            "log_host": {
                "default": "func:client_ip",
                "nullable": False,
                "primary_key": True,
                "type": "string",
                "type_kwargs": {"length": 45},
            },
            "logger": {
                "nullable": False,
                "primary_key": True,
                "type": "string",
                "type_kwargs": {"length": 255},
            },
            "message": {
                "nullable": False,
                "type": "string",
                "type_kwargs": {"length": 2048},
            },
            "module": {
                "nullable": False,
                "type": "string",
                "type_kwargs": {"length": 255},
            },
            "process": {
                "nullable": False,
                "type": "string",
                "type_kwargs": {"length": 255},
            },
            "thread": {
                "nullable": False,
                "type": "string",
                "type_kwargs": {"length": 255},
            },
        },
    }

    _WAREHOUSE_TYPES: Final[dict[str, str]] = {
        k: v["type"] for k, v in _WAREHOUSE_SCHEMA["item_schema"].items()
    }

    def __init__(
        self,
        *,
        level: int | Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
        warehouse_host: str | None = None,
        warehouse_port: int | None = None,
    ) -> None:
        """Initialize the handler."""

        Handler.__init__(self, level=level)

        JsonApiClient.__init__(
            self,
            base_url=self.get_base_url(warehouse_host, warehouse_port),
        )

    def emit(self, _: LogRecord) -> None:  # noqa: D102
        raise NotImplementedError("Don't use the base handler directly.")

    @staticmethod
    def get_base_url(host: str | None, port: int | None) -> str:
        """Get the base URL for the Item Warehouse.

        Args:
            host (str): the hostname of the Item Warehouse
            port (int): the port of the Item Warehouse

        Returns:
            str: the base URL for the Item Warehouse
        """

        host = host or str(getenv("ITEM_WAREHOUSE_HOST", "http://homeassistant.local"))
        port = port if port is not None else int(getenv("ITEM_WAREHOUSE_PORT", "8002"))

        if port:
            host += f":{port}"

        host += "/v1"

        return host

    @staticmethod
    def get_log_hash(record: LogRecord) -> str:
        """Get a hash of the log message.

        Args:
            record (LogRecord): the log record to hash

        Returns:
            str: the hexdigest of the hash
        """
        return md5(record.getMessage().encode(), usedforsecurity=False).hexdigest()

    @staticmethod
    def get_log_payload(record: LogRecord) -> LogPayload:
        """Get a log payload from a log record.

        Args:
            record (LogRecord): the log record to convert

        Returns:
            LogPayload: the converted log payload
        """

        log_payload: LogPayload = {
            "created_at": record.created,
            "exception_message": None,
            "exception_type": None,
            "exception_traceback": None,
            "file": record.pathname,
            "level": record.levelno,
            "line": record.lineno,
            "log_hash": BaseWarehouseHandler.get_log_hash(record),
            "log_host": BaseWarehouseHandler.HOST_NAME,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "process": record.processName,
            "thread": record.threadName,
        }

        if record.exc_info and record.exc_info[0]:
            log_payload["exception_message"] = str(record.exc_info[1])
            log_payload["exception_type"] = record.exc_info[0].__name__
            log_payload["exception_traceback"] = "".join(
                format_exception(record.exc_info[1]),
            )

        return log_payload

    def __eq__(self, other: object) -> bool:
        """Compare two WarehouseHandlers for equality."""

        if not isinstance(other, BaseWarehouseHandler):  # pragma: no cover
            return NotImplemented

        return self.base_url == other.base_url and self.level == other.level

    def __hash__(self) -> int:  # noqa: D105
        return super().__hash__()

    def __repr__(self) -> str:  # noqa: D105
        return (
            f"<{self.__class__.__name__}(base_url={self.base_url}, level={self.level})>"
        )
__eq__(other)

Compare two WarehouseHandlers for equality.

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
279
280
281
282
283
284
285
def __eq__(self, other: object) -> bool:
    """Compare two WarehouseHandlers for equality."""

    if not isinstance(other, BaseWarehouseHandler):  # pragma: no cover
        return NotImplemented

    return self.base_url == other.base_url and self.level == other.level
__init__(*, level='INFO', warehouse_host=None, warehouse_port=None)

Initialize the handler.

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def __init__(
    self,
    *,
    level: int | Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
    warehouse_host: str | None = None,
    warehouse_port: int | None = None,
) -> None:
    """Initialize the handler."""

    Handler.__init__(self, level=level)

    JsonApiClient.__init__(
        self,
        base_url=self.get_base_url(warehouse_host, warehouse_port),
    )
get_base_url(host, port) staticmethod

Get the base URL for the Item Warehouse.

Parameters:

Name Type Description Default
host str

the hostname of the Item Warehouse

required
port int

the port of the Item Warehouse

required

Returns:

Name Type Description
str str

the base URL for the Item Warehouse

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@staticmethod
def get_base_url(host: str | None, port: int | None) -> str:
    """Get the base URL for the Item Warehouse.

    Args:
        host (str): the hostname of the Item Warehouse
        port (int): the port of the Item Warehouse

    Returns:
        str: the base URL for the Item Warehouse
    """

    host = host or str(getenv("ITEM_WAREHOUSE_HOST", "http://homeassistant.local"))
    port = port if port is not None else int(getenv("ITEM_WAREHOUSE_PORT", "8002"))

    if port:
        host += f":{port}"

    host += "/v1"

    return host
get_log_hash(record) staticmethod

Get a hash of the log message.

Parameters:

Name Type Description Default
record LogRecord

the log record to hash

required

Returns:

Name Type Description
str str

the hexdigest of the hash

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
230
231
232
233
234
235
236
237
238
239
240
@staticmethod
def get_log_hash(record: LogRecord) -> str:
    """Get a hash of the log message.

    Args:
        record (LogRecord): the log record to hash

    Returns:
        str: the hexdigest of the hash
    """
    return md5(record.getMessage().encode(), usedforsecurity=False).hexdigest()
get_log_payload(record) staticmethod

Get a log payload from a log record.

Parameters:

Name Type Description Default
record LogRecord

the log record to convert

required

Returns:

Name Type Description
LogPayload LogPayload

the converted log payload

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@staticmethod
def get_log_payload(record: LogRecord) -> LogPayload:
    """Get a log payload from a log record.

    Args:
        record (LogRecord): the log record to convert

    Returns:
        LogPayload: the converted log payload
    """

    log_payload: LogPayload = {
        "created_at": record.created,
        "exception_message": None,
        "exception_type": None,
        "exception_traceback": None,
        "file": record.pathname,
        "level": record.levelno,
        "line": record.lineno,
        "log_hash": BaseWarehouseHandler.get_log_hash(record),
        "log_host": BaseWarehouseHandler.HOST_NAME,
        "logger": record.name,
        "message": record.getMessage(),
        "module": record.module,
        "process": record.processName,
        "thread": record.threadName,
    }

    if record.exc_info and record.exc_info[0]:
        log_payload["exception_message"] = str(record.exc_info[1])
        log_payload["exception_type"] = record.exc_info[0].__name__
        log_payload["exception_traceback"] = "".join(
            format_exception(record.exc_info[1]),
        )

    return log_payload

LogPayload

Bases: TypedDict

Type for a log payload.

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class LogPayload(TypedDict):
    """Type for a log payload."""

    created_at: float
    exception_message: str | None
    exception_type: str | None
    exception_traceback: str | None
    file: str
    level: int
    line: int
    log_hash: str
    log_host: str
    logger: str
    message: str
    module: str
    process: str | None
    thread: str | None

WarehouseLog

Bases: TypedDict

Type for a log record.

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class WarehouseLog(TypedDict):
    """Type for a log record."""

    created_at: str
    file: str
    level: int
    line: int
    log_hash: str
    log_host: str
    logger: str
    message: str
    module: str
    process: str
    thread: str

WarehouseLogPage

Bases: TypedDict

Type for a page of log records.

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
21
22
23
24
25
26
27
class WarehouseLogPage(TypedDict):
    """Type for a page of log records."""

    count: int
    items: list[WarehouseLog]
    next_offset: int | None
    total: int

WarehouseSchema

Bases: TypedDict

Type for a Warehouse schema.

Source code in wg_utilities/loggers/item_warehouse/base_handler.py
63
64
65
66
67
68
69
class WarehouseSchema(TypedDict):
    """Type for a Warehouse schema."""

    created_at: NotRequired[str]
    item_name: str
    item_schema: dict[str, FieldDefinition]
    name: str

flushable_queue_listener

Simple subclass of logging.handlers.QueueListener that can be flushed and stopped.

FlushableQueueListener

Bases: QueueListener

A QueueListener that can be flushed and stopped.

Source code in wg_utilities/loggers/item_warehouse/flushable_queue_listener.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FlushableQueueListener(QueueListener):
    """A QueueListener that can be flushed and stopped."""

    queue: Queue[Any]

    def flush_and_stop(self, timeout: float = 300) -> None:
        """Wait for the queue to empty and stop.

        Args:
            timeout (float): the maximum time to wait for the queue to empty
        """

        start_time = time()

        while not self.queue.empty():
            sleep(1)

            if 0 < timeout < time() - start_time:
                LOGGER.warning("QueueListener failed to flush after %s seconds", timeout)
                break

        self.stop()
flush_and_stop(timeout=300)

Wait for the queue to empty and stop.

Parameters:

Name Type Description Default
timeout float

the maximum time to wait for the queue to empty

300
Source code in wg_utilities/loggers/item_warehouse/flushable_queue_listener.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def flush_and_stop(self, timeout: float = 300) -> None:
    """Wait for the queue to empty and stop.

    Args:
        timeout (float): the maximum time to wait for the queue to empty
    """

    start_time = time()

    while not self.queue.empty():
        sleep(1)

        if 0 < timeout < time() - start_time:
            LOGGER.warning("QueueListener failed to flush after %s seconds", timeout)
            break

    self.stop()

warehouse_handler

Custom handler to allow logging directly into an Item Warehouse.

WarehouseHandler

Bases: BaseWarehouseHandler

Custom handler to allow logging directly into an Item Warehouse.

https://github.com/worgarside/addon-item-warehouse-api https://github.com/worgarside/addon-item-warehouse-website

The primary key of the log warehouse is a combination of
  • log_hash (message content)
  • logger (name of the logger)
  • log_host (hostname of the machine the log was generated on)

This means that the same log message from the same host will only be stored once.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class WarehouseHandler(BaseWarehouseHandler):
    """Custom handler to allow logging directly into an Item Warehouse.

    https://github.com/worgarside/addon-item-warehouse-api
    https://github.com/worgarside/addon-item-warehouse-website

    The primary key of the log warehouse is a combination of:
        - log_hash (message content)
        - logger (name of the logger)
        - log_host (hostname of the machine the log was generated on)

    This means that the same log message from the same host will only be stored once.
    """

    def __init__(
        self,
        *,
        level: int | Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
        warehouse_host: str | None = None,
        warehouse_port: int | None = None,
        initialize_warehouse: bool = False,
    ) -> None:
        """Initialize the handler and Log Warehouse."""

        super().__init__(
            level=level,
            warehouse_host=warehouse_host,
            warehouse_port=warehouse_port,
        )

        if initialize_warehouse:
            self.initialize_warehouse()

    def emit(self, record: LogRecord) -> None:
        """Add log record to the internal record store.

        Args:
            record (LogRecord): the new log record being "emitted"
        """

        log_payload = self.get_log_payload(record)

        self.post_with_backoff(log_payload)

    def initialize_warehouse(self) -> None:
        """Create a new warehouse or validate an existing one."""
        try:
            schema: WarehouseSchema = self.get_json_response(  # type: ignore[assignment]
                self.WAREHOUSE_ENDPOINT,
                timeout=5,
            )
        except HTTPError as exc:
            if (
                exc.response is not None
                and exc.response.status_code == HTTPStatus.NOT_FOUND
            ):
                schema = self.post_json_response(  # type: ignore[assignment]
                    "/warehouses",
                    json=self._WAREHOUSE_SCHEMA,
                    timeout=5,
                )
                LOGGER.info("Created new Warehouse: %r", schema)
        except Exception:
            LOGGER.exception("Error creating Warehouse")
        else:
            LOGGER.info(
                "Warehouse %s already exists - created at %s",
                schema.get("name", None),
                schema.get("created_at", None),
            )

            schema_types = {
                k: v["type"] for k, v in schema.get("item_schema", {}).items()
            }

            if schema_types != self._WAREHOUSE_TYPES:
                raise ValueError(
                    "Warehouse types do not match expected types: "
                    + dumps(
                        {
                            k: {"expected": v, "actual": schema_types.get(k)}
                            for k, v in self._WAREHOUSE_TYPES.items()
                            if v != schema_types.get(k)
                        },
                        default=str,
                    ),
                )

    @backoff(
        RequestException,
        logger=LOGGER,
        max_tries=BACKOFF_MAX_TRIES,
        timeout=BACKOFF_TIMEOUT,
    )
    def post_with_backoff(self, log_payload: LogPayload, /) -> None:
        """Post a JSON response to the warehouse, with backoff applied."""

        res = post(
            f"{self.base_url}{self.ITEM_ENDPOINT}",
            timeout=60,
            json=log_payload,
        )

        if res.status_code == HTTPStatus.CONFLICT:
            return

        if (
            str(res.status_code).startswith("4")
            and res.status_code != HTTPStatus.TOO_MANY_REQUESTS
        ):
            LOGGER.error(
                "Permanent error posting log to warehouse (%s %s): %s",
                res.status_code,
                res.reason,
                res.text,
            )
            return

        res.raise_for_status()
__init__(*, level='INFO', warehouse_host=None, warehouse_port=None, initialize_warehouse=False)

Initialize the handler and Log Warehouse.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    *,
    level: int | Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
    warehouse_host: str | None = None,
    warehouse_port: int | None = None,
    initialize_warehouse: bool = False,
) -> None:
    """Initialize the handler and Log Warehouse."""

    super().__init__(
        level=level,
        warehouse_host=warehouse_host,
        warehouse_port=warehouse_port,
    )

    if initialize_warehouse:
        self.initialize_warehouse()
emit(record)

Add log record to the internal record store.

Parameters:

Name Type Description Default
record LogRecord

the new log record being "emitted"

required
Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
66
67
68
69
70
71
72
73
74
75
def emit(self, record: LogRecord) -> None:
    """Add log record to the internal record store.

    Args:
        record (LogRecord): the new log record being "emitted"
    """

    log_payload = self.get_log_payload(record)

    self.post_with_backoff(log_payload)
initialize_warehouse()

Create a new warehouse or validate an existing one.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def initialize_warehouse(self) -> None:
    """Create a new warehouse or validate an existing one."""
    try:
        schema: WarehouseSchema = self.get_json_response(  # type: ignore[assignment]
            self.WAREHOUSE_ENDPOINT,
            timeout=5,
        )
    except HTTPError as exc:
        if (
            exc.response is not None
            and exc.response.status_code == HTTPStatus.NOT_FOUND
        ):
            schema = self.post_json_response(  # type: ignore[assignment]
                "/warehouses",
                json=self._WAREHOUSE_SCHEMA,
                timeout=5,
            )
            LOGGER.info("Created new Warehouse: %r", schema)
    except Exception:
        LOGGER.exception("Error creating Warehouse")
    else:
        LOGGER.info(
            "Warehouse %s already exists - created at %s",
            schema.get("name", None),
            schema.get("created_at", None),
        )

        schema_types = {
            k: v["type"] for k, v in schema.get("item_schema", {}).items()
        }

        if schema_types != self._WAREHOUSE_TYPES:
            raise ValueError(
                "Warehouse types do not match expected types: "
                + dumps(
                    {
                        k: {"expected": v, "actual": schema_types.get(k)}
                        for k, v in self._WAREHOUSE_TYPES.items()
                        if v != schema_types.get(k)
                    },
                    default=str,
                ),
            )
post_with_backoff(log_payload)

Post a JSON response to the warehouse, with backoff applied.

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@backoff(
    RequestException,
    logger=LOGGER,
    max_tries=BACKOFF_MAX_TRIES,
    timeout=BACKOFF_TIMEOUT,
)
def post_with_backoff(self, log_payload: LogPayload, /) -> None:
    """Post a JSON response to the warehouse, with backoff applied."""

    res = post(
        f"{self.base_url}{self.ITEM_ENDPOINT}",
        timeout=60,
        json=log_payload,
    )

    if res.status_code == HTTPStatus.CONFLICT:
        return

    if (
        str(res.status_code).startswith("4")
        and res.status_code != HTTPStatus.TOO_MANY_REQUESTS
    ):
        LOGGER.error(
            "Permanent error posting log to warehouse (%s %s): %s",
            res.status_code,
            res.reason,
            res.text,
        )
        return

    res.raise_for_status()

add_warehouse_handler(logger, *, level=DEBUG, warehouse_host=None, warehouse_port=None, initialize_warehouse=False, disable_queue=False)

Add a WarehouseHandler to an existing logger.

Parameters:

Name Type Description Default
logger Logger

the logger to add a file handler to

required
level int

the logging level to be used for the WarehouseHandler

DEBUG
warehouse_host str

the hostname of the Item Warehouse

None
warehouse_port int

the port of the Item Warehouse

None
initialize_warehouse bool

whether to initialize the Warehouse

False
disable_queue bool

whether to disable the queue for the WarehouseHandler

False

Returns:

Name Type Description
WarehouseHandler WarehouseHandler

the WarehouseHandler that was added to the logger

Source code in wg_utilities/loggers/item_warehouse/warehouse_handler.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def add_warehouse_handler(
    logger: Logger,
    *,
    level: int = DEBUG,
    warehouse_host: str | None = None,
    warehouse_port: int | None = None,
    initialize_warehouse: bool = False,
    disable_queue: bool = False,
) -> WarehouseHandler:
    """Add a WarehouseHandler to an existing logger.

    Args:
        logger (Logger): the logger to add a file handler to
        level (int): the logging level to be used for the WarehouseHandler
        warehouse_host (str): the hostname of the Item Warehouse
        warehouse_port (int): the port of the Item Warehouse
        initialize_warehouse (bool): whether to initialize the Warehouse
        disable_queue (bool): whether to disable the queue for the WarehouseHandler

    Returns:
        WarehouseHandler: the WarehouseHandler that was added to the logger
    """

    wh_handler = WarehouseHandler(
        level=level,
        warehouse_host=warehouse_host,
        warehouse_port=warehouse_port,
        initialize_warehouse=initialize_warehouse,
    )

    if disable_queue:
        for handler in logger.handlers:
            if isinstance(
                handler,
                WarehouseHandler,
            ) and handler.base_url == WarehouseHandler.get_base_url(
                warehouse_host,
                warehouse_port,
            ):
                LOGGER.warning("WarehouseHandler already exists for %s", handler.base_url)
                return handler

        logger.addHandler(wh_handler)
        return wh_handler

    for handler in logger.handlers:
        if isinstance(handler, _QueueHandler) and handler.warehouse_handler == wh_handler:
            LOGGER.warning(
                "WarehouseHandler already exists for %s",
                handler.warehouse_handler.base_url,
            )
            return handler.warehouse_handler

    listener = FlushableQueueListener(LOG_QUEUE, wh_handler)
    listener.start()

    q_handler = _QueueHandler(LOG_QUEUE, wh_handler)
    q_handler.setLevel(level)

    logger.addHandler(q_handler)

    # Ensure the queue worker is stopped when the program exits
    atexit.register(LOGGER.info, "Stopped WarehouseHandler")
    atexit.register(listener.flush_and_stop)
    atexit.register(LOG_QUEUE.put, None)  # Processed in reverse order

    return wh_handler

list_handler

Helper class to allow retrieval of log records after the fact.

ListHandler

Bases: Handler

Custom handler to allow retrieval of log records after the fact.

Parameters:

Name Type Description Default
records_list list

allows the user to pass in a pre-defined list to add records to

None
Source code in wg_utilities/loggers/list_handler.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class ListHandler(Handler):
    """Custom handler to allow retrieval of log records after the fact.

    Args:
        records_list (list): allows the user to pass in a pre-defined list to add records to
    """

    def __init__(
        self,
        records_list: list[Any] | None = None,
        *,
        log_ttl: int | None = 86400,
        on_record: Callable[[LogRecord], Any] | None = None,
        on_expiry: Callable[[LogRecord], Any] | None = None,
    ):
        super().__init__()

        # Can't use `or` here as `[]` is False
        self._records_list: list[LogRecord] = (
            records_list if records_list is not None else []
        )

        self.ttl = log_ttl
        self.on_record = on_record
        self.on_expiry = on_expiry

    def emit(self, record: LogRecord) -> None:
        """Add log record to the internal record store.

        Args:
            record (LogRecord): the new log record being "emitted"
        """
        self.expire_records()

        self._records_list.append(record)

        if self.on_record is not None:
            self.on_record(record)

    def expire_records(self) -> None:
        """Remove records older than `self.ttl`, and call `self.on_expiry` on them."""
        if self.ttl is None:
            return

        now = utcnow().timestamp()

        while self._records_list:
            record = self._records_list.pop(0)

            if record.created < (now - self.ttl):
                if self.on_expiry is not None:
                    self.on_expiry(record)
            else:
                self._records_list.insert(0, record)
                break

    @property
    def debug_records(self) -> list[LogRecord]:
        """Debug level records.

        Returns:
            list: a list of log records with the level DEBUG
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == DEBUG]

    @property
    def info_records(self) -> list[LogRecord]:
        """Info level records.

        Returns:
            list: a list of log records with the level INFO
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == INFO]

    @property
    def warning_records(self) -> list[LogRecord]:
        """Warning level records.

        Returns:
            list: a list of log records with the level WARNING
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == WARNING]

    @property
    def error_records(self) -> list[LogRecord]:
        """Error level records.

        Returns:
            list: a list of log records with the level ERROR
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == ERROR]

    @property
    def critical_records(self) -> list[LogRecord]:
        """Critical level records.

        Returns:
            list: a list of log records with the level CRITICAL
        """
        self.expire_records()
        return [record for record in self._records_list if record.levelno == CRITICAL]

    @property
    def records(self) -> list[LogRecord]:
        """All records.

        Returns:
            list: a list of log records with the level CRITICAL
        """
        self.expire_records()
        return self._records_list

critical_records: list[LogRecord] property

Critical level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level CRITICAL

debug_records: list[LogRecord] property

Debug level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level DEBUG

error_records: list[LogRecord] property

Error level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level ERROR

info_records: list[LogRecord] property

Info level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level INFO

records: list[LogRecord] property

All records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level CRITICAL

warning_records: list[LogRecord] property

Warning level records.

Returns:

Name Type Description
list list[LogRecord]

a list of log records with the level WARNING

emit(record)

Add log record to the internal record store.

Parameters:

Name Type Description Default
record LogRecord

the new log record being "emitted"

required
Source code in wg_utilities/loggers/list_handler.py
40
41
42
43
44
45
46
47
48
49
50
51
def emit(self, record: LogRecord) -> None:
    """Add log record to the internal record store.

    Args:
        record (LogRecord): the new log record being "emitted"
    """
    self.expire_records()

    self._records_list.append(record)

    if self.on_record is not None:
        self.on_record(record)

expire_records()

Remove records older than self.ttl, and call self.on_expiry on them.

Source code in wg_utilities/loggers/list_handler.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def expire_records(self) -> None:
    """Remove records older than `self.ttl`, and call `self.on_expiry` on them."""
    if self.ttl is None:
        return

    now = utcnow().timestamp()

    while self._records_list:
        record = self._records_list.pop(0)

        if record.created < (now - self.ttl):
            if self.on_expiry is not None:
                self.on_expiry(record)
        else:
            self._records_list.insert(0, record)
            break

add_list_handler(logger, *, log_list=None, level=DEBUG, log_ttl=86400, on_expiry=None)

Add a ListHandler to an existing logger.

Parameters:

Name Type Description Default
logger Logger

the logger to add a file handler to

required
log_list list

the list for the handler to write logs to

None
level int

the logging level to be used for the ListHandler

DEBUG
log_ttl int

number of seconds to retain a log for

86400
on_expiry Callable

function to call with expired logs

None
Source code in wg_utilities/loggers/list_handler.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def add_list_handler(
    logger: Logger,
    *,
    log_list: list[Any] | None = None,
    level: int = DEBUG,
    log_ttl: int | None = 86400,
    on_expiry: Callable[[LogRecord], Any] | None = None,
) -> ListHandler:
    """Add a ListHandler to an existing logger.

    Args:
        logger (Logger): the logger to add a file handler to
        log_list (list): the list for the handler to write logs to
        level (int): the logging level to be used for the ListHandler
        log_ttl (int): number of seconds to retain a log for
        on_expiry (Callable): function to call with expired logs
    """

    l_handler = ListHandler(log_list, log_ttl=log_ttl, on_expiry=on_expiry)
    l_handler.setLevel(level)

    logger.addHandler(l_handler)

    return l_handler

stream_handler

Helper function to add a StreamHandler to a logger.

add_stream_handler(logger, *, formatter=FORMATTER, level=DEBUG)

Add a FileHandler to an existing logger.

Parameters:

Name Type Description Default
logger Logger

the logger to add a file handler to

required
formatter Formatter

the formatter to use in the stream logs

FORMATTER
level int

the logging level to be used for the FileHandler

DEBUG

Returns:

Name Type Description
Logger Logger

the logger instance, returned for use in one-liners: logger = add_stream_handler(logging.getLogger(__name__))

Source code in wg_utilities/loggers/stream_handler.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def add_stream_handler(
    logger: Logger,
    *,
    formatter: Formatter | None = FORMATTER,
    level: int = DEBUG,
) -> Logger:
    """Add a FileHandler to an existing logger.

    Args:
        logger (Logger): the logger to add a file handler to
        formatter (Formatter): the formatter to use in the stream logs
        level (int): the logging level to be used for the FileHandler

    Returns:
        Logger: the logger instance, returned for use in one-liners:
            `logger = add_stream_handler(logging.getLogger(__name__))`
    """

    s_handler = StreamHandler(stdout)
    s_handler.setFormatter(formatter)
    s_handler.setLevel(level)

    logger.addHandler(s_handler)

    return logger

get_streaming_logger(name, *, formatter=FORMATTER, level=DEBUG)

Get a logger with a StreamHandler attached.

Parameters:

Name Type Description Default
name str

the name of the logger to create

required
formatter Formatter

the formatter to use in the stream logs

FORMATTER
level int

the logging level to be used for the FileHandler

DEBUG

Returns:

Name Type Description
Logger Logger

the logger instance, returned for use in one-liners: logger = get_streaming_logger(__name__)

Source code in wg_utilities/loggers/stream_handler.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def get_streaming_logger(
    name: str,
    *,
    formatter: Formatter | None = FORMATTER,
    level: int = DEBUG,
) -> Logger:
    """Get a logger with a StreamHandler attached.

    Args:
        name (str): the name of the logger to create
        formatter (Formatter): the formatter to use in the stream logs
        level (int): the logging level to be used for the FileHandler

    Returns:
        Logger: the logger instance, returned for use in one-liners:
            `logger = get_streaming_logger(__name__)`
    """
    logger = getLogger(name)
    logger.setLevel(level)

    return add_stream_handler(logger, formatter=formatter, level=level)