skip to content

loguru — Structured Logging

Add structured, colorized logging to Python apps with loguru. Covers sinks, log levels, file rotation, retention, exception catching, and context binding.

20 min read 55 snippets deep dive

loguru — Structured Logging#

What it is#

Loguru is a structured logging library that replaces Python’s standard logging module with a simpler, zero-configuration API. It provides:

  • Colorized output with level labels out of the box
  • File sinks with automatic rotation, retention, and compression
  • logger.catch() decorator for automatic exception catching
  • Context binding with logger.bind()
  • Structured JSON output via serialize=True

Install#

pip install loguru

Output: (none — exits 0 on success)

Quick example#

from loguru import logger

logger.debug("Starting up")
logger.info("Server running on port {port}", port=8080)
logger.warning("Low memory: {mb} MB remaining", mb=128)
logger.error("Connection refused to {host}", host="db.internal")

Output:

2026-04-25 14:30:01.234 | DEBUG    | __main__:<module>:3 - Starting up
2026-04-25 14:30:01.234 | INFO     | __main__:<module>:4 - Server running on port 8080
2026-04-25 14:30:01.234 | WARNING  | __main__:<module>:5 - Low memory: 128 MB remaining
2026-04-25 14:30:01.234 | ERROR    | __main__:<module>:6 - Connection refused to db.internal

[!NOTE] In the terminal, each level is colorized: DEBUG is dim, INFO is white, WARNING is yellow, ERROR is red. The output includes module, function, and line number automatically.

When / why to use it over logging#

Featureloggingloguru
Zero-config setup❌ (handlers, formatters)
Colorized output
f-string-style messages
File rotationManual setuplogger.add("app.log", rotation="10 MB")
Exception contextManuallogger.catch() / logger.exception()
Structured JSONManuallogger.add(..., serialize=True)

Common pitfalls#

[!WARNING] Thread-safety — loguru is thread-safe by default. But if you’re using multiprocessing, you must use enqueue=True on your file sink: logger.add("app.log", enqueue=True) to avoid write conflicts.

[!WARNING] logger.remove() before reconfiguring — loguru starts with a default stderr sink (id 0). If you add your own stderr sink without removing the default, you get duplicate output. Call logger.remove() first.

[!TIP] Use logger.info("value={x}", x=val) (lazy formatting) rather than logger.info(f"value={val}"). Lazy formatting skips string interpolation entirely if the log level is disabled — important for performance in hot paths.

Richer example — file sink with rotation#

import sys
from loguru import logger

# Remove default stderr sink and add custom ones
logger.remove()

# Concise stderr: only INFO and above
logger.add(
    sys.stderr,
    level="INFO",
    format="<green>{time:HH:mm:ss}</green> | <level>{level:<8}</level> | {message}",
    colorize=True,
)

# Verbose file: DEBUG and above, with rotation
logger.add(
    "logs/app.log",
    level="DEBUG",
    rotation="10 MB",       # new file after 10 MB
    retention="7 days",     # delete logs older than 7 days
    compression="zip",      # compress rotated files
    format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {name}:{line} | {message}",
)

logger.debug("Debug detail — goes to file only")
logger.info("Server started")
logger.warning("Disk usage above 80%")

Output (stderr only):

14:30:01 | INFO     | Server started
14:30:01 | WARNING  | Disk usage above 80%

Exception catching#

The @logger.catch decorator wraps a function so that any unhandled exception is logged with a full, colorized traceback instead of crashing silently. Apply it to top-level handlers or background tasks where you want errors recorded but execution to continue.

from loguru import logger

@logger.catch
def divide(a: int, b: int) -> float:
    return a / b

result = divide(10, 0)   # won't crash the program — logs the full traceback

Output:

2026-04-25 14:30:01.234 | ERROR    | __main__:divide:4 - An error has been caught in function 'divide', process 'MainProcess' (1234), thread 'MainThread' (5678):
Traceback (most recent call last):
  ...
