Framework for creating laboratory instrument nodes that integrate with MADSci workcells via REST APIs.
Features¶
REST API server: Automatic FastAPI server generation with comprehensive OpenAPI documentation
Action system: Declarative action definitions with flexible return types and automatic validation
Advanced return types: Support for Pydantic models, files, JSON data, and datapoint IDs
File handling: Seamless file upload/download with automatic path management
State management: Periodic state polling and reporting
Event integration: Built-in logging to MADSci Event Manager
Resource integration: Access to MADSci Resource and Data Managers
Lifecycle management: Startup, shutdown, and error handling
Configuration: YAML-based node configuration and deployment
Installation¶
See the main README for installation options. This package is available as:
PyPI:
pip install madsci.node_moduleDocker: Included in
ghcr.io/ad-sdl/madsciExample nodes: See example
_lab /example _modules/ including the comprehensive advanced _example _module .py
Quick Start¶
1. Create a Node Class¶
from madsci.node_module import RestNode, action
from madsci.common.types.node_types import RestNodeConfig
from typing import Any
from pathlib import Path
from pydantic import BaseModel, Field
class MyInstrumentConfig(RestNodeConfig):
"""Configuration for your instrument."""
device_port: str = "/dev/ttyUSB0"
timeout: int = 30
class MyInstrumentNode(RestNode):
"""Node for controlling my laboratory instrument."""
config: MyInstrumentConfig = MyInstrumentConfig()
config_model = MyInstrumentConfig
def startup_handler(self) -> None:
"""Initialize device connection."""
# Connect to your instrument
self.device = MyDeviceInterface(port=self.config.device_port)
self.logger.log("Instrument initialized!")
def shutdown_handler(self) -> None:
"""Clean up device connection."""
if hasattr(self, 'device'):
self.device.disconnect()
def state_handler(self) -> dict[str, Any]:
"""Report current instrument state."""
if hasattr(self, 'device'):
self.node_state = {
"temperature": self.device.get_temperature(),
"status": self.device.get_status()
}
@action
def measure_sample(self, sample_id: str, duration: int = 60) -> dict[str, float]:
"""Measure a sample and return results directly as JSON."""
# Your instrument control logic here
result = self.device.measure(sample_id, duration)
return {"temperature": result.temp, "absorbance": result.abs}
@action
def run_protocol(self, protocol_file: Path) -> str:
"""Execute a protocol file and return datapoint ID."""
self.device.load_protocol(protocol_file)
results = self.device.run()
# Upload results and return datapoint ID
return self.create_and_upload_value_datapoint(
value=results,
label=f"protocol_results"
)
if __name__ == "__main__":
node = MyInstrumentNode()
node.start_node() # Starts REST server2. Create Node Definition¶
Create a YAML file (e.g., my_instrument.node.yaml):
node_name: my_instrument_1
node_id: 01JYKZDPANTNRYXF5TQKRJS0F2 # Generate with ulid
node_description: My laboratory instrument for sample analysis
node_type: device
module_name: my_instrument
module_version: 1.0.03. Run Your Node¶
# Run directly
python my_instrument_node.py
# Or with a pre-defined node
python my_instrument_node.py --node_definition my_instrument.node.yaml
# Node will be available at http://localhost:2000/docsCore Concepts¶
Actions¶
Actions are the primary interface for interacting with nodes. They support flexible return types:
from madsci.node_module import action
from pydantic import BaseModel
from pathlib import Path
# Return simple JSON data
@action
def get_temperature(self) -> float:
"""Get current temperature."""
return self.device.get_temperature()
# Return custom Pydantic models
class AnalysisResult(BaseModel):
sample_id: str
concentration: float
ph_level: float
@action
def analyze_sample(self, sample_id: str) -> AnalysisResult:
"""Analyze sample and return structured results."""
result = self.device.analyze(sample_id)
return AnalysisResult(
sample_id=sample_id,
concentration=result.conc,
ph_level=result.ph
)
# Return datapoint IDs for workflow integration
@action
def capture_data(self, location: str) -> str:
"""Capture data and return datapoint ID."""
data = self.device.capture(location)
return self.create_and_upload_value_datapoint(
value=data,
label=f"capture_{location}"
)
# Handle file operations
@action
def process_file(self, input_file: Path) -> Path:
"""Process file and return output file path."""
output_path = self.device.process_file(input_file)
return output_pathAction features:
Flexible return types: JSON data, Pydantic models, file paths, datapoint IDs
Automatic validation: Parameter and return value validation via type hints
File handling: Seamless file uploads/downloads with
PathparametersOpenAPI documentation: Comprehensive auto-generated API documentation
Type safety: Full type checking for complex nested data structures
Configuration¶
Node configuration using Pydantic settings:
from madsci.common.types.node_types import RestNodeConfig
from pydantic import Field
class MyNodeConfig(RestNodeConfig):
# Device-specific settings
device_ip: str = Field(description="Device IP address")
device_port: int = Field(default=502, description="Device port")
# Operational settings
measurement_timeout: int = Field(default=30, description="Timeout in seconds")
auto_calibrate: bool = Field(default=True, description="Enable auto-calibration")
# Advanced settings
retry_attempts: int = Field(default=3, ge=1, description="Number of retry attempts")Lifecycle Handlers¶
Manage node startup, shutdown, and state:
from madsci.node_module import RestNode
from typing import Any
class MyNode(RestNode):
def startup_handler(self) -> None:
"""Called on node initialization."""
# Initialize connections, load calibration, etc.
pass
def shutdown_handler(self) -> None:
"""Called on node shutdown."""
# Clean up resources, close connections, etc.
pass
def state_handler(self) -> dict[str, Any]:
"""Called periodically to update node state."""
self.node_state = {
"connected": self.device.is_connected(),
"ready": self.device.is_ready()
}Integration with MADSci Ecosystem¶
Nodes automatically integrate with other MADSci services:
from madsci.node_module import RestNode, action
class IntegratedNode(RestNode):
@action
def process_sample(self, sample_id: str) -> str:
# Get sample info from Resource Manager
sample = self.resource_client.get_resource(sample_id)
# Process sample
result = self.device.process(sample)
# Store results and return datapoint ID
datapoint_id = self.create_and_upload_value_datapoint(
value=result,
label=f"processing_result_{sample_id}"
)
# Log event
self.logger.log(f"Processed sample {sample_id}")
return datapoint_idReturn Type Options¶
Actions support multiple return patterns depending on your needs:
from madsci.node_module import RestNode, action
from typing import Any
from pathlib import Path
class MyNode(RestNode):
@action
def get_status(self) -> dict[str, Any]:
"""Return status directly as JSON for immediate use."""
return {"temperature": 25.0, "ready": True}
@action
def analyze_sample(self, sample_id: str) -> str:
"""Analyze sample and return datapoint ID for workflow storage."""
analysis_data = {"purity": 95.2, "concentration": 1.25}
return self.create_and_upload_value_datapoint(
value=analysis_data,
label=f"analysis_{sample_id}"
)
@action
def generate_report(self, data: dict) -> Path:
"""Generate report file and return file path."""
report_path = self.create_report(data)
return report_path # File automatically served via REST API
@action
def perform_measurement(self) -> None:
"""Perform action without returning data."""
self.device.calibrate()
# No return value neededChoose the appropriate return type based on how the data will be used in workflows.
Example Nodes¶
See complete working examples in example
liquidhandler.py: Liquid handling robot
platereader.py: Microplate reader
robotarm.py: Robotic arm
advanced
_example _module .py: Comprehensive example showcasing advanced features
Deployment¶
Docker Deployment¶
Basic Dockerfile¶
FROM ghcr.io/ad-sdl/madsci:latest
# Copy node implementation and configuration
COPY my_instrument_node.py /app/
COPY my_instrument.node.yaml /app/
COPY requirements.txt /app/
WORKDIR /app
# Install additional dependencies if needed
RUN pip install -r requirements.txt
# Create directory for data files
RUN mkdir -p /app/data
# Expose the node port
EXPOSE 2000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:2000/health || exit 1
# Run the node
CMD ["python", "my_instrument_node.py"]Multi-stage Docker Build¶
# Build stage
FROM python:3.10-slim as builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --user -r requirements.txt
# Runtime stage
FROM ghcr.io/ad-sdl/madsci:latest
COPY --from=builder /root/.local /root/.local
COPY my_instrument_node.py /app/
COPY my_instrument.node.yaml /app/
WORKDIR /app
EXPOSE 2000
ENV PATH=/root/.local/bin:$PATH
CMD ["python", "my_instrument_node.py"]Docker Compose for Development¶
version: '3.8'
services:
my-instrument:
build: .
ports:
- "2000:2000"
environment:
# Node configuration via environment variables
- NODE_NAME=my_instrument_dev
- DEVICE_PORT=/dev/ttyUSB0
- LOG_LEVEL=DEBUG
- NODE_URL=http://localhost:2000
# MADSci service URLs (for local development)
- EVENT_MANAGER_URL=http://localhost:8001
- DATA_MANAGER_URL=http://localhost:8004
- RESOURCE_MANAGER_URL=http://localhost:8003
volumes:
# Mount device (for hardware access)
- /dev/ttyUSB0:/dev/ttyUSB0
# Mount data directory
- ./data:/app/data
# Mount config for hot-reload during development
- ./my_instrument.node.yaml:/app/my_instrument.node.yaml
depends_on:
- event-manager
- data-manager
- resource-manager
restart: unless-stopped
# MADSci core services for development
event-manager:
image: ghcr.io/ad-sdl/madsci:latest
command: ["python", "-m", "madsci.event_manager"]
ports:
- "8001:8001"
environment:
- LOG_LEVEL=DEBUG
data-manager:
image: ghcr.io/ad-sdl/madsci:latest
command: ["python", "-m", "madsci.data_manager"]
ports:
- "8004:8004"
environment:
- LOG_LEVEL=DEBUG
resource-manager:
image: ghcr.io/ad-sdl/madsci:latest
command: ["python", "-m", "madsci.resource_manager"]
ports:
- "8003:8003"
environment:
- LOG_LEVEL=DEBUGKubernetes Deployment¶
Basic Deployment¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-instrument-node
labels:
app: my-instrument-node
spec:
replicas: 1
selector:
matchLabels:
app: my-instrument-node
template:
metadata:
labels:
app: my-instrument-node
spec:
containers:
- name: my-instrument-node
image: my-registry/my-instrument-node:latest
ports:
- containerPort: 2000
env:
- name: NODE_NAME
value: "my_instrument_prod"
- name: LOG_LEVEL
value: "INFO"
- name: EVENT_MANAGER_URL
value: "http://event-manager:8001"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 2000
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 2000
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: my-instrument-node
spec:
selector:
app: my-instrument-node
ports:
- protocol: TCP
port: 80
targetPort: 2000
type: ClusterIPConfigMap for Node Configuration¶
apiVersion: v1
kind: ConfigMap
metadata:
name: my-instrument-config
data:
my_instrument.node.yaml: |
node_name: my_instrument_prod
node_id: 01JYKZDPANTNRYXF5TQKRJS0F2
node_description: Production instrument for sample analysis
node_type: device
module_name: my_instrument
module_version: 1.0.0
actions:
- name: measure_sample
description: Measure a sample
parameters:
sample_id: str
duration: int
returns: dict
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-instrument-node
spec:
template:
spec:
containers:
- name: my-instrument-node
image: my-registry/my-instrument-node:latest
volumeMounts:
- name: config-volume
mountPath: /app/my_instrument.node.yaml
subPath: my_instrument.node.yaml
volumes:
- name: config-volume
configMap:
name: my-instrument-configEnvironment Configuration¶
Environment Variables¶
All node configuration can be controlled via environment variables:
# Core node settings
export NODE_NAME="my_instrument_1"
export NODE_ID="01JYKZDPANTNRYXF5TQKRJS0F2"
export NODE_URL="http://localhost:2000"
export NODE_PORT="2000"
# Device-specific settings (custom configuration)
export DEVICE_PORT="/dev/ttyUSB0"
export DEVICE_TIMEOUT="30"
export DEVICE_BAUDRATE="9600"
# MADSci service endpoints
export EVENT_MANAGER_URL="http://localhost:8001"
export DATA_MANAGER_URL="http://localhost:8004"
export RESOURCE_MANAGER_URL="http://localhost:8003"
export LOCATION_MANAGER_URL="http://localhost:8006"
# Logging configuration
export LOG_LEVEL="INFO"
export LOG_FORMAT="json"
# Security settings (if applicable)
export API_KEY="your-api-key"
export TLS_CERT_PATH="/etc/ssl/certs/node.crt"
export TLS_KEY_PATH="/etc/ssl/private/node.key"
# Start the node
python my_instrument_node.pyConfiguration Files¶
Create environment-specific configuration files:
# config/development.py
from madsci.common.types.node_types import RestNodeConfig
class DevelopmentConfig(RestNodeConfig):
device_port: str = "/dev/mock"
log_level: str = "DEBUG"
event_manager_url: str = "http://localhost:8001"
enable_debug_endpoints: bool = True
# config/production.py
class ProductionConfig(RestNodeConfig):
device_port: str = "/dev/ttyUSB0"
log_level: str = "INFO"
event_manager_url: str = "http://event-manager:8001"
enable_debug_endpoints: bool = False
tls_enabled: bool = True
# my_instrument_node.py
import os
from config.development import DevelopmentConfig
from config.production import ProductionConfig
# Select configuration based on environment
if os.getenv("ENVIRONMENT", "development") == "production":
config_class = ProductionConfig
else:
config_class = DevelopmentConfig
class MyInstrumentNode(RestNode):
config_model = config_classIntegration with Workcells¶
Static Configuration¶
# workcell.yaml
nodes:
my_instrument_1: "http://my-instrument:2000"
backup_instrument: "http://backup-instrument:2000"
workflows:
- name: sample_analysis
steps:
- node: my_instrument_1
action: measure_sample
parameters:
sample_id: "${sample_id}"
duration: 60Dynamic Discovery¶
# workcell_manager.py
from madsci.workcell_manager import WorkcellManager
class MyWorkcell(WorkcellManager):
def discover_nodes(self):
"""Automatically discover available nodes."""
nodes = {}
# Check for nodes on network
for ip in self.get_network_ips():
try:
client = RestNodeClient(f"http://{ip}:2000")
definition = client.get_definition()
nodes[definition.node_name] = f"http://{ip}:2000"
except:
continue
return nodesProduction Deployment Best Practices¶
Security¶
# Secure node configuration
class SecureNodeConfig(RestNodeConfig):
# Enable TLS
tls_enabled: bool = True
tls_cert_path: str = "/etc/ssl/certs/node.crt"
tls_key_path: str = "/etc/ssl/private/node.key"
# API authentication
api_key_required: bool = True
api_key_header: str = "X-API-Key"
# Rate limiting
rate_limit_enabled: bool = True
rate_limit_requests_per_minute: int = 60
class SecureNode(RestNode):
config_model = SecureNodeConfig
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.config.tls_enabled:
# Configure TLS
import ssl
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(
self.config.tls_cert_path,
self.config.tls_key_path
)
if self.config.api_key_required:
# Add API key middleware
from fastapi import Depends, HTTPException, status
def verify_api_key(api_key: str = Header(alias=self.config.api_key_header)):
if api_key != os.getenv("API_KEY"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key"
)
# Apply to all action endpoints
for route in self.app.routes:
if hasattr(route, 'dependencies'):
route.dependencies.append(Depends(verify_api_key))Monitoring and Observability¶
from madsci.node_module import RestNode
import time
from prometheus_client import Counter, Histogram, generate_latest
# Metrics collection
action_counter = Counter('node_actions_total', 'Total actions executed', ['action_name', 'status'])
action_duration = Histogram('node_action_duration_seconds', 'Action execution time', ['action_name'])
class MonitoredNode(RestNode):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Add metrics endpoint
@self.app.get("/metrics")
async def metrics():
return generate_latest()
def execute_action_with_monitoring(self, action_name: str, func, *args, **kwargs):
"""Wrap action execution with monitoring."""
start_time = time.time()
status = "success"
try:
result = func(*args, **kwargs)
return result
except Exception as e:
status = "error"
raise
finally:
# Record metrics
duration = time.time() - start_time
action_counter.labels(action_name=action_name, status=status).inc()
action_duration.labels(action_name=action_name).observe(duration)Testing Your Node¶
Manual Testing with REST Client¶
from madsci.client.node.rest_node_client import RestNodeClient
client = RestNodeClient("http://localhost:2000")
# Check node status
status = client.get_status()
print(f"Node status: {status}")
# Execute actions
result = client.execute_action("measure_sample", {
"sample_id": "sample_001",
"duration": 120
})
print(f"Measurement result: {result}")
# Test file operations
with open("test_protocol.json", "w") as f:
f.write('{"steps": ["prepare", "measure", "analyze"]}')
file_result = client.execute_action("run_protocol", {
"protocol_file": "test_protocol.json"
})
print(f"Protocol result datapoint ID: {file_result}")Unit Testing with pytest¶
Create a test file test_my_instrument_node.py:
import pytest
from unittest.mock import Mock, patch
from pathlib import Path
from my_instrument_node import MyInstrumentNode, MyInstrumentConfig
@pytest.fixture
def node_config():
"""Create test configuration."""
return MyInstrumentConfig(
device_port="/dev/ttyUSB0",
timeout=10
)
@pytest.fixture
def mock_device():
"""Create mock device interface."""
device = Mock()
device.get_temperature.return_value = 25.0
device.get_status.return_value = "ready"
device.measure.return_value = Mock(temp=24.5, abs=0.85)
return device
@pytest.fixture
def node(node_config, mock_device):
"""Create test node instance."""
with patch('my_instrument_node.MyDeviceInterface', return_value=mock_device):
node = MyInstrumentNode()
node.config = node_config
node.startup_handler()
yield node
def test_startup_handler(node, mock_device):
"""Test node initialization."""
assert hasattr(node, 'device')
assert node.device is mock_device
def test_state_handler(node, mock_device):
"""Test state reporting."""
node.state_handler()
assert "temperature" in node.node_state
assert "status" in node.node_state
assert node.node_state["temperature"] == 25.0
assert node.node_state["status"] == "ready"
def test_measure_sample_action(node, mock_device):
"""Test sample measurement action."""
result = node.measure_sample("sample_001", 60)
# Verify device was called correctly
mock_device.measure.assert_called_once_with("sample_001", 60)
# Verify return values
assert "temperature" in result
assert "absorbance" in result
assert result["temperature"] == 24.5
assert result["absorbance"] == 0.85
def test_run_protocol_action(node, mock_device, tmp_path):
"""Test protocol execution action."""
# Create test protocol file
protocol_file = tmp_path / "test_protocol.json"
protocol_file.write_text('{"steps": ["prepare", "measure"]}')
# Mock device protocol execution
mock_device.load_protocol.return_value = None
mock_device.run.return_value = {"result": "success"}
# Mock datapoint creation
with patch.object(node, 'create_and_upload_value_datapoint', return_value="01ABCD1234"):
result = node.run_protocol(protocol_file)
# Verify device calls
mock_device.load_protocol.assert_called_once_with(protocol_file)
mock_device.run.assert_called_once()
# Verify datapoint creation
node.create_and_upload_value_datapoint.assert_called_once_with(
value={"result": "success"},
label="protocol_results"
)
# Verify return value
assert result == "01ABCD1234"
def test_shutdown_handler(node, mock_device):
"""Test cleanup on shutdown."""
mock_device.disconnect = Mock()
node.shutdown_handler()
mock_device.disconnect.assert_called_once()
@pytest.mark.asyncio
async def test_rest_api_integration(node):
"""Test REST API endpoints."""
from fastapi.testclient import TestClient
# Create test client
client = TestClient(node.app)
# Test health endpoint
response = client.get("/health")
assert response.status_code == 200
# Test node definition
response = client.get("/definition")
assert response.status_code == 200
# Test action execution
response = client.post("/actions/measure_sample", json={
"sample_id": "test_sample",
"duration": 30
})
assert response.status_code == 200
assert "temperature" in response.json()
assert "absorbance" in response.json()Integration Testing with Docker¶
Create a docker-compose.test.yml for integration testing:
version: '3.8'
services:
my-instrument-node:
build: .
ports:
- "2000:2000"
environment:
- DEVICE_PORT=/dev/mock
- LOG_LEVEL=DEBUG
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:2000/health"]
interval: 30s
timeout: 10s
retries: 3
integration-test:
image: python:3.10
depends_on:
- my-instrument-node
volumes:
- ./tests:/tests
command: |
bash -c "
pip install pytest requests
python -m pytest /tests/test_integration.py -v
"Run integration tests:
# Start services and run tests
docker-compose -f docker-compose.test.yml up --build --abort-on-container-exit
# Run specific test
docker-compose -f docker-compose.test.yml run integration-test pytest /tests/test_integration.py::test_node_communication -vPerformance Testing¶
import time
import concurrent.futures
from madsci.client.node.rest_node_client import RestNodeClient
def test_concurrent_actions():
"""Test node performance under concurrent load."""
client = RestNodeClient("http://localhost:2000")
def execute_action(sample_id):
start_time = time.time()
result = client.execute_action("measure_sample", {
"sample_id": f"sample_{sample_id}",
"duration": 5
})
end_time = time.time()
return end_time - start_time, result
# Execute 10 concurrent actions
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(execute_action, i) for i in range(10)]
results = [future.result() for future in futures]
# Analyze performance
execution_times = [result[0] for result in results]
avg_time = sum(execution_times) / len(execution_times)
max_time = max(execution_times)
print(f"Average execution time: {avg_time:.2f}s")
print(f"Maximum execution time: {max_time:.2f}s")
# Assert performance requirements
assert avg_time < 2.0, f"Average execution time too high: {avg_time}s"
assert max_time < 5.0, f"Maximum execution time too high: {max_time}s"Running Tests¶
# Run all tests
pytest tests/ -v
# Run specific test file
pytest tests/test_my_instrument_node.py -v
# Run with coverage
pytest tests/ --cov=my_instrument_node --cov-report=html
# Run integration tests only
pytest tests/ -m integration
# Run performance tests
pytest tests/ -m performance --timeout=60Advanced Features¶
Error Handling¶
Basic Exception Handling¶
from madsci.node_module import RestNode, action
from typing import Dict, Any
import logging
class MyNode(RestNode):
@action
def risky_action(self, param: str) -> Dict[str, float]:
"""Actions can raise exceptions for error handling."""
try:
result = self.device.risky_operation(param)
return {"result": result}
except DeviceError as e:
# Log the error for debugging
self.logger.error(f"Device error in risky_action: {e}")
# Exceptions are automatically converted to HTTP error responses
raise RuntimeError(f"Device operation failed: {e}")
except ValueError as e:
# Handle parameter validation errors
raise ValueError(f"Invalid parameter '{param}': {e}")
except Exception as e:
# Catch-all for unexpected errors
self.logger.error(f"Unexpected error in risky_action: {e}")
raise RuntimeError("An unexpected error occurred during operation")Custom Exception Classes¶
from madsci.node_module import RestNode, action
class InstrumentError(Exception):
"""Base exception for instrument-related errors."""
pass
class CalibrationError(InstrumentError):
"""Raised when calibration fails."""
pass
class CommunicationError(InstrumentError):
"""Raised when device communication fails."""
pass
class MyNode(RestNode):
@action
def calibrate_instrument(self) -> Dict[str, Any]:
"""Calibrate instrument with specific error handling."""
try:
# Attempt calibration
if not self.device.is_connected():
raise CommunicationError("Device not connected")
calibration_result = self.device.calibrate()
if not calibration_result.success:
raise CalibrationError(f"Calibration failed: {calibration_result.error}")
return {
"status": "success",
"calibration_value": calibration_result.value,
"timestamp": calibration_result.timestamp
}
except CommunicationError as e:
self.logger.error(f"Communication error during calibration: {e}")
raise RuntimeError(f"Device communication error: {e}")
except CalibrationError as e:
self.logger.warning(f"Calibration failed: {e}")
raise RuntimeError(f"Calibration error: {e}")
except Exception as e:
self.logger.error(f"Unexpected calibration error: {e}")
raise RuntimeError("Calibration failed due to unexpected error")Validation and Input Checking¶
from madsci.node_module import RestNode, action
from pydantic import validator, Field
from typing import Dict, Any
class MyNode(RestNode):
@action
def set_temperature(self, temperature: float, unit: str = "celsius") -> Dict[str, Any]:
"""Set instrument temperature with validation."""
# Input validation
if unit not in ["celsius", "fahrenheit", "kelvin"]:
raise ValueError(f"Invalid temperature unit: {unit}. Must be celsius, fahrenheit, or kelvin")
# Convert to celsius for internal use
if unit == "fahrenheit":
celsius_temp = (temperature - 32) * 5/9
elif unit == "kelvin":
celsius_temp = temperature - 273.15
else:
celsius_temp = temperature
# Range validation
if celsius_temp < -50 or celsius_temp > 200:
raise ValueError(f"Temperature {celsius_temp}°C out of range (-50°C to 200°C)")
try:
# Attempt to set temperature
self.device.set_temperature(celsius_temp)
# Verify setting was applied
actual_temp = self.device.get_temperature()
if abs(actual_temp - celsius_temp) > 0.5:
raise RuntimeError(f"Temperature setting failed. Set: {celsius_temp}°C, Got: {actual_temp}°C")
return {
"status": "success",
"set_temperature": celsius_temp,
"actual_temperature": actual_temp,
"unit": "celsius"
}
except Exception as e:
self.logger.error(f"Failed to set temperature: {e}")
raise RuntimeError(f"Temperature setting failed: {e}")Timeout and Retry Patterns¶
import time
import asyncio
from functools import wraps
from madsci.node_module import RestNode, action
def with_timeout(timeout_seconds: int):
"""Decorator to add timeout to actions."""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
start_time = time.time()
try:
result = func(self, *args, **kwargs)
elapsed = time.time() - start_time
if elapsed > timeout_seconds:
raise TimeoutError(f"Action '{func.__name__}' exceeded timeout of {timeout_seconds}s")
return result
except Exception as e:
elapsed = time.time() - start_time
self.logger.error(f"Action '{func.__name__}' failed after {elapsed:.2f}s: {e}")
raise
return wrapper
return decorator
def with_retry(max_attempts: int, delay_seconds: float = 1.0):
"""Decorator to add retry logic to actions."""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(self, *args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
self.logger.warning(f"Action '{func.__name__}' attempt {attempt + 1} failed: {e}. Retrying in {delay_seconds}s...")
time.sleep(delay_seconds)
else:
self.logger.error(f"Action '{func.__name__}' failed after {max_attempts} attempts")
raise last_exception
return wrapper
return decorator
class MyNode(RestNode):
@action
@with_timeout(30)
@with_retry(3, delay_seconds=2.0)
def measure_with_retry(self, sample_id: str) -> Dict[str, Any]:
"""Measure sample with timeout and retry logic."""
try:
# Simulate potentially failing measurement
result = self.device.measure(sample_id)
if result is None:
raise RuntimeError("Measurement returned no data")
return {
"sample_id": sample_id,
"measurement": result,
"timestamp": time.time()
}
except Exception as e:
# Log the specific attempt failure
self.logger.debug(f"Measurement attempt failed for {sample_id}: {e}")
raise # Re-raise for retry logicError Response Customization¶
from madsci.node_module import RestNode
from fastapi import HTTPException
from fastapi.responses import JSONResponse
class MyNode(RestNode):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Add custom exception handler
@self.app.exception_handler(InstrumentError)
async def instrument_error_handler(request, exc):
return JSONResponse(
status_code=500,
content={
"error": "InstrumentError",
"message": str(exc),
"node_id": self.node_id,
"timestamp": time.time()
}
)
@self.app.exception_handler(ValueError)
async def validation_error_handler(request, exc):
return JSONResponse(
status_code=400,
content={
"error": "ValidationError",
"message": str(exc),
"node_id": self.node_id
}
)Health Check with Error Reporting¶
from madsci.node_module import RestNode
from typing import Dict, Any
class MyNode(RestNode):
def get_health_status(self) -> Dict[str, Any]:
"""Override health status to include error information."""
base_health = super().get_health_status()
# Add custom health checks
errors = []
warnings = []
try:
# Check device connection
if not self.device.is_connected():
errors.append("Device not connected")
except Exception as e:
errors.append(f"Device check failed: {e}")
try:
# Check temperature within range
temp = self.device.get_temperature()
if temp > 100:
warnings.append(f"High temperature detected: {temp}°C")
except Exception as e:
warnings.append(f"Temperature check failed: {e}")
# Add error information to health status
base_health.update({
"errors": errors,
"warnings": warnings,
"error_count": len(errors),
"warning_count": len(warnings),
"status": "error" if errors else "warning" if warnings else "healthy"
})
return base_healthFile Handling¶
from madsci.node_module import action
from pathlib import Path
@action
def process_file(self, file_input: Path, output_dir: Path = None) -> Path:
"""Process uploaded file and return output file."""
# file_input is automatically handled as file upload
# Process the file
processed_data = self.device.process_file(file_input)
# Save processed file
output_path = output_dir / "result.csv" if output_dir else Path("result.csv")
processed_data.to_csv(output_path)
# Return file path - file is automatically served
return output_path
@action
def get_multiple_files(self) -> list[Path]:
"""Return multiple files."""
files = self.device.generate_reports()
return files # All files automatically servedWorking examples: See example_lab/ for a complete working laboratory with multiple integrated nodes.
Development Steps¶
In order to ensure a MADSci module is up to standard and will run reliably, developers should:
Document expected behavior for each action exposed by the Node, and then test it on the device to ensure it behaves as documented
Ensure that the module folder contains no extraneous files unused by the code.
Ensure that the code passes automated linting and code quality checks (we use pre-commit and ruff, for instance. See our pre-commit config and ruff config.
Where appropriate, write a Docker image for the module and ensure that it builds and runs.