etl_lib.core.ParallelBatchProcessor module

class ParallelBatchProcessor(context, worker_factory, task=None, predecessor=None, max_workers=4, prefetch=4)[source]

Bases: BatchProcessor

BatchProcessor that runs worker threads over partitions of batches.

Receives a special BatchResult (ParallelBatchResult) from the predecessor. All chunks in a ParallelBatchResult it receives can be processed in parallel. See etl_lib.core.SplittingBatchProcessor on how to produce them. Prefetches the next ParallelBatchResults from its predecessor. The actual processing of the batches is deferred to the configured worker.

Note

  • The predecessor must emit ParallelBatchResult instances.

Parameters:
  • context – ETL context.

  • worker_factory (Callable[[], BatchProcessor]) – A zero-arg callable that returns a new BatchProcessor each time it’s called. This processor is responsible for the processing pf the batches.

  • task – optional Task for reporting.

  • predecessor – upstream BatchProcessor that must emit ParallelBatchResult.

  • max_workers (int) – number of parallel threads for partitions.

  • prefetch (int) – number of ParallelBatchResults to prefetch from the predecessor.

Behavior:
  • For every ParallelBatchResult, spins up max_workers threads.

  • Each thread calls its own worker from worker_factory(), with its partition wrapped by SingleBatchWrapper.

  • Collects and merges their BatchResults in a fail-fast manner: on first exception, logs the error, cancels remaining threads, and raises an exception.

class SingleBatchWrapper(context, batch)[source]

Bases: BatchProcessor

Simple BatchProcessor that returns the batch it receives via init. Will be used as predecessor for the worker

Parameters:

batch (List[Any])

__init__(context, batch)[source]

Constructs a new etl_lib.core.BatchProcessor instance.

Parameters:
  • contextetl_lib.core.ETLContext.ETLContext instance. It Will be available to subclasses.

  • tasketl_lib.core.Task.Task this processor is part of. Needed for status reporting only.

  • predecessor – Source of batches for this processor. Can be None if no predecessor is needed (such as when this processor is the start of the queue).

  • batch (List[Any])

get_batch(max_batch__size)[source]

Provides a batch of data to the caller.

The batch itself could be called and processed from the provided predecessor or generated from other sources.

Parameters:

max_batch__size (int) – The max size of the batch the caller expects to receive.

Return type:

Generator[BatchResults, None, None]

Returns

A generator that yields batches.

__init__(context, worker_factory, task=None, predecessor=None, max_workers=4, prefetch=4)[source]

Constructs a new etl_lib.core.BatchProcessor instance.

Parameters:
get_batch(max_batch_size)[source]

Pulls ParallelBatchResult batches from the predecessor, prefetching up to prefetch ahead, processes each batch’s partitions in parallel threads, and yields a flattened BatchResults. The predecessor can run ahead while the current batch is processed.

Return type:

Generator[BatchResults, None, None]

Parameters:

max_batch_size (int)

class ParallelBatchResult(chunk, statistics=<factory>, batch_size=9223372036854775807)[source]

Bases: BatchResults

Represents a batch split into parallelizable partitions.

chunk is a list of lists, each sub-list is a partition.

Parameters: