etl_lib.core.ParallelBatchProcessor module
- class ParallelBatchProcessor(context, worker_factory, task=None, predecessor=None, max_workers=4, prefetch=4)[source]
Bases:
BatchProcessorBatchProcessor that runs worker threads over partitions of batches.
Receives a special BatchResult (
ParallelBatchResult) from the predecessor. All chunks in a ParallelBatchResult it receives can be processed in parallel. Seeetl_lib.core.SplittingBatchProcessoron how to produce them. Prefetches the next ParallelBatchResults from its predecessor. The actual processing of the batches is deferred to the configured worker.Note
The predecessor must emit ParallelBatchResult instances.
- Parameters:
context – ETL context.
worker_factory (
Callable[[],BatchProcessor]) – A zero-arg callable that returns a new BatchProcessor each time it’s called. This processor is responsible for the processing pf the batches.task – optional Task for reporting.
predecessor – upstream BatchProcessor that must emit ParallelBatchResult.
max_workers (
int) – number of parallel threads for partitions.prefetch (
int) – number of ParallelBatchResults to prefetch from the predecessor.
- Behavior:
For every ParallelBatchResult, spins up max_workers threads.
Each thread calls its own worker from worker_factory(), with its partition wrapped by SingleBatchWrapper.
Collects and merges their BatchResults in a fail-fast manner: on first exception, logs the error, cancels remaining threads, and raises an exception.
- class SingleBatchWrapper(context, batch)[source]
Bases:
BatchProcessorSimple BatchProcessor that returns the batch it receives via init. Will be used as predecessor for the worker
- __init__(context, batch)[source]
Constructs a new
etl_lib.core.BatchProcessorinstance.- Parameters:
context –
etl_lib.core.ETLContext.ETLContextinstance. It Will be available to subclasses.task –
etl_lib.core.Task.Taskthis processor is part of. Needed for status reporting only.predecessor – Source of batches for this processor. Can be None if no predecessor is needed (such as when this processor is the start of the queue).
- get_batch(max_batch__size)[source]
Provides a batch of data to the caller.
The batch itself could be called and processed from the provided predecessor or generated from other sources.
- Parameters:
max_batch__size (
int) – The max size of the batch the caller expects to receive.- Return type:
- Returns
A generator that yields batches.
- __init__(context, worker_factory, task=None, predecessor=None, max_workers=4, prefetch=4)[source]
Constructs a new
etl_lib.core.BatchProcessorinstance.- Parameters:
context –
etl_lib.core.ETLContext.ETLContextinstance. It Will be available to subclasses.task –
etl_lib.core.Task.Taskthis processor is part of. Needed for status reporting only.predecessor – Source of batches for this processor. Can be None if no predecessor is needed (such as when this processor is the start of the queue).
worker_factory (Callable[[], BatchProcessor])
max_workers (int)
prefetch (int)
- get_batch(max_batch_size)[source]
Pulls ParallelBatchResult batches from the predecessor, prefetching up to prefetch ahead, processes each batch’s partitions in parallel threads, and yields a flattened BatchResults. The predecessor can run ahead while the current batch is processed.
- Return type:
- Parameters:
max_batch_size (int)