core

The PyRosettaCluster Python class facilitates scalable and reproducible job distribution of user-defined PyRosetta protocols efficiently parallelized on the user’s local workstation, high-performance computing (HPC) cluster, or elastic cloud computing infrastructure with available compute resources.

Warning: This class uses the pickle module to deserialize pickled Pose objects. Using the pickle module is not secure, so please only run with input files you trust. Learn more about the pickle module and its security here.

Args:
tasks: dict[str, Any] | Iterable[dict[str, Any]] | Callable[…, Iterable[dict[str, Any]]] | None

A list object of JSON-serializable dict objects, a callable returning an iterable of JSON-serializable dict objects, or a generator that yields JSON-serializable dict objects. During simulation execution, each task dictionary is automatically unpacked and collected by the variadic keyword parameter of each user-defined PyRosetta protocol; as a result, items may be accessed dynamically during PyRosetta protocol execution via the dictionary of keyword arguments, enabling dynamic control flow in PyRosetta protocols. In order to initialize PyRosetta with user-specified Rosetta command-line options at the start of each user-defined PyRosetta protocol, either or both of the “extra_options” and/or “options” keys may be defined in each task dictionary, where each value is either a str, list, or dict object representing the Rosetta command-line options.

Default: [{}]

nstruct: int

An int object specifying the number of repeats of the first user-defined PyRosetta protocol. The user can control the number of repeats of downstream PyRosetta protocols via returning multiple clones of any output decoys from any upstream PyRosetta protocols, or by cloning the input decoy multiple times inside a downstream PyRosetta protocol.

Default: 1

input_packed_pose: Pose | PackedPose | None

An input PackedPose object that is accessible via the first positional-or-keyword parameter of the first user-defined PyRosetta protocol.

Default: None

seeds: list[int] | None

A list of int objects specifying the PyRosetta pseudorandom number generator (RNG) seeds to use for each user-defined PyRosetta protocol. The length of the keyword argument value provided must be equal to the number of input user-defined PyRosetta protocols. Seeds are used in the same order that the user-defined PyRosetta protocols are executed.

Default: None

decoy_ids: list[int] | None

A list of int objects specifying the decoy identification numbers to keep after executing the user-defined PyRosetta protocols. User-defined PyRosetta protocols may return an iterable of Pose and/or PackedPose objects, or yield Pose and/or PackedPose objects. To reproduce a particular decoy produced via the chain of user-provided PyRosetta protocols, the decoy number to keep for each protocol may be specified, where other decoys are discarded. Decoy numbers use zero-based indexing, so 0 is the first decoy generated from a particular user-defined PyRosetta protocol. The length of the keyword argument value provided must be equal to the number of input user-defined PyRosetta protocols, so that one decoy is kept for each user-defined PyRosetta protocol. Decoy identification numbers are applied in the same order that the user-defined PyRosetta protocols are executed.

Default: None

client: distributed.Client | None

An initialized Dask distributed.Client object to be used as the Dask client interface to the local or remote Dask cluster. If None, then PyRosettaCluster initializes its own Dask client based on the scheduler keyword argument value. Deprecated by the clients keyword argument, but supported for legacy purposes. Either or both of the client or clients keyword argument values must be None.

Default: None

clients: list[distributed.Client] | tuple[distributed.Client, …] | None

A list or tuple object of initialized Dask distributed.Client objects to be used as the Dask client interface(s) to the local or remote Dask cluster(s). If None, then PyRosettaCluster initializes its own Dask client based on the scheduler keyword argument value. Optionally used in combination with the PyRosettaCluster.distribute(clients_indices=…) keyword argument. Either or both of the client or clients keyword argument values must be None. See the PyRosettaCluster.distribute method docstring for usage examples.

Default: None

scheduler: str | None

A str object of either “sge” or “slurm”, or None. If “sge”, then PyRosettaCluster schedules jobs using a SGECluster instance from the dask-jobqueue package. If “slurm”, then PyRosettaCluster schedules jobs using a SLURMCluster instance from the dask-jobqueue package. If None, then PyRosettaCluster schedules jobs using a distributed.LocalCluster instance. If client or clients keyword argument values are not None, then this keyword argument is ignored.

Default: None

cores: int | None

An int object specifying the total number of cores per job, which is passed to the dask_jobqueue.SLURMCluster(cores=…) or the dask_jobqueue.SGECluster(cores=…) keyword argument depending on the scheduler keyword argument value.

Default: 1

processes: int | None

An int object specifying the total number of processes per job, which is passed to the dask_jobqueue.SLURMCluster(processes=…) or the dask_jobqueue.SGECluster(processes=…) keyword argument depending on the scheduler keyword argument value. This feature determines how many Python processes each Dask worker job will run.

Default: 1

memory: str | None

A str object specifying the total amount of memory per job, which is input to the dask_jobqueue.SLURMCluster(memory=…) or the dask_jobqueue.SGECluster(memory=…) keyword argument depending on the scheduler keyword argument value.

Default: “4g”

scratch_dir: str | None

A str object specifying the absolute filesystem path to a scratch directory where temporary files may be written.

Default: “/temp” if it exists, otherwise the current working directory.

min_workers: int | None

An int object specifying the minimum number of workers to which to adapt during parallelization of user-defined PyRosetta protocols.

Default: 1

max_workers: int | None

An int object specifying the maximum number of workers to which to adapt during parallelization of user-defined PyRosetta protocols.

Default: 1000 if the number of user-defined task dictionaries passed to the tasks keyword argument value is <1000, otherwise the number of user-defined task dictionaries.

dashboard_address: str | None

A str object specifying the port over which the Dask dashboard is forwarded. Particularly useful for diagnosing PyRosettaCluster performance in real-time.

Default: “:8787”

project_name: str | None

A str object specifying the project name for this simulation. This keyword argument value is just added to the full simulation record for accounting purposes.

Default: datetime.now().strftime(“%Y.%m.%d.%H.%M.%S.%f”) if not specified, else “PyRosettaCluster” if None.

simulation_name: str | None

A str object specifying the particular name of this simulation. This keyword argument value is just added to the full simulation record for accounting purposes.

Default: project_name if not specified, else “PyRosettaCluster” if None

output_path: str | None

A str object specifying the absolute path of the output directory where the results will be written to disk. The directory will be created be created if it does not exist.

Default: “./outputs”

output_decoy_types: Iterable[str] | None

An iterable of str objects representing the output decoy filetypes to save during the simulation. Available options are: “.pdb” for PDB files; “.pkl_pose” for pickled Pose files; “.b64_pose” for Base64-encoded pickled Pose files; and “.init” for PyRosetta initialization files, each caching: the Rosetta command-line options (and PyRosetta initialization input files, if any) initialized on the head node, the input_packed_pose keyword argument value (if any), and an output decoy. Because each PyRosetta initialization file contains a copy of the PyRosetta initialization input files and input PackedPose object (if any), unless these objects are relatively small in size or there are relatively few expected output decoys, then it is recommended to run the pyrosetta.distributed.cluster.export_init_file function on only output decoys of interest after the simulation completes without specifying “.init” in this iterable. If the compressed keyword argument value is set to True, then each output decoy file is further compressed by the bzip2 library, and “.bz2” is automatically appended to the filename.

Default: [“.pdb”,]

output_scorefile_types: Iterable[str] | None

An iterable of str objects representing the output scorefile filetypes to save during the simulation. Available options are: “.json” for a JSON Lines (JSONL)-formatted scorefile, and any filename extensions accepted by the pandas.DataFrame.to_pickle(compression=”infer”) method (including “.gz”, “.bz2”, and “.xz”) for pickled pandas.DataFrame objects of scorefile data that may be analyzed using pyrosetta.distributed.cluster.io.secure_read_pickle(compression=”infer”). Note that in order to write pickled pandas.DataFrame objects to disk, please ensure that pyrosetta.secure_unpickle.add_secure_package(“pandas”) has first been run. If using pandas version >=3.0.0, PyArrow-backed datatypes may be enabled by default; in this case, please ensure that pyrosetta.secure_unpickle.add_secure_package(“pyarrow”) has also first been run.

See https://pandas.pydata.org/pdeps/0010-required-pyarrow-dependency.html and https://pandas.pydata.org/pdeps/0014-string-dtype.html for more information.

Default: [“.json”,]

scorefile_name: str | None

A str object specifying the name of the output JSON Lines (JSONL)-formatted scorefile, which must end in “.json”. The scorefile location is always `output_path` "/" `scorefile_name`. If “.json” is not in the output_scorefile_types keyword argument value, the JSONL-formatted scorefile will not be output, but other scorefile types (if any) will still get the same filename stem (i.e., before the “.json” extension).

Default: “scores.json”

simulation_records_in_scorefile: bool | None

A bool object specifying whether or not to write full simulation records to the output scorefile(s). If True, then write full simulation records to the output scorefile(s). This results in some redundant information in each entry, allowing downstream reproduction of a decoy of interest from the scorefile, but a larger scorefile storage footprint. If False, then write curtailed simulation records to the scorefile(s) containing only the Pose.cache dictionary data. This results in minimally redundant information in each entry, disallowing downstream reproduction of a decoy of interest from the output scorefile(s), but a smaller scorefile storage footprint. If False, also write the active Conda/Mamba/uv/Pixi environment configuration to a separate output file in the output_path keyword argument value for accounting purposes. Full simulation records are always written to the output decoy files, which may still be used to reproduce any decoy without the scorefile.

Default: False

decoy_dir_name: str | None

A str object specifying the directory name where the output decoy files will be saved. The directory location is always determined by: `output_path` "/" `decoy_dir_name`.

Default: “decoys”

logs_dir_name: str | None

A str object specifying the directory name where the output log files will be saved. The directory location is always determined by: `output_path` "/" `logs_dir_name`.

Default: “logs”

logging_level: str | None

A str object specifying the logging level of Python logging output to write to the log file. The available options are either: “NOTSET”, “DEBUG”, “INFO”, “WARNING”, “ERROR”, or “CRITICAL”. The output log file location is always determined by: `output_path` "/" `logs_dir_name` "/" `simulation_name`.

Default: “INFO”

logging_address: str

A str object specifying the socket endpoint for sending and receiving log messages across a network, so log messages from user-defined PyRosetta protocols may be written to a single log file on the head node. The str object must take the format of a socket address (i.e., “<host address>:<port number>”) where the <host address> is either an IP address, “localhost”, or Domain Name System (DNS)-accessible domain name, and the <port number> is a digit greater than or equal to “0”. If the port number is “0”, then the next free port number is selected.

Default: “localhost:0” if the scheduler keyword argument value is None or either the client or clients keyword argument values specify instances of distributed.LocalCluster, else “0.0.0.0:0”.

compressed: bool | None

A bool object specifying whether or not to compress the output decoy files and output PyRosetta initialization files using the bzip2 library, resulting in the appending of “.bz2” to output decoy files and PyRosetta initialization files. Also see the output_decoy_types and output_init_file keyword arguments.

Default: True

compression: str | bool | None

A str object of either “xz”, “zlib” or “bz2”, or a bool or None object representing the internal compression library for pickled Pose objects and user-defined task dictionaries. The default of True uses “xz” for compression if it is installed, otherwise resorts to “zlib” for compression.

Default: True

sha1: str | None

