Source code for etl_lib.core.BatchProcessor

import abc
import logging
import sys
from dataclasses import dataclass, field
from typing import Generator, List, Any

from etl_lib.core.Task import Task
from etl_lib.core.utils import merge_summery


[docs] @dataclass class BatchResults: """ Return object of the :py:func:`~BatchProcessor.get_batch` method, wrapping a batched data together with meta information. """ chunk: List[Any] """The batch of data.""" statistics: dict = field(default_factory=dict) """`dict` of statistic information, such as row processed, nodes writen, ..""" batch_size: int = field(default=sys.maxsize) """size of the batch."""
[docs] def append_result(org: BatchResults, stats: dict) -> BatchResults: """ Appends the stats dict to the provided `org`. Args: org: The original `BatchResults` object. stats: dict containing statistics to be added to the org object. Returns: New `BatchResults` object, where the :py:attr:`~BatchResults.statistics` attribute is the merged result of the provided parameters. Values in the dicts with the same key are added. """ return BatchResults(chunk=org.chunk, statistics=merge_summery(org.statistics, stats), batch_size=org.batch_size)
[docs] class BatchProcessor(abc.ABC): """ Allows assembly of :py:class:`etl_lib.core.Task.Task` out of smaller building blocks. This way, functionally, such as reading from a CSV file, writing to a database or validation can be implemented and tested independently and re-used. BatchProcessors form, a linked list, where each processor only knows about its predecessor. BatchProcessors process data in batches. A batch of data is requested from the provided predecessors :py:func:`~get_batch` and returned in batches to the caller. Usage of `Generators` ensure that not all data must be loaded at once. """
[docs] def __init__(self, context, task: Task = None, predecessor=None): """ Constructs a new :py:class:`etl_lib.core.BatchProcessor` instance. Args: context: :py:class:`etl_lib.core.ETLContext.ETLContext` instance. It Will be available to subclasses. task: :py:class:`etl_lib.core.Task.Task` this processor is part of. Needed for status reporting only. predecessor: Source of batches for this processor. Can be `None` if no predecessor is needed (such as when this processor is the start of the queue). """ self.context = context """:py:class:`etl_lib.core.ETLContext.ETLContext` instance. Providing access to general facilities.""" self.predecessor = predecessor """Predecessor, used as a source of batches.""" self.logger = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}") self.task = task """The :py:class:`etl_lib.core.Task.Task` owning instance."""
[docs] @abc.abstractmethod def get_batch(self, max_batch__size: int) -> Generator[BatchResults, None, None]: """ Provides a batch of data to the caller. The batch itself could be called and processed from the provided predecessor or generated from other sources. Args: max_batch__size: The max size of the batch the caller expects to receive. Returns A generator that yields batches. """ pass