ZeroDivisionError: division by zero

Context binding#

logger.bind(**kwargs) returns a new logger instance with extra key/value pairs attached to every log record it emits. Use it to tag all log lines within a request, task, or user session with a correlation ID — making it easy to filter logs for a specific context.

from loguru import logger

def handle_request(request_id: str, user_id: int):
    log = logger.bind(request_id=request_id, user_id=user_id)
    log.info("Request received")
    log.info("Processing complete")

handle_request("req-abc123", user_id=42)

Output:

2026-04-25 14:30:01 | INFO | __main__:handle_request:4 - Request received
2026-04-25 14:30:01 | INFO | __main__:handle_request:5 - Processing complete

JSON / structured output#

Passing serialize=True to logger.add() switches that sink to emit one JSON object per log line instead of formatted text. This is the standard way to feed loguru into log aggregators (Datadog, Loki, CloudWatch) that expect structured, machine-parseable logs.

import sys
from loguru import logger

logger.remove()
logger.add(sys.stdout, serialize=True)   # every line is a JSON object

logger.info("User logged in", user_id=42, action="login")

Output:

{"text": "2026-04-25 14:30:01.234 | INFO | __main__:<module>:5 - User logged in", "record": {"elapsed": ..., "exception": null, "extra": {"user_id": 42, "action": "login"}, "file": ..., "function": "<module>", "level": {"icon": "ℹ️", "name": "INFO", "no": 20}, "line": 5, "message": "User logged in", ...}}

Log levels#

Levellogger.X()Default value
TRACElogger.trace()5
DEBUGlogger.debug()10
INFOlogger.info()20
SUCCESSlogger.success()25
WARNINGlogger.warning()30
ERRORlogger.error()40
CRITICALlogger.critical()50

[!TIP] logger.success() is a loguru-specific level (between INFO and WARNING) useful for marking successful completion of significant operations.

loguru vs stdlib logging#

The stdlib logging module is venerable, exhaustively configurable, and notoriously painful to set up. Loguru is the deliberate counterpoint: a single global logger object that works without configuration and exposes 90% of stdlib’s power through a much smaller API. The trade-off:

Concernstdlib loggingloguru
Mental modelHierarchy of named loggers; each picks up handlers from ancestorsOne global logger; handlers added/removed by id
Setuplogger = logging.getLogger(__name__), handlers, formatters, levels — at least 10 linesfrom loguru import logger — zero lines
Formatting%-style strings via Formatterf-string-like inline placeholders + colour markup
Rotationlogging.handlers.RotatingFileHandler + manual size logicrotation="10 MB"
RetentionManual (no built-in cleanup)retention="7 days"
CompressionManualcompression="zip"
Async / multiproc safetyManual QueueHandler setupenqueue=True
Exception tracebacklogger.exception() + manual extra=@logger.catch or logger.opt(exception=True)
JSON outputManual custom formatterserialize=True
Context propagationManual Filter + extra=logger.bind() / logger.contextualize()
EcosystemUniversal — every Python lib uses itLoguru-only; bridge with InterceptHandler

The pragmatic choice:

  • New apps — start with loguru. The API is dramatically smaller and the features (rotation, retention, structured output) work out of the box.
  • Libraries — use stdlib logging exclusively. Library authors must not impose loguru on their consumers; expose a stdlib logger and let the app decide.
  • Existing apps with stdlib already wired — bridge stdlib into loguru with an InterceptHandler (see recipe below) so all log records flow through loguru’s handlers without rewriting every logging.getLogger(__name__) call.

The single global logger#

Loguru exposes one logger imported once and used everywhere. No getLogger(__name__) ritual, no parent/child propagation rules. Every module imports the same object; configuration applied in main.py takes effect across the program. The trade-off is that you cannot route different modules’ logs to different sinks via the logger object itself — instead, configure handlers to filter by record fields (record["name"], record["module"], record["extra"]).

