Skip to content

Processing Tracker

Job lifecycle management with TTL cleanup.


ProcessingTracker

Python
ProcessingTracker(service_name: str, processor_name: str, sql_client: Optional[RDSClient] = None)

Thread-safe singleton tracker that manages processing_events in Postgres.

This class acts as the in-memory coordinator for all running jobs within a service/processor context. It records new jobs, updates their status, and retrieves information from the backing Postgres table.

Key features: - Thread-safe access using RLock - TTL-based cleanup for finished/failed jobs - Python-side timestamps (UTC) - Full lifecycle tracking: start, finish, retrieve

Attributes: _sql_client (Optional[RDSClient]): Lazily-initialized RDS client. _service_name (Optional[str]): Logical service identifier. _processor_name (Optional[str]): Logical processor or component. _lock (threading.RLock): Lock ensuring thread-safe updates. _running_job_ids (dict[str, ProcessingEvent]): Active in-memory jobs. _finished_job_ids (TTLSet): Recently finished job IDs with TTL expiry. _failed_job_ids (TTLSet): Recently failed job IDs with TTL expiry.

Initialize a ProcessingTracker instance for a given service/processor.

Args: service_name (str): Logical service identifier (e.g. 'ai_pipeline'). processor_name (str): Logical processor name (e.g. 'embedding_batch'). sql_client (Optional[RDSClient]): Optional custom RDS connection.

processor_name

Python
processor_name: str

Return the processor name.

service_name

Python
service_name: str

Return the current service name, falling back to environment variable.

Raises: ValueError: If no service name is set or environment variable missing.

sql_client

Python
sql_client

Lazily instantiate and return the RDS client.

Returns: RDSClient: Active RDS client connection.

end_event

Python
end_event(processing_id: str, success: bool = True, status_note: Optional[str] = None, output_rows: Optional[int] = None) -> bool

Mark a processing job as complete (success or failure) and persist its status.

Args: processing_id (str): UUID of the processing event. success (bool, optional): Whether the job succeeded. Defaults to True. status_note (Optional[str], optional): Optional completion message. output_rows (Optional[int], optional): Optional output row count.

Returns: bool: True if persisted successfully, False otherwise.

get_event

Python
get_event(id_value: str, id_type: Literal['processing_id', 'reference'] = 'processing_id')

Retrieve an event object by ID or reference.

Args: id_value (str): The identifier (processing_id or reference). id_type (Literal): Type of identifier ('processing_id' or 'reference').

Returns: Optional[ProcessingEvent]: Matching ProcessingEvent if found.

get_processing_id

Python
get_processing_id(reference: str) -> Optional[str]

Retrieve a running job's processing_id from memory by reference.

Args: reference (str): External reference tied to the job.

Returns: Optional[str]: Processing ID if active, else None.

start_event

Python
start_event(reference: str, event: dict, input_rows: Optional[int] = None) -> Optional[str]

Start a new processing event and insert it into Postgres.

Args: reference (str): Unique external reference (e.g. run_id or s3_key). event (dict): JSON-compatible input payload. input_rows (Optional[int]): Optional count of input rows.

Returns: Optional[str]: Processing ID if successfully created, else None.

ProcessingEvent

Python
ProcessingEvent(processing_id: str = (lambda: str(uuid.uuid4()))(), service_name: str = '', processor_name: str = '', reference: str = '', event: dict = dict(), status: Literal['in-progress', 'success', 'failure'] = 'in-progress', status_note: Optional[str] = None, input_rows: Optional[int] = None, output_rows: Optional[int] = None, started_at: datetime = (lambda: datetime.now(timezone.utc))(), ended_at: Optional[datetime] = None)

Represents a single processing event stored in the datastore.processing_events table.

This model defines the schema and lifecycle of a tracked job, including metadata, timestamps, and row counts. It provides methods for validation, status transitions, and completion tracking.

Attributes: processing_id (str): Unique identifier for the processing event. service_name (str): Logical service identifier (e.g. 'ai_pipeline'). processor_name (str): Component or processor performing the work. reference (str): External reference (e.g. run_id, s3_key, etc.). event (dict): Arbitrary input event payload stored as JSONB. status (Literal): Current job status ('in-progress', 'success', 'failure'). status_note (Optional[str]): Optional human-readable note or error message. input_rows (Optional[int]): Number of input rows processed. output_rows (Optional[int]): Number of output rows produced. started_at (datetime): UTC timestamp when the event started. ended_at (Optional[datetime]): UTC timestamp when the event finished.

check_init

Python
check_init()

Validate that all required fields are initialized and the status is valid.

Raises: ValueError: If a required field is missing or invalid.

check_status

Python
check_status()

Validate that the job status is one of the supported literals.

Raises: ValueError: If the status is not in ['in-progress', 'success', 'failure'].

finish_job

Python
finish_job(status: Literal['success', 'failure'], status_note: Optional[str] = None, output_rows: Optional[int] = None)

Mark the job as finished, updating timestamps and optional row counts.

Args: status (Literal): Completion status ('success' or 'failure'). status_note (Optional[str]): Optional status message or error detail. output_rows (Optional[int]): Optional number of output rows produced.

has_failed

Python
has_failed() -> bool

Returns True if the event has failed.

is_finished

Python
is_finished() -> bool

Returns True if the event is in a terminal state (success or failure).