Processing Tracker¶
Job lifecycle management with TTL cleanup.
ProcessingTracker
¶
ProcessingTracker(service_name: str, processor_name: str, sql_client: Optional[RDSClient] = None)
Thread-safe singleton tracker that manages processing_events in Postgres.
This class acts as the in-memory coordinator for all running jobs within a service/processor context. It records new jobs, updates their status, and retrieves information from the backing Postgres table.
Key features: - Thread-safe access using RLock - TTL-based cleanup for finished/failed jobs - Python-side timestamps (UTC) - Full lifecycle tracking: start, finish, retrieve
Attributes: _sql_client (Optional[RDSClient]): Lazily-initialized RDS client. _service_name (Optional[str]): Logical service identifier. _processor_name (Optional[str]): Logical processor or component. _lock (threading.RLock): Lock ensuring thread-safe updates. _running_job_ids (dict[str, ProcessingEvent]): Active in-memory jobs. _finished_job_ids (TTLSet): Recently finished job IDs with TTL expiry. _failed_job_ids (TTLSet): Recently failed job IDs with TTL expiry.
Initialize a ProcessingTracker instance for a given service/processor.
Args: service_name (str): Logical service identifier (e.g. 'ai_pipeline'). processor_name (str): Logical processor name (e.g. 'embedding_batch'). sql_client (Optional[RDSClient]): Optional custom RDS connection.
service_name
¶
Return the current service name, falling back to environment variable.
Raises: ValueError: If no service name is set or environment variable missing.
sql_client
¶
Lazily instantiate and return the RDS client.
Returns: RDSClient: Active RDS client connection.
end_event
¶
end_event(processing_id: str, success: bool = True, status_note: Optional[str] = None, output_rows: Optional[int] = None) -> bool
Mark a processing job as complete (success or failure) and persist its status.
Args: processing_id (str): UUID of the processing event. success (bool, optional): Whether the job succeeded. Defaults to True. status_note (Optional[str], optional): Optional completion message. output_rows (Optional[int], optional): Optional output row count.
Returns: bool: True if persisted successfully, False otherwise.
get_event
¶
Retrieve an event object by ID or reference.
Args: id_value (str): The identifier (processing_id or reference). id_type (Literal): Type of identifier ('processing_id' or 'reference').
Returns: Optional[ProcessingEvent]: Matching ProcessingEvent if found.
get_processing_id
¶
Retrieve a running job's processing_id from memory by reference.
Args: reference (str): External reference tied to the job.
Returns: Optional[str]: Processing ID if active, else None.
start_event
¶
Start a new processing event and insert it into Postgres.
Args: reference (str): Unique external reference (e.g. run_id or s3_key). event (dict): JSON-compatible input payload. input_rows (Optional[int]): Optional count of input rows.
Returns: Optional[str]: Processing ID if successfully created, else None.
ProcessingEvent
¶
ProcessingEvent(processing_id: str = (lambda: str(uuid.uuid4()))(), service_name: str = '', processor_name: str = '', reference: str = '', event: dict = dict(), status: Literal['in-progress', 'success', 'failure'] = 'in-progress', status_note: Optional[str] = None, input_rows: Optional[int] = None, output_rows: Optional[int] = None, started_at: datetime = (lambda: datetime.now(timezone.utc))(), ended_at: Optional[datetime] = None)
Represents a single processing event stored in the datastore.processing_events table.
This model defines the schema and lifecycle of a tracked job, including metadata, timestamps, and row counts. It provides methods for validation, status transitions, and completion tracking.
Attributes: processing_id (str): Unique identifier for the processing event. service_name (str): Logical service identifier (e.g. 'ai_pipeline'). processor_name (str): Component or processor performing the work. reference (str): External reference (e.g. run_id, s3_key, etc.). event (dict): Arbitrary input event payload stored as JSONB. status (Literal): Current job status ('in-progress', 'success', 'failure'). status_note (Optional[str]): Optional human-readable note or error message. input_rows (Optional[int]): Number of input rows processed. output_rows (Optional[int]): Number of output rows produced. started_at (datetime): UTC timestamp when the event started. ended_at (Optional[datetime]): UTC timestamp when the event finished.
check_init
¶
Validate that all required fields are initialized and the status is valid.
Raises: ValueError: If a required field is missing or invalid.
check_status
¶
Validate that the job status is one of the supported literals.
Raises: ValueError: If the status is not in ['in-progress', 'success', 'failure'].
finish_job
¶
finish_job(status: Literal['success', 'failure'], status_note: Optional[str] = None, output_rows: Optional[int] = None)
Mark the job as finished, updating timestamps and optional row counts.
Args: status (Literal): Completion status ('success' or 'failure'). status_note (Optional[str]): Optional status message or error detail. output_rows (Optional[int]): Optional number of output rows produced.
is_finished
¶
Returns True if the event is in a terminal state (success or failure).