# app.py
from loguru import logger

# Configure at startup — affects every module
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add("logs/app.log", level="DEBUG", rotation="10 MB")

# Modules import the same logger
import myapp.api
import myapp.workers
# myapp/api.py
from loguru import logger

def handle(request):
    logger.info("Handling {method} {path}", method=request.method, path=request.path)
# myapp/workers.py
from loguru import logger

def process(job):
    logger.debug("Processing job {id}", id=job.id)

A single logger.remove() then logger.add(...) block in main.py reconfigures the entire app. To route worker logs separately, filter inside the handler:

logger.add(
    "logs/workers.log",
    filter=lambda record: record["module"].startswith("myapp.workers"),
)

add and remove — handler lifecycle#

logger.add(sink, **opts) registers a sink (stderr, a file path, a callable, a stream) and returns an integer id. logger.remove(id) removes that specific sink; logger.remove() (no args) removes all sinks including the default. The default stderr sink has id 0 — calling add(sys.stderr, ...) without first removing it produces duplicate output.

import sys
from loguru import logger

# Step 1: remove the default stderr sink (id 0)
logger.remove()

# Step 2: add custom sinks; each returns an id
stderr_id = logger.add(sys.stderr, level="INFO", format="<green>{time:HH:mm:ss}</green> | {message}")
file_id = logger.add("logs/app.log", level="DEBUG", rotation="10 MB")
debug_id = logger.add("logs/debug.log", level="TRACE")

# Step 3: remove a specific sink later
logger.remove(debug_id)

# Step 4: full reset
logger.remove()

Sink types#

SinkExampleBehaviour
File path (str/Path)"logs/app.log"Write to file; supports rotation/retention/compression
Streamsys.stderrWrite to file-like object
Callablelambda msg: send_to_slack(msg)Invoked for each formatted record
Coroutineasync def consume(msg): ...Awaited per record (loguru handles the event loop)
logging.Handlera stdlib Handler instanceBridge: loguru records become stdlib LogRecords
# Send errors to Slack
def slack_sink(message):
    record = message.record
    if record["level"].name == "ERROR":
        post_to_slack(message)

logger.add(slack_sink, level="ERROR")

# Async sink for an HTTP API
import httpx

async def http_sink(message):
    async with httpx.AsyncClient() as c:
        await c.post("https://logs.example.com/", content=str(message))

logger.add(http_sink, level="WARNING")

Rotation, retention, and compression#

Loguru’s file sink handles log rotation, old-file cleanup, and compression in one line each. Each argument accepts multiple forms — sizes, durations, schedules, or callables. This is the single biggest ergonomic win over stdlib’s RotatingFileHandler/TimedRotatingFileHandler.

Rotation — when to start a new file#

FormExampleMeaning
Sizerotation="10 MB"Rotate when current file exceeds 10 MB
Timerotation="00:00"Rotate at midnight
Periodrotation="1 week"Rotate every week
Callablerotation=lambda msg, file: file.tell() > 1e7Custom predicate
logger.add("app.log", rotation="100 MB")
logger.add("app.log", rotation="1 day")
logger.add("app.log", rotation="monday at 12:00")
logger.add("app.log", rotation="00:00")          # nightly

Retention — when to delete old files#

FormExampleMeaning
Countretention=10Keep newest 10 rotated files
Durationretention="7 days"Delete files older than 7 days
Callableretention=lambda files: [f for f in files if old(f)]Custom cleanup
logger.add("app.log", rotation="10 MB", retention=5)
logger.add("app.log", rotation="1 day", retention="30 days")

Compression — how to store rotated files#

FormExampleMeaning
Extensioncompression="zip"gzip, bz2, xz, lzma, tar, tar.gz, tar.bz2, tar.xz, zip
Callablecompression=lambda f: subprocess.run(...)Custom
logger.add("app.log", rotation="100 MB", retention="30 days", compression="gz")

