etl_lib.core.SplittingBatchProcessor module

class SplittingBatchProcessor(context, table_size, id_extractor, task=None, predecessor=None, near_full_ratio=0.85, burst_multiplier=25)[source]

Bases: BatchProcessor

Streaming wave scheduler for mix-and-batch style loading.

Incoming rows are assigned to buckets via an id_extractor(item) -> (row, col) inside a table_size x table_size grid. The processor emits waves; each wave contains bucket-batches that are safe to process concurrently under the configured non-overlap rule.

Non-overlap rules

  • Bi-partite (default): within a wave, no two buckets share a row index and no two buckets share a col index.

  • Mono-partite: within a wave, no node index is used more than once (row/col indices are the same domain). Enable by setting id_extractor.monopartite = True (as done by canonical_integer_id_extractor).

Emission strategy

  • During streaming: emit a wave when at least one bucket is full (>= max_batch_size). The wave is then filled with additional non-overlapping buckets that are near-full to keep parallelism high without producing tiny batches.

  • If a bucket backlog grows beyond a burst threshold, emit a burst wave to bound memory.

  • After source exhaustion: flush leftovers in capped waves (max_batch_size per bucket).

Statistics policy

  • Every emission except the last carries {}.

  • The last emission carries the accumulated upstream statistics (unfiltered).

__init__(context, table_size, id_extractor, task=None, predecessor=None, near_full_ratio=0.85, burst_multiplier=25)[source]

Constructs a new etl_lib.core.BatchProcessor instance.

Parameters:
  • contextetl_lib.core.ETLContext.ETLContext instance. It Will be available to subclasses.

  • tasketl_lib.core.Task.Task this processor is part of. Needed for status reporting only.

  • predecessor – Source of batches for this processor. Can be None if no predecessor is needed (such as when this processor is the start of the queue).

  • table_size (int)

  • id_extractor (Callable[[Any], Tuple[int, int]])

  • near_full_ratio (float)

  • burst_multiplier (int)

get_batch(max_batch_size)[source]

Consume upstream batches, bucket incoming rows, and emit waves of non-overlapping buckets.

Return type:

Generator[BatchResults, None, None]

Parameters:

max_batch_size (int)

Streaming behavior:
  • If at least one bucket is full (>= max_batch_size), emit a wave seeded with full buckets and extended with near-full buckets to keep parallelism high.

  • If a bucket exceeds a burst threshold, emit a burst wave (seeded by the hottest bucket) and extended with near-full buckets.

End-of-stream behavior:
  • Flush leftovers in capped waves (max_batch_size per bucket).

Statistics policy:
  • Every emission except the last carries {}.

  • The last emission carries the accumulated upstream stats (unfiltered).

Parameters:
canonical_int_or_str_id_extractor(table_size=10, start_key='start', end_key='end')[source]

ID extractor for integer IDs and string IDs with canonical folding (row <= col).

Return type:

Callable[[Dict[str, Any]], Tuple[int, int]]

Parameters:
  • table_size (int)

  • start_key (str)

  • end_key (str)

Purpose - This extractor is intended for SplittingBatchProcessor / ParallelBatchProcessor style fan-out where

you want a stable mapping from a row dict to a “bucket coordinate” (row, col) on a small grid.

  • The coordinate is used only for bucketing/scheduling; it is not the domain identifier used in Neo4j.

  • While blake2b is heavier than CRC32, CRC32 is much more likely to create collisions (as in wikipedia tage titles)

  • Collisions would impact import performance, as non-overlapping waves are not a guarantee anymore

Design choices - Strings are mapped to a 128-bit unsigned integer using blake2b with digest_size=16.

This is not “mathematically collision-free”, but for practical purposes the collision probability is negligible compared to CRC32.

  • Integers are treated as stable IDs and mixed as 64-bit values.

  • Canonical folding enforces row <= col so that (A,B) and (B,A) map to the same bucket. This is useful when the write set is effectively undirected (or when you want symmetric scheduling for pairs).

canonical_integer_id_extractor(table_size=10, start_key='start', end_key='end')[source]

ID extractor for integer IDs with canonical folding.

  • Uses Knuth’s multiplicative hashing to scatter sequential integers across the grid.

  • Canonical folding ensures (A,B) and (B,A) land in the same bucket by folding the lower triangle into the upper triangle (row <= col).

The extractor marks itself as mono-partite by setting extractor.monopartite = True.

Return type:

Callable[[Dict[str, Any]], Tuple[int, int]]

Parameters:
  • table_size (int)

  • start_key (str)

  • end_key (str)

dict_id_extractor(table_size=10, start_key='start', end_key='end')[source]

Build an ID extractor for dict rows. The extractor reads two fields (configurable via start_key and end_key) and returns (row, col) based on the last decimal digit of each.

Parameters:
  • table_size (int) – Informational hint carried on the extractor.

  • start_key (str) – Field name for the start node identifier.

  • end_key (str) – Field name for the end node identifier.

Return type:

Callable[[Dict[str, Any]], Tuple[int, int]]

Returns:

Callable that maps {start_key, end_key} → (row, col).

tuple_id_extractor(table_size=10)[source]

Create an ID extractor function for tuple items, using the last decimal digit of each element. The output is a (row, col) tuple within a table_size x table_size grid (default 10x10).

Parameters:

table_size (int) – The dimension of the grid (number of rows/cols). Defaults to 10.

Return type:

Callable[[Tuple[str | int, str | int]], Tuple[int, int]]

Returns:

A callable that maps a tuple (a, b) to a tuple (row, col) using the last digit of a and b.

Notes

This extractor does not validate the returned indices. Range validation is performed by SplittingBatchProcessor.