Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Module madsci.client.workcell_client

Client for performing workcell actions

Classes

WorkcellClient(workcell_server_url: Optional[Union[str, AnyUrl]] = None, working_directory: str = './', event_client: Optional[EventClient] = None, config: Optional[WorkcellClientConfig] = None)

A client for interacting with the Workcell Manager to perform various actions.

Initialize the WorkcellClient.

Parameters

workcell_server_url : Optional[Union[str, AnyUrl]] The base URL of the Workcell Manager. If not provided, it will be taken from the current MadsciContext. working_directory : str, optional The directory to look for relative paths. Defaults to “./”. event_client : Optional[EventClient], optional Event client for logging. If not provided, a new one will be created. config : Optional[WorkcellClientConfig], optional Client configuration for retry strategies, timeouts, and connection pooling. If not provided, uses default WorkcellClientConfig settings.

Ancestors (in MRO)

  • madsci.client.http.DualModeClientMixin

Class variables

workcell_server_url: Optional[AnyUrl] :

Instance variables

session: httpx.Client
Backward-compatible accessor for the underlying HTTP client.

Methods

add_node(self, node_name: str, node_url: str, node_description: str = 'A Node', permanent: bool = False, timeout: Optional[float] = None) ‑> madsci.common.types.node_types.Node

Add a node to the workcell.

Parameters

node_name : str The name of the node. node_url : str The URL of the node. node_description : str, optional A description of the node, by default “A Node”. permanent : bool, optional If True, add the node permanently, by default False. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Node The added node details.