Full production-grade config:

logger.add(
    "logs/app.{time:YYYY-MM-DD}.log",
    level="INFO",
    rotation="00:00",         # roll daily at midnight
    retention="30 days",      # keep a month
    compression="gz",         # gzip rotated files
    enqueue=True,             # safe for multiprocessing
    backtrace=True,           # include extended traceback context
    diagnose=False,           # do NOT include variable values in tracebacks for prod
    serialize=False,          # human-readable; switch to True for log aggregators
)

Format strings and colours#

Loguru formats use {field} placeholders against the record dict, plus optional <color> and <level> markup that’s stripped automatically for non-TTY sinks. Available colours: <red>, <green>, <yellow>, <blue>, <magenta>, <cyan>, <white>, plus <dim>, <bold>, <italic>, <underline>. <level> is a special tag that picks up the current record’s level colour.

logger.add(
    sys.stderr,
    format=(
        "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> "
        "| <level>{level: <8}</level> "
        "| <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> "
        "- <level>{message}</level>"
    ),
    colorize=True,
)

Record fields#

FieldMeaning
{time}datetime of the record; supports {time:YYYY-MM-DD HH:mm:ss}
{level}Level name ({level: <8} pads to 8 chars)
{name}Module name (e.g. myapp.api)
{function}Calling function name
{file}File path / name
{line}Line number
{module}Module short name
{message}The formatted message
{extra}Dict of bound extra fields
{exception}Formatted traceback (if any)
{process} / {thread}Process and thread identifiers
{elapsed}Time since program start

Programmatic vs string formatting#

logger.info("user={user}", user=u) is lazy: loguru only interpolates if the record is actually emitted (i.e. the level isn’t filtered out). logger.info(f"user={u}") interpolates eagerly — a meaningful difference in hot loops with disabled debug logging.

# Lazy — preferred
logger.debug("payload={payload}", payload=request.body)

# Eager — runs str(request.body) even if debug is off
logger.debug(f"payload={request.body}")

bind and contextualize — structured logging#

logger.bind(**kwargs) returns a new logger instance with extra fields attached to every record it emits. logger.contextualize(**kwargs) is a context manager that attaches fields temporarily — fields disappear when the block exits. Use bind for long-lived loggers (per-request, per-worker); use contextualize for transient contexts (one operation, one block).

# bind — returns a new logger object
def handle_request(request):
    log = logger.bind(request_id=request.id, user_id=request.user_id)
    log.info("Received")
    log.debug("Processing")
    log.info("Done")

# contextualize — temporary, scope-limited
def transfer_funds(src, dst, amount):
    with logger.contextualize(operation="transfer", amount=amount):
        logger.info("Debiting {src}", src=src)
        logger.info("Crediting {dst}", dst=dst)
    # Outside the block — operation and amount no longer attached
    logger.info("Transfer complete")

contextualize is the right tool for request-scoped fields in async frameworks because it interacts correctly with contextvars — the context propagates across await boundaries.

# FastAPI: attach a request_id to every log in this request's call chain
from uuid import uuid4
from fastapi import Request

async def request_id_middleware(request: Request, call_next):
    rid = request.headers.get("X-Request-Id") or str(uuid4())
    with logger.contextualize(request_id=rid):
        return await call_next(request)

JSON / structured output#

Pass serialize=True to a sink and every record becomes a single JSON object. This is the format log aggregators (Datadog, Loki, Splunk, CloudWatch, Elastic) consume natively. Fields bound via bind/contextualize end up under record.extra in the output.

import sys
from loguru import logger

logger.remove()
logger.add(sys.stdout, serialize=True, level="INFO")

with logger.contextualize(request_id="req-abc123", user_id=42):
    logger.info("User action", action="login")

Output:

