Data Sinks

Data sinks are implementations of BatchProcessor and serve as the endpoint in a chain of processors, writing data to a final destination. See Batching and Streaming for more details.

Data to write is acquired from the provided predecessor in batches.

CSV

The CSVBatchSink implementation enables writing batches of data to a CSV file.

It uses Python’s built-in csv module and allows customization via additional arguments passed to csv.DictWriter.

Behavior: - If the specified CSV file exists, data will be appended. - It automatically detects and writes headers if the file is new.

Example usage:

csv_sink = CSVBatchSink(context, task, predecessor, Path("output.csv"))

Neo4j / Cypher

The CypherBatchSink implementation writes batch data to a Neo4j database using Cypher queries.

Behavior: - Each batch is written in its own transaction. - Data is passed using the batch parameter, requiring Cypher queries to begin with UNWIND $batch AS row.

Example usage:

cypher_sink = CypherBatchSink(context, task, predecessor, """
    UNWIND $batch AS row
    MERGE (n:Entity {id: row.id})
    SET n += row
""")