Skip to content

logging

logging

Logging configuration for the flow system.

FlowFormatter

FlowFormatter(include_process_thread: bool = False)

Bases: Formatter

Custom formatter for flow system logs.

Format example: 2024-01-24 15:30:45.123 [INFO][FlowName:abc123] Started execution - {"context": "additional data"} 2024-01-24 15:30:46.234 [ERROR][FlowName:abc123] Execution failed - {"error": "details", "traceback": "..."}

Source code in src/flow/core/logging.py
def __init__(self, include_process_thread: bool = False):
    super().__init__()
    self.include_process_thread = include_process_thread

format

format(record: LogRecord) -> str

Format the log record.

Source code in src/flow/core/logging.py
def format(self, record: logging.LogRecord) -> str:
    """Format the log record."""
    # Extract flow information if available
    flow_info = f"[{getattr(record, 'flow_name', 'System')}:{getattr(record, 'process_id', 'N/A')}]"

    # Format timestamp
    timestamp = datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

    # Format level
    level = record.levelname.ljust(8)

    # Process and thread info if requested
    proc_thread = ""
    if self.include_process_thread:
        proc_thread = f"[P:{record.process}|T:{record.thread}] "

    # Format the message
    msg = record.getMessage()

    # Format any extra contextual data
    extra = ""
    if hasattr(record, 'flow_context'):
        try:
            extra = f" - {json.dumps(record.flow_context, default=str)}"
        except Exception:
            extra = f" - {str(record.flow_context)}"

    # Combine all parts
    log_message = f"{timestamp} [{level}] {proc_thread}{flow_info} {msg}{extra}"

    # Add exception information if present
    if record.exc_info:
        exc_text = self.formatException(record.exc_info)
        log_message = f"{log_message}\nException:\n{exc_text}"

    return log_message

FlowLogger

FlowLogger(name: str)

Enhanced logger for flow system with context management.

Source code in src/flow/core/logging.py
def __init__(self, name: str):
    self.logger = logging.getLogger(name)
    self.context: Dict[str, Any] = {}
    self._context_lock = threading.Lock()

flow_context

flow_context(**kwargs)

Context manager for adding flow-specific context to logs.

Source code in src/flow/core/logging.py
@contextlib.contextmanager
def flow_context(self, **kwargs):
    """Context manager for adding flow-specific context to logs."""
    with self._context_lock:
        old_context = self.context.copy()
        self.context.update(kwargs)
        try:
            yield
        finally:
            self.context = old_context

setup_logging

setup_logging(
    log_file: Optional[str] = None,
    level: int = INFO,
    max_bytes: int = 10485760,
    backup_count: int = 5,
    include_process_thread: bool = False,
) -> None

Set up logging configuration for the flow system.

Parameters:

Name Type Description Default
log_file Optional[str]

Optional path to log file. If None, logs to console only

None
level int

Minimum logging level

INFO
max_bytes int

Maximum size of each log file

10485760
backup_count int

Number of backup log files to keep

5
include_process_thread bool

Whether to include process and thread IDs in logs

False
Source code in src/flow/core/logging.py
def setup_logging(
    log_file: Optional[str] = None,
    level: int = LoggingLevel.INFO,
    max_bytes: int = 10_485_760,  # 10MB
    backup_count: int = 5,
    include_process_thread: bool = False
) -> None:
    """Set up logging configuration for the flow system.

    Args:
        log_file: Optional path to log file. If None, logs to console only
        level: Minimum logging level
        max_bytes: Maximum size of each log file
        backup_count: Number of backup log files to keep
        include_process_thread: Whether to include process and thread IDs in logs
    """
    root_logger = logging.getLogger('flow')
    root_logger.setLevel(level)

    # Create formatter
    formatter = FlowFormatter(include_process_thread=include_process_thread)

    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    root_logger.addHandler(console_handler)

    # File handler if requested
    if log_file:
        log_path = Path(log_file)
        log_path.parent.mkdir(parents=True, exist_ok=True)

        file_handler = logging.handlers.RotatingFileHandler(
            log_file,
            maxBytes=max_bytes,
            backupCount=backup_count
        )
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)