{"text": "...", "record": {"elapsed": {...}, "exception": null, "extra": {"request_id": "req-abc123", "user_id": 42, "action": "login"}, "file": {...}, "function": "<module>", "level": {"icon": "ℹ️", "name": "INFO", "no": 20}, "line": 10, "message": "User action", "module": "app", "name": "__main__", "process": {...}, "thread": {...}, "time": {...}}}

For a leaner format, write a custom serializer:

import json

def lean_serializer(record):
    return json.dumps({
        "ts": record["time"].isoformat(),
        "level": record["level"].name,
        "msg": record["message"],
        **record["extra"],
    })

def sink(message):
    record = message.record
    print(lean_serializer(record))

logger.add(sink, level="INFO")

Output:

{"ts": "2026-05-25T14:30:01.234567+00:00", "level": "INFO", "msg": "User action", "request_id": "req-abc123", "user_id": 42, "action": "login"}

Exception catching#

Three escape hatches for capturing exceptions:

@logger.catch — decorator#

Wraps a function so any unhandled exception is logged with a full traceback and (optionally) re-raised or swallowed.

@logger.catch                                # logs + swallows
def background_job(payload):
    process(payload)

@logger.catch(reraise=True)                  # logs + re-raises
def critical_path():
    do_something_risky()

@logger.catch(exclude=(KeyboardInterrupt,))  # don't catch certain types
def long_running():
    while True:
        tick()

@logger.catch(message="Job {extra[job_id]} failed", level="ERROR")
def parametrised_job(job_id):
    logger = loguru_logger.bind(job_id=job_id)
    risky_work()

logger.opt — fine-grained control#

logger.opt() returns a logger configured for the next call only. Use exception=True to add the current exception’s traceback, lazy=True to defer formatting, depth=N to walk up the call stack for {name}/{function}/{line} fields.

try:
    risky()
except ValueError as e:
    logger.opt(exception=True).error("risky failed: {msg}", msg=e)
    # equivalent to logger.exception() in stdlib

# Inside a logging helper, point caller fields one frame up
def my_log(msg):
    logger.opt(depth=1).info(msg)

Backtrace and diagnose#

The file sink accepts backtrace=True (show frames above the exception) and diagnose=True (show variable values at each frame). Both default to False for performance; turn on in dev, leave off in production (diagnose=True may leak secrets into logs).

logger.add("dev.log", backtrace=True, diagnose=True)        # rich, dev only
logger.add("prod.log", backtrace=True, diagnose=False)      # safe for prod

Async safety and multiprocessing#

Loguru is thread-safe by default — internal locks serialise writes. For multiprocessing or async workloads, pass enqueue=True to the sink. This routes records through a multiprocessing.Queue, which a background thread drains. The cost is a small queue overhead; the benefit is correctness when multiple processes (or async tasks logging from different event loops) write to the same file.

# Multiprocessing-safe file sink
logger.add("app.log", rotation="10 MB", enqueue=True)

# In each child process:
def worker(name):
    logger.bind(worker=name).info("Working")

with multiprocessing.Pool(4) as pool:
    pool.map(worker, ["a", "b", "c", "d"])

[!WARNING] enqueue=True survives the parent process forking but each child must import loguru after the fork. On Windows (which uses spawn rather than fork), loguru reconfigures itself per process — but the file lock is shared via the queue.

Bridging stdlib logging into loguru#

Most Python libraries log via stdlib. To route those records through loguru’s handlers (so they get the same rotation, JSON output, colours, etc.), add an InterceptHandler to the stdlib root logger.

import logging
from loguru import logger

class InterceptHandler(logging.Handler):
    def emit(self, record: logging.LogRecord) -> None:
        # Map stdlib level number to loguru level name
        try:
            level = logger.level(record.levelname).name
        except ValueError:
            level = record.levelno

        # Walk up the stack to find the caller (skip logging internals)
        frame, depth = logging.currentframe(), 2
        while frame and frame.f_code.co_filename == logging.__file__:
            frame = frame.f_back
            depth += 1

        logger.opt(depth=depth, exception=record.exc_info).log(
            level, record.getMessage()
        )

# Install once at app startup
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)

Every logging.getLogger("uvicorn").info(...) and requests.session.send(...) debug log now flows through loguru’s sinks — same format, same rotation, same JSON output.

Framework integration#

FastAPI#

FastAPI uses stdlib logging. Bridge to loguru via InterceptHandler and use contextualize middleware to attach request IDs.

from fastapi import FastAPI, Request
from loguru import logger
from uuid import uuid4

app = FastAPI()

@app.middleware("http")
async def logging_middleware(request: Request, call_next):
    request_id = request.headers.get("X-Request-Id") or str(uuid4())
    with logger.contextualize(request_id=request_id, path=request.url.path):
        logger.info("Request started")
        response = await call_next(request)
        logger.info("Request done", status=response.status_code)
        response.headers["X-Request-Id"] = request_id
        return response

Add the InterceptHandler from the previous section to capture uvicorn.access and uvicorn.error logs through loguru.

Django#

Add the InterceptHandler in settings.py:

# settings.py
LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "handlers": {
        "loguru": {
            "class": "myapp.logging.InterceptHandler",
        },
    },
    "root": {"handlers": ["loguru"], "level": "INFO"},
}

Then in apps.py or wsgi.py:

from loguru import logger
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add("logs/django.log", rotation="50 MB", retention="30 days", serialize=True)

Celery#

Celery emits via stdlib. Use the same InterceptHandler pattern and add per-task context:

from celery.signals import task_prerun, task_postrun
from loguru import logger

@task_prerun.connect
def task_start(sender, task_id, task, **kwargs):
    logger.bind(task_id=task_id, task_name=task.name).info("Task starting")

@task_postrun.connect
def task_end(sender, task_id, task, state, **kwargs):
    logger.bind(task_id=task_id, state=state).info("Task done")

Log levels#

LevelMethodDefault valueUse for
TRACElogger.trace()5Extreme verbosity, function-entry traces
DEBUGlogger.debug()10Dev troubleshooting
INFOlogger.info()20Normal operational events
SUCCESSlogger.success()25Successful completion of significant operations
WARNINGlogger.warning()30Recoverable issues
ERRORlogger.error()40Errors a user might see
CRITICALlogger.critical()50Service down, data loss

Custom levels#

logger.level("AUDIT", no=33, color="<yellow>", icon="*")
logger.log("AUDIT", "User {u} accessed admin panel", u="alice")

Common pitfalls#

  1. Duplicate output from the default sink — loguru ships with stderr sink id 0. Adding your own stderr without logger.remove() produces double-printed lines. Always remove first.
  2. diagnose=True leaks secrets — variable values are printed at every frame in tracebacks. Fine in dev, dangerous in production logs. Default it to False in prod sinks.
  3. enqueue=True on every sink — turns every log call into a cross-thread message. Use only for shared files in multiprocess workloads; single-process file sinks don’t need it.
  4. Eager f-string formatting in hot pathslogger.debug(f"x={value}") runs str(value) even when debug is off. Use logger.debug("x={x}", x=value) for lazy formatting.
  5. Library code using loguru — libraries must not pin a logging library on their consumers. Use stdlib logging in libraries; let the app bridge to loguru if it wants.
  6. Long-lived bind chainslogger.bind(req=...).bind(user=...).bind(op=...) works but each .bind() creates a new logger object. Combine: logger.bind(req=..., user=..., op=...).
  7. Mixing extra= and bindextra= is the old stdlib argument; loguru’s idiomatic version is bind(). They both work but mixing them in one app makes the format string brittle.
  8. rotation="0 sec" doesn’t disable rotation — it rotates every record. To disable, omit the argument entirely.
  9. serialize=True outputs a giant JSON — the default serializer includes every record field. Define a custom sink (see structured-logging section) for a lean payload.
  10. logger.catch swallows by default — useful for fire-and-forget but dangerous for code that should fail loudly. Pass reraise=True for critical paths.
  11. Caplog (pytest) doesn’t see loguru records — pytest’s caplog reads stdlib logging. To assert on loguru output, add a list sink in a fixture (see recipe below).
  12. Async sinks block on import — declaring an async sink before the event loop exists raises. Add the sink inside asyncio.run() or after loop = asyncio.new_event_loop().

Real-world recipes#

Bootstrap loguru in a new project#

A single configuration block at startup covers 90% of production needs: colourised stderr for humans, rotated JSON files for log aggregators.

# myapp/logging_setup.py
import sys
from loguru import logger

def setup_logging(level: str = "INFO", json_logs: bool = False):
    logger.remove()

    # Human-readable stderr
    logger.add(
        sys.stderr,
        level=level,
        format=(
            "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
            "<level>{level: <8}</level> | "
            "<cyan>{name}:{line}</cyan> - <level>{message}</level>"
        ),
        colorize=True,
        backtrace=True,
        diagnose=False,
    )

    # Rotating file (JSON for log aggregators when json_logs=True)
    logger.add(
        "logs/app.{time:YYYY-MM-DD}.log",
        level="DEBUG",
        rotation="00:00",
        retention="30 days",
        compression="gz",
        enqueue=True,
        backtrace=True,
        diagnose=False,
        serialize=json_logs,
    )
# main.py
from myapp.logging_setup import setup_logging
from loguru import logger

setup_logging(level="INFO", json_logs=False)
logger.info("Service starting on port {port}", port=8080)

Output:

2026-05-25 14:30:01 | INFO     | __main__:5 - Service starting on port 8080

Bridge stdlib loggers into loguru#

The InterceptHandler recipe applied at startup so library logs (uvicorn, requests, sqlalchemy) flow through loguru’s pipeline. See the bridging section above for the handler class.

import logging
from loguru import logger
from myapp.logging_setup import InterceptHandler

# Replace stdlib handlers
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)

# Silence noisy libraries
for noisy in ["urllib3", "asyncio", "watchdog"]:
    logging.getLogger(noisy).setLevel(logging.WARNING)

Per-request context in async code#

Use contextualize so the request id propagates across await boundaries via contextvars. Every log inside the request handler — including ones emitted by deep helpers — carries the id.

from fastapi import FastAPI, Request
from loguru import logger
from uuid import uuid4

app = FastAPI()

@app.middleware("http")
async def attach_id(request: Request, call_next):
    rid = request.headers.get("X-Request-Id") or str(uuid4())
    with logger.contextualize(request_id=rid, path=str(request.url.path)):
        logger.info("Request begin")
        response = await call_next(request)
        logger.bind(status=response.status_code).info("Request end")
        return response

Custom JSON for a log aggregator#

Most log aggregators want a flat JSON record. Replace loguru’s default verbose serializer with a slim one.

import json
import sys
from loguru import logger

def serialize(record):
    return json.dumps({
        "ts": record["time"].isoformat(),
        "level": record["level"].name,
        "logger": record["name"],
        "msg": record["message"],
        "file": f"{record['file'].name}:{record['line']}",
        **record["extra"],
        **({"exc": str(record["exception"])} if record["exception"] else {}),
    })

def sink(message):
    print(serialize(message.record), file=sys.stdout, flush=True)

logger.remove()
logger.add(sink, level="INFO")
logger.bind(service="api", env="prod").info("Hello {who}", who="world")

Output:

{"ts": "2026-05-25T14:30:01.234567+00:00", "level": "INFO", "logger": "__main__", "msg": "Hello world", "file": "app.py:12", "service": "api", "env": "prod"}

Test loguru output in pytest#

caplog reads stdlib records and won’t see loguru. Capture loguru records via a list sink fixture.

# conftest.py
import pytest
from loguru import logger

