etl_lib.core.SplittingBatchProcessor module

class SplittingBatchProcessor(context, table_size, id_extractor, task=None, predecessor=None)[source]

Bases: BatchProcessor

BatchProcessor that splits incoming BatchResults into non-overlapping partitions based on row/col indices extracted by the id_extractor, and emits full or remaining batches using the mix-and-batch algorithm from https://neo4j.com/blog/developer/mix-and-batch-relationship-load/ Each emitted batch is a list of per-cell lists (array of arrays), so callers can process each partition (other name for a cell) in parallel.

A batch for a schedule group is emitted when all cells in that group have at least batch_size items. In addition, when a cell/partition reaches 3x the configured max_batch_size, the group is emitted to avoid overflowing the buffer when the distribution per cell is uneven. Leftovers are flushed after source exhaustion. Emitted batches never exceed the configured max_batch_size.

Parameters:
__init__(context, table_size, id_extractor, task=None, predecessor=None)[source]

Constructs a new etl_lib.core.BatchProcessor instance.

Parameters:
get_batch(max_batch__size)[source]
Return type:

Generator[BatchResults, None, None]

Parameters:

max_batch__size (int)

Consume upstream batches, split data across cells, and emit diagonal partitions:
  • During streaming: emit a full partition when all its cells have >= max_batch__size.

  • Also during streaming: if any cell in a partition builds up beyond a ‘burst’ threshold (3 * of max_batch__size), emit that partition early, taking up to max_batch__size from each cell.

  • After source exhaustion: flush leftovers in waves capped at max_batch__size per cell.

Statistics policy:
  • Every emission except the last carries {}.

  • The last emission carries the accumulated upstream stats (unfiltered).

dict_id_extractor(table_size=10, start_key='start', end_key='end')[source]

Build an ID extractor for dict rows. The extractor reads two fields (configurable via start_key and end_key) and returns (row, col) based on the last decimal digit of each. Range validation remains the responsibility of the SplittingBatchProcessor.

Parameters:
  • table_size (int) – Informational hint carried on the extractor; used by callers to sanity-check.

  • start_key (str) – Field name for the start node identifier.

  • end_key (str) – Field name for the end node identifier.

Returns:

Maps {start_key, end_key} → (row, col).

Return type:

Callable[[Mapping[str, Any]], tuple[int, int]]

tuple_id_extractor(table_size=10)[source]

Create an ID extractor function for tuple items, using the last decimal digit of each element. The output is a (row, col) tuple within a table_size x table_size grid (default 10x10).

Parameters:

table_size (int) – The dimension of the grid (number of rows/cols). Defaults to 10.

Return type:

Callable[[Tuple[str | int, str | int]], Tuple[int, int]]

Returns:

A callable that maps a tuple (a, b) to a tuple (row, col) using the last digit of a and b.