etl_lib.core.SplittingBatchProcessor module
- class SplittingBatchProcessor(context, table_size, id_extractor, task=None, predecessor=None, near_full_ratio=0.85, burst_multiplier=25)[source]
Bases:
BatchProcessorStreaming wave scheduler for mix-and-batch style loading.
Incoming rows are assigned to buckets via an id_extractor(item) -> (row, col) inside a table_size x table_size grid. The processor emits waves; each wave contains bucket-batches that are safe to process concurrently under the configured non-overlap rule.
Non-overlap rules
Bi-partite (default): within a wave, no two buckets share a row index and no two buckets share a col index.
Mono-partite: within a wave, no node index is used more than once (row/col indices are the same domain). Enable by setting id_extractor.monopartite = True (as done by canonical_integer_id_extractor).
Emission strategy
During streaming: emit a wave when at least one bucket is full (>= max_batch_size). The wave is then filled with additional non-overlapping buckets that are near-full to keep parallelism high without producing tiny batches.
If a bucket backlog grows beyond a burst threshold, emit a burst wave to bound memory.
After source exhaustion: flush leftovers in capped waves (max_batch_size per bucket).
Statistics policy
Every emission except the last carries {}.
The last emission carries the accumulated upstream statistics (unfiltered).
- __init__(context, table_size, id_extractor, task=None, predecessor=None, near_full_ratio=0.85, burst_multiplier=25)[source]
Constructs a new
etl_lib.core.BatchProcessorinstance.- Parameters:
context –
etl_lib.core.ETLContext.ETLContextinstance. It Will be available to subclasses.task –
etl_lib.core.Task.Taskthis processor is part of. Needed for status reporting only.predecessor – Source of batches for this processor. Can be None if no predecessor is needed (such as when this processor is the start of the queue).
table_size (int)
near_full_ratio (float)
burst_multiplier (int)
- get_batch(max_batch_size)[source]
Consume upstream batches, bucket incoming rows, and emit waves of non-overlapping buckets.
- Return type:
- Parameters:
max_batch_size (int)
- Streaming behavior:
If at least one bucket is full (>= max_batch_size), emit a wave seeded with full buckets and extended with near-full buckets to keep parallelism high.
If a bucket exceeds a burst threshold, emit a burst wave (seeded by the hottest bucket) and extended with near-full buckets.
- End-of-stream behavior:
Flush leftovers in capped waves (max_batch_size per bucket).
- Statistics policy:
Every emission except the last carries {}.
The last emission carries the accumulated upstream stats (unfiltered).
- canonical_int_or_str_id_extractor(table_size=10, start_key='start', end_key='end')[source]
ID extractor for integer IDs and string IDs with canonical folding (row <= col).
- Return type:
- Parameters:
Purpose - This extractor is intended for SplittingBatchProcessor / ParallelBatchProcessor style fan-out where
you want a stable mapping from a row dict to a “bucket coordinate” (row, col) on a small grid.
The coordinate is used only for bucketing/scheduling; it is not the domain identifier used in Neo4j.
While blake2b is heavier than CRC32, CRC32 is much more likely to create collisions (as in wikipedia tage titles)
Collisions would impact import performance, as non-overlapping waves are not a guarantee anymore
Design choices - Strings are mapped to a 128-bit unsigned integer using blake2b with digest_size=16.
This is not “mathematically collision-free”, but for practical purposes the collision probability is negligible compared to CRC32.
Integers are treated as stable IDs and mixed as 64-bit values.
Canonical folding enforces row <= col so that (A,B) and (B,A) map to the same bucket. This is useful when the write set is effectively undirected (or when you want symmetric scheduling for pairs).
- canonical_integer_id_extractor(table_size=10, start_key='start', end_key='end')[source]
ID extractor for integer IDs with canonical folding.
Uses Knuth’s multiplicative hashing to scatter sequential integers across the grid.
Canonical folding ensures (A,B) and (B,A) land in the same bucket by folding the lower triangle into the upper triangle (row <= col).
The extractor marks itself as mono-partite by setting extractor.monopartite = True.
- dict_id_extractor(table_size=10, start_key='start', end_key='end')[source]
Build an ID extractor for dict rows. The extractor reads two fields (configurable via start_key and end_key) and returns (row, col) based on the last decimal digit of each.
- Parameters:
- Return type:
- Returns:
Callable that maps {start_key, end_key} → (row, col).
- tuple_id_extractor(table_size=10)[source]
Create an ID extractor function for tuple items, using the last decimal digit of each element. The output is a (row, col) tuple within a table_size x table_size grid (default 10x10).
- Parameters:
table_size (
int) – The dimension of the grid (number of rows/cols). Defaults to 10.- Return type:
- Returns:
A callable that maps a tuple (a, b) to a tuple (row, col) using the last digit of a and b.
Notes
This extractor does not validate the returned indices. Range validation is performed by
SplittingBatchProcessor.