etl_lib.core.ETLContext module

class ETLContext(env_vars)[source]

Bases: object

General context information.

Will be passed to all Task to provide access to environment variables and functionally deemed general enough that all parts of the ETL pipeline would need it.

Parameters:

env_vars (dict)

__init__(env_vars)[source]

Create a new ETLContext.

Parameters:

env_vars (dict) – Environment variables. Stored internally and can be accessed via env() .

The context created will contain an Neo4jContext and a ProgressReporter. See there for keys used from the provided env_vars dict.

env(key)[source]

Returns the value of an entry in the env_vars dict.

Parameters:

key (str) – name of the entry to read.

Return type:

Any

Returns:

value of the entry, or None if the key is not in the dict.

class Neo4jContext(env_vars)[source]

Bases: object

Holds the connection to the neo4j database and provides facilities to execute queries.

Parameters:

env_vars (dict)

__init__(env_vars)[source]

Create a new Neo4j context.

Reads the following env_vars keys: - NEO4J_URI, - NEO4J_USERNAME, - NEO4J_PASSWORD. - NEO4J_DATABASE,

Parameters:

env_vars (dict)

query_database(session, query, **kwargs)[source]

Executes Cypher and returns (records, counters) with retryable write semantics. Accepts either a single query string or a list of queries. Does not work with CALL {} IN TRANSACTION queries.

Return type:

QueryResult

Parameters:

session (Session)

session(database=None)[source]

Create a new Neo4j session in write mode, caller is responsible to close the session.

Parameters:

database – name of the database to use for this session. If not provided, the database name provided during construction will be used.

Returns:

newly created Neo4j session.

class QueryResult(data, summery)[source]

Bases: NamedTuple

Result of a query against the neo4j database.

Parameters:
data: List[Any]

Data as returned from the query.

summery: Dict[str, int]

Counters as reported by neo4j. Contains entries such as nodes_created, nodes_deleted, etc.

class SQLContext(database_url, pool_size=10, max_overflow=20)[source]

Bases: object

Parameters:
  • database_url (str)

  • pool_size (int)

  • max_overflow (int)

__init__(database_url, pool_size=10, max_overflow=20)[source]

Initializes the SQL context with an SQLAlchemy engine.

Parameters:
  • database_url (str) – SQLAlchemy connection URL.

  • pool_size (int) – Number of connections to maintain in the pool.

  • max_overflow (int) – Additional connections allowed beyond pool_size.

append_results(r1, r2)[source]

Appends two QueryResult objects, summing the values for duplicate keys in the summary.

Parameters:
  • r1 (QueryResult) – The first QueryResult object.

  • r2 (QueryResult) – The second QueryResult object to append.

Return type:

QueryResult

Returns:

A new QueryResult object with combined data and summed summary counts.

gds(neo4j_context)[source]

Creates a new GraphDataScience client.

Parameters:

neo4j_context – Neo4j context containing driver and database name.

Return type:

GraphDataScience

Returns:

gds client.