Welcome to the neo4j-etl-lib documentation

I often found myself implementing pipelines for small to medium-sized project and repeating the same code. This library is an effort to collect building blocks in a way that simplify the process of assembling pipelines.

When building ETL pipelines the following criteria should be considered as a bare minimum:

  • Logging (recording task execution, durations, errors, and statistics)

  • Error handling

  • Data validation (currently via Pydantic)

  • Batching and streaming

  • Optionally, tracking performed tasks and providing a way to review past ETL runs

Note

This project is under active development, meaning that API changes may occur.

Installation

The library can be installed via pip (inside a virtual environment):

pip install neo4j-etl-lib

Overview

The core of the library is the Task class and its implementations. This class represents an individual job to be executed.

Each task receives an instance of ETLContext during construction, providing access to shared functionality such as database interactions.

Subclasses only need to implement the run_internal() method. The base task class handles other concerns, such as time tracking, reporting, and error handling.

There are ready to use Task implementations for comon jobs, such as:

To build a CSV loading task, all you need to do is to sub class CSVLoad2Neo4jTask providing the Cypher query and optionally a Pydantic class if data validation is needed.

Tasks can be grouped into TaskGroup or ParallelTaskGroup. These groups help organize the pipeline, for example, by grouping all loading tasks or post-processing tasks. This structure allows executing only specific parts of the pipeline when needed.

Reporting statistics are also aggregated on TaskGroup level, so grouping helps to get an overview.

A pipeline (aka a job you want to run) is represented by a Task or TaskGroup, which serves as the root of a tree of tasks and task groups. To start the pipeline, simply call execute() on the root task or task group.

For an complete example on how to asemble the blocks into a cli to load a group of CVS files, see the gtfs example