API Reference
This document provides detailed reference for Lab Orchestrator's core classes and methods.
Orchestrator
Main orchestrator class for managing laboratory workflows.
Class: Orchestrator
Located in: laborchestrator/orchestrator_implementation.py
Constructor
Parameters:
- reader (str): Process reader type (default: "PythonLab")
- worker_type (Type[WorkerInterface]): Worker implementation class
Example:
Methods
add_lab_resources_from_file
Load lab configuration from a YAML file.
Parameters:
- lab_env_filename (str): Path to lab configuration YAML file
Returns: bool - True if successful
Example:
add_process
Add a process to the orchestrator.
add_process(
description: Optional[str] = None,
file_path: Optional[str] = None,
name: Optional[str] = None,
process_object: Optional[PLProcess] = None,
delay: int = 0
) -> str
Parameters:
- description (str, optional): Process source code as string
- file_path (str, optional): Path to Python process file
- name (str, optional): Process name
- process_object (PLProcess, optional): Process instance
- delay (int): Start delay in minutes (default: 0)
Returns: str - Process name
Example:
# From file
name = orchestrator.add_process(
file_path="process.py",
name="MyProcess"
)
# From object
from my_processes import IncReadProcess
process = IncReadProcess(priority=5)
name = orchestrator.add_process(
process_object=process,
name="IncubationTest"
)
# From string
code = """
from pythonlab.process import PLProcess
# ...
"""
name = orchestrator.add_process(
description=code,
name="DynamicProcess"
)
start_processes
Start execution of processes.
Parameters:
- process_names (List[str]): List of process names to start
Example:
set_parameter
Set orchestrator parameter.
Parameters:
- param_name (str): Parameter name
- new_value (Any): New parameter value
Example:
inject_db_interface
Inject database interface for experiment tracking.
Parameters:
- db_client (StatusDBInterface): Database client instance
Example:
from laborchestrator.database_integration import StatusDBInterface
db_interface = StatusDBInterface()
orchestrator.inject_db_interface(db_interface)
Properties
processes
Dictionary of all processes.
Example:
schedule_manager
The schedule manager instance.
Example:
ScheduleManager
Handles scheduling logic and device allocation.
Class: ScheduleManager
Located in: laborchestrator/engine/schedule_manager.py
Constructor
Parameters:
- jssp (SchedulingInstance): Scheduling instance
- db_client (StatusDBInterface, optional): Database client
Methods
configure_lab
Configure lab from YAML file.
Parameters:
- yaml_file (str): Path to lab configuration YAML
Returns: bool - True if successful
Example:
compute_schedule
Compute optimal schedule for processes.
Parameters:
- time_limit (float, optional): Scheduling timeout in seconds
Returns: Dict - Schedule information
Example:
Properties
time_limit_short
Quick scheduling timeout in seconds.
Default: 2 seconds
time_limit_long
Full scheduling timeout in seconds.
Default: 5 seconds
lab_config_file
Path to loaded lab configuration file.
Data Structures
ProcessStep
Represents a single operation in a workflow.
Located in: laborchestrator/structures.py
from laborchestrator.structures import ProcessStep
from dataclasses import dataclass
@dataclass
class ProcessStep:
name: str # Step identifier
cont_names: List[str] # Container names
function: str # Operation type
duration: float # Duration in seconds
process_name: str # Parent process name
used_devices: List[UsedDevice] # Device requirements
wait_to_start_costs: float # Priority weighting
data: Dict # Custom metadata
Example:
step = ProcessStep(
name="incubate_step_1",
cont_names=["plate_1"],
function="incubate",
duration=120.0,
process_name="MyProcess",
used_devices=[device],
wait_to_start_costs=0.0,
data={"temperature": 310}
)
MoveStep
Represents a container movement operation.
@dataclass
class MoveStep:
name: str # Step identifier
cont_names: List[str] # Container names
function: str # Always "move"
duration: float # Duration in seconds
source_device: str # Source location
target_device: str # Target location
used_devices: List[UsedDevice] # Mover device
lidded: bool # Lid state
Example:
move_step = MoveStep(
name="move_to_incubator",
cont_names=["plate_1"],
function="move",
duration=30.0,
source_device="Carousel",
target_device="Incubator1",
used_devices=[mover],
lidded=True
)
ContainerInfo
Tracks labware container information.
@dataclass
class ContainerInfo:
name: str # Container identifier
current_device: str # Current location
current_pos: int # Position at location
start_device: UsedDevice # Starting location
filled: bool # Filled/empty status
content: str # Content description
labware_type: str # Type (plate, tube, etc)
barcode: Optional[str] # Barcode number
is_reagent: bool # Reagent flag
Example:
container = ContainerInfo(
name="culture_plate_1",
current_device="Carousel",
current_pos=10,
start_device=storage_device,
filled=True,
content="E. coli culture",
labware_type="96-well plate",
barcode="BC123456",
is_reagent=False
)
UsedDevice
Represents device requirements/assignments.
@dataclass
class UsedDevice:
name: str # Device name
device_type: str # Device type
position: Optional[int] # Position at device
capacity: int # Device capacity
allows_overlap: bool # Concurrent operations
Example:
device = UsedDevice(
name="Incubator1",
device_type="incubator",
position=5,
capacity=32,
allows_overlap=False
)
SchedulingInstance
Complete workflow container (JSSP - Job Shop Scheduling Problem).
@dataclass
class SchedulingInstance:
process_steps: List[ProcessStep] # All process steps
move_steps: List[MoveStep] # All move steps
containers: List[ContainerInfo] # All containers
devices: List[UsedDevice] # All devices
dependencies: Dict # Step dependencies
Example:
instance = SchedulingInstance(
process_steps=[step1, step2],
move_steps=[move1, move2],
containers=[container1, container2],
devices=[device1, device2],
dependencies={"step2": ["step1"]}
)
WorkerInterface
Executes process steps by interfacing with lab devices.
Class: WorkerInterface
Located in: laborchestrator/engine/worker_interface.py
Methods
execute_step
Execute a process step.
Parameters:
- step (ProcessStep): Step to execute
- containers (List[ContainerInfo]): Involved containers
Returns: bool - True if successful
execute_move
Execute a move step.
Parameters:
- move (MoveStep): Move step to execute
- containers (List[ContainerInfo]): Containers to move
Returns: bool - True if successful
WorkerObserver
Monitors execution and updates schedules.
Class: WorkerObserver
Located in: laborchestrator/engine/worker_observer.py
Methods
observe
Monitor process execution.
Continuously monitors running steps and triggers rescheduling on delays or errors.
update_schedule
Trigger schedule update.
Requests a new schedule computation based on current state.
WFGManager
Manages workflow graphs for visualization.
Class: WFGManager
Located in: laborchestrator/engine/wfg_manager.py
Methods
create_graph
Create workflow graph from scheduling instance.
Parameters:
- instance (SchedulingInstance): Scheduling instance
Returns: WorkFlowGraph - NetworkX directed graph
visualize
Generate visualization of workflow graph.
Parameters:
- graph (WorkFlowGraph): Workflow graph
- output_file (str): Output file path
PythonLabReader
Parses PythonLab process files.
Class: PythonLabReader
Located in: laborchestrator/pythonlab_reader.py
Methods
read_process
Read and parse a PythonLab process file.
Parameters:
- file_path (str): Path to process file
Returns: PLProcess - Parsed process object
Example:
try_to_read_process
CLI command to read and validate a process.
Parameters:
- file_path (str): Path to process file
Example:
Enums
OrchestratorState
Orchestrator execution states.
Located in: laborchestrator/orchestrator_interface.py
from laborchestrator.orchestrator_interface import OrchestratorState
from enum import Enum
class OrchestratorState(Enum):
IDLE = "idle"
RUNNING = "running"
PAUSED = "paused"
ERROR = "error"
FINISHED = "finished"
Example:
ProcessState
Process execution states.
from laborchestrator.orchestrator_interface import ProcessState
from enum import Enum
class ProcessState(Enum):
PENDING = "pending"
SCHEDULED = "scheduled"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
Example:
for name, process in orchestrator.processes.items():
if process.state == ProcessState.FAILED:
print(f"Process {name} failed")
StepState
Process step execution states.
from laborchestrator.orchestrator_interface import StepState
from enum import Enum
class StepState(Enum):
PENDING = "pending"
READY = "ready"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
Database Integration
StatusDBInterface
Abstract interface for database integration.
Located in: laborchestrator/database_integration/status_db_interface.py
Methods
update_process_status
Update process status in database.
Parameters:
- process_name (str): Process name
- state (ProcessState): Current state
- progress (float): Progress percentage (0-100)
update_step_status
Update step status in database.
update_step_status(
step_name: str,
state: StepState,
start_time: Optional[datetime],
end_time: Optional[datetime]
) -> None
Parameters:
- step_name (str): Step name
- state (StepState): Current state
- start_time (datetime, optional): Start timestamp
- end_time (datetime, optional): End timestamp
log_event
Log an event to database.
Parameters:
- event_type (str): Event type
- message (str): Event message
- severity (str): Severity level (info/warning/error)
Type Hints
Common type aliases used in Lab Orchestrator:
from typing import Dict, List, Optional, Tuple, Union
from laborchestrator.structures import ProcessStep, MoveStep, ContainerInfo, UsedDevice
# Workflow graph type (NetworkX directed graph)
WorkFlowGraph = nx.DiGraph
# Schedule type
Schedule = Dict[str, Any]
# Device configuration
DeviceConfig = Dict[str, Union[str, int, bool, float]]
# Lab configuration
LabConfig = Dict[str, Dict[str, DeviceConfig]]
Complete Example
Here's a complete example using the API:
from laborchestrator.orchestrator_implementation import Orchestrator
from laborchestrator.orchestrator_interface import ProcessState
from tests.test_data.inc_read_process import IncReadProcess
# Create orchestrator
orchestrator = Orchestrator()
# Configure lab
success = orchestrator.add_lab_resources_from_file("lab_config.yml")
if not success:
print("Failed to load lab configuration")
exit(1)
# Add processes
process1 = IncReadProcess(priority=5)
name1 = orchestrator.add_process(
process_object=process1,
name="IncubationTest"
)
name2 = orchestrator.add_process(
file_path="another_process.py",
name="SecondProcess",
delay=30 # Start in 30 minutes
)
# Start processes
orchestrator.start_processes([name1, name2])
# Monitor execution
import time
while True:
all_done = True
for name, process in orchestrator.processes.items():
print(f"{name}: {process.state}")
if process.state not in [ProcessState.COMPLETED, ProcessState.FAILED]:
all_done = False
if all_done:
break
time.sleep(10)
print("All processes completed")
See Also
- Configuration - Configure lab resources
- Writing Processes - Create workflows
- SiLA Integration - SiLA server API
- Getting Started - Quick start guide