Source code for etl_lib.core.InstrumentationWriter

import csv
import threading
from pathlib import Path
from typing import Any


[docs] class InstrumentationWriter: """ Base writer for instrumentation events. Implementations decide where and how events are persisted. """ enabled: bool = False
[docs] def write(self, event: dict[str, Any]) -> None: """ Persist one instrumentation event. Args: event: Event payload to persist. """ raise NotImplementedError
[docs] class NoopInstrumentationWriter(InstrumentationWriter): """ Disabled writer implementation. """ enabled = False
[docs] def write(self, event: dict[str, Any]) -> None: """ Ignore instrumentation events. Args: event: Event payload to persist. """ return
[docs] class CsvInstrumentationWriter(InstrumentationWriter): """ Writes instrumentation events to CSV. The writer is thread-safe and appends rows to the configured file. """ enabled = True
[docs] def __init__(self, path: Path, sample_every: int = 1): """ Creates a new CSV instrumentation writer. Args: path: Target CSV path. sample_every: Writes every Nth event. Values < 1 are treated as 1. """ self.path = path self.sample_every = max(1, int(sample_every)) self._lock = threading.Lock() self._event_counter = 0 self._header_written = self.path.exists() and self.path.stat().st_size > 0 self._field_names = [ "ts", "run_id", "event_type", "task_uuid", "task_name", "rows", "success", "error", "batch_size", "wave_size", "buckets", "max_workers", "prefetch", "dt_ms", "buffered_before", "bucket_min", "bucket_p50", "bucket_max", "table_size", "emitted_rows", "queue_depth", ]
[docs] def write(self, event: dict[str, Any]) -> None: """ Appends one event row to the CSV file. Args: event: Event payload to persist. """ with self._lock: self._event_counter += 1 if self._event_counter % self.sample_every != 0: return self.path.parent.mkdir(parents=True, exist_ok=True) with self.path.open("a", newline="", encoding="utf-8") as file: writer = csv.DictWriter(file, fieldnames=self._field_names, extrasaction="ignore") if not self._header_written: writer.writeheader() self._header_written = True writer.writerow(event)
[docs] def create_instrumentation_writer(env_vars: dict) -> InstrumentationWriter: """ Creates an instrumentation writer from environment values. Supported modes via `ETL_LIB_INSTRUMENT`: - `none` (default) - `csv` Args: env_vars: Environment dictionary. Returns: A configured instrumentation writer. """ mode = (env_vars.get("ETL_LIB_INSTRUMENT") or "none").lower() if mode == "csv": path = Path(env_vars.get("ETL_LIB_INSTRUMENT_CSV_PATH") or "etl_instrumentation.csv") sample_every = int(env_vars.get("ETL_LIB_INSTRUMENT_SAMPLE") or 1) return CsvInstrumentationWriter(path=path, sample_every=sample_every) return NoopInstrumentationWriter()