etl_lib.core.ParallelBatchProcessor module

class ParallelBatchProcessor(context, worker_factory, task=None, predecessor=None, max_workers=4, prefetch=4)[source]

Bases: BatchProcessor

BatchProcessor that runs a worker over the bucket-batches of each ParallelBatchResult in parallel threads, while prefetching the next ParallelBatchResult from its predecessor.

Note

  • The predecessor must emit ParallelBatchResult instances (waves).

  • This processor collects the BatchResults from all workers for one wave and merges them into one BatchResults.

  • The returned BatchResults will not obey the max_batch_size from get_batch() because it represents the full wave.

Parameters:
  • context – ETL context.

  • worker_factory (Callable[[], BatchProcessor]) – A zero-arg callable that returns a new BatchProcessor each time it’s called.

  • task – optional Task for reporting.

  • predecessor – upstream BatchProcessor that must emit ParallelBatchResult See SplittingBatchProcessor.

  • max_workers (int) – number of parallel threads for bucket processing.

  • prefetch (int) – number of waves to prefetch.

Behavior:
  • For every wave, spins up max_workers threads.

  • Each thread processes one bucket-batch using a fresh worker from worker_factory().

  • Collects and merges worker results in a fail-fast manner.

class SingleBatchWrapper(context, batch)[source]

Bases: BatchProcessor

Simple BatchProcessor that returns exactly one batch (the bucket-batch passed in via init). Used as predecessor for the per-bucket worker.

Parameters:

batch (List[Any])

__init__(context, batch)[source]

Constructs a new etl_lib.core.BatchProcessor instance.

Parameters:
  • contextetl_lib.core.ETLContext.ETLContext instance. It Will be available to subclasses.

  • tasketl_lib.core.Task.Task this processor is part of. Needed for status reporting only.

  • predecessor – Source of batches for this processor. Can be None if no predecessor is needed (such as when this processor is the start of the queue).

  • batch (List[Any])

get_batch(max_size)[source]

Provides a batch of data to the caller.

The batch itself could be called and processed from the provided predecessor or generated from other sources.

Parameters:
  • max_batch__size – The max size of the batch the caller expects to receive.

  • max_size (int)

Return type:

Generator[BatchResults, None, None]

Returns

A generator that yields batches.

__init__(context, worker_factory, task=None, predecessor=None, max_workers=4, prefetch=4)[source]

Constructs a new etl_lib.core.BatchProcessor instance.

Parameters:
get_batch(max_batch_size)[source]

Pull waves from the predecessor (prefetching up to prefetch ahead), process each wave’s buckets in parallel, and yield one flattened BatchResults per wave.

Return type:

Generator[BatchResults, None, None]

Parameters:

max_batch_size (int)

class ParallelBatchResult(chunk, statistics=<factory>, batch_size=9223372036854775807)[source]

Bases: BatchResults

Represents one wave produced by the splitter.

chunk is a list of bucket-batches. Each sub-list is processed by one worker instance.

Parameters: