Advanced Usage
This guide covers advanced features of Platform Status DB including experiments, process tracking, duration estimation, and integration with laboratory orchestration systems.
Experiments and Processes
Understanding the Process-Experiment Model
Platform Status DB uses a hierarchical model for tracking laboratory workflows:
- Process: A reusable workflow definition (template)
- Experiment: A specific execution instance of a process
- ProcessStep: Individual operations within an experiment
Process (Definition)
└── Experiment 1 (Instance)
├── ProcessStep 1
├── ProcessStep 2
└── ProcessStep 3
└── Experiment 2 (Instance)
├── ProcessStep 1
└── ProcessStep 2
Creating a Process
A process represents a reusable workflow that can be executed multiple times:
from platform_status_db.larastatus.status_db_implementation import StatusDBImplementation
db = StatusDBImplementation()
# Define process source code or description
process_code = """
def cell_culture_assay(plate):
# Step 1: Add reagent
liquid_handler.dispense(plate, reagent='MTT', volume=50)
# Step 2: Incubate
incubator.incubate(plate, temperature=37, duration=4*3600)
# Step 3: Read absorbance
reader.read_absorbance(plate, wavelength=570)
"""
# Store process in database
process_uuid = db.add_process_to_db(
name="Cell Viability Assay",
src=process_code
)
print(f"Process created with UUID: {process_uuid}")
Listing Available Processes
processes = db.get_available_processes()
print("Available Processes:")
for name, uuid in processes:
print(f" {name}: {uuid}")
Retrieving a Process
process_uuid = "123e4567-e89b-12d3-a456-426614174000"
process_code = db.get_process(process_uuid)
print(process_code)
Creating an Experiment
An experiment is a specific execution instance of a process:
# Create experiment based on a process
experiment_uuid = db.create_experiment(process_uuid)
print(f"Experiment created with UUID: {experiment_uuid}")
# Now you can execute process steps and associate them with this experiment
Process Step Tracking
Recording Basic Process Steps
Track individual operations within an experiment:
from laborchestrator import structures
from datetime import datetime
db = StatusDBImplementation()
# Get container
container = db.get_cont_info_by_barcode("PLATE001")
# Create process step
step = structures.ProcessStep(
name="Read Absorbance",
main_device=structures.DeviceInfo(name="PlateReader"),
data={
"fct": "absorbance",
"method": "endpoint",
"wavelength": 450,
"reads": 3
}
)
# Record start time
step.start = datetime.now()
# ... perform actual operation on physical device ...
# Record finish time and status
step.finish = datetime.now()
step.status = "completed"
# Save to database
db.safe_step_to_db(
step=step,
container_info=container,
experiment_uuid=experiment_uuid
)
Recording Movement Steps
Movement steps are tracked with additional details:
from laborchestrator import structures
from datetime import datetime
# Create move step
move_step = structures.MoveStep(
name="Move to Reader",
main_device=structures.DeviceInfo(name="RoboticArm"),
origin_device=structures.DeviceInfo(name="Hamilton_STAR"),
target_device=structures.DeviceInfo(name="PlateReader"),
origin_pos=0,
destination_pos=0,
data={"fct": "move"}
)
# Record execution
move_step.start = datetime.now()
# Perform movement in database
db.moved_container(
source_device="Hamilton_STAR",
source_pos=0,
target_device="PlateReader",
target_pos=0,
barcode=container.barcode
)
move_step.finish = datetime.now()
move_step.status = "completed"
# Save to database
db.safe_step_to_db(
step=move_step,
container_info=container,
experiment_uuid=experiment_uuid
)
Complete Experiment Example
from platform_status_db.larastatus.status_db_implementation import StatusDBImplementation
from laborchestrator import structures
from datetime import datetime
import time
db = StatusDBImplementation()
# 1. Define and store process
process_code = """
Cell culture screening workflow:
1. Prepare plates with cells
2. Add compounds
3. Incubate
4. Read absorbance
"""
process_uuid = db.add_process_to_db(
name="Cell Screening",
src=process_code
)
# 2. Create experiment instance
experiment_uuid = db.create_experiment(process_uuid)
print(f"Running experiment: {experiment_uuid}")
# 3. Add container to platform
container_info = structures.ContainerInfo(
name="ScreenPlate_01",
current_device="Hamilton_STAR",
current_pos=0,
barcode="SCREEN_001",
lidded=True,
filled=True
)
db.add_container(container_info)
# 4. Execute and log process steps
# Step 1: Add compounds
step1 = structures.ProcessStep(
name="Add Compounds",
main_device=structures.DeviceInfo(name="Hamilton_STAR"),
data={"fct": "dispense", "volume": 50, "reagent": "compound_library"}
)
step1.start = datetime.now()
time.sleep(2) # Simulate operation
step1.finish = datetime.now()
step1.status = "completed"
db.safe_step_to_db(step1, container_info, experiment_uuid)
# Step 2: Move to incubator
move_step = structures.MoveStep(
name="Move to Incubator",
main_device=structures.DeviceInfo(name="RoboticArm"),
origin_device=structures.DeviceInfo(name="Hamilton_STAR"),
target_device=structures.DeviceInfo(name="Incubator"),
origin_pos=0,
destination_pos=5,
data={"fct": "move"}
)
move_step.start = datetime.now()
db.moved_container("Hamilton_STAR", 0, "Incubator", 5, "SCREEN_001")
container_info = db.get_cont_info_by_barcode("SCREEN_001")
move_step.finish = datetime.now()
move_step.status = "completed"
db.safe_step_to_db(move_step, container_info, experiment_uuid)
# Step 3: Incubate
step3 = structures.ProcessStep(
name="Incubate",
main_device=structures.DeviceInfo(name="Incubator"),
data={"fct": "incubate", "temperature": 37, "duration": 14400}
)
step3.start = datetime.now()
time.sleep(2) # Simulate operation
step3.finish = datetime.now()
step3.status = "completed"
db.safe_step_to_db(step3, container_info, experiment_uuid)
# Step 4: Move to reader
move_step2 = structures.MoveStep(
name="Move to Reader",
main_device=structures.DeviceInfo(name="RoboticArm"),
origin_device=structures.DeviceInfo(name="Incubator"),
target_device=structures.DeviceInfo(name="PlateReader"),
origin_pos=5,
destination_pos=0,
data={"fct": "move"}
)
move_step2.start = datetime.now()
db.moved_container("Incubator", 5, "PlateReader", 0, "SCREEN_001")
container_info = db.get_cont_info_by_barcode("SCREEN_001")
move_step2.finish = datetime.now()
move_step2.status = "completed"
db.safe_step_to_db(move_step2, container_info, experiment_uuid)
# Step 5: Read absorbance
step5 = structures.ProcessStep(
name="Read Absorbance",
main_device=structures.DeviceInfo(name="PlateReader"),
data={"fct": "absorbance", "wavelength": 570}
)
step5.start = datetime.now()
time.sleep(2) # Simulate operation
step5.finish = datetime.now()
step5.status = "completed"
db.safe_step_to_db(step5, container_info, experiment_uuid)
print(f"Experiment {experiment_uuid} completed")
Duration Estimation
Platform Status DB can estimate the duration of future operations based on historical data.
Estimating Single Step Duration
from platform_status_db.larastatus.status_db_implementation import StatusDBImplementation
from laborchestrator import structures
db = StatusDBImplementation()
# Define a step to estimate
step = structures.ProcessStep(
name="Read Absorbance",
main_device=structures.DeviceInfo(name="PlateReader"),
data={"fct": "absorbance", "method": "endpoint_570nm"}
)
# Get duration estimate
estimated_duration = db.get_estimated_duration(step, confidence=0.95)
if estimated_duration:
print(f"Estimated duration: {estimated_duration:.2f} seconds")
print(f" : {estimated_duration/60:.2f} minutes")
else:
print("No historical data available for this step")
Estimating Multiple Steps
from laborchestrator import structures
db = StatusDBImplementation()
# Define workflow steps
steps = [
structures.ProcessStep(
name="Dispense",
main_device=structures.DeviceInfo(name="Hamilton_STAR"),
data={"fct": "dispense", "volume": 50}
),
structures.MoveStep(
name="Move to Reader",
main_device=structures.DeviceInfo(name="RoboticArm"),
origin_device=structures.DeviceInfo(name="Hamilton_STAR"),
target_device=structures.DeviceInfo(name="PlateReader"),
origin_pos=0,
destination_pos=0,
data={"fct": "move"}
),
structures.ProcessStep(
name="Read",
main_device=structures.DeviceInfo(name="PlateReader"),
data={"fct": "absorbance", "wavelength": 450}
),
]
# Get estimates for all steps
durations = db.get_estimated_durations(steps, confidence=0.95)
total_estimated = 0
for i, (step, duration) in enumerate(zip(steps, durations)):
if duration:
print(f"Step {i+1} ({step.name}): {duration:.2f}s")
total_estimated += duration
else:
print(f"Step {i+1} ({step.name}): No estimate")
print(f"\nTotal estimated time: {total_estimated:.2f}s ({total_estimated/60:.2f} min)")
How Duration Estimation Works
The duration estimator analyzes historical ProcessStep records and matches based on:
- Function name (
data['fct']) - Device types (for movement steps)
- Method parameters (for protocol steps)
- Arbitrary parameters (fallback matching)
The estimator uses specialized "historians" for different step types: - MoveHistorian: Analyzes movement patterns between device pairs - ProtocolHistorian: Analyzes protocol executions with similar parameters - GeneralHistorian: Fallback for arbitrary step types
Example: A movement from Hamilton_STAR to PlateReader will match historical movements between the same device pair and use the maximum observed duration.
Querying Historical Data
Query Process Steps
from platform_status_db.job_logs.models import ProcessStep, MoveStep
from django.db.models import Avg, Min, Max, Count
# Get all process steps
all_steps = ProcessStep.objects.all()
print(f"Total process steps logged: {all_steps.count()}")
# Get steps for a specific device
device_steps = ProcessStep.objects.filter(
executing_device__lara_name="PlateReader"
)
print(f"PlateReader operations: {device_steps.count()}")
# Calculate statistics
stats = device_steps.aggregate(
avg_duration=Avg('finish') - Avg('start'),
min_duration=Min('finish') - Min('start'),
max_duration=Max('finish') - Max('start'),
total=Count('id')
)
print(f"Statistics: {stats}")
# Get movement steps only
move_steps = MoveStep.objects.all()
for move in move_steps[:10]: # First 10 movements
duration = move.get_duration()
print(f"{move.origin} -> {move.destination}: {duration:.2f}s")
Query Experiments
from platform_status_db.job_logs.models import Experiment, ProcessStep
# Get all experiments
experiments = Experiment.objects.all()
for exp in experiments:
steps = ProcessStep.objects.filter(experiment=exp)
print(f"\nExperiment {exp.experiment_uuid}")
print(f" Process: {exp.process.name}")
print(f" Steps: {steps.count()}")
# Calculate total duration
if steps.exists():
first_step = steps.order_by('start').first()
last_step = steps.order_by('-finish').first()
if first_step and last_step:
total_duration = (last_step.finish - first_step.start).total_seconds()
print(f" Duration: {total_duration:.2f}s ({total_duration/60:.2f} min)")
Analyze Container History
from platform_status_db.job_logs.models import Container, ProcessStep
barcode = "SCREEN_001"
# Get container
container = Container.objects.get(barcode=barcode, removed=False)
# Get all steps involving this container
steps = ProcessStep.objects.filter(container=container).order_by('start')
print(f"History for container {barcode}:")
print(f" Starting position: {container.starting_pos}")
print(f" Current position: {container.current_pos}")
print(f"\nProcess steps:")
for step in steps:
duration = step.get_duration()
print(f" [{step.start.strftime('%H:%M:%S')}] {step.process_name} "
f"on {step.executing_device.lara_name} ({duration:.2f}s)")
Integration Patterns
Integration with Laborchestrator
Platform Status DB is designed to work seamlessly with laborchestrator:
from laborchestrator.orchestrator import Orchestrator
from platform_status_db.larastatus.status_db_implementation import StatusDBImplementation
# Create orchestrator with status DB
db = StatusDBImplementation()
orchestrator = Orchestrator(
config_file="lab_config.yaml",
status_db=db
)
# The orchestrator will automatically:
# - Track container movements
# - Log process steps
# - Update container states
# - Estimate durations
Custom Workflow Integration
from platform_status_db.larastatus.status_db_implementation import StatusDBImplementation
from laborchestrator import structures
from datetime import datetime
class WorkflowManager:
def __init__(self, db: StatusDBImplementation):
self.db = db
def run_assay_workflow(self, plate_barcode: str, assay_config: dict):
# Create experiment
process_uuid = self.db.add_process_to_db(
name=assay_config['name'],
src=str(assay_config)
)
experiment_uuid = self.db.create_experiment(process_uuid)
# Get container
container = self.db.get_cont_info_by_barcode(plate_barcode)
# Execute workflow steps
for step_config in assay_config['steps']:
step = self._create_step(step_config, container)
# Estimate duration
estimated = self.db.get_estimated_duration(step)
if estimated:
print(f"Estimated: {estimated:.2f}s")
# Execute step
step.start = datetime.now()
self._execute_step(step, container)
step.finish = datetime.now()
step.status = "completed"
# Log to database
self.db.safe_step_to_db(step, container, experiment_uuid)
# Update container info if needed
if isinstance(step, structures.MoveStep):
container = self.db.get_cont_info_by_barcode(plate_barcode)
return experiment_uuid
def _create_step(self, config: dict, container) -> structures.ProcessStep:
# Create step based on configuration
pass
def _execute_step(self, step: structures.ProcessStep, container):
# Execute on physical devices
pass
# Usage
db = StatusDBImplementation()
workflow_mgr = WorkflowManager(db)
assay_config = {
'name': 'Enzyme Assay',
'steps': [
{'type': 'dispense', 'device': 'Hamilton_STAR', 'volume': 50},
{'type': 'move', 'from': 'Hamilton_STAR', 'to': 'Incubator'},
{'type': 'incubate', 'device': 'Incubator', 'temp': 37, 'time': 3600},
{'type': 'move', 'from': 'Incubator', 'to': 'PlateReader'},
{'type': 'read', 'device': 'PlateReader', 'wavelength': 450},
]
}
experiment_uuid = workflow_mgr.run_assay_workflow("PLATE001", assay_config)
REST API Wrapper Example
Create a REST API around Platform Status DB:
from flask import Flask, jsonify, request
from platform_status_db.larastatus.status_db_implementation import StatusDBImplementation
from laborchestrator import structures
app = Flask(__name__)
db = StatusDBImplementation()
@app.route('/api/containers/<barcode>', methods=['GET'])
def get_container(barcode):
container = db.get_cont_info_by_barcode(barcode)
if container:
return jsonify({
'barcode': container.barcode,
'device': container.current_device,
'position': container.current_pos,
'lidded': container.lidded
})
return jsonify({'error': 'Container not found'}), 404
@app.route('/api/containers/<barcode>/move', methods=['POST'])
def move_container(barcode):
data = request.json
db.moved_container(
source_device=data['source_device'],
source_pos=data['source_pos'],
target_device=data['target_device'],
target_pos=data['target_pos'],
barcode=barcode
)
return jsonify({'status': 'success'})
@app.route('/api/devices/<device>/positions', methods=['GET'])
def get_positions(device):
positions = db.get_all_positions(device)
status = []
for pos in positions:
is_empty = db.position_empty(device, pos)
container = None if is_empty else db.get_container_at_position(device, pos)
status.append({
'position': pos,
'empty': is_empty,
'container': container.barcode if container else None
})
return jsonify(status)
if __name__ == '__main__':
app.run(debug=True, port=5000)
Performance Optimization
Bulk Operations
When adding many containers, use batch operations:
from django.db import transaction
from platform_status_db.job_logs.models import Container, Position
# Use transaction for atomic operations
with transaction.atomic():
for i in range(100):
position = Position.objects.get(device__lara_name="Storage", slot_number=i)
Container.objects.create(
current_pos=position,
starting_pos=position,
barcode=f"BATCH_{i:03d}",
lidded=True,
labware_uuid="00000000-0000-0000-0000-000000000000",
removed=False
)
Caching Container Locations
For frequently accessed containers:
from functools import lru_cache
from platform_status_db.larastatus.status_db_implementation import StatusDBImplementation
class CachedStatusDB:
def __init__(self):
self.db = StatusDBImplementation()
@lru_cache(maxsize=1000)
def get_container_location(self, barcode: str):
container = self.db.get_cont_info_by_barcode(barcode)
return (container.current_device, container.current_pos) if container else None
def moved_container(self, *args, **kwargs):
# Clear cache on movement
self.get_container_location.cache_clear()
return self.db.moved_container(*args, **kwargs)
Database Connection Pooling
For production deployments with high concurrency, configure connection pooling in settings.py:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'platform_status',
'CONN_MAX_AGE': 600, # Persistent connections
'OPTIONS': {
'connect_timeout': 10,
}
}
}
Best Practices
Process Step Logging
- Always set both
startandfinishtimes - Use meaningful step names
- Include relevant parameters in
datadict - Set appropriate
statusvalues - Log steps immediately after completion
Experiment Organization
- Create processes for reusable workflows
- Create new experiment for each run
- Associate all steps with experiment UUID
- Use descriptive process names
- Store process source/description for reproducibility
Duration Estimation
- Log sufficient historical data (at least 10 samples per operation type)
- Use consistent parameter structures for better matching
- Handle
Noneestimates gracefully - Add buffer time to estimates for scheduling
- Update estimates periodically as more data accumulates
Error Handling
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
try:
db.safe_step_to_db(step, container, experiment_uuid)
except Exception as e:
logging.error(f"Failed to log step: {e}")
# Continue workflow or retry
Next Steps
- API Reference: Complete method documentation
- Managing Devices: Device setup and configuration
- Managing Containers: Container tracking basics