etl_lib.core.SplittingBatchProcessor module
- class SplittingBatchProcessor(context, table_size, id_extractor, task=None, predecessor=None)[source]
Bases:
BatchProcessorBatchProcessor that splits incoming BatchResults into non-overlapping partitions based on row/col indices extracted by the id_extractor, and emits full or remaining batches using the mix-and-batch algorithm from https://neo4j.com/blog/developer/mix-and-batch-relationship-load/ Each emitted batch is a list of per-cell lists (array of arrays), so callers can process each partition (other name for a cell) in parallel.
A batch for a schedule group is emitted when all cells in that group have at least batch_size items. In addition, when a cell/partition reaches 3x the configured max_batch_size, the group is emitted to avoid overflowing the buffer when the distribution per cell is uneven. Leftovers are flushed after source exhaustion. Emitted batches never exceed the configured max_batch_size.
- __init__(context, table_size, id_extractor, task=None, predecessor=None)[source]
Constructs a new
etl_lib.core.BatchProcessorinstance.- Parameters:
context –
etl_lib.core.ETLContext.ETLContextinstance. It Will be available to subclasses.task –
etl_lib.core.Task.Taskthis processor is part of. Needed for status reporting only.predecessor – Source of batches for this processor. Can be None if no predecessor is needed (such as when this processor is the start of the queue).
table_size (int)
- get_batch(max_batch__size)[source]
- Return type:
- Parameters:
max_batch__size (int)
- Consume upstream batches, split data across cells, and emit diagonal partitions:
During streaming: emit a full partition when all its cells have >= max_batch__size.
Also during streaming: if any cell in a partition builds up beyond a ‘burst’ threshold (3 * of max_batch__size), emit that partition early, taking up to max_batch__size from each cell.
After source exhaustion: flush leftovers in waves capped at max_batch__size per cell.
- Statistics policy:
Every emission except the last carries {}.
The last emission carries the accumulated upstream stats (unfiltered).
- dict_id_extractor(table_size=10, start_key='start', end_key='end')[source]
Build an ID extractor for dict rows. The extractor reads two fields (configurable via start_key and end_key) and returns (row, col) based on the last decimal digit of each. Range validation remains the responsibility of the SplittingBatchProcessor.
- Parameters:
- Returns:
Maps {start_key, end_key} → (row, col).
- Return type: