Source code for etl_lib.core.Task

import abc
import logging
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime


[docs] class TaskReturn: """ Return object for the :func:`~Task.execute` function, transporting result information. """ success: bool """Success or failure of the task.""" summery: dict """dict holding statistics about the task performed, such as rows inserted, updated.""" error: str """Error message."""
[docs] def __init__(self, success: bool = True, summery: dict = None, error: str = None): self.success = success self.summery = summery if summery else {} self.error = error
def __repr__(self): return f"TaskReturn({self.success=}, {self.summery=}, {self.error=})" def __add__(self, other): """ Adding 2 instances of TaskReturn. Args: other: Instance to add. Returns: New TaskReturn instance. `success` is the logical AND of the instances. `summery` is the merged dict. For the values of the same key the values are added. """ if not isinstance(other, TaskReturn): return NotImplemented # Merge the summery dictionaries by summing their values merged_summery = self.summery.copy() for key, value in other.summery.items(): merged_summery[key] = merged_summery.get(key, 0) + value # Combine success values and errors combined_success = self.success and other.success combined_error = None if not (self.error or other.error) \ else f"{self.error or ''} | {other.error or ''}".strip(" |") return TaskReturn( success=combined_success, summery=merged_summery, error=combined_error )
[docs] class Task: """ ETL job that can be executed. Provides reporting, time tracking and error handling. Implementations must provide the :func:`~run_internal` function. """
[docs] def __init__(self, context): """ Construct a Task object. Args: context: :class:`~etl_lib.core.ETLContext.ETLContext` instance. Will be available to subclasses. """ self.context = context """:class:`~etl_lib.core.ETLContext.ETLContext` giving access to various resources.""" self.logger = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}") self.uuid = str(uuid.uuid4()) """Uniquely identifies a Task.""" self.start_time: datetime """Time when the :func:`~execute` was called., `None` before.""" self.end_time: datetime """Time when the :func:`~execute` has finished., `None` before.""" self.success: bool """True if the task has finished successful. False otherwise, `None` before the task has finished.""" self.depth: int = 0 """Level or depth of the task in the hierarchy. The root task is depth 0. Updated by the Reporter"""
[docs] def execute(self, **kwargs) -> TaskReturn: """ Executes the task. Implementations of this Interface should not overwrite this method, but provide the Task functionality inside :func:`~run_internal` which will be called from here. Will use the :class:`~etl_lib.core.ProgressReporter.ProgressReporter` from :attr:`~etl_lib.core.Task.Task.context` to report status updates. Args: kwargs: will be passed to `run_internal` """ self.context.reporter.started_task(self) try: result = self.run_internal(**kwargs) except Exception as e: result = TaskReturn(success=False, summery={}, error=str(e)) self.context.reporter.finished_task(task=self,result=result) return result
[docs] @abc.abstractmethod def run_internal(self, **kwargs) -> TaskReturn: """ Place to provide the logic to be performed. This base class provides all the housekeeping and reporting, so that implementation must/should not need to care about them. Exceptions should not be captured by implementations. They are handled by this base class. Args: kwargs: will be passed to `run_internal` Returns: An instance of :py:class:`~etl_lib.core.Task.TaskReturn`. """ pass
[docs] def abort_on_fail(self) -> bool: """ Should the pipeline abort when this job fails. Returns: `True` indicates that no other Tasks should be executed if :py:func:`~run_internal` fails. """ return True
[docs] def task_name(self) -> str: """ Option to overwrite the name of this Task. Name is used in reporting only. Returns: Sting describing the task. Defaults to the class name.. """ return self.__class__.__name__
def __repr__(self): return f"Task({self.task_name()})"
[docs] class TaskGroup(Task): """ Base class to allow wrapping of Task or TaskGroups to form a hierarchy of jobs. Implementations only need to provide the Tasks to execute as an array. The summery statistic object returned from the group execute method will be a merged/aggregated one. """
[docs] def __init__(self, context, tasks: list[Task], name: str): """ Construct a TaskGroup object. Args: context: :py:class:`etl_lib.core.ETLContext.ETLContext` instance. tasks: a list of `:py:class:`etl_lib.core.Task.Rask` instances. These will be executed in the order provided when :py:func:`~run_internal` is called. name: short name of the TaskGroup for reporting. """ super().__init__(context) self.tasks = tasks self.name = name
[docs] def sub_tasks(self) -> [Task]: return self.tasks
[docs] def run_internal(self, **kwargs) -> TaskReturn: ret = TaskReturn() for task in self.tasks: task_ret = task.execute(**kwargs) if task_ret == False and task.abort_on_fail(): self.logger.warning( f"Task {self.task_name()} failed. Aborting execution." ) return task_ret ret = ret + task_ret return ret
[docs] def abort_on_fail(self): for task in self.tasks: if task.abort_on_fail(): return True
[docs] def task_name(self) -> str: return self.name
def __repr__(self): return f"TaskGroup({self.task_name()})"
[docs] class ParallelTaskGroup(TaskGroup): """ Task group for parallel execution of jobs. This class uses a ThreadPoolExecutor to run the provided tasks :py:func:`~run_internal` functions in parallel. Care should be taken that the Tasks can operate without blocking.locking each other. """
[docs] def __init__(self, context, tasks: list[Task], name: str): """ Construct a TaskGroup object. Args: context: :py:class:`etl_lib.core.ETLContext.ETLContext` instance. tasks: an array of `Task` instances. These will be executed in parallel when :py:func:`~run_internal` is called. The Tasks in the array could itself be other TaskGroups. name: short name of the TaskGroup. """ super().__init__(context, tasks, name)
[docs] def run_internal(self, **kwargs) -> TaskReturn: combined_result = TaskReturn() with ThreadPoolExecutor() as executor: future_to_task = { executor.submit(task.execute, **kwargs): task for task in self.tasks } for future in as_completed(future_to_task): task = future_to_task[future] try: result = future.result() combined_result += result # If a task fails and it has abort_on_fail set, stop further execution if not result.success and task.abort_on_fail(): self.logger.warning( f"Task {task.task_name()} failed. Aborting execution of TaskGroup {self.task_name()}." ) # Cancel any pending tasks for f in future_to_task: if not f.done(): f.cancel() return combined_result except Exception as e: self.logger.error( f"Task {task.task_name()} encountered an error: {str(e)}" ) error_result = TaskReturn(success=False, summery={}, error=str(e)) combined_result += error_result # Handle abort logic for unexpected exceptions if task.abort_on_fail(): self.logger.warning( f"Unexpected failure in {task.task_name()}. Aborting execution of TaskGroup {self.task_name()}." ) # Cancel any pending tasks for f in future_to_task: if not f.done(): f.cancel() return combined_result return combined_result