Data Sinks
Data sinks are implementations of BatchProcessor and serve as the endpoint in a chain of processors, writing data to a final destination. See Batching and Streaming for more details.
Data to write is acquired from the provided predecessor in batches.
CSV
The CSVBatchSink implementation enables writing batches of data to a CSV file.
It uses Python’s built-in csv module and allows customization via additional arguments passed to csv.DictWriter.
Behavior: - If the specified CSV file exists, data will be appended. - It automatically detects and writes headers if the file is new.
Example usage:
csv_sink = CSVBatchSink(context, task, predecessor, Path("output.csv"))
Neo4j / Cypher
The CypherBatchSink implementation writes batch data to a Neo4j database using Cypher queries.
Behavior: - Each batch is written in its own transaction. - Data is passed using the batch parameter, requiring Cypher queries to begin with UNWIND $batch AS row.
Example usage:
cypher_sink = CypherBatchSink(context, task, predecessor, """
UNWIND $batch AS row
MERGE (n:Entity {id: row.id})
SET n += row
""")