async_cancel_workflow(self, workflow_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Cancel a workflow asynchronously.

async_get_active_workflows(self, timeout: Optional[float] = None) ‑> dict[str, madsci.common.types.workflow_types.Workflow]

Get all active workflows asynchronously.

async_get_archived_workflows(self, number: int = 20, timeout: Optional[float] = None) ‑> dict[str, madsci.common.types.workflow_types.Workflow]

Get archived workflows asynchronously.

async_get_node(self, node_name: str, timeout: Optional[float] = None) ‑> madsci.common.types.node_types.Node

Get details of a specific node asynchronously.

async_get_nodes(self, timeout: Optional[float] = None) ‑> dict[str, madsci.common.types.node_types.Node]

Get all nodes in the workcell asynchronously.

async_get_workcell_state(self, timeout: Optional[float] = None) ‑> madsci.common.types.workcell_types.WorkcellState

Get the full state of the workcell asynchronously.

async_get_workflow_definition(self, workflow_definition_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.WorkflowDefinition

Get the definition of a workflow asynchronously.

async_get_workflow_queue(self, timeout: Optional[float] = None) ‑> list[madsci.common.types.workflow_types.Workflow]

Get the workflow queue asynchronously.

async_pause_workflow(self, workflow_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Pause a workflow asynchronously.

async_query_workflow(self, workflow_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow | None

Check the status of a workflow using its ID asynchronously.

async_resubmit_workflow(self, workflow_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Resubmit a workflow as a brand new workflow run asynchronously.

Parameters

workflow_id : str The ID of the workflow to resubmit. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Workflow The new workflow object.

async_resume_workflow(self, workflow_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Resume a paused workflow asynchronously.

async_retry_workflow(self, workflow_id: str, index: Optional[int] = None, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Retry a workflow from a specific step asynchronously.

Parameters

workflow_id : str The ID of the workflow to retry. index : Optional[int] The step index to retry from. If not provided, retries from the current step. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Workflow The retried workflow object.

await_workflow(self, workflow_id: str, prompt_on_error: bool = True, raise_on_failed: bool = True, raise_on_cancelled: bool = True, query_frequency: float = 2.0, display_mode: DisplayMode = 'auto') ‑> madsci.common.types.workflow_types.Workflow

Wait for a workflow to complete.

Parameters

workflow_id : str The ID of the workflow to wait for. prompt_on_error : bool, optional If True, prompt the user for action on workflow errors, by default True. raise_on_failed : bool, optional If True, raise an exception if the workflow fails, by default True. raise_on_cancelled : bool, optional If True, raise an exception if the workflow is cancelled, by default True. query_frequency : float, optional How often to query the workflow status in seconds, by default 2.0. display_mode : DisplayMode, optional Display backend: “auto” (detect), “rich”, “jupyter”, or “plain”.

Returns

Workflow The completed workflow object.

cancel_workflow(self, workflow_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Cancel a workflow.

Parameters

workflow_id : str The ID of the workflow to cancel. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Workflow The cancelled workflow object.

close(self) ‑> None

Close HTTP clients and embedded logger.

get_active_workflows(self, timeout: Optional[float] = None) ‑> dict[str, madsci.common.types.workflow_types.Workflow]

Get all workflows from the Workcell Manager.

Parameters

timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the long operations timeout from config.

Returns

dict[str, Workflow] A dictionary of workflow IDs and their details.

get_archived_workflows(self, number: int = 20, timeout: Optional[float] = None) ‑> dict[str, madsci.common.types.workflow_types.Workflow]

Get all workflows from the Workcell Manager.

Parameters

number : int Number of archived workflows to retrieve. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the long operations timeout from config.

Returns

dict[str, Workflow] A dictionary of workflow IDs and their details.

get_node(self, node_name: str, timeout: Optional[float] = None) ‑> madsci.common.types.node_types.Node

Get details of a specific node.

Parameters

node_name : str The name of the node. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Node The node details.

get_nodes(self, timeout: Optional[float] = None) ‑> dict[str, madsci.common.types.node_types.Node]

Get all nodes in the workcell.

Parameters

timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

dict[str, Node] A dictionary of node names and their details.

get_workcell_state(self, timeout: Optional[float] = None) ‑> madsci.common.types.workcell_types.WorkcellState

Get the full state of the workcell.

Parameters

timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

WorkcellState The current state of the workcell.

get_workflow_definition(self, workflow_definition_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.WorkflowDefinition

Get the definition of a workflow.

Parameters

workflow_definition_id : str The ID of the workflow definition to retrieve. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

WorkflowDefinition The workflow definition object.

get_workflow_queue(self, timeout: Optional[float] = None) ‑> list[madsci.common.types.workflow_types.Workflow]

Get the workflow queue from the workcell.

Parameters

timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

list[Workflow] A list of queued workflows.

make_paths_absolute(self, files: dict[str, PathLike]) ‑> dict[str, pathlib.Path]

Extract file paths from a workflow definition.

Parameters

files : dict[str, PathLike] A dictionary mapping unique file keys to their paths.

Returns

dict[str, Path] A dictionary mapping unique file keys to their paths.

pause_workflow(self, workflow_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Pause a workflow.

Parameters

workflow_id : str The ID of the workflow to pause. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Workflow The paused workflow object.

query_workflow(self, workflow_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow | None

Check the status of a workflow using its ID.

Parameters

workflow_id : str The ID of the workflow to query. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Optional[Workflow] The workflow object if found, otherwise None.

resubmit_workflow(self, workflow_id: str, await_completion: bool = True, raise_on_cancelled: bool = True, raise_on_failed: bool = True, prompt_on_error: bool = True, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Resubmit a workflow as a brand new workflow run with the same parameters.

Parameters

workflow_id : str The ID of the workflow to resubmit. await_completion : bool, optional If True, wait for the workflow to complete, by default True. raise_on_cancelled : bool, optional If True, raise an exception if the workflow is cancelled, by default True. raise_on_failed : bool, optional If True, raise an exception if the workflow fails, by default True. prompt_on_error : bool, optional If True, prompt the user for what action to take on workflow errors, by default True. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Workflow The new workflow object.

resume_workflow(self, workflow_id: str, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Resume a paused workflow.

Parameters

workflow_id : str The ID of the workflow to resume. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Workflow The resumed workflow object.

retry_workflow(self, workflow_id: str, index: int = 0, await_completion: bool = True, raise_on_cancelled: bool = True, raise_on_failed: bool = True, prompt_on_error: bool = True, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Retry a workflow from a specific step.

Parameters

workflow_id : str The ID of the workflow to retry. index : int, optional The step index to retry from, by default -1 (retry the entire workflow). await_completion : bool, optional If True, wait for the workflow to complete, by default True. raise_on_cancelled : bool, optional If True, raise an exception if the workflow is cancelled, by default True. raise_on_failed : bool, optional If True, raise an exception if the workflow fails, by default True. prompt_on_error : bool, optional If True, prompt the user for what action to take on workflow errors, by default True. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

dict The response from the Workcell Manager.

start_workflow(self, workflow_definition: Union[str, PathLike, WorkflowDefinition], json_inputs: Optional[dict[str, Any]] = None, file_inputs: Optional[dict[str, PathLike]] = None, await_completion: bool = True, prompt_on_error: bool = True, raise_on_failed: bool = True, raise_on_cancelled: bool = True, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Submit a workflow to the Workcell Manager.

Parameters

workflow_definition: Optional[Union[PathLike, WorkflowDefinition]], Either a workflow definition ID, WorkflowDefinition or a path to a YAML file of one. parameters: Optional[dict[str, Any]] = None, Parameters to be inserted into the workflow. validate_only : bool, optional If True, only validate the workflow without submitting, by default False. await_completion : bool, optional If True, wait for the workflow to complete, by default True. prompt_on_error : bool, optional If True, prompt the user for what action to take on workflow errors, by default True. raise_on_failed : bool, optional If True, raise an exception if the workflow fails, by default True. raise_on_cancelled : bool, optional If True, raise an exception if the workflow is cancelled, by default True. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Workflow The submitted workflow object.

submit_workflow(self, workflow_definition: Union[str, PathLike, WorkflowDefinition], json_inputs: Optional[dict[str, Any]] = None, file_inputs: Optional[dict[str, PathLike]] = None, await_completion: bool = True, prompt_on_error: bool = True, raise_on_failed: bool = True, raise_on_cancelled: bool = True, timeout: Optional[float] = None) ‑> madsci.common.types.workflow_types.Workflow

Submit a workflow to the Workcell Manager.

Parameters

workflow_definition: Optional[Union[PathLike, WorkflowDefinition]], Either a workflow definition ID, WorkflowDefinition or a path to a YAML file of one. parameters: Optional[dict[str, Any]] = None, Parameters to be inserted into the workflow. validate_only : bool, optional If True, only validate the workflow without submitting, by default False. await_completion : bool, optional If True, wait for the workflow to complete, by default True. prompt_on_error : bool, optional If True, prompt the user for what action to take on workflow errors, by default True. raise_on_failed : bool, optional If True, raise an exception if the workflow fails, by default True. raise_on_cancelled : bool, optional If True, raise an exception if the workflow is cancelled, by default True. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

Workflow The submitted workflow object.

submit_workflow_batch(self, workflows: list[str], json_inputs: list[dict[str, Any]] = [], file_inputs: list[dict[str, PathLike]] = []) ‑> list[madsci.common.types.workflow_types.Workflow]

Submit a batch of workflows to run concurrently.

Parameters

workflows : list[str] A list of workflow definitions in YAML format. parameters : list[dict[str, Any]] A list of parameter dictionaries for each workflow.

Returns

list[Workflow] A list of completed workflow objects.

submit_workflow_definition(self, workflow_definition: Union[PathLike, WorkflowDefinition], timeout: Optional[float] = None) ‑> str

Submit a workflow to the Workcell Manager.

Parameters

workflow_definition : Union[PathLike, WorkflowDefinition] The workflow definition to submit. timeout : Optional[float] Timeout in seconds for this request. If not provided, uses the default timeout from config.

Returns

str The ID of the submitted workflow.

submit_workflow_sequence(self, workflows: list[str], json_inputs: list[dict[str, Any]] = [], file_inputs: list[dict[str, PathLike]] = []) ‑> list[madsci.common.types.workflow_types.Workflow]

Submit a sequence of workflows to run in order.

Parameters

workflows : list[str] A list of workflow definitions in YAML format. parameters : list[dict[str, Any]] A list of parameter dictionaries for each workflow.

Returns

list[Workflow] A list of submitted workflow objects.