Data Sources
Data sources are implementations of BatchProcessor and serve as the starting point in a chain of processors from source to sink. See Batching and Streaming for more details.
CSV
The CSVBatchSource implementation enables reading CSV files in batches.
It utilizes Python’s built-in csv module. The constructor forwards kwargs to csv.DictReader, allowing adaptation to different CSV formats.
Additionally, it detects compressed files and decompresses them on the fly.
Example usage:
from pathlib import Path
csv_source = CSVBatchSource(Path("input.csv"), context, delimiter=';', quotechar='"')
The CSVLoad2Neo4jTask provides implementation that utilises the CSVBatchSource together with Pydantic to stream data from CSV to Neo4j.
See the gtfs in examples for a demo.
Neo4j / Cypher
The CypherBatchSource implementation streams query results in batches while keeping the transaction open until all data has been returned.
Each row in the returned batch is a dictionary, as provided by the Neo4j Python Driver.
If an optional record_transformer is supplied in the constructor, this transformer is used to convert the Record into a dictionary, providing greater flexibility in handling different data types.
Example usage:
query = "MATCH (n) RETURN n.id AS id, n.name AS name, n.type AS type"
cypher_source = CypherBatchSource(context, task, query)
Relational Databases / SQL
The SQLBatchSource implementation streams query results in batches while keeping the transaction open until all data has been returned.
Each row in the returned batch is a dictionary.
This datasource is only enabled if the module sqlalchemy is installed.
The connection url is expected in the environment variable SQLALCHEMY_URI.
The SQLLoad2Neo4jTask provides an convenience implementation that only needs two queries: one SQL query to extract data and an Cypher query to write the data to Neo4j.
See the Musikbrainz demo in examples the examples folder.