etl_lib.task.data_loading.CSVLoad2Neo4jTask module

class CSVLoad2Neo4jTask(context, model, file, batch_size=5000)[source]

Bases: Task

Loads the specified CSV file to Neo4j.

Uses BatchProcessors to read, validate and write to Neo4j. The validation step is using pydantic, hence a Pydantic model needs to be provided. Rows with fail validation will be written to en error file. The location of the error file is determined as follows:

If the context env vars hold an entry ETL_ERROR_PATH the file will be placed there, with the name set to name of the provided filename appended with .error.json

If ETL_ERROR_PATH is not set, the file will be placed in the same directory as the CSV file.

Example usage: (from the gtfs demo)

class LoadStopsTask(CSVLoad2Neo4jTask):
    class Stop(BaseModel):
        id: str = Field(alias="stop_id")
        name: str = Field(alias="stop_name")
        latitude: float = Field(alias="stop_lat")
        longitude: float = Field(alias="stop_lon")
        platform_code: Optional[str] = None
        parent_station: Optional[str] = None
        type: Optional[str] = Field(alias="location_type", default=None)
        timezone: Optional[str] = Field(alias="stop_timezone", default=None)
        code: Optional[str] = Field(alias="stop_code", default=None)

    def __init__(self, context: ETLContext, file: Path):
        super().__init__(context, LoadStopsTask.Stop, file)

    def task_name(self) -> str:
        return f"{self.__class__.__name__}('{self.file}')"

    def _query(self):
        return """
               UNWIND $batch AS row
               MERGE (s:Stop {id: row.id})
                 SET s.name = row.name,
                     s.location= point({latitude: row.latitude, longitude: row.longitude}),
                     s.platformCode= row.platform_code,
                     s.parentStation= row.parent_station,
                     s.type= row.type,
                     s.timezone= row.timezone,
                     s.code= row.code
              """
Parameters:
__init__(context, model, file, batch_size=5000)[source]

Construct a Task object.

Parameters:
run_internal(**kwargs)[source]

Place to provide the logic to be performed.

This base class provides all the housekeeping and reporting, so that implementation must/should not need to care about them. Exceptions should not be captured by implementations. They are handled by this base class.

Parameters:

kwargs – will be passed to run_internal

Return type:

TaskReturn

Returns:

An instance of TaskReturn.