Source code for etl_lib.data_sink.CSVBatchSink

import csv
from pathlib import Path
from typing import Generator

from etl_lib.core.ETLContext import ETLContext
from etl_lib.core.BatchProcessor import BatchProcessor, BatchResults, append_result
from etl_lib.core.Task import Task


[docs] class CSVBatchSink(BatchProcessor): """ BatchProcessor to write batches of data to a CSV file. """
[docs] def __init__(self, context: ETLContext, task: Task, predecessor: BatchProcessor, file_path: Path, **kwargs): """ Constructs a new CSVBatchSink. Args: context: :class:`etl_lib.core.ETLContext.ETLContext` instance. task: :class:`etl_lib.core.Task.Task` instance owning this batchProcessor. predecessor: BatchProcessor which :func:`~get_batch` function will be called to receive batches to process. file_path: Path to the CSV file where data will be written. If the file exists, data will be appended. **kwargs: Additional arguments passed to `csv.DictWriter` to allow tuning the csv creation. """ super().__init__(context, task, predecessor) self.file_path = file_path self.file_initialized = False self.csv_kwargs = kwargs
[docs] def get_batch(self, batch_size: int) -> Generator[BatchResults, None, None]: assert self.predecessor is not None for batch_result in self.predecessor.get_batch(batch_size): self._write_to_csv(batch_result.chunk) yield append_result(batch_result, {"rows_written": len(batch_result.chunk)})
def _write_to_csv(self, data: list[dict]): """ Writes a batch of data to the CSV file. Args: data: A list of dictionaries representing rows of data. """ if not data: return fieldnames = data[0].keys() write_header = not self.file_initialized or not self.file_path.exists() with self.file_path.open(mode="a", newline="", encoding="utf-8") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames, **self.csv_kwargs) if write_header: writer.writeheader() writer.writerows(data) self.file_initialized = True