etl_lib.task.data_loading.CSVLoad2Neo4jTask module
- class CSVLoad2Neo4jTask(context, model, file, batch_size=5000)[source]
Bases:
TaskLoads the specified CSV file to Neo4j.
Uses BatchProcessors to read, validate and write to Neo4j. The validation step is using pydantic, hence a Pydantic model needs to be provided. Rows with fail validation will be written to en error file. The location of the error file is determined as follows:
If the context env vars hold an entry ETL_ERROR_PATH the file will be placed there, with the name set to name of the provided filename appended with .error.json
If ETL_ERROR_PATH is not set, the file will be placed in the same directory as the CSV file.
Example usage: (from the gtfs demo)
class LoadStopsTask(CSVLoad2Neo4jTask): class Stop(BaseModel): id: str = Field(alias="stop_id") name: str = Field(alias="stop_name") latitude: float = Field(alias="stop_lat") longitude: float = Field(alias="stop_lon") platform_code: Optional[str] = None parent_station: Optional[str] = None type: Optional[str] = Field(alias="location_type", default=None) timezone: Optional[str] = Field(alias="stop_timezone", default=None) code: Optional[str] = Field(alias="stop_code", default=None) def __init__(self, context: ETLContext, file: Path): super().__init__(context, LoadStopsTask.Stop, file) def task_name(self) -> str: return f"{self.__class__.__name__}('{self.file}')" def _query(self): return """ UNWIND $batch AS row MERGE (s:Stop {id: row.id}) SET s.name = row.name, s.location= point({latitude: row.latitude, longitude: row.longitude}), s.platformCode= row.platform_code, s.parentStation= row.parent_station, s.type= row.type, s.timezone= row.timezone, s.code= row.code """
- Parameters:
context (ETLContext)
file (Path)
batch_size (int)
- __init__(context, model, file, batch_size=5000)[source]
Construct a Task object.
- Parameters:
context (
ETLContext) –ETLContextinstance. Will be available to subclasses.file (Path)
batch_size (int)
- run_internal(**kwargs)[source]
Place to provide the logic to be performed.
This base class provides all the housekeeping and reporting, so that implementation must/should not need to care about them. Exceptions should not be captured by implementations. They are handled by this base class.
- Parameters:
kwargs – will be passed to run_internal
- Return type:
- Returns:
An instance of
TaskReturn.