A str or None object specifying the Git commit SHA-1 hash string of the local Git repository defining the simulation. If a non-empty str object is provided, then it is validated to match the Git commit SHA-1 hash string of the most recent commit in the local Git repository checked out in the current working directory, and then it is added to the simulation record for accounting. If an empty string is provided, then ensure that everything in the current working directory is committed to the local Git repository. If None is provided, then bypass SHA-1 hash string validation and set this value to an empty string.

Default: “”

ignore_errors: bool | None

A bool object specifying whether or not to ignore raised Python exceptions and thrown Rosetta segmentation faults in the user-defined PyRosetta protocols. This comes in handy when well-defined errors are sparse and sporadic (such as rare Rosetta segmentation faults), and the user would like PyRosettaCluster to continue running without otherwise raising a WorkerError exception.

Default: False

timeout: float | int | None

A float or int object specifying how many seconds to wait between PyRosettaCluster checking-in on the running user-defined PyRosetta protocols. If each PyRosetta protocol is expected to run quickly, then 0.1 seconds seems reasonable. If each PyRosetta protocol is expected to run slowly, then >1 second seems reasonable.

Default: 0.5

max_delay_time: float | int | None

A float or int object specifying the maximum number of seconds to sleep before returning the result(s) from each user-defined PyRosetta protocol back to the Dask client on the head node. If a Dask worker returns the result(s) from a PyRosetta protocol too quickly, the Dask scheduler needs to first register that the task is processing before it completes. In practice, in each PyRosetta protocol the runtime is subtracted from the max_delay_time keyword argument value, and the Dask worker sleeps for the remainder of the time (if any) before returning the result(s). It is recommended to set this option to at least 1 second, but longer times may be used as a safety throttle in cases of overwhelmed Dask scheduler processes. Because spawning a billiard subprocess for PyRosetta protocol execution may take ~3–5 seconds already before the PyRosetta protocol executes, this feature usually does not have an effect with the default value.

Default: 3.0

filter_results: bool | None

A bool object specifying whether or not to filter out empty PackedPose objects between user-defined PyRosetta protocols. When a PyRosetta protocol returns or yields None, PyRosettaCluster converts it to an empty PackedPose object that gets bound to the first positional-or-keyword parameter of the next PyRosetta protocol. If True, then filter out any empty PackedPose objects where there are no residues in the conformation as given by PackedPose.empty(). If False, then continue to pass the empty PackedPose objects to the next PyRosetta protocol. This is used for filtering out decoys mid-trajectory in-between PyRosetta protocols if PyRosetta protocols return or yield any None, empty Pose, or empty PackedPose objects.

Default: True

save_all: bool | None

A bool object specifying whether or not to save all of the returned non-empty PackedPose objects from all user-defined PyRosetta protocols. This option may be used to checkpoint decoy trajectories after each PyRosetta protocol.

Default: False

dry_run: bool | None

A bool object specifying whether or not to save output decoy files to disk. If True, then do not write output decoy files to disk. This feature may be useful for debugging.

Default: False

norm_task_options: bool | None

A bool object specifying whether or not to normalize the task ‘options’ and ‘extra_options’ values after PyRosetta initialization on the remote compute cluster. If True, then this enables more facile simulation reproduction by the use of the ProtocolSettingsMetric SimpleMetric to normalize the PyRosetta initialization options and by relativization of any input files and directory paths to the current working directory from which the task is running.

Default: True

max_task_replicas: int | None

An int or None object specifying the replication factor of tasks on Dask workers within the network (only via Dask’s best effort). If an int object, the value must be greater than or equal to 0. If None, then attempt to replicate all tasks on each Dask worker. Tasks are automatically deleted from each Dask worker upon task completion. Task replication improves resilience of the simulation when compute resources executing tasks are preempted midway through a user-defined PyRosetta protocol (e.g., due to using cloud spot instances or cluster backfill queues), so scattered data can be recovered. If a Dask worker is preempted during task execution, then the number of task retries is controlled by the Dask configuration parameter distributed.scheduler.allowed-failures, which may be manually configured prior to the simulation. Dask worker memory limits may also need to be increased to achieve the desired replication factor (see memory keyword argument). Using task replicas requires that either Dask’s ReduceReplicas policy is disabled or that Dask’s entire Active Memory Manager (AMM) is disabled, since replicated tasks consume additional memory per Dask worker. Task size in memory is dominated by the input PackedPose object; a rough estimate of additional memory usage is ~1 MB/task for a 500 residue protein. Task retries are only appropriate when PyRosetta protocols are side effect-free upon preemption, wherein tasks can be restarted without producing inconsistent external states if preempted midway through a PyRosetta protocol.

See https://distributed.dask.org/en/stable/api.html#distributed.Client.replicate and https://docs.dask.org/en/stable/configuration.html for more information.

Default: 0

task_registry: str | None

A None object or str object of either “disk” or “memory”. If “disk” is provided, then write the task registry to disk. If “memory” is provided, then keep the task registry in memory on the head node process. Maintaining a task registry improves the resilience of the simulation when compute resources executing tasks are preempted midway through a user-defined PyRosetta protocol (e.g., due to using cloud spot instances or cluster backfill queues); if scattered data cannot be recovered (see max_task_replicas keyword argument), then the task will be automatically resubmitted using the task input arguments cached in the task registry. If “memory” is provided, then task input arguments consume memory on the head node process, which is appropriate with fewer tasks (e.g., debugging pipelines). If “disk” is provided, then task input arguments consume disk space (in the scratch_dir keyword argument value), which is appropriate for production simulations. Task size is dominated by the input PackedPose object; a rough estimate of additional disk or memory usage is ~1 MB/task for a 500 residue protein. Completed tasks are automatically deleted from the task registry upon task completion. If None is provided, then the task registry is not created, which is appropriate for non-preemptible compute resources. Task resubmissions are only appropriate when user-provided PyRosetta protocols are side effect-free upon preemption, wherein tasks can be restarted without producing inconsistent external states if preempted midway through a PyRosetta protocol.

Default: None

cooldown_time: float | int | None

A float or int object specifying how many seconds to sleep after the simulation is complete to allow loggers to flush. For very slow network filesystems, 2 or more seconds may be reasonable.

Default: 0.5

system_info: dict[Any, Any] | None

A dict or None object specifying the system information and/or extra simulation informatio required to reproduce the simulation. If None is provided, then PyRosettaCluster automatically detects the platform and sets this value as the dictionary {"sys.platform": `sys.platform`} (e.g., {“sys.platform”: “linux”}). If a dict object is provided, then validate that the “sys.platform” key has a value equal to the current sys.platform result, and log a warning message if not. System information such as Amazon Machine Image (AMI) identifier and compute fleet instance type identifier may be stored in this dictionary, but it is not automatically validated upon reproduction simulations. This extra simulation information is stored in the full simulation records for accounting.

Default: None

pyrosetta_build: str | None

A str or None object specifying the PyRosetta build signature as output by pyrosetta._build_signature(). If None is provided, then PyRosettaCluster automatically detects the PyRosetta build signature and sets this keyword argument value. If a non-empty str object is provided, then validate that the input PyRosetta build signature is equal to the active PyRosetta build signature, and raise an exception if not. This validation process ensures that reproduction simulations use an identical PyRosetta build signature from the original simulation. To bypass PyRosetta build signature validation with a warning message, an empty string (‘’) may be provided but does not assure reproducibility.

Default: None

security: bool | distributed.Security | None

