etl_lib.core.ParallelBatchProcessor module
- class ParallelBatchProcessor(context, worker_factory, task=None, predecessor=None, max_workers=4, prefetch=4)[source]
Bases:
BatchProcessorBatchProcessor that runs a worker over the bucket-batches of each ParallelBatchResult in parallel threads, while prefetching the next ParallelBatchResult from its predecessor.
Note
The predecessor must emit ParallelBatchResult instances (waves).
This processor collects the BatchResults from all workers for one wave and merges them into one BatchResults.
The returned BatchResults will not obey the max_batch_size from get_batch() because it represents the full wave.
- Parameters:
context – ETL context.
worker_factory (
Callable[[],BatchProcessor]) – A zero-arg callable that returns a new BatchProcessor each time it’s called.task – optional Task for reporting.
predecessor – upstream BatchProcessor that must emit ParallelBatchResult See
SplittingBatchProcessor.max_workers (
int) – number of parallel threads for bucket processing.prefetch (
int) – number of waves to prefetch.
- Behavior:
For every wave, spins up max_workers threads.
Each thread processes one bucket-batch using a fresh worker from worker_factory().
Collects and merges worker results in a fail-fast manner.
- class SingleBatchWrapper(context, batch)[source]
Bases:
BatchProcessorSimple BatchProcessor that returns exactly one batch (the bucket-batch passed in via init). Used as predecessor for the per-bucket worker.
- __init__(context, batch)[source]
Constructs a new
etl_lib.core.BatchProcessorinstance.- Parameters:
context –
etl_lib.core.ETLContext.ETLContextinstance. It Will be available to subclasses.task –
etl_lib.core.Task.Taskthis processor is part of. Needed for status reporting only.predecessor – Source of batches for this processor. Can be None if no predecessor is needed (such as when this processor is the start of the queue).
- get_batch(max_size)[source]
Provides a batch of data to the caller.
The batch itself could be called and processed from the provided predecessor or generated from other sources.
- Parameters:
max_batch__size – The max size of the batch the caller expects to receive.
max_size (int)
- Return type:
- Returns
A generator that yields batches.
- __init__(context, worker_factory, task=None, predecessor=None, max_workers=4, prefetch=4)[source]
Constructs a new
etl_lib.core.BatchProcessorinstance.- Parameters:
context –
etl_lib.core.ETLContext.ETLContextinstance. It Will be available to subclasses.task –
etl_lib.core.Task.Taskthis processor is part of. Needed for status reporting only.predecessor – Source of batches for this processor. Can be None if no predecessor is needed (such as when this processor is the start of the queue).
worker_factory (Callable[[], BatchProcessor])
max_workers (int)
prefetch (int)
- class ParallelBatchResult(chunk, statistics=<factory>, batch_size=9223372036854775807)[source]
Bases:
BatchResultsRepresents one wave produced by the splitter.
chunk is a list of bucket-batches. Each sub-list is processed by one worker instance.