@pytest.fixture
def caplog_loguru():
    records = []
    sink_id = logger.add(records.append, level="DEBUG", format="{message}")
    yield records
    logger.remove(sink_id)
# test_logging.py
def test_logs_warning(caplog_loguru):
    from myapp.api import warning_path
    warning_path()
    assert any("retry" in str(r) for r in caplog_loguru)

Per-module log levels#

Filter records by record["name"] to route different modules to different levels.

logger.add(sys.stderr, level="INFO")
logger.add(
    "logs/api.log",
    level="DEBUG",
    filter=lambda r: r["name"].startswith("myapp.api"),
)
logger.add(
    "logs/workers.log",
    level="INFO",
    filter=lambda r: r["name"].startswith("myapp.workers"),
)

Disable logging in tests#

Tests should not pollute output with WARNING/ERROR from negative-path tests. Disable loguru globally in test setup or use a quiet sink.

# conftest.py
import pytest
from loguru import logger

@pytest.fixture(autouse=True)
def silence_loguru():
    logger.remove()
    yield
    # restore for any downstream tests that want it
    logger.add(sys.stderr, level="WARNING")

Catch exceptions in a background task#

Background tasks must not crash silently. Wrap entry points with @logger.catch to log the traceback and (optionally) re-raise.

from loguru import logger
import asyncio

@logger.catch(reraise=False)
async def background_worker(queue: asyncio.Queue):
    while True:
        job = await queue.get()
        await process(job)

asyncio.create_task(background_worker(jobs))

If process raises, the traceback is logged at ERROR level with full context, and the worker continues. Use reraise=True to crash hard instead.

Rotate logs hourly with gzip compression and 7-day retention#

A common production pattern. Hourly rotation keeps individual files small enough to grep; gzip cuts disk by ~10x; 7-day retention is enough for triage without filling the disk.

logger.add(
    "logs/api.{time:YYYY-MM-DD-HH}.log",
    rotation="1 hour",
    retention="7 days",
    compression="gz",
    level="INFO",
    enqueue=True,
    serialize=True,
)

After a week the directory looks like:

logs/
├── api.2026-05-25-14.log         # current hour, plaintext
├── api.2026-05-25-13.log.gz
├── api.2026-05-25-12.log.gz
├── ...
└── api.2026-05-18-15.log.gz       # oldest still retained

Send errors to Slack while logging everything to disk#

A callable sink filtered by level posts to Slack; a separate file sink keeps the full DEBUG record.

import httpx
from loguru import logger

SLACK_WEBHOOK = "https://hooks.slack.com/services/T000/B000/XXX"

def slack_sink(message):
    if message.record["level"].no >= logger.level("ERROR").no:
        httpx.post(SLACK_WEBHOOK, json={"text": str(message)})

logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add("logs/app.log", level="DEBUG", rotation="100 MB", retention="14 days")
logger.add(slack_sink, level="ERROR")

One-shot debug logging in production#

When troubleshooting a specific issue in prod, drop a temporary verbose sink. Use a context manager so it auto-removes.

from contextlib import contextmanager
from loguru import logger

@contextmanager
def verbose_logging(path: str = "logs/debug-trace.log"):
    sink_id = logger.add(path, level="TRACE", backtrace=True, diagnose=True)
    try:
        yield
    finally:
        logger.remove(sink_id)

# In a triage script — re-route everything to a forensic file
with verbose_logging():
    run_problem_workflow()

See also#

  • sections/python/logging — stdlib logging, the alternative; bridged from above.
  • sections/python/pytest — testing tip; loguru records via a list sink fixture instead of caplog.
  • sections/python/fastapi — context-binding middleware pattern.
  • sections/python/rich — alternative for terminal output (tables, prompts, progress) without the logging semantics.
  • sections/python/pdb — pair with loguru’s @logger.catch(reraise=True) for post-mortem debugging on crashes.