A bool object or instance of distributed.Security, only having an effect if both client=None and clients=None, that is passed to Dask if using scheduler=None or passed to Dask-Jobqueue if using scheduler=”slurm” or scheduler=”sge”. If True is provided, then invoke the cryptography package to generate a distributed.Security.temporary object through Dask or Dask-Jobqueue. If a Dask distributed.Security object is provided, then pass it to Dask with scheduler=None, or pass it to Dask-Jobqueue with scheduler=”slurm” or scheduler=”sge” (where the shared_temp_directory keyword argument value of SLURMCluster or SGECluster is set to the output_path keyword argument value of PyRosettaCluster). If False is provided, then Dask TLS security is disabled regardless of the scheduler keyword argument value (which is not recommended for remote Dask clusters unless behind a trusted private network segment (i.e., a firewall). If None is provided, then True is used by default. In order to generate a distributed.Security object with the OpenSSL command-line interface, the pyrosetta.distributed.cluster.generate_dask_tls_security function may also be used (see docstring for more information) instead of the cryptography package.

See https://distributed.dask.org/en/latest/tls.html#distributed.security.Security.temporary for more information.

Default: False if scheduler=None, else True

max_nonce: int

An int object greater than or equal to 1 specifying the maximum number of nonces to cache per process if Dask TLS security is disabled while using remote Dask clusters, which protects against replay attacks. If nonce caching is in use, each process (including the head node process and all Dask worker processes) cache nonces upon communication exchange over the network, which can increase memory usage in each process. A rough estimate of additional memory usage is ~0.2 KB per task per user-defined PyRosetta protocol per process. For example, submitting 1000 tasks with 2 PyRosetta protocols adds (~0.2 KB/task/protocol × 1000 tasks × 2 protocols) = ~0.4 MB of additional memory per processs. If memory usage per process permits, it is recommended to set this value to at least the number of tasks times the number of protocols submitted, so that every nonce from every communication exchange over the network gets cached.

Default: 4096

environment: str | None

A None or str object specifying either the active Conda/Mamba environment YML file string, active uv project uv.lock file string, or active Pixi project pixi.lock file string. If None is provided, then generate an environment file string for the active Conda/Mamba/uv/Pixi environment and save it to the full simulation record. If a non-empty str object is provided, then validate it to match the active Conda/Mamba/uv/Pixi environment YML/lock file string and save it to the full simulation record. This ensures that reproduction simulations use an identical Conda/Mamba/uv/Pixi environment configuration to the original simulation. To bypass Conda/Mamba/uv/Pixi environment validation with a warning message, an empty string (‘’) may be provided, but does not assure reproducibility.

Default: None

author: str | None

A str object specifying the author(s) of the simulation that is written to the full simulation records and the output PyRosetta initialization file(s).

Default: “”

email: str | None

A str object specifying the email address(es) of the author(s) of the simulation that is written to the full simulation records and the output PyRosetta initialization file(s).

Default: “”

license: str | None

A str object specifying the license of the output data of the simulation that is written to the full simulation records and the output PyRosetta initialization file(s) (e.g., “ODC-ODbL”, “CC BY-ND”, “CDLA Permissive-2.0”, etc.).

Default: “”

output_init_file: str | None

A str object specifying the absolute path to the output PyRosetta initialization file that caches the input_packed_pose keyword argument value upon PyRosettaCluster instantiation. The file does not include any output decoys, and is optionally used for exporting PyRosetta initialization files with output decoys by the pyrosetta.distributed.cluster.export_init_file function after the simulation completes (see the output_decoy_types keyword argument). If None (or an empty str object (“”)) is provided, or dry_run keyword argument value is set to True, then skip writing an output “.init” file upon PyRosettaCluster instantiation. If skipped, it is recommended to run the pyrosetta.dump_init_file function before or after the simulation. If the compressed keyword argument value is set to True, then the output file is further compressed by the bzip2 library, and “.bz2” is automatically appended to the filename.

Default: `output_path` "/" `project_name` "_" `simulation_name` "_pyrosetta.init"

Returns:

A PyRosettaCluster instance.

class pyrosetta.distributed.cluster.core.PyRosettaCluster(*, tasks: Any = [{}], nstruct=1, input_packed_pose: Any = None, seeds: Optional[Any] = None, decoy_ids: Optional[Any] = None, client: Optional[Client] = None, clients: Optional[Union[List[Client], Tuple[Client, ...]]] = None, scheduler: Optional[str] = None, cores=1, processes=1, memory='4g', scratch_dir: Any = None, min_workers=1, max_workers=_Nothing.NOTHING, dashboard_address=':8787', project_name='2026.04.12.04.33.27.118564', simulation_name=_Nothing.NOTHING, output_path='/home/benchmark/rosetta/source/build/PyRosetta/Linux-5.4.0-84-generic-x86_64-with-glibc2.27/clang-6.0.0/python-3.11/minsizerel.serialization.thread/documentation/outputs', output_decoy_types: Any = None, output_scorefile_types: Any = None, scorefile_name='scores.json', simulation_records_in_scorefile=False, decoy_dir_name='decoys', logs_dir_name='logs', logging_level='INFO', logging_address: str = _Nothing.NOTHING, compressed=True, compression: Optional[Union[str, bool]] = True, sha1: Any = '', ignore_errors=False, timeout=0.5, max_delay_time=3.0, filter_results: Any = None, save_all=False, dry_run=False, norm_task_options: Any = None, max_task_replicas: Optional[int] = 0, task_registry: Optional[str] = None, cooldown_time=0.5, system_info: Any = None, pyrosetta_build: Any = None, security=_Nothing.NOTHING, max_nonce: int = 4096, environment: Any = None, author=None, email=None, license=None, output_init_file=_Nothing.NOTHING)

Bases: IO, LoggingSupport, SchedulerManager, SecurityIO, TaskBase

The PyRosettaCluster Python class facilitates scalable and reproducible job distribution of user-defined PyRosetta protocols efficiently parallelized on the user’s local workstation, high-performance computing (HPC) cluster, or elastic cloud computing infrastructure with available compute resources.

Warning: This class uses the pickle module to deserialize pickled Pose objects. Using the pickle module is not secure, so please only run with input files you trust. Learn more about the pickle module and its security here.

Args:
tasks: dict[str, Any] | Iterable[dict[str, Any]] | Callable[…, Iterable[dict[str, Any]]] | None

A list object of JSON-serializable dict objects, a callable returning an iterable of JSON-serializable dict objects, or a generator that yields JSON-serializable dict objects. During simulation execution, each task dictionary is automatically unpacked and collected by the variadic keyword parameter of each user-defined PyRosetta protocol; as a result, items may be accessed dynamically during PyRosetta protocol execution via the dictionary of keyword arguments, enabling dynamic control flow in PyRosetta protocols. In order to initialize PyRosetta with user-specified Rosetta command-line options at the start of each user-defined PyRosetta protocol, either or both of the “extra_options” and/or “options” keys may be defined in each task dictionary, where each value is either a str, list, or dict object representing the Rosetta command-line options.

Default: [{}]

nstruct: int

An int object specifying the number of repeats of the first user-defined PyRosetta protocol. The user can control the number of repeats of downstream PyRosetta protocols via returning multiple clones of any output decoys from any upstream PyRosetta protocols, or by cloning the input decoy multiple times inside a downstream PyRosetta protocol.

Default: 1

input_packed_pose: Pose | PackedPose | None

An input PackedPose object that is accessible via the first positional-or-keyword parameter of the first user-defined PyRosetta protocol.

Default: None

seeds: list[int] | None

A list of int objects specifying the PyRosetta pseudorandom number generator (RNG) seeds to use for each user-defined PyRosetta protocol. The length of the keyword argument value provided must be equal to the number of input user-defined PyRosetta protocols. Seeds are used in the same order that the user-defined PyRosetta protocols are executed.

Default: None

decoy_ids: list[int] | None

A list of int objects specifying the decoy identification numbers to keep after executing the user-defined PyRosetta protocols. User-defined PyRosetta protocols may return an iterable of Pose and/or PackedPose objects, or yield Pose and/or PackedPose objects. To reproduce a particular decoy produced via the chain of user-provided PyRosetta protocols, the decoy number to keep for each protocol may be specified, where other decoys are discarded. Decoy numbers use zero-based indexing, so 0 is the first decoy generated from a particular user-defined PyRosetta protocol. The length of the keyword argument value provided must be equal to the number of input user-defined PyRosetta protocols, so that one decoy is kept for each user-defined PyRosetta protocol. Decoy identification numbers are applied in the same order that the user-defined PyRosetta protocols are executed.

Default: None

client: distributed.Client | None

An initialized Dask distributed.Client object to be used as the Dask client interface to the local or remote Dask cluster. If None, then PyRosettaCluster initializes its own Dask client based on the scheduler keyword argument value. Deprecated by the clients keyword argument, but supported for legacy purposes. Either or both of the client or clients keyword argument values must be None.

Default: None

clients: list[distributed.Client] | tuple[distributed.Client, …] | None

A list or tuple object of initialized Dask distributed.Client objects to be used as the Dask client interface(s) to the local or remote Dask cluster(s). If None, then PyRosettaCluster initializes its own Dask client based on the scheduler keyword argument value. Optionally used in combination with the PyRosettaCluster.distribute(clients_indices=…) keyword argument. Either or both of the client or clients keyword argument values must be None. See the PyRosettaCluster.distribute method docstring for usage examples.

Default: None

scheduler: str | None

A str object of either “sge” or “slurm”, or None. If “sge”, then PyRosettaCluster schedules jobs using a SGECluster instance from the dask-jobqueue package. If “slurm”, then PyRosettaCluster schedules jobs using a SLURMCluster instance from the dask-jobqueue package. If None, then PyRosettaCluster schedules jobs using a distributed.LocalCluster instance. If client or clients keyword argument values are not None, then this keyword argument is ignored.

Default: None

cores: int | None

An int object specifying the total number of cores per job, which is passed to the dask_jobqueue.SLURMCluster(cores=…) or the dask_jobqueue.SGECluster(cores=…) keyword argument depending on the scheduler keyword argument value.

Default: 1

processes: int | None

An int object specifying the total number of processes per job, which is passed to the dask_jobqueue.SLURMCluster(processes=…) or the dask_jobqueue.SGECluster(processes=…) keyword argument depending on the scheduler keyword argument value. This feature determines how many Python processes each Dask worker job will run.

Default: 1

memory: str | None

A str object specifying the total amount of memory per job, which is input to the dask_jobqueue.SLURMCluster(memory=…) or the dask_jobqueue.SGECluster(memory=…) keyword argument depending on the scheduler keyword argument value.

Default: “4g”

scratch_dir: str | None

A str object specifying the absolute filesystem path to a scratch directory where temporary files may be written.

Default: “/temp” if it exists, otherwise the current working directory.

min_workers: int | None

An int object specifying the minimum number of workers to which to adapt during parallelization of user-defined PyRosetta protocols.

Default: 1

max_workers: int | None

An int object specifying the maximum number of workers to which to adapt during parallelization of user-defined PyRosetta protocols.

Default: 1000 if the number of user-defined task dictionaries passed to the tasks keyword argument value is <1000, otherwise the number of user-defined task dictionaries.

dashboard_address: str | None

A str object specifying the port over which the Dask dashboard is forwarded. Particularly useful for diagnosing PyRosettaCluster performance in real-time.

Default: “:8787”

project_name: str | None

A str object specifying the project name for this simulation. This keyword argument value is just added to the full simulation record for accounting purposes.

Default: datetime.now().strftime(“%Y.%m.%d.%H.%M.%S.%f”) if not specified, else “PyRosettaCluster” if None.

simulation_name: str | None

A str object specifying the particular name of this simulation. This keyword argument value is just added to the full simulation record for accounting purposes.

Default: project_name if not specified, else “PyRosettaCluster” if None

output_path: str | None

A str object specifying the absolute path of the output directory where the results will be written to disk. The directory will be created be created if it does not exist.

Default: “./outputs”

output_decoy_types: Iterable[str] | None

An iterable of str objects representing the output decoy filetypes to save during the simulation. Available options are: “.pdb” for PDB files; “.pkl_pose” for pickled Pose files; “.b64_pose” for Base64-encoded pickled Pose files; and “.init” for PyRosetta initialization files, each caching: the Rosetta command-line options (and PyRosetta initialization input files, if any) initialized on the head node, the input_packed_pose keyword argument value (if any), and an output decoy. Because each PyRosetta initialization file contains a copy of the PyRosetta initialization input files and input PackedPose object (if any), unless these objects are relatively small in size or there are relatively few expected output decoys, then it is recommended to run the pyrosetta.distributed.cluster.export_init_file function on only output decoys of interest after the simulation completes without specifying “.init” in this iterable. If the compressed keyword argument value is set to True, then each output decoy file is further compressed by the bzip2 library, and “.bz2” is automatically appended to the filename.

Default: [“.pdb”,]

output_scorefile_types: Iterable[str] | None

An iterable of str objects representing the output scorefile filetypes to save during the simulation. Available options are: “.json” for a JSON Lines (JSONL)-formatted scorefile, and any filename extensions accepted by the pandas.DataFrame.to_pickle(compression=”infer”) method (including “.gz”, “.bz2”, and “.xz”) for pickled pandas.DataFrame objects of scorefile data that may be analyzed using pyrosetta.distributed.cluster.io.secure_read_pickle(compression=”infer”). Note that in order to write pickled pandas.DataFrame objects to disk, please ensure that pyrosetta.secure_unpickle.add_secure_package(“pandas”) has first been run. If using pandas version >=3.0.0, PyArrow-backed datatypes may be enabled by default; in this case, please ensure that pyrosetta.secure_unpickle.add_secure_package(“pyarrow”) has also first been run.

See https://pandas.pydata.org/pdeps/0010-required-pyarrow-dependency.html and https://pandas.pydata.org/pdeps/0014-string-dtype.html for more information.

Default: [“.json”,]

scorefile_name: str | None

A str object specifying the name of the output JSON Lines (JSONL)-formatted scorefile, which must end in “.json”. The scorefile location is always `output_path` "/" `scorefile_name`. If “.json” is not in the output_scorefile_types keyword argument value, the JSONL-formatted scorefile will not be output, but other scorefile types (if any) will still get the same filename stem (i.e., before the “.json” extension).

Default: “scores.json”

simulation_records_in_scorefile: bool | None

A bool object specifying whether or not to write full simulation records to the output scorefile(s). If True, then write full simulation records to the output scorefile(s). This results in some redundant information in each entry, allowing downstream reproduction of a decoy of interest from the scorefile, but a larger scorefile storage footprint. If False, then write curtailed simulation records to the scorefile(s) containing only the Pose.cache dictionary data. This results in minimally redundant information in each entry, disallowing downstream reproduction of a decoy of interest from the output scorefile(s), but a smaller scorefile storage footprint. If False, also write the active Conda/Mamba/uv/Pixi environment configuration to a separate output file in the output_path keyword argument value for accounting purposes. Full simulation records are always written to the output decoy files, which may still be used to reproduce any decoy without the scorefile.

Default: False

decoy_dir_name: str | None

A str object specifying the directory name where the output decoy files will be saved. The directory location is always determined by: `output_path` "/" `decoy_dir_name`.

Default: “decoys”

logs_dir_name: str | None

A str object specifying the directory name where the output log files will be saved. The directory location is always determined by: `output_path` "/" `logs_dir_name`.

Default: “logs”

logging_level: str | None

A str object specifying the logging level of Python logging output to write to the log file. The available options are either: “NOTSET”, “DEBUG”, “INFO”, “WARNING”, “ERROR”, or “CRITICAL”. The output log file location is always determined by: `output_path` "/" `logs_dir_name` "/" `simulation_name`.

Default: “INFO”

logging_address: str

A str object specifying the socket endpoint for sending and receiving log messages across a network, so log messages from user-defined PyRosetta protocols may be written to a single log file on the head node. The str object must take the format of a socket address (i.e., “<host address>:<port number>”) where the <host address> is either an IP address, “localhost”, or Domain Name System (DNS)-accessible domain name, and the <port number> is a digit greater than or equal to “0”. If the port number is “0”, then the next free port number is selected.

Default: “localhost:0” if the scheduler keyword argument value is None or either the client or clients keyword argument values specify instances of distributed.LocalCluster, else “0.0.0.0:0”.

compressed: bool | None

A bool object specifying whether or not to compress the output decoy files and output PyRosetta initialization files using the bzip2 library, resulting in the appending of “.bz2” to output decoy files and PyRosetta initialization files. Also see the output_decoy_types and output_init_file keyword arguments.

Default: True

compression: str | bool | None

A str object of either “xz”, “zlib” or “bz2”, or a bool or None object representing the internal compression library for pickled Pose objects and user-defined task dictionaries. The default of True uses “xz” for compression if it is installed, otherwise resorts to “zlib” for compression.

Default: True

sha1: str | None

A str or None object specifying the Git commit SHA-1 hash string of the local Git repository defining the simulation. If a non-empty str object is provided, then it is validated to match the Git commit SHA-1 hash string of the most recent commit in the local Git repository checked out in the current working directory, and then it is added to the simulation record for accounting. If an empty string is provided, then ensure that everything in the current working directory is committed to the local Git repository. If None is provided, then bypass SHA-1 hash string validation and set this value to an empty string.

Default: “”

ignore_errors: bool | None

A bool object specifying whether or not to ignore raised Python exceptions and thrown Rosetta segmentation faults in the user-defined PyRosetta protocols. This comes in handy when well-defined errors are sparse and sporadic (such as rare Rosetta segmentation faults), and the user would like PyRosettaCluster to continue running without otherwise raising a WorkerError exception.

Default: False

timeout: float | int | None

A float or int object specifying how many seconds to wait between PyRosettaCluster checking-in on the running user-defined PyRosetta protocols. If each PyRosetta protocol is expected to run quickly, then 0.1 seconds seems reasonable. If each PyRosetta protocol is expected to run slowly, then >1 second seems reasonable.

Default: 0.5

max_delay_time: float | int | None

A float or int object specifying the maximum number of seconds to sleep before returning the result(s) from each user-defined PyRosetta protocol back to the Dask client on the head node. If a Dask worker returns the result(s) from a PyRosetta protocol too quickly, the Dask scheduler needs to first register that the task is processing before it completes. In practice, in each PyRosetta protocol the runtime is subtracted from the max_delay_time keyword argument value, and the Dask worker sleeps for the remainder of the time (if any) before returning the result(s). It is recommended to set this option to at least 1 second, but longer times may be used as a safety throttle in cases of overwhelmed Dask scheduler processes. Because spawning a billiard subprocess for PyRosetta protocol execution may take ~3–5 seconds already before the PyRosetta protocol executes, this feature usually does not have an effect with the default value.

Default: 3.0

filter_results: bool | None

A bool object specifying whether or not to filter out empty PackedPose objects between user-defined PyRosetta protocols. When a PyRosetta protocol returns or yields None, PyRosettaCluster converts it to an empty PackedPose object that gets bound to the first positional-or-keyword parameter of the next PyRosetta protocol. If True, then filter out any empty PackedPose objects where there are no residues in the conformation as given by PackedPose.empty(). If False, then continue to pass the empty PackedPose objects to the next PyRosetta protocol. This is used for filtering out decoys mid-trajectory in-between PyRosetta protocols if PyRosetta protocols return or yield any None, empty Pose, or empty PackedPose objects.

Default: True

save_all: bool | None

A bool object specifying whether or not to save all of the returned non-empty PackedPose objects from all user-defined PyRosetta protocols. This option may be used to checkpoint decoy trajectories after each PyRosetta protocol.

Default: False

dry_run: bool | None

A bool object specifying whether or not to save output decoy files to disk. If True, then do not write output decoy files to disk. This feature may be useful for debugging.

Default: False

norm_task_options: bool | None

A bool object specifying whether or not to normalize the task ‘options’ and ‘extra_options’ values after PyRosetta initialization on the remote compute cluster. If True, then this enables more facile simulation reproduction by the use of the ProtocolSettingsMetric SimpleMetric to normalize the PyRosetta initialization options and by relativization of any input files and directory paths to the current working directory from which the task is running.

Default: True

max_task_replicas: int | None

An int or None object specifying the replication factor of tasks on Dask workers within the network (only via Dask’s best effort). If an int object, the value must be greater than or equal to 0. If None, then attempt to replicate all tasks on each Dask worker. Tasks are automatically deleted from each Dask worker upon task completion. Task replication improves resilience of the simulation when compute resources executing tasks are preempted midway through a user-defined PyRosetta protocol (e.g., due to using cloud spot instances or cluster backfill queues), so scattered data can be recovered. If a Dask worker is preempted during task execution, then the number of task retries is controlled by the Dask configuration parameter distributed.scheduler.allowed-failures, which may be manually configured prior to the simulation. Dask worker memory limits may also need to be increased to achieve the desired replication factor (see memory keyword argument). Using task replicas requires that either Dask’s ReduceReplicas policy is disabled or that Dask’s entire Active Memory Manager (AMM) is disabled, since replicated tasks consume additional memory per Dask worker. Task size in memory is dominated by the input PackedPose object; a rough estimate of additional memory usage is ~1 MB/task for a 500 residue protein. Task retries are only appropriate when PyRosetta protocols are side effect-free upon preemption, wherein tasks can be restarted without producing inconsistent external states if preempted midway through a PyRosetta protocol.

See https://distributed.dask.org/en/stable/api.html#distributed.Client.replicate and https://docs.dask.org/en/stable/configuration.html for more information.

Default: 0

task_registry: str | None

A None object or str object of either “disk” or “memory”. If “disk” is provided, then write the task registry to disk. If “memory” is provided, then keep the task registry in memory on the head node process. Maintaining a task registry improves the resilience of the simulation when compute resources executing tasks are preempted midway through a user-defined PyRosetta protocol (e.g., due to using cloud spot instances or cluster backfill queues); if scattered data cannot be recovered (see max_task_replicas keyword argument), then the task will be automatically resubmitted using the task input arguments cached in the task registry. If “memory” is provided, then task input arguments consume memory on the head node process, which is appropriate with fewer tasks (e.g., debugging pipelines). If “disk” is provided, then task input arguments consume disk space (in the scratch_dir keyword argument value), which is appropriate for production simulations. Task size is dominated by the input PackedPose object; a rough estimate of additional disk or memory usage is ~1 MB/task for a 500 residue protein. Completed tasks are automatically deleted from the task registry upon task completion. If None is provided, then the task registry is not created, which is appropriate for non-preemptible compute resources. Task resubmissions are only appropriate when user-provided PyRosetta protocols are side effect-free upon preemption, wherein tasks can be restarted without producing inconsistent external states if preempted midway through a PyRosetta protocol.

Default: None

cooldown_time: float | int | None

A float or int object specifying how many seconds to sleep after the simulation is complete to allow loggers to flush. For very slow network filesystems, 2 or more seconds may be reasonable.

Default: 0.5

system_info: dict[Any, Any] | None

A dict or None object specifying the system information and/or extra simulation informatio required to reproduce the simulation. If None is provided, then PyRosettaCluster automatically detects the platform and sets this value as the dictionary {"sys.platform": `sys.platform`} (e.g., {“sys.platform”: “linux”}). If a dict object is provided, then validate that the “sys.platform” key has a value equal to the current sys.platform result, and log a warning message if not. System information such as Amazon Machine Image (AMI) identifier and compute fleet instance type identifier may be stored in this dictionary, but it is not automatically validated upon reproduction simulations. This extra simulation information is stored in the full simulation records for accounting.

Default: None

pyrosetta_build: str | None

A str or None object specifying the PyRosetta build signature as output by pyrosetta._build_signature(). If None is provided, then PyRosettaCluster automatically detects the PyRosetta build signature and sets this keyword argument value. If a non-empty str object is provided, then validate that the input PyRosetta build signature is equal to the active PyRosetta build signature, and raise an exception if not. This validation process ensures that reproduction simulations use an identical PyRosetta build signature from the original simulation. To bypass PyRosetta build signature validation with a warning message, an empty string (‘’) may be provided but does not assure reproducibility.

Default: None

security: bool | distributed.Security | None

A bool object or instance of distributed.Security, only having an effect if both client=None and clients=None, that is passed to Dask if using scheduler=None or passed to Dask-Jobqueue if using scheduler=”slurm” or scheduler=”sge”. If True is provided, then invoke the cryptography package to generate a distributed.Security.temporary object through Dask or Dask-Jobqueue. If a Dask distributed.Security object is provided, then pass it to Dask with scheduler=None, or pass it to Dask-Jobqueue with scheduler=”slurm” or scheduler=”sge” (where the shared_temp_directory keyword argument value of SLURMCluster or SGECluster is set to the output_path keyword argument value of PyRosettaCluster). If False is provided, then Dask TLS security is disabled regardless of the scheduler keyword argument value (which is not recommended for remote Dask clusters unless behind a trusted private network segment (i.e., a firewall). If None is provided, then True is used by default. In order to generate a distributed.Security object with the OpenSSL command-line interface, the pyrosetta.distributed.cluster.generate_dask_tls_security function may also be used (see docstring for more information) instead of the cryptography package.

See https://distributed.dask.org/en/latest/tls.html#distributed.security.Security.temporary for more information.

Default: False if scheduler=None, else True

max_nonce: int

An int object greater than or equal to 1 specifying the maximum number of nonces to cache per process if Dask TLS security is disabled while using remote Dask clusters, which protects against replay attacks. If nonce caching is in use, each process (including the head node process and all Dask worker processes) cache nonces upon communication exchange over the network, which can increase memory usage in each process. A rough estimate of additional memory usage is ~0.2 KB per task per user-defined PyRosetta protocol per process. For example, submitting 1000 tasks with 2 PyRosetta protocols adds (~0.2 KB/task/protocol × 1000 tasks × 2 protocols) = ~0.4 MB of additional memory per processs. If memory usage per process permits, it is recommended to set this value to at least the number of tasks times the number of protocols submitted, so that every nonce from every communication exchange over the network gets cached.

Default: 4096

environment: str | None

A None or str object specifying either the active Conda/Mamba environment YML file string, active uv project uv.lock file string, or active Pixi project pixi.lock file string. If None is provided, then generate an environment file string for the active Conda/Mamba/uv/Pixi environment and save it to the full simulation record. If a non-empty str object is provided, then validate it to match the active Conda/Mamba/uv/Pixi environment YML/lock file string and save it to the full simulation record. This ensures that reproduction simulations use an identical Conda/Mamba/uv/Pixi environment configuration to the original simulation. To bypass Conda/Mamba/uv/Pixi environment validation with a warning message, an empty string (‘’) may be provided, but does not assure reproducibility.

Default: None

author: str | None

A str object specifying the author(s) of the simulation that is written to the full simulation records and the output PyRosetta initialization file(s).

Default: “”

email: str | None

A str object specifying the email address(es) of the author(s) of the simulation that is written to the full simulation records and the output PyRosetta initialization file(s).

Default: “”

license: str | None

A str object specifying the license of the output data of the simulation that is written to the full simulation records and the output PyRosetta initialization file(s) (e.g., “ODC-ODbL”, “CC BY-ND”, “CDLA Permissive-2.0”, etc.).

Default: “”

output_init_file: str | None

A str object specifying the absolute path to the output PyRosetta initialization file that caches the input_packed_pose keyword argument value upon PyRosettaCluster instantiation. The file does not include any output decoys, and is optionally used for exporting PyRosetta initialization files with output decoys by the pyrosetta.distributed.cluster.export_init_file function after the simulation completes (see the output_decoy_types keyword argument). If None (or an empty str object (“”)) is provided, or dry_run keyword argument value is set to True, then skip writing an output “.init” file upon PyRosettaCluster instantiation. If skipped, it is recommended to run the pyrosetta.dump_init_file function before or after the simulation. If the compressed keyword argument value is set to True, then the output file is further compressed by the bzip2 library, and “.bz2” is automatically appended to the filename.

Default: `output_path` "/" `project_name` "_" `simulation_name` "_pyrosetta.init"

Returns:

A PyRosettaCluster instance.

tasks: List[Dict[str, Any]]
nstruct: int
tasks_size: int
input_packed_pose: Optional[PackedPose]
seeds: Optional[List[str]]
decoy_ids: Optional[List[int]]
client: Optional[Client]
clients: Optional[Union[List[Client], Tuple[Client, ...]]]
scheduler: Optional[str]
cores: int
processes: int
memory: str
scratch_dir: str
adapt_threshold: int
min_workers: int
max_workers: int
dashboard_address: str
project_name: str
simulation_name: str
output_path: str
output_decoy_types: List[str]
output_scorefile_types: List[str]
scorefile_name: str
scorefile_path: str
simulation_records_in_scorefile: bool
decoy_dir_name: str
decoy_path: str
logs_dir_name: str
logs_path: str
logging_level: str
logging_file: str
logging_address: str
compressed: bool
compression: Optional[Union[str, bool]]
sha1: str
ignore_errors: bool
timeout: Union[float, int]
max_delay_time: Union[float, int]
filter_results: bool
save_all: bool
dry_run: bool
norm_task_options: bool
max_task_replicas: Optional[int]
task_registry: Optional[str]
yield_results: bool
cooldown_time: Union[float, int]
protocols_key: str
system_info: Dict[Any, Any]
pyrosetta_build: str
security: Union[bool, Security]
instance_id: str
max_nonce: int
environment: str
author: str
email: str
license: str
output_init_file: str
environment_manager: str
environment_file: str
task_registry_dir: Optional[str]
pyrosetta_init_args: List[str]
_get_submit_kwargs(resources: Optional[Dict[str, Union[float, int]]] = None, priority: Optional[int] = None, retries: Optional[int] = None) Dict[str, Any]

Setup Client.submit keyword arguments.

_create_future(client: Client, clients_index: int, protocol_name: str, compressed_protocol: bytes, compressed_packed_pose: bytes, compressed_kwargs: bytes, pyrosetta_init_kwargs: Dict[str, Any], extra_args: ExtraArgs, passkey: bytes, resource: Optional[Dict[str, Union[float, int]]], priority: Optional[int], retry: Optional[int]) Future

Scatter data and return submitted user_spawn_thread future.

_recreate_future(client: Client, clients_index: int, user_args: UserArgs, submit_kwargs: Dict[str, Any]) Future

Re-scatter data and return submitted ‘user_spawn_thread’ future.

_run(*args: Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], protocols: Optional[Union[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], List[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]]], Tuple[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], ...]]] = None, clients_indices: Optional[Union[List[int], Tuple[int, ...]]] = None, resources: Optional[Union[List[Optional[Dict[str, Union[float, int]]]], Tuple[Optional[Dict[str, Union[float, int]]], ...]]] = None, priorities: Optional[Union[List[int], Tuple[int, ...]]] = None, retries: Optional[Union[int, List[int], Tuple[int, ...]]] = None) Generator[Tuple[Optional[PackedPose], Dict[str, Any]], None, None]

Execute user-defined PyRosetta protocols on a local or remote compute cluster using the user-customized PyRosettaCluster instance. Either positional arguments or the protocols keyword argument specifying one or more user-defined PyRosetta protocols is required. If both are passed, then the protocols keyword argument value gets concatenated after the positional arguments.

Warning: This method uses the cloudpickle and pickle modules to serialize and deserialize Pose objects, arbitrary Python types in Pose.cache dictionaries, pandas.DataFrame objects (if configured), user-defined task dictionaries, user-defined PyRosetta protocols, and other user-provided data. Using the cloudpickle and pickle modules is not secure, so please only run this method with input data you fully understand and trust. Learn more about the cloudpickle and pickle modules and their security here and here.

Examples:

Basic usage:

>>> PyRosettaCluster().distribute(protocol_1)
>>> PyRosettaCluster().distribute(protocols=protocol_1)
>>> PyRosettaCluster().distribute(protocol_1, protocol_2, protocol_3)
>>> PyRosettaCluster().distribute(protocols=(protocol_1, protocol_2, protocol_3))
>>> PyRosettaCluster().distribute(protocol_1, protocol_2, protocols=[protocol_3, protocol_4])

Run with two Dask clients:

>>> # Run `protocol_1` on `client_1`,
>>> # then `protocol_2` on `client_2`,
>>> # then `protocol_3` on `client_1`,
>>> # then `protocol_4` on `client_2`:
>>> PyRosettaCluster(clients=[client_1, client_2]).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3, protocol_4],
...     clients_indices=[0, 1, 0, 1],
... )

Run with multiple Dask clients:

>>> # Run `protocol_1` on `client_2`,
>>> # then `protocol_2` on `client_3`,
>>> # then `protocol_3` on `client_1`:
>>> PyRosettaCluster(clients=[client_1, client_2, client_3]).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3],
...     clients_indices=[1, 2, 0],
... )

Run with one Dask client and compute resource constraints:

>>> # Run `protocol_1` on `client_1` with Dask worker resource constraints "GPU=2",
>>> # then `protocol_2` on `client_1` with Dask worker resource constraints "MEMORY=100e9",
>>> # then `protocol_3` on `client_1` without Dask worker resource constraints:
>>> PyRosettaCluster(client=client_1).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3],
...     resources=[{"GPU": 2}, {"MEMORY": 100e9}, None],
... )

Run with two Dask clients and compute resource constraints:

>>> # Run `protocol_1` on `client_1` with Dask worker resource constraints "GPU=2",
>>> # then `protocol_2` on `client_2` with Dask worker resource constraints "MEMORY=100e9":
>>> PyRosettaCluster(clients=[client_1, client_2]).distribute(
...     protocols=[protocol_1, protocol_2],
...     clients_indices=[0, 1],
...     resources=[{"GPU": 2}, {"MEMORY": 100e9}],
... )

Run with task priorities:

>>> # Run protocols with depth-first task execution:
>>> PyRosettaCluster().distribute(
...     protocols=[protocol_1, protocol_2, protocol_3, protocol_4],
...     priorities=[0, 10, 20, 30],
... )

Run with task retries:

>>> # Run protocols with up to three retries per failed task during `protocol_3` and `protocol_4`:
>>> PyRosettaCluster(ignore_errors=False).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3, protocol_4],
...     retries=[0, 0, 3, 3],
... )
Args:
*args: PyRosettaProtocol

Optional callables of type types.GeneratorType or types.FunctionType representing user-defined PyRosetta protocols in the order to be executed.

protocols: PyRosettaProtocol | list[PyRosettaProtocol] | tuple[PyRosettaProtocol, …] | None

An ordered iterable of extra callable user-defined PyRosetta protocols; i.e., an ordered iterable of objects of types.GeneratorType and/or types.FunctionType types, or a single callable of type types.GeneratorType or types.FunctionType.

Default: None

clients_indices: list[int] | tuple[int, …] | None

A list or tuple object of int objects, where each int object represents a zero-based index corresponding to the initialized Dask distributed.Client object(s) passed to the PyRosettaCluster(clients=…) keyword argument value. If not None, then the length of the clients_indices object must equal the number of protocols passed to the PyRosettaCluster.distribute method.

Default: None

resources: list[dict[str, float | int] | None] | tuple[dict[str, float | int] | None, …] | None

A list or tuple object of dict objects, where each dict object represents an abstract, arbitrary resource to constrain which Dask workers execute the user-defined PyRosetta protocols. If None, then do not impose resource constaints on any PyRosetta protocols. If not None, then the length of the resources object must equal the number of PyRosetta protocols passed to the PyRosettaCluster.distribute method, such that each resource specified indicates the unique resource constraints for the protocol at the corresponding index of the PyRosetta protocols passed to the PyRosettaCluster.distribute method. Note that this feature is only useful when one passes in their own instantiated Dask client(s) with Dask workers set up with various resource constraints. If Dask workers were not instantiated to satisfy the specified resource constraints, PyRosetta protocols will hang indefinitely by design because the Dask scheduler is waiting for Dask workers that meet the specified resource constraints so that it may schedule these tasks. Unless Dask workers were created with these resource tags applied, the PyRosetta protocols will not run.

See https://distributed.dask.org/en/stable/resources.html for more information.

Default: None

priorities: list[int] | tuple[int, …] | None

A list or tuple object of int objects, where each int object sets the Dask scheduler priority for the corresponding user-defined PyRosetta protocol (i.e., indexed the same as the client_indices keyword argument value). If None, then no explicit priorities are set. If not None, then the length of this value must equal the number of PyRosetta protocols passed to the PyRosettaCluster.distribute method, and each int value determines the Dask scheduler priority for the tasks applied to that PyRosetta protocol.

Breadth-first task execution (default):

When all user-defined PyRosetta protocols have an identical priority (e.g., [0] * len(protocols) or None), then all tasks enter the Dask scheduler’s queue with equal priority. Under equal priority, Dask mainly schedules tasks in a first-in, first-out behavior. When Dask worker resources are saturated, this causes all tasks submitted to upstream PyRosetta protocols to run to completion before tasks are scheduled to execute downstream PyRosetta protocols, producing a breadth-first task execution behavior across PyRosetta protocols.

Depth-first task execution:

To allow tasks to run through all user-defined PyRosetta protocols before all tasks applied to upstream PyRosetta protocols complete, assign increasing priorities to downstream protocols (e.g., list(range(0, len(protocols) * 10, 10))). Once a task completes an upstream PyRosetta protocol, it is applied to the next downstream PyRosetta protocol with a higher priority than tasks still queued for upstream PyRosetta protocols, so tasks may run through all user-defined PyRosetta protocols to completion as Dask worker resources become available. This produces a depth-first task execution behavior across PyRosetta protocols when Dask worker resources are saturated.

See https://distributed.dask.org/en/stable/priority.html for more information.

Default: None

retries: list[int] | tuple[int, …] | int | None

A list or tuple of int objects, where each int object (≥0) sets the number of allowed automatic retries of each failed task that was applied to the corresponding user-defined PyRosetta protocol (i.e., indexed the same as client_indices keyword argument value). If an int object (≥0) is provided, then apply that number of allowed automatic retries to all PyRosetta protocols. If None is provided, then no explicit retries are allowed. If not None and not an int object, then the length of this value must equal the number of PyRosetta protocols passed to the PyRosettaCluster.distribute method, and each int value determines the number of automatic retries the Dask scheduler allows for that the tasks applied to that PyRosetta protocol. Allowing retries of failed tasks may be useful if the PyRosetta protocol raises a standard Python exception or Rosetta throws a segmentation fault in the billiard subprocess while the Dask worker remains alive and PyRosettaCluster(ignore_errors=False) is configured. If PyRosettaCluster(ignore_errors=True) is configured, then protocols failing due to standard Python exceptions or Rosetta segmentation faults will still be considered successes, and this keyword argument has no effect since these PyRosetta protocol errors are ignored. Note that if a compute resource executing a PyRosetta protocol is preempted, then the Dask worker process does not remain alive and the Dask scheduler registers that failed task as incomplete or cancelled. In this case, the number of allowed task retries is controlled by the Dask configuration parameter distributed.scheduler.allowed-failures; please use the max_task_replices and task_registry keyword arguments of PyRosettaCluster for further configuration of task retries after compute resource preemption.

See https://distributed.dask.org/en/latest/scheduling-state.html#task-state for more information.

Default: None

generate(*args: Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], protocols: Optional[Union[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], List[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]]], Tuple[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], ...]]] = None, clients_indices: Optional[Union[List[int], Tuple[int, ...]]] = None, resources: Optional[Union[List[Optional[Dict[str, Union[float, int]]]], Tuple[Optional[Dict[str, Union[float, int]]], ...]]] = None, priorities: Optional[Union[List[int], Tuple[int, ...]]] = None, retries: Optional[Union[int, List[int], Tuple[int, ...]]] = None) Generator[Tuple[Optional[PackedPose], Dict[str, Any]], None, None]

Execute user-defined PyRosetta protocols on a local or remote compute cluster using the user-customized PyRosettaCluster instance. Either positional arguments or the protocols keyword argument specifying one or more user-defined PyRosetta protocols is required. If both are passed, then the protocols keyword argument value gets concatenated after the positional arguments.

Warning: This method uses the cloudpickle and pickle modules to serialize and deserialize Pose objects, arbitrary Python types in Pose.cache dictionaries, pandas.DataFrame objects (if configured), user-defined task dictionaries, user-defined PyRosetta protocols, and other user-provided data. Using the cloudpickle and pickle modules is not secure, so please only run this method with input data you fully understand and trust. Learn more about the cloudpickle and pickle modules and their security here and here.

Examples:

Basic usage:

>>> PyRosettaCluster().distribute(protocol_1)
>>> PyRosettaCluster().distribute(protocols=protocol_1)
>>> PyRosettaCluster().distribute(protocol_1, protocol_2, protocol_3)
>>> PyRosettaCluster().distribute(protocols=(protocol_1, protocol_2, protocol_3))
>>> PyRosettaCluster().distribute(protocol_1, protocol_2, protocols=[protocol_3, protocol_4])

Run with two Dask clients:

>>> # Run `protocol_1` on `client_1`,
>>> # then `protocol_2` on `client_2`,
>>> # then `protocol_3` on `client_1`,
>>> # then `protocol_4` on `client_2`:
>>> PyRosettaCluster(clients=[client_1, client_2]).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3, protocol_4],
...     clients_indices=[0, 1, 0, 1],
... )

Run with multiple Dask clients:

>>> # Run `protocol_1` on `client_2`,
>>> # then `protocol_2` on `client_3`,
>>> # then `protocol_3` on `client_1`:
>>> PyRosettaCluster(clients=[client_1, client_2, client_3]).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3],
...     clients_indices=[1, 2, 0],
... )

Run with one Dask client and compute resource constraints:

>>> # Run `protocol_1` on `client_1` with Dask worker resource constraints "GPU=2",
>>> # then `protocol_2` on `client_1` with Dask worker resource constraints "MEMORY=100e9",
>>> # then `protocol_3` on `client_1` without Dask worker resource constraints:
>>> PyRosettaCluster(client=client_1).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3],
...     resources=[{"GPU": 2}, {"MEMORY": 100e9}, None],
... )

Run with two Dask clients and compute resource constraints:

>>> # Run `protocol_1` on `client_1` with Dask worker resource constraints "GPU=2",
>>> # then `protocol_2` on `client_2` with Dask worker resource constraints "MEMORY=100e9":
>>> PyRosettaCluster(clients=[client_1, client_2]).distribute(
...     protocols=[protocol_1, protocol_2],
...     clients_indices=[0, 1],
...     resources=[{"GPU": 2}, {"MEMORY": 100e9}],
... )

Run with task priorities:

>>> # Run protocols with depth-first task execution:
>>> PyRosettaCluster().distribute(
...     protocols=[protocol_1, protocol_2, protocol_3, protocol_4],
...     priorities=[0, 10, 20, 30],
... )

Run with task retries:

>>> # Run protocols with up to three retries per failed task during `protocol_3` and `protocol_4`:
>>> PyRosettaCluster(ignore_errors=False).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3, protocol_4],
...     retries=[0, 0, 3, 3],
... )
Args:
*args: PyRosettaProtocol

Optional callables of type types.GeneratorType or types.FunctionType representing user-defined PyRosetta protocols in the order to be executed.

protocols: PyRosettaProtocol | list[PyRosettaProtocol] | tuple[PyRosettaProtocol, …] | None

An ordered iterable of extra callable user-defined PyRosetta protocols; i.e., an ordered iterable of objects of types.GeneratorType and/or types.FunctionType types, or a single callable of type types.GeneratorType or types.FunctionType.

Default: None

clients_indices: list[int] | tuple[int, …] | None

A list or tuple object of int objects, where each int object represents a zero-based index corresponding to the initialized Dask distributed.Client object(s) passed to the PyRosettaCluster(clients=…) keyword argument value. If not None, then the length of the clients_indices object must equal the number of protocols passed to the PyRosettaCluster.distribute method.

Default: None

resources: list[dict[str, float | int] | None] | tuple[dict[str, float | int] | None, …] | None

A list or tuple object of dict objects, where each dict object represents an abstract, arbitrary resource to constrain which Dask workers execute the user-defined PyRosetta protocols. If None, then do not impose resource constaints on any PyRosetta protocols. If not None, then the length of the resources object must equal the number of PyRosetta protocols passed to the PyRosettaCluster.distribute method, such that each resource specified indicates the unique resource constraints for the protocol at the corresponding index of the PyRosetta protocols passed to the PyRosettaCluster.distribute method. Note that this feature is only useful when one passes in their own instantiated Dask client(s) with Dask workers set up with various resource constraints. If Dask workers were not instantiated to satisfy the specified resource constraints, PyRosetta protocols will hang indefinitely by design because the Dask scheduler is waiting for Dask workers that meet the specified resource constraints so that it may schedule these tasks. Unless Dask workers were created with these resource tags applied, the PyRosetta protocols will not run.

See https://distributed.dask.org/en/stable/resources.html for more information.

Default: None

priorities: list[int] | tuple[int, …] | None

A list or tuple object of int objects, where each int object sets the Dask scheduler priority for the corresponding user-defined PyRosetta protocol (i.e., indexed the same as the client_indices keyword argument value). If None, then no explicit priorities are set. If not None, then the length of this value must equal the number of PyRosetta protocols passed to the PyRosettaCluster.distribute method, and each int value determines the Dask scheduler priority for the tasks applied to that PyRosetta protocol.

Breadth-first task execution (default):

When all user-defined PyRosetta protocols have an identical priority (e.g., [0] * len(protocols) or None), then all tasks enter the Dask scheduler’s queue with equal priority. Under equal priority, Dask mainly schedules tasks in a first-in, first-out behavior. When Dask worker resources are saturated, this causes all tasks submitted to upstream PyRosetta protocols to run to completion before tasks are scheduled to execute downstream PyRosetta protocols, producing a breadth-first task execution behavior across PyRosetta protocols.

Depth-first task execution:

To allow tasks to run through all user-defined PyRosetta protocols before all tasks applied to upstream PyRosetta protocols complete, assign increasing priorities to downstream protocols (e.g., list(range(0, len(protocols) * 10, 10))). Once a task completes an upstream PyRosetta protocol, it is applied to the next downstream PyRosetta protocol with a higher priority than tasks still queued for upstream PyRosetta protocols, so tasks may run through all user-defined PyRosetta protocols to completion as Dask worker resources become available. This produces a depth-first task execution behavior across PyRosetta protocols when Dask worker resources are saturated.

See https://distributed.dask.org/en/stable/priority.html for more information.

Default: None

retries: list[int] | tuple[int, …] | int | None

A list or tuple of int objects, where each int object (≥0) sets the number of allowed automatic retries of each failed task that was applied to the corresponding user-defined PyRosetta protocol (i.e., indexed the same as client_indices keyword argument value). If an int object (≥0) is provided, then apply that number of allowed automatic retries to all PyRosetta protocols. If None is provided, then no explicit retries are allowed. If not None and not an int object, then the length of this value must equal the number of PyRosetta protocols passed to the PyRosettaCluster.distribute method, and each int value determines the number of automatic retries the Dask scheduler allows for that the tasks applied to that PyRosetta protocol. Allowing retries of failed tasks may be useful if the PyRosetta protocol raises a standard Python exception or Rosetta throws a segmentation fault in the billiard subprocess while the Dask worker remains alive and PyRosettaCluster(ignore_errors=False) is configured. If PyRosettaCluster(ignore_errors=True) is configured, then protocols failing due to standard Python exceptions or Rosetta segmentation faults will still be considered successes, and this keyword argument has no effect since these PyRosetta protocol errors are ignored. Note that if a compute resource executing a PyRosetta protocol is preempted, then the Dask worker process does not remain alive and the Dask scheduler registers that failed task as incomplete or cancelled. In this case, the number of allowed task retries is controlled by the Dask configuration parameter distributed.scheduler.allowed-failures; please use the max_task_replices and task_registry keyword arguments of PyRosettaCluster for further configuration of task retries after compute resource preemption.

See https://distributed.dask.org/en/latest/scheduling-state.html#task-state for more information.

Default: None

Extra information:

The PyRosettaCluster.generate method may be used for developing PyRosetta protocols on a local or remote compute cluster and optionally post-processing or visualizing output PackedPose objects in memory. Importantly, subsequent code run on the yielded results is not captured by PyRosettaCluster, and so use of this method does not ensure reproducibility of the simulation. Use the PyRosettaCluster.distribute method for reproducible simulations.

Each yielded result is a tuple object with a PackedPose object as the first element and a dict object as the second element. The PackedPose object represents a returned or yielded PackedPose (or Pose or NoneType) object from the most recently executed user-defined PyRosetta protocol. The dict object represents the optionally returned or yielded dictionary of keyword arguments from the same most recently executed PyRosetta protocol (see the protocols keyword argument). If PyRosettaCluster(save_all=True) is used, tuples are yielded after each PyRosetta protocol, otherwise tuples are yielded after the final PyRosetta protocol. Tuples are yielded in the order in which they arrive back to the Dask client(s) from the distributed cluster (which may differ from the order that tasks are submitted, due to tasks running asynchronously). If PyRosettaCluster(dry_run=True) is used, then tuples are still yielded but output decoy files are not written to disk.

See https://docs.dask.org/en/latest/futures.html#distributed.as_completed for more information.

Extra examples:

Iterate over results in real-time as they are yielded from the cluster:

>>> for packed_pose, kwargs in PyRosettaCluster().generate(protocols):
...     ...

Iterate over submissions to the same Dask client:

>>> client = Client()
>>> for packed_pose, kwargs in PyRosettaCluster(client=client).generate(protocols):
...     # Post-process results on head node asynchronously from results generation
...     prc = PyRosettaCluster(
...         input_packed_pose=packed_pose,
...         client=client,
...         logs_dir_name=f"logs_{uuid.uuid4().hex}", # Make sure to write new log files
...     )
...     for packed_pose, kwargs in prc.generate(other_protocols):
...         ...

Iterate over two PyRosettaCluster instances, each managing one Dask client, creating additional overhead:

>>> client_1 = Client()
>>> client_2 = Client()
>>> for packed_pose, kwargs in PyRosettaCluster(client=client_1).generate(protocols):
...     # Post-process results on head node asynchronously from results generation
...     prc = PyRosettaCluster(
...         input_packed_pose=packed_pose,
...         client=client_2,
...         logs_dir_name=f"logs_{uuid.uuid4().hex}", # Make sure to write new log files
...     )
...     for packed_pose, kwargs in prc.generate(other_protocols):
...         ...

Iterate over one PyRosettaCluster instance managing two Dask clients, reducing overhead:

>>> # Using multiple `distributed.as_completed` iterators on the head node creates additional
>>> # overhead. If post-processing on the head node is not required between user-defined PyRosetta
>>> # protocols, the preferred method is to distribute PyRosetta protocols within a single
>>> # `PyRosettaCluster.generate` method call using the `clients_indices` keyword argument:
>>> prc_generate = PyRosettaCluster(clients=[client_1, client_2]).generate(
...     protocols=[protocol_1, protocol_2],
...     clients_indices=[0, 1],
...     resources=[{"GPU": 1}, {"CPU": 1}],
... )
... for packed_pose, kwargs in prc_generate:
...     # Post-process results on head node asynchronously from results generation
...     ...
Yields:

(`PackedPose`, `dict`) tuples from the most recently executed user-defined PyRosetta protocol if PyRosettaCluster(save_all=True) is used, otherwise from the final PyRosetta protocol.

distribute(*args: Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], protocols: Optional[Union[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], List[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]]], Tuple[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], ...]]] = None, clients_indices: Optional[Union[List[int], Tuple[int, ...]]] = None, resources: Optional[Union[List[Optional[Dict[str, Union[float, int]]]], Tuple[Optional[Dict[str, Union[float, int]]], ...]]] = None, priorities: Optional[Union[List[int], Tuple[int, ...]]] = None, retries: Optional[Union[int, List[int], Tuple[int, ...]]] = None) None

Execute user-defined PyRosetta protocols on a local or remote compute cluster using the user-customized PyRosettaCluster instance. Either positional arguments or the protocols keyword argument specifying one or more user-defined PyRosetta protocols is required. If both are passed, then the protocols keyword argument value gets concatenated after the positional arguments.

Warning: This method uses the cloudpickle and pickle modules to serialize and deserialize Pose objects, arbitrary Python types in Pose.cache dictionaries, pandas.DataFrame objects (if configured), user-defined task dictionaries, user-defined PyRosetta protocols, and other user-provided data. Using the cloudpickle and pickle modules is not secure, so please only run this method with input data you fully understand and trust. Learn more about the cloudpickle and pickle modules and their security here and here.

Examples:

Basic usage:

>>> PyRosettaCluster().distribute(protocol_1)
>>> PyRosettaCluster().distribute(protocols=protocol_1)
>>> PyRosettaCluster().distribute(protocol_1, protocol_2, protocol_3)
>>> PyRosettaCluster().distribute(protocols=(protocol_1, protocol_2, protocol_3))
>>> PyRosettaCluster().distribute(protocol_1, protocol_2, protocols=[protocol_3, protocol_4])

Run with two Dask clients:

>>> # Run `protocol_1` on `client_1`,
>>> # then `protocol_2` on `client_2`,
>>> # then `protocol_3` on `client_1`,
>>> # then `protocol_4` on `client_2`:
>>> PyRosettaCluster(clients=[client_1, client_2]).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3, protocol_4],
...     clients_indices=[0, 1, 0, 1],
... )

Run with multiple Dask clients:

>>> # Run `protocol_1` on `client_2`,
>>> # then `protocol_2` on `client_3`,
>>> # then `protocol_3` on `client_1`:
>>> PyRosettaCluster(clients=[client_1, client_2, client_3]).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3],
...     clients_indices=[1, 2, 0],
... )

Run with one Dask client and compute resource constraints:

>>> # Run `protocol_1` on `client_1` with Dask worker resource constraints "GPU=2",
>>> # then `protocol_2` on `client_1` with Dask worker resource constraints "MEMORY=100e9",
>>> # then `protocol_3` on `client_1` without Dask worker resource constraints:
>>> PyRosettaCluster(client=client_1).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3],
...     resources=[{"GPU": 2}, {"MEMORY": 100e9}, None],
... )

Run with two Dask clients and compute resource constraints:

>>> # Run `protocol_1` on `client_1` with Dask worker resource constraints "GPU=2",
>>> # then `protocol_2` on `client_2` with Dask worker resource constraints "MEMORY=100e9":
>>> PyRosettaCluster(clients=[client_1, client_2]).distribute(
...     protocols=[protocol_1, protocol_2],
...     clients_indices=[0, 1],
...     resources=[{"GPU": 2}, {"MEMORY": 100e9}],
... )

Run with task priorities:

>>> # Run protocols with depth-first task execution:
>>> PyRosettaCluster().distribute(
...     protocols=[protocol_1, protocol_2, protocol_3, protocol_4],
...     priorities=[0, 10, 20, 30],
... )

Run with task retries:

>>> # Run protocols with up to three retries per failed task during `protocol_3` and `protocol_4`:
>>> PyRosettaCluster(ignore_errors=False).distribute(
...     protocols=[protocol_1, protocol_2, protocol_3, protocol_4],
...     retries=[0, 0, 3, 3],
... )
Args:
*args: PyRosettaProtocol

Optional callables of type types.GeneratorType or types.FunctionType representing user-defined PyRosetta protocols in the order to be executed.

protocols: PyRosettaProtocol | list[PyRosettaProtocol] | tuple[PyRosettaProtocol, …] | None

An ordered iterable of extra callable user-defined PyRosetta protocols; i.e., an ordered iterable of objects of types.GeneratorType and/or types.FunctionType types, or a single callable of type types.GeneratorType or types.FunctionType.

Default: None

clients_indices: list[int] | tuple[int, …] | None

A list or tuple object of int objects, where each int object represents a zero-based index corresponding to the initialized Dask distributed.Client object(s) passed to the PyRosettaCluster(clients=…) keyword argument value. If not None, then the length of the clients_indices object must equal the number of protocols passed to the PyRosettaCluster.distribute method.

Default: None

resources: list[dict[str, float | int] | None] | tuple[dict[str, float | int] | None, …] | None

A list or tuple object of dict objects, where each dict object represents an abstract, arbitrary resource to constrain which Dask workers execute the user-defined PyRosetta protocols. If None, then do not impose resource constaints on any PyRosetta protocols. If not None, then the length of the resources object must equal the number of PyRosetta protocols passed to the PyRosettaCluster.distribute method, such that each resource specified indicates the unique resource constraints for the protocol at the corresponding index of the PyRosetta protocols passed to the PyRosettaCluster.distribute method. Note that this feature is only useful when one passes in their own instantiated Dask client(s) with Dask workers set up with various resource constraints. If Dask workers were not instantiated to satisfy the specified resource constraints, PyRosetta protocols will hang indefinitely by design because the Dask scheduler is waiting for Dask workers that meet the specified resource constraints so that it may schedule these tasks. Unless Dask workers were created with these resource tags applied, the PyRosetta protocols will not run.

See https://distributed.dask.org/en/stable/resources.html for more information.

Default: None

priorities: list[int] | tuple[int, …] | None

A list or tuple object of int objects, where each int object sets the Dask scheduler priority for the corresponding user-defined PyRosetta protocol (i.e., indexed the same as the client_indices keyword argument value). If None, then no explicit priorities are set. If not None, then the length of this value must equal the number of PyRosetta protocols passed to the PyRosettaCluster.distribute method, and each int value determines the Dask scheduler priority for the tasks applied to that PyRosetta protocol.

Breadth-first task execution (default):

When all user-defined PyRosetta protocols have an identical priority (e.g., [0] * len(protocols) or None), then all tasks enter the Dask scheduler’s queue with equal priority. Under equal priority, Dask mainly schedules tasks in a first-in, first-out behavior. When Dask worker resources are saturated, this causes all tasks submitted to upstream PyRosetta protocols to run to completion before tasks are scheduled to execute downstream PyRosetta protocols, producing a breadth-first task execution behavior across PyRosetta protocols.

Depth-first task execution:

To allow tasks to run through all user-defined PyRosetta protocols before all tasks applied to upstream PyRosetta protocols complete, assign increasing priorities to downstream protocols (e.g., list(range(0, len(protocols) * 10, 10))). Once a task completes an upstream PyRosetta protocol, it is applied to the next downstream PyRosetta protocol with a higher priority than tasks still queued for upstream PyRosetta protocols, so tasks may run through all user-defined PyRosetta protocols to completion as Dask worker resources become available. This produces a depth-first task execution behavior across PyRosetta protocols when Dask worker resources are saturated.

See https://distributed.dask.org/en/stable/priority.html for more information.

Default: None

retries: list[int] | tuple[int, …] | int | None

A list or tuple of int objects, where each int object (≥0) sets the number of allowed automatic retries of each failed task that was applied to the corresponding user-defined PyRosetta protocol (i.e., indexed the same as client_indices keyword argument value). If an int object (≥0) is provided, then apply that number of allowed automatic retries to all PyRosetta protocols. If None is provided, then no explicit retries are allowed. If not None and not an int object, then the length of this value must equal the number of PyRosetta protocols passed to the PyRosettaCluster.distribute method, and each int value determines the number of automatic retries the Dask scheduler allows for that the tasks applied to that PyRosetta protocol. Allowing retries of failed tasks may be useful if the PyRosetta protocol raises a standard Python exception or Rosetta throws a segmentation fault in the billiard subprocess while the Dask worker remains alive and PyRosettaCluster(ignore_errors=False) is configured. If PyRosettaCluster(ignore_errors=True) is configured, then protocols failing due to standard Python exceptions or Rosetta segmentation faults will still be considered successes, and this keyword argument has no effect since these PyRosetta protocol errors are ignored. Note that if a compute resource executing a PyRosetta protocol is preempted, then the Dask worker process does not remain alive and the Dask scheduler registers that failed task as incomplete or cancelled. In this case, the number of allowed task retries is controlled by the Dask configuration parameter distributed.scheduler.allowed-failures; please use the max_task_replices and task_registry keyword arguments of PyRosettaCluster for further configuration of task retries after compute resource preemption.

See https://distributed.dask.org/en/latest/scheduling-state.html#task-state for more information.

Default: None

Returns:

None

DATETIME_FORMAT: str = '%Y-%m-%d %H:%M:%S.%f'
REMARK_FORMAT: str = 'REMARK PyRosettaCluster: '
__init__(*, tasks: Any = [{}], nstruct=1, input_packed_pose: Any = None, seeds: Optional[Any] = None, decoy_ids: Optional[Any] = None, client: Optional[Client] = None, clients: Optional[Union[List[Client], Tuple[Client, ...]]] = None, scheduler: Optional[str] = None, cores=1, processes=1, memory='4g', scratch_dir: Any = None, min_workers=1, max_workers=_Nothing.NOTHING, dashboard_address=':8787', project_name='2026.04.12.04.33.27.118564', simulation_name=_Nothing.NOTHING, output_path='/home/benchmark/rosetta/source/build/PyRosetta/Linux-5.4.0-84-generic-x86_64-with-glibc2.27/clang-6.0.0/python-3.11/minsizerel.serialization.thread/documentation/outputs', output_decoy_types: Any = None, output_scorefile_types: Any = None, scorefile_name='scores.json', simulation_records_in_scorefile=False, decoy_dir_name='decoys', logs_dir_name='logs', logging_level='INFO', logging_address: str = _Nothing.NOTHING, compressed=True, compression: Optional[Union[str, bool]] = True, sha1: Any = '', ignore_errors=False, timeout=0.5, max_delay_time=3.0, filter_results: Any = None, save_all=False, dry_run=False, norm_task_options: Any = None, max_task_replicas: Optional[int] = 0, task_registry: Optional[str] = None, cooldown_time=0.5, system_info: Any = None, pyrosetta_build: Any = None, security=_Nothing.NOTHING, max_nonce: int = 4096, environment: Any = None, author=None, email=None, license=None, output_init_file=_Nothing.NOTHING) None

Method generated by attrs for class PyRosettaCluster.

static _add_pose_comment(packed_pose: PackedPose, pdbfile_data: str) PackedPose

Cache simulation data as a Pose comment.

Warning: This method uses the pickle module to deserialize pickled Pose objects. Using the pickle module is not secure, so please only run with input files you trust. Learn more about the pickle module and its security here.

_cache_toml() None

Cache the Pixi/uv TOML file string and TOML file format.

_clients_dict_has_security() bool

Test if the clients_dict attribute has security enabled on all Dask clients, excluding Dask clients with LocalCluster clusters.

_close_logger() None

Close the logger for the head node process.

_close_socket_listener(clients: Dict[int, Client]) None

Close logging socket listener.

_close_socket_logger_plugins(clients: Dict[int, Client]) None

Purge cached logging socket addresses on all dask workers.

_cooldown() None

Sleep based on the cooldown_time instance attribute.

_dump_init_file(filename: str, input_packed_pose: Optional[PackedPose] = None, output_packed_pose: Optional[PackedPose] = None, verbose: bool = True) None

Dump compressed PyRosetta initialization input files and Pose or PackedPose objects to the input filename.

Warning: This method uses the pickle module to deserialize pickled Pose objects. Using the pickle module is not secure, so please only run with input files you trust. Learn more about the pickle module and its security here.

static _dump_json(data: Dict[str, Any]) str

Return JSON-serialized data.

static _filter_scores_dict(scores_dict: Dict[str, Any]) Dict[str, Any]

Filter for JSON-serializable scoring data.

_format_result(result: Union[Pose, PackedPose]) Tuple[PackedPose, str, Dict[str, Any], Dict[str, Any]]

Given a Pose or PackedPose object, return a tuple object containing the Pose or PackedPose object, and its PDB string, Pose.cache dictionary, and JSON-serializable Pose.cache dictionary.

Warning: This method uses the pickle module to deserialize pickled Pose objects and arbitrary Python types in Pose.cache dictionary. Using the pickle module is not secure, so please only run with input files you trust. Learn more about the pickle module and its security here.

_get_clients_index(clients_indices: Optional[Union[List[int], Tuple[int, ...]]], protocols: Sized) int

Return the clients index for the current PyRosetta protocol.

_get_cluster() Union[LocalCluster, SGECluster, SLURMCluster]

Given user input argument values, return the requested Dask cluster instance.

_get_init_file_json(packed_pose: PackedPose) str

Return a PyRosetta initialization file as a JSON-serialized string.

Warning: This method uses the pickle module to deserialize pickled Pose objects. Using the pickle module is not secure, so please only run with input files you trust. Learn more about the pickle module and its security here.

_get_instance_and_metadata(kwargs: Dict[str, Any]) Tuple[Dict[str, Any], Dict[str, Any]]

Get the current state of the PyRosettaCluster instance, and split the input keyword arguments into the PyRosettaCluster instance attributes and ancillary metadata.

_get_output_dir(decoy_dir: str) str

Get the output directory in which to write files to disk.

_get_priority(priorities: Optional[Union[List[int], Tuple[int, ...]]], protocols: Sized) Optional[int]

Return the priority for the current PyRosetta protocol.

_get_resource(resources: Optional[Union[List[Optional[Dict[str, Union[float, int]]]], Tuple[Optional[Dict[str, Union[float, int]]], ...]]], protocols: Sized) Optional[Dict[str, Union[float, int]]]

Return the resource for the current PyRosetta protocol.

_get_retry(retries: Optional[Union[int, List[int], Tuple[int, ...]]], protocols: Sized) Optional[int]

Return the number of task retries for the current PyRosetta protocol.

_get_seed(protocols: Sized) Optional[str]

Get the PyRosetta RNG seed for the input user-defined PyRosetta protocol.

_get_task_state(protocols: List[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]]]) Tuple[List[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]]], Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], Optional[str]]

Given the current state of the PyRosetta protocols, return a tuple object of the updated state of the PyRosetta protocols, the current PyRosetta protocol, and curent PyRosetta RNG seed.

_maybe_adapt(adaptive: Optional[Adaptive]) None

Adjust the maximum number of Dask workers.

_maybe_teardown(clients: Dict[int, Client], cluster: Optional[Union[LocalCluster, SGECluster, SLURMCluster]]) None

Teardown the Dask client and cluster.

_parse_priorities(priorities: Any) Any

Parse the priorities keyword argument value of the PyRosettaCluster.distribute method.

_parse_resources(resources: Any) Any

Parse the resources keyword argument value of the PyRosettaCluster.distribute method.

_parse_results(results: Optional[Union[bytes, Pose, PackedPose, Iterable[Union[bytes, Pose, PackedPose]]]]) List[Tuple[str, Dict[str, Any]]]

Format output results from a Dask worker.

Warning: This method uses the pickle module to deserialize pickled Pose objects and arbitrary Python types in Pose.cache dictionary. Using the pickle module is not secure, so please only run with input files you trust. Learn more about the pickle module and its security here.

Args:
results: Pose | PackedPose | bytes | Iterable[Pose | PackedPose | bytes] | None

An Pose, PackedPose, bytes or None object, or an iterable of Pose, PackedPose, or bytes objects.

Returns:

A list object of tuple objects, where each tuple object contains a PDB string, Pose.cache dictionary, and JSON-serializable Pose.cache dictionary.

_parse_retries(retries: Any) Any

Parse the retries keyword argument value of the PyRosettaCluster.distribute method.

_process_kwargs(kwargs: Dict[str, Any]) Dict[str, Any]

Parse a returned task dictionary.

_register_socket_logger_plugin(clients: Dict[int, Client]) None

Register SocketLoggerPlugin as a dask worker plugin on dask clients.

_register_task_security_plugin(clients: Dict[int, Client], prk: MaskedBytes) None

Register TaskSecurityPlugin as a Dask worker plugin on Dask clients.

_save_results(results: Optional[bytes], kwargs: Dict[str, Any]) None

Write output results to disk.

Warning: This method uses the pickle module to deserialize pickled Pose objects and arbitrary Python types in Pose.cache dictionary. Using the pickle module is not secure, so please only run with input files you trust. Learn more about the pickle module and its security here.

_setup_clients_cluster_adaptive() Tuple[Dict[int, Client], Optional[Union[LocalCluster, SGECluster, SLURMCluster]], Optional[Adaptive]]

Given user input arguments, return the requested Dask client, cluster, and adaptive instance.

_setup_clients_dict() Dict[int, Client]

Setup Dask clients dictionary for PyRosettaCluster.

_setup_initial_kwargs(protocols: List[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]]], seed: Optional[str], task: Dict[str, Any]) Tuple[bytes, Dict[str, Any]]

Setup the keyword arguments for the initial user-defined task dictionary.

_setup_kwargs(kwargs: Dict[str, Any], clients_indices: Optional[Union[List[int], Tuple[int, ...]]], resources: Optional[Union[List[Optional[Dict[str, Union[float, int]]]], Tuple[Optional[Dict[str, Union[float, int]]], ...]]], priorities: Optional[Union[List[int], Tuple[int, ...]]], retries: Optional[Union[int, List[int], Tuple[int, ...]]]) Tuple[bytes, Dict[str, Any], Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], int, Optional[Dict[str, Union[float, int]]], Optional[int], Optional[int]]

Setup the keyword arguments for the subsequent user-defined task dictionary.

_setup_logger() None

Open the logger for the head node process.

_setup_protocols_protocol_seed(args: Tuple[Any, ...], protocols: Any, clients_indices: Any, resources: Any, priorities: Any, retries: Any) Tuple[List[Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]]], Callable[[...], Union[Pose, PackedPose, Dict[str, Any], None, List[Optional[Union[Pose, PackedPose, Dict[str, Any]]]], Tuple[Optional[Union[Pose, PackedPose, Dict[str, Any]]], ...], Generator[Optional[Union[Pose, PackedPose, Dict[str, Any]]], None, None]]], Optional[str], int, Optional[Dict[str, Union[float, int]]], Optional[int], Optional[int]]

Parse, validate, and setup the user-defined PyRosetta protocol(s).

_setup_pyrosetta_init_kwargs(kwargs: Dict[str, Any]) Dict[str, Any]

Setup the pyrosetta.init function keyword arguments.

_setup_seed(kwargs: Dict[str, Any], seed: Optional[str]) Dict[str, Any]

Update the value of the “options” or “extra_options” key of the user-defined task dictionary with the -run:jran Rosetta command-line option.

_setup_socket_listener(clients: Dict[int, Client]) Tuple[Tuple[str, int], bytes]

Setup logging socket listener.

_setup_task_security_plugin(clients: Dict[int, Client]) None

Setup task security worker plugin(s).

_setup_with_nonce() bool

Post-init hook to setup the PyRosettaCluster.with_nonce instance attribute.

_write_environment_file(filename: str) None

Write the Conda/Mamba YML or uv/Pixi lock file string to the input filename. If Pixi/uv is used as the environment manager, also write the TOML file string to a separate filename.

_write_init_file() None

Maybe dump a PyRosetta initialization file.

Warning: This method uses the pickle module to deserialize pickled Pose objects. Using the pickle module is not secure, so please only run with input files you trust. Learn more about the pickle module and its security here.