cluster¶
- class pyrosetta.distributed.cluster.PyRosettaCluster(*, tasks: Any = [{}], nstruct=1, input_packed_pose: Any = None, seeds: Optional[Any] = None, decoy_ids: Optional[Any] = None, client: Optional[Client] = None, clients: Optional[List[Client]] = None, scheduler: str = None, cores=1, processes=1, memory='4g', scratch_dir: Any = None, min_workers=1, max_workers=_Nothing.NOTHING, dashboard_address=':8787', project_name='2025.10.14.21.07.15.688633', simulation_name=_Nothing.NOTHING, output_path='/home/benchmark/rosetta/source/build/PyRosetta/Linux-5.4.0-84-generic-x86_64-with-glibc2.27/clang-6.0.0/python-3.11/minsizerel.serialization.thread/documentation/outputs', output_decoy_types: Any = None, output_scorefile_types: Any = None, scorefile_name='scores.json', simulation_records_in_scorefile=False, decoy_dir_name='decoys', logs_dir_name='logs', logging_level='INFO', logging_address: str = _Nothing.NOTHING, compressed=True, compression: Optional[Union[str, bool]] = True, sha1: Any = '', ignore_errors=False, timeout=0.5, max_delay_time=3.0, filter_results: Any = None, save_all=False, dry_run=False, norm_task_options: Any = None, cooldown_time=0.5, system_info: Any = None, pyrosetta_build: Any = None, security=_Nothing.NOTHING, max_nonce: int = 4096, environment: Any = None, author=None, email=None, license=None, output_init_file=_Nothing.NOTHING)¶
Bases:
IO
[G
],LoggingSupport
[G
],SchedulerManager
[G
],SecurityIO
[G
],TaskBase
[G
]PyRosettaCluster is a class for reproducible, high-throughput job distribution of user-defined PyRosetta protocols efficiently parallelized on the user’s local computer, high-performance computing (HPC) cluster, or elastic cloud computing infrastructure with available compute resources.
- Args:
- tasks: A list of dict objects, a callable or called function returning
a list of dict objects, or a callable or called generator yielding a list of dict objects. Each dictionary object element of the list is accessible via kwargs in the user-defined PyRosetta protocols. In order to initialize PyRosetta with user-defined PyRosetta command line options at the start of each user-defined PyRosetta protocol, either extra_options and/or options must be a key of each dictionary object, where the value is a str, tuple, list, set, or dict of PyRosetta command line options. Default: [{}]
- input_packed_pose: Optional input PackedPose object that is accessible via
the first argument of the first user-defined PyRosetta protocol. Default: None
- seeds: A list of int objects specifying the random number generator seeds
to use for each user-defined PyRosetta protocol. The number of seeds provided must be equal to the number of user-defined input PyRosetta protocols. Seeds are used in the same order that the user-defined PyRosetta protocols are executed. Default: None
- decoy_ids: A list of int objects specifying the decoy numbers to keep after
executing user-defined PyRosetta protocols. User-provided PyRosetta protocols may return a list of Pose and/or PackedPose objects, or yield multiple Pose and/or PackedPose objects. To reproduce a particular decoy generated via the chain of user-provided PyRosetta protocols, the decoy number to keep for each protocol may be specified, where other decoys are discarded. Decoy numbers use zero-based indexing, so 0 is the first decoy generated from a particular PyRosetta protocol. The number of decoy_ids provided must be equal to the number of user-defined input PyRosetta protocols, so that one decoy is saved for each user-defined PyRosetta protocol. Decoy ids are applied in the same order that the user-defined PyRosetta protocols are executed. Default: None
- client: An initialized dask distributed.client.Client object to be used as
the dask client interface to the local or remote compute cluster. If None, then PyRosettaCluster initializes its own dask client based on the PyRosettaCluster(scheduler=…) class attribute. Deprecated by the PyRosettaCluster(clients=…) class attribute, but supported for legacy purposes. Either or both of the client or clients attribute parameters must be None. Default: None
- clients: A list or tuple object of initialized dask distributed.client.Client
objects to be used as the dask client interface(s) to the local or remote compute cluster(s). If None, then PyRosettaCluster initializes its own dask client based on the PyRosettaCluster(scheduler=…) class attribute. Optionally used in combination with the PyRosettaCluster().distribute(clients_indices=…) method. Either or both of the client or clients attribute parameters must be None. See the PyRosettaCluster().distribute() method docstring for usage examples. Default: None
- scheduler: A str of either “sge” or “slurm”, or None. If “sge”, then
PyRosettaCluster schedules jobs using SGECluster with dask-jobqueue. If “slurm”, then PyRosettaCluster schedules jobs using SLURMCluster with dask-jobqueue. If None, then PyRosettaCluster schedules jobs using LocalCluster with dask.distributed. If PyRosettaCluster(client=…) or PyRosettaCluster(clients=…) is provided, then PyRosettaCluster(scheduler=…) is ignored. Default: None
- cores: An int object specifying the total number of cores per job, which
is input to the dask_jobqueue.SLURMCluster(cores=…) argument or the dask_jobqueue.SGECluster(cores=…) argument. Default: 1
- processes: An int object specifying the total number of processes per job,
which is input to the dask_jobqueue.SLURMCluster(processes=…) argument or the dask_jobqueue.SGECluster(processes=…) argument. This cuts the job up into this many processes. Default: 1
- memory: A str object specifying the total amount of memory per job, which
is input to the dask_jobqueue.SLURMCluster(memory=…) argument or the dask_jobqueue.SGECluster(memory=…) argument. Default: “4g”
- scratch_dir: A str object specifying the path to a scratch directory where
dask litter may go. Default: “/temp” if it exists, otherwise the current working directory
- min_workers: An int object specifying the minimum number of workers to
which to adapt during parallelization of user-provided PyRosetta protocols. Default: 1
- max_workers: An int object specifying the maximum number of workers to
which to adapt during parallelization of user-provided PyRosetta protocols. Default: 1000 if the initial number of tasks is <1000, else use the
the initial number of tasks
- dashboard_address: A str object specifying the port over which the dask
dashboard is forwarded. Particularly useful for diagnosing PyRosettaCluster performance in real-time. Default=”:8787”
- nstruct: An int object specifying the number of repeats of the first
user-provided PyRosetta protocol. The user can control the number of repeats of subsequent user-provided PyRosetta protocols via returning multiple clones of the output pose(s) from a user-provided PyRosetta protocol run earlier, or cloning the input pose(s) multiple times in a user-provided PyRosetta protocol run later. Default: 1
- compressed: A bool object specifying whether or not to compress the output
“.pdb”, “.pkl_pose”, “.b64_pose”, and “.init” files with bzip2, resulting in appending “.bz2” to decoy output files and PyRosetta initialization files. Also see the ‘output_decoy_types’ and ‘output_init_file’ keyword arguments. Default: True
- compression: A str object of ‘xz’, ‘zlib’ or ‘bz2’, or a bool or NoneType
object representing the internal compression library for pickled PackedPose objects and user-defined PyRosetta protocol kwargs objects. The default of True uses ‘xz’ for serialization if it’s installed, otherwise uses ‘zlib’ for serialization. Default: True
- system_info: A dict or NoneType object specifying the system information
required to reproduce the simulation. If None is provided, then PyRosettaCluster automatically detects the platform and returns this attribute as a dictionary {‘sys.platform’: sys.platform} (for example, {‘sys.platform’: ‘linux’}). If a dict is provided, then validate that the ‘sys.platform’ key has a value equal to the current sys.platform, and log a warning message if not. Additional system information such as Amazon Machine Image (AMI) identifier and compute fleet instance type identifier may be stored in this dictionary, but is not validated. This information is stored in the simulation records for accounting. Default: None
- pyrosetta_build: A str or NoneType object specifying the PyRosetta build as
output by pyrosetta._build_signature(). If None is provided, then PyRosettaCluster automatically detects the PyRosetta build and sets this attribute as the str. If a non-empty str is provided, then validate that the input PyRosetta build is equal to the active PyRosetta build, and raise an error if not. This ensures that reproduction simulations use an identical PyRosetta build from the original simulation. To bypass PyRosetta build validation with a warning message, an empty string (‘’) may be provided (but does not ensure reproducibility). Default: None
- sha1: A str or NoneType object specifying the git SHA1 hash string of the
particular git commit being simulated. If a non-empty str object is provided, then it is validated to match the SHA1 hash string of the current HEAD, and then it is added to the simulation record for accounting. If an empty string is provided, then ensure that everything in the working directory is committed to the repository. If None is provided, then bypass SHA1 hash string validation and set this attribute to an empty string. Default: “”
- project_name: A str object specifying the project name of this simulation.
This option just adds the user-provided project_name to the scorefile for accounting. Default: datetime.now().strftime(“%Y.%m.%d.%H.%M.%S.%f”) if not specified,
else “PyRosettaCluster” if None
- simulation_name: A str object specifying the name of this simulation.
This option just adds the user-provided simulation_name to the scorefile for accounting. Default: project_name if not specified, else “PyRosettaCluster” if None
- environment: A NoneType or str object specifying the active conda environment
YML file string. If a NoneType object is provided, then generate a YML file string for the active conda environment and save it to the full simulation record. If a non-empty str object is provided, then validate it against the active conda environment YML file string and save it to the full simulation record. This ensures that reproduction simulations use an identical conda environment from the original simulation. To bypass conda environment validation with a warning message, an empty string (‘’) may be provided (but does not ensure reproducibility). Default: None
- output_path: A str object specifying the full path of the output directory
(to be created if it doesn’t exist) where the output results will be saved to disk. Default: “./outputs”
- output_init_file: A str object specifying the output “.init” file path that caches
the ‘input_packed_pose’ keyword argument parameter upon PyRosettaCluster instantiation, and not including any output decoys, which is optionally used for exporting PyRosetta initialization files with output decoys by the pyrosetta.distributed.cluster.export_init_file() function after the simulation completes (see the ‘output_decoy_types’ keyword argument). If a NoneType object (or an empty str object (‘’)) is provided, or dry_run=True, then skip writing an output “.init” file upon PyRosettaCluster instantiation. If skipped, it is recommended to run pyrosetta.dump_init_file() before or after the simulation. If compressed=True, then the output file is further compressed by bzip2, and “.bz2” is appended to the filename. Default: output_path/`project_name`_`simulation_name`_pyrosetta.init
- output_decoy_types: An iterable of str objects representing the output decoy
filetypes to save during the simulation. Available options are: “.pdb” for PDB files; “.pkl_pose” for pickled Pose files; “.b64_pose” for base64-encoded pickled Pose files; and “.init” for PyRosetta initialization files, each caching the host node PyRosetta initialization options (and input files, if any), the ‘input_packed_pose’ keyword argument parameter (if any) and an output decoy. Because each “.init” file contains a copy of the PyRosetta initialization input files and input PackedPose object, unless these objects are relatively small in size or there are relatively few expected output decoys, then it is recommended to run pyrosetta.distributed.cluster.export_init_file() on only decoys of interest after the simulation completes without specifying “.init”. If compressed=True, then each decoy output file is further compressed by bzip2, and “.bz2” is appended to the filename. Default: [“.pdb”,]
- output_scorefile_types: An iterable of str objects representing the output scorefile
filetypes to save during the simulation. Available options are: “.json” for a JSON-encoded scorefile, and any filename extensions accepted by pandas.DataFrame().to_pickle(compression=”infer”) (including “.gz”, “.bz2”, and “.xz”) for pickled pandas.DataFrame objects of scorefile data that can later be analyzed using pyrosetta.distributed.cluster.io.secure_read_pickle(compression=”infer”). Note that in order to save pickled pandas.DataFrame objects, please ensure that pyrosetta.secure_unpickle.add_secure_package(“pandas”) has been first run. Default: [“.json”,]
- scorefile_name: A str object specifying the name of the output JSON-formatted
scorefile, which must end in “.json”. The scorefile location is always output_path/scorefile_name. If “.json” is not in the ‘output_scorefile_types’ keyword argument parameter, the JSON-formatted scorefile will not be output, but other scorefile types will get the same filename before the “.json” extension. Default: “scores.json”
- simulation_records_in_scorefile: A bool object specifying whether or not to
write full simulation records to the scorefile. If True, then write full simulation records to the scorefile. This results in some redundant information on each line, allowing downstream reproduction of a decoy from the scorefile, but a larger scorefile. If False, then write curtailed simulation records to the scorefile. This results in minimally redundant information on each line, disallowing downstream reproduction of a decoy from the scorefile, but a smaller scorefile. If False, also write the active conda environment to a YML file in ‘output_path’. Full simulation records are always written to the output ‘.pdb’ or ‘.pdb.bz2’ file(s), which can be used to reproduce any decoy without the scorefile. Default: False
- decoy_dir_name: A str object specifying the directory name where the
output decoys will be saved. The directory location is always output_path/decoy_dir_name. Default: “decoys”
- logs_dir_name: A str object specifying the directory name where the
output log files will be saved. The directory location is always output_path/logs_dir_name. Default: “logs”
- logging_level: A str object specifying the logging level of python tracer
output to write to the log file of either “NOTSET”, “DEBUG”, “INFO”, “WARNING”, “ERROR”, or “CRITICAL”. The output log file is always written to output_path/logs_dir_name/simulation_name.log on disk. Default: “INFO”
- logging_address: A str object specifying the socket endpoint for sending and receiving
log messages across a network, so log messages from user-provided PyRosetta protocols may be written to a single log file on the host node. The str object must take the format ‘host:port’ where ‘host’ is either an IP address, ‘localhost’, or Domain Name System (DNS)-accessible domain name, and the ‘port’ is a digit greater than or equal to 0. If the ‘port’ is ‘0’, then the next free port is selected. Default: ‘localhost:0’ if scheduler=None or either the client or clients
keyword argument parameters specify instances of dask.distributed.LocalCluster, otherwise ‘0.0.0.0:0’
- ignore_errors: A bool object specifying for PyRosettaCluster to ignore errors
raised in the user-provided PyRosetta protocols. This comes in handy when well-defined errors are sparse and sporadic (such as rare Segmentation Faults), and the user would like PyRosettaCluster to run without raising the errors. Default: False
- timeout: A float or int object specifying how many seconds to wait between
PyRosettaCluster checking-in on the running user-provided PyRosetta protocols. If each user-provided PyRosetta protocol is expected to run quickly, then 0.1 seconds seems reasonable. If each user-provided PyRosetta protocol is expected to run slowly, then >1 second seems reasonable. Default: 0.5
- max_delay_time: A float or int object specifying the maximum number of seconds to
sleep before returning the result(s) from each user-provided PyRosetta protocol back to the client. If a dask worker returns the result(s) from a user-provided PyRosetta protocol too quickly, the dask scheduler needs to first register that the task is processing before it completes. In practice, in each user-provided PyRosetta protocol the runtime is subtracted from max_delay_time, and the dask worker sleeps for the remainder of the time, if any, before returning the result(s). It’s recommended to set this option to at least 1 second, but longer times may be used as a safety throttle in cases of overwhelmed dask scheduler processes. Default: 3.0
- filter_results: A bool object specifying whether or not to filter out empty
PackedPose objects between user-provided PyRosetta protocols. When a protocol returns or yields NoneType, PyRosettaCluster converts it to an empty PackedPose object that gets passed to the next protocol. If True, then filter out any empty PackedPose objects where there are no residues in the conformation as given by Pose.empty(), otherwise if False then continue to pass empty PackedPose objects to the next protocol. This is used for filtering out decoys mid-trajectory through user-provided PyRosetta protocols if protocols return or yield any None, empty Pose, or empty PackedPose objects. Default: True
- save_all: A bool object specifying whether or not to save all of the returned
or yielded Pose and PackedPose objects from all user-provided PyRosetta protocols. This option may be used for checkpointing trajectories. To save arbitrary poses to disk, from within any user-provided PyRosetta protocol:
- `pose.dump_pdb(
os.path.join(kwargs[“PyRosettaCluster_output_path”], “checkpoint.pdb”))`
Default: False
- dry_run: A bool object specifying whether or not to save ‘.pdb’ files to
disk. If True, then do not write ‘.pdb’ or ‘.pdb.bz2’ files to disk. Default: False
- security: A bool object or instance of dask.distributed.Security(), only having
effect if client=None and clients=None, that is passed to ‘dask’ if using scheduler=None or passed to ‘dask-jobqueue’ if using scheduler=”slurm” or scheduler=”sge”. If True is provided, then invoke the ‘cryptography’ package to generate a Security.temporary() object through ‘dask’ or ‘dask-jobqueue’. See https://distributed.dask.org/en/latest/tls.html#distributed.security.Security.temporary for more information. If a dask Security() object is provided, then pass it to dask with scheduler=None, or pass it to ‘dask-jobqueue’ (where ‘shared_temp_directory’ is set to the output_path keyword argument parameter) with scheduler=”slurm” or scheduler=”sge”. If False is provided, then security is disabled regardless of the scheduler keyword argument parameter (which is not recommended for remote clusters unless using a firewall). If None is provided, then True is used by default. In order to generate a dask.distributed.Security() object with OpenSSL, the pyrosetta.distributed.cluster.generate_dask_tls_security() function may also be used (see docstring for more information) instead of the ‘cryptography’ package. Default: False if scheduler=None, otherwise True
- max_nonce: An int object greater than or equal to 1 specifying the maximum number of
nonces to cache per process if dask security is disabled while using remote clusters, which protects against replay attacks. If nonce caching is in use, each process (including the host node process and all dask worker processes) cache nonces upon communication exchange over the network, which can increase memory usage in each process. A rough estimate of additional memory usage is ~0.2 KB per task per user-provided PyRosetta protocol per process. For example, submitting 1000 tasks with 2 user-provided PyRosetta protocols adds ~0.2 KB/task/protocol * 1000 tasks * 2 protocols = ~0.4 MB of memory per processs. If memory usage per process permits, it is recommended to set this parameter to at least the number of tasks times the number of protocols submitted, so that every nonce from every communication exchange over the network gets cached. Default: 4096
- cooldown_time: A float or int object specifying how many seconds to sleep after the
simulation is complete to allow loggers to flush. For very slow network filesystems, 2.0 or more seconds may be reasonable. Default: 0.5
- norm_task_options: A bool object specifying whether or not to normalize the task
‘options’ and ‘extra_options’ values after PyRosetta initialization on the remote compute cluster. If True, then this enables more facile simulation reproduction by the use of the ProtocolSettingsMetric SimpleMetric to normalize the PyRosetta initialization options and by relativization of any input files and directory paths to the current working directory from which the task is running. Default: True
- author: An optional str object specifying the author(s) of the simulation that is
written to the full simulation records and the PyRosetta initialization ‘.init’ file. Default: “”
- email: An optional str object specifying the email address(es) of the author(s) of
the simulation that is written to the full simulation records and the PyRosetta initialization ‘.init’ file. Default: “”
- license: An optional str object specifying the license of the output data of the
simulation that is written to the full simulation records and the PyRosetta initialization ‘.init’ file (e.g., “ODC-ODbL”, “CC BY-ND”, “CDLA Permissive-2.0”, etc.). Default: “”
- Returns:
A PyRosettaCluster instance.
- tasks¶
- nstruct¶
- tasks_size¶
- input_packed_pose¶
- seeds¶
- decoy_ids¶
- client¶
- clients¶
- scheduler¶
- cores¶
- processes¶
- memory¶
- scratch_dir¶
- adapt_threshold¶
- min_workers¶
- max_workers¶
- dashboard_address¶
- project_name¶
- simulation_name¶
- output_path¶
- output_decoy_types¶
- output_scorefile_types¶
- scorefile_name¶
- scorefile_path¶
- simulation_records_in_scorefile¶
- decoy_dir_name¶
- decoy_path¶
- logs_dir_name¶
- logs_path¶
- logging_level¶
- logging_file¶
- logging_address¶
- compressed¶
- compression¶
- sha1¶
- ignore_errors¶
- timeout¶
- max_delay_time¶
- filter_results¶
- save_all¶
- dry_run¶
- norm_task_options¶
- yield_results¶
- cooldown_time¶
- protocols_key¶
- system_info¶
- pyrosetta_build¶
- security¶
- instance_id¶
- max_nonce¶
- environment¶
- author¶
- email¶
- license¶
- output_init_file¶
- environment_file¶
- pyrosetta_init_args¶
- _create_future(client: Client, protocol_name: str, compressed_protocol: bytes, compressed_packed_pose: bytes, compressed_kwargs: bytes, pyrosetta_init_kwargs: Dict[str, Any], extra_args: Dict[str, Any], passkey: bytes, resource: Optional[Dict[Any, Any]]) Future ¶
Scatter data and return submitted ‘user_spawn_thread’ future.
- _run(*args: Any, protocols: Any = None, clients_indices: Any = None, resources: Any = None) Union[NoReturn, Generator[Tuple[PackedPose, Dict[Any, Any]], None, None]] ¶
Run user-provided PyRosetta protocols on a local or remote compute cluster using the user-customized PyRosettaCluster instance. Either arguments or the ‘protocols’ keyword argument is required. If both are provided, then the ‘protocols’ keyword argument gets concatenated after the input arguments.
- Examples:
PyRosettaCluster().distribute(protocol_1) PyRosettaCluster().distribute(protocols=protocol_1) PyRosettaCluster().distribute(protocol_1, protocol_2, protocol_3) PyRosettaCluster().distribute(protocols=(protocol_1, protocol_2, protocol_3)) PyRosettaCluster().distribute(protocol_1, protocol_2, protocols=[protocol_3, protocol_4])
# Run protocol_1 on client_1, # then protocol_2 on client_2, # then protocol_3 on client_1, # then protocol_4 on client_2: PyRosettaCluster(clients=[client_1, client_2]).distribute(
protocols=[protocol_1, protocol_2, protocol_3, protocol_4], clients_indices=[0, 1, 0, 1],
)
# Run protocol_1 on client_2, # then protocol_2 on client_3, # then protocol_3 on client_1: PyRosettaCluster(clients=[client_1, client_2, client_3]).distribute(
protocols=[protocol_1, protocol_2, protocol_3], clients_indices=[1, 2, 0],
)
# Run protocol_1 on client_1 with dask worker resource constraints “GPU=2”, # then protocol_2 on client_1 with dask worker resource constraints “MEMORY=100e9”, # then protocol_3 on client_1 without dask worker resource constraints: PyRosettaCluster(client=client_1).distribute(
protocols=[protocol_1, protocol_2, protocol_3], resources=[{“GPU”: 2}, {“MEMORY”: 100e9}, None],
)
# Run protocol_1 on client_1 with dask worker resource constraints “GPU=2”, # then protocol_2 on client_2 with dask worker resource constraints “MEMORY=100e9”: PyRosettaCluster(clients=[client_1, client_2]).distribute(
protocols=[protocol_1, protocol_2], clients_indices=[0, 1], resources=[{“GPU”: 2}, {“MEMORY”: 100e9}],
)
- Args:
- *args: Optional instances of type types.GeneratorType or types.FunctionType,
in the order of protocols to be executed.
- protocols: An optional iterable of extra callable PyRosetta protocols,
i.e. an iterable of objects of types.GeneratorType and/or types.FunctionType types; or a single instance of type types.GeneratorType or types.FunctionType. Default: None
- clients_indices: An optional list or tuple object of int objects, where each int object represents
a zero-based index corresponding to the initialized dask distributed.client.Client object(s) passed to the PyRosettaCluster(clients=…) class attribute. If not None, then the length of the clients_indices object must equal the number of protocols passed to the PyRosettaCluster().distribute method. Default: None
- resources: An optional list or tuple object of dict objects, where each dict object represents
an abstract, arbitrary resource to constrain which dask workers run the user-defined PyRosetta protocols. If None, then do not impose resource constaints on any protocols. If not None, then the length of the resources object must equal the number of protocols passed to the PyRosettaCluster().distribute method, such that each resource specified indicates the unique resource constraints for the protocol at the corresponding index of the protocols passed to PyRosettaCluster().distribute. Note that this feature is only useful when one passes in their own instantiated client(s) with dask workers set up with various resource constraints. If dask workers were not instantiated to satisfy the specified resource constraints, protocols will hang indefinitely because the dask scheduler is waiting for workers that meet the specified resource constraints so that it can schedule these protocols. Unless workers were created with these resource tags applied, the protocols will not run. See https://distributed.dask.org/en/stable/resources.html for more information. Default: None
- generate(*args: Any, protocols: Any = None, clients_indices: Any = None, resources: Any = None) Union[NoReturn, Generator[Tuple[PackedPose, Dict[Any, Any]], None, None]] ¶
Run user-provided PyRosetta protocols on a local or remote compute cluster using the user-customized PyRosettaCluster instance. Either arguments or the ‘protocols’ keyword argument is required. If both are provided, then the ‘protocols’ keyword argument gets concatenated after the input arguments.
- Examples:
PyRosettaCluster().distribute(protocol_1) PyRosettaCluster().distribute(protocols=protocol_1) PyRosettaCluster().distribute(protocol_1, protocol_2, protocol_3) PyRosettaCluster().distribute(protocols=(protocol_1, protocol_2, protocol_3)) PyRosettaCluster().distribute(protocol_1, protocol_2, protocols=[protocol_3, protocol_4])
# Run protocol_1 on client_1, # then protocol_2 on client_2, # then protocol_3 on client_1, # then protocol_4 on client_2: PyRosettaCluster(clients=[client_1, client_2]).distribute(
protocols=[protocol_1, protocol_2, protocol_3, protocol_4], clients_indices=[0, 1, 0, 1],
)
# Run protocol_1 on client_2, # then protocol_2 on client_3, # then protocol_3 on client_1: PyRosettaCluster(clients=[client_1, client_2, client_3]).distribute(
protocols=[protocol_1, protocol_2, protocol_3], clients_indices=[1, 2, 0],
)
# Run protocol_1 on client_1 with dask worker resource constraints “GPU=2”, # then protocol_2 on client_1 with dask worker resource constraints “MEMORY=100e9”, # then protocol_3 on client_1 without dask worker resource constraints: PyRosettaCluster(client=client_1).distribute(
protocols=[protocol_1, protocol_2, protocol_3], resources=[{“GPU”: 2}, {“MEMORY”: 100e9}, None],
)
# Run protocol_1 on client_1 with dask worker resource constraints “GPU=2”, # then protocol_2 on client_2 with dask worker resource constraints “MEMORY=100e9”: PyRosettaCluster(clients=[client_1, client_2]).distribute(
protocols=[protocol_1, protocol_2], clients_indices=[0, 1], resources=[{“GPU”: 2}, {“MEMORY”: 100e9}],
)
- Args:
- *args: Optional instances of type types.GeneratorType or types.FunctionType,
in the order of protocols to be executed.
- protocols: An optional iterable of extra callable PyRosetta protocols,
i.e. an iterable of objects of types.GeneratorType and/or types.FunctionType types; or a single instance of type types.GeneratorType or types.FunctionType. Default: None
- clients_indices: An optional list or tuple object of int objects, where each int object represents
a zero-based index corresponding to the initialized dask distributed.client.Client object(s) passed to the PyRosettaCluster(clients=…) class attribute. If not None, then the length of the clients_indices object must equal the number of protocols passed to the PyRosettaCluster().distribute method. Default: None
- resources: An optional list or tuple object of dict objects, where each dict object represents
an abstract, arbitrary resource to constrain which dask workers run the user-defined PyRosetta protocols. If None, then do not impose resource constaints on any protocols. If not None, then the length of the resources object must equal the number of protocols passed to the PyRosettaCluster().distribute method, such that each resource specified indicates the unique resource constraints for the protocol at the corresponding index of the protocols passed to PyRosettaCluster().distribute. Note that this feature is only useful when one passes in their own instantiated client(s) with dask workers set up with various resource constraints. If dask workers were not instantiated to satisfy the specified resource constraints, protocols will hang indefinitely because the dask scheduler is waiting for workers that meet the specified resource constraints so that it can schedule these protocols. Unless workers were created with these resource tags applied, the protocols will not run. See https://distributed.dask.org/en/stable/resources.html for more information. Default: None
Extra information:
The PyRosettaCluster.generate method may be used for developing PyRosetta protocols on a local or remote compute cluster and optionally post-processing or visualizing output PackedPose objects in memory. Importantly, subsequent code run on the yielded results is not captured by PyRosettaCluster, and so use of this method does not ensure reproducibility of the simulation. Use the PyRosettaCluster.distribute method for reproducible simulations.
Each yielded result is a tuple object with a PackedPose object as the first element and a dict object as the second element. The PackedPose object represents a returned or yielded PackedPose (or Pose or NoneType) object from the most recently run user-provided PyRosetta protocol. The dict object represents the optionally returned or yielded user-defined PyRosetta protocol kwargs dictionary object from the same most recently run user-provided PyRosetta protocol (see ‘protocols’ argument). If PyRosettaCluster(save_all=True), results are yielded after each user-provided PyRosetta protocol, otherwise results are yielded after the final user-defined PyRosetta protocol. Results are yielded in the order in which they arrive back to the client(s) from the distributed cluster (which may differ from the order that tasks are submitted, due to tasks running asynchronously). If PyRosettaCluster(dry_run=True), results are still yielded but ‘.pdb’ or ‘.pdb.bz2’ files are not saved to disk. See https://docs.dask.org/en/latest/futures.html#distributed.as_completed for more information.
Extra examples:
# Iterate over results in real-time as they are yielded from the cluster: for packed_pose, kwargs in PyRosettaCluster().generate(protocols):
…
# Iterate over submissions to the same client: client = Client() for packed_pose, kwargs in PyRosettaCluster(client=client).generate(protocols):
# Post-process results on host node asynchronously from results generation prc = PyRosettaCluster(
input_packed_pose=packed_pose, client=client, logs_dir_name=f”logs_{uuid.uuid4().hex}”, # Make sure to write new log files
) for packed_pose, kwargs in prc.generate(other_protocols):
…
# Iterate over multiple clients: client_1 = Client() client_2 = Client() for packed_pose, kwargs in PyRosettaCluster(client=client_1).generate(protocols):
# Post-process results on host node asynchronously from results generation prc = PyRosettaCluster(
input_packed_pose=packed_pose, client=client_2, logs_dir_name=f”logs_{uuid.uuid4().hex}”, # Make sure to write new log files
) for packed_pose, kwargs in prc.generate(other_protocols):
…
# Using multiple dask.distributed.as_completed iterators on the host node creates additional overhead. # If post-processing on the host node is not required between user-provided PyRosetta protocols, # the preferred method is to distribute protocols within a single PyRosettaCluster().generate() # method call using the clients_indices keyword argument: prc_generate = PyRosettaCluster(clients=[client_1, client_2]).generate(
protocols=[protocol_1, protocol_2], clients_indices=[0, 1], resources=[{“GPU”: 1}, {“CPU”: 1}],
) for packed_pose, kwargs in prc_generate:
# Post-process results on host node asynchronously from results generation
- Yields:
(PackedPose, dict) tuples from the most recently run user-provided PyRosetta protocol if PyRosettaCluster(save_all=True) otherwise from the final user-defined PyRosetta protocol.
- distribute(*args: Any, protocols: Any = None, clients_indices: Any = None, resources: Any = None) Optional[NoReturn] ¶
Run user-provided PyRosetta protocols on a local or remote compute cluster using the user-customized PyRosettaCluster instance. Either arguments or the ‘protocols’ keyword argument is required. If both are provided, then the ‘protocols’ keyword argument gets concatenated after the input arguments.
- Examples:
PyRosettaCluster().distribute(protocol_1) PyRosettaCluster().distribute(protocols=protocol_1) PyRosettaCluster().distribute(protocol_1, protocol_2, protocol_3) PyRosettaCluster().distribute(protocols=(protocol_1, protocol_2, protocol_3)) PyRosettaCluster().distribute(protocol_1, protocol_2, protocols=[protocol_3, protocol_4])
# Run protocol_1 on client_1, # then protocol_2 on client_2, # then protocol_3 on client_1, # then protocol_4 on client_2: PyRosettaCluster(clients=[client_1, client_2]).distribute(
protocols=[protocol_1, protocol_2, protocol_3, protocol_4], clients_indices=[0, 1, 0, 1],
)
# Run protocol_1 on client_2, # then protocol_2 on client_3, # then protocol_3 on client_1: PyRosettaCluster(clients=[client_1, client_2, client_3]).distribute(
protocols=[protocol_1, protocol_2, protocol_3], clients_indices=[1, 2, 0],
)
# Run protocol_1 on client_1 with dask worker resource constraints “GPU=2”, # then protocol_2 on client_1 with dask worker resource constraints “MEMORY=100e9”, # then protocol_3 on client_1 without dask worker resource constraints: PyRosettaCluster(client=client_1).distribute(
protocols=[protocol_1, protocol_2, protocol_3], resources=[{“GPU”: 2}, {“MEMORY”: 100e9}, None],
)
# Run protocol_1 on client_1 with dask worker resource constraints “GPU=2”, # then protocol_2 on client_2 with dask worker resource constraints “MEMORY=100e9”: PyRosettaCluster(clients=[client_1, client_2]).distribute(
protocols=[protocol_1, protocol_2], clients_indices=[0, 1], resources=[{“GPU”: 2}, {“MEMORY”: 100e9}],
)
- Args:
- *args: Optional instances of type types.GeneratorType or types.FunctionType,
in the order of protocols to be executed.
- protocols: An optional iterable of extra callable PyRosetta protocols,
i.e. an iterable of objects of types.GeneratorType and/or types.FunctionType types; or a single instance of type types.GeneratorType or types.FunctionType. Default: None
- clients_indices: An optional list or tuple object of int objects, where each int object represents
a zero-based index corresponding to the initialized dask distributed.client.Client object(s) passed to the PyRosettaCluster(clients=…) class attribute. If not None, then the length of the clients_indices object must equal the number of protocols passed to the PyRosettaCluster().distribute method. Default: None
- resources: An optional list or tuple object of dict objects, where each dict object represents
an abstract, arbitrary resource to constrain which dask workers run the user-defined PyRosetta protocols. If None, then do not impose resource constaints on any protocols. If not None, then the length of the resources object must equal the number of protocols passed to the PyRosettaCluster().distribute method, such that each resource specified indicates the unique resource constraints for the protocol at the corresponding index of the protocols passed to PyRosettaCluster().distribute. Note that this feature is only useful when one passes in their own instantiated client(s) with dask workers set up with various resource constraints. If dask workers were not instantiated to satisfy the specified resource constraints, protocols will hang indefinitely because the dask scheduler is waiting for workers that meet the specified resource constraints so that it can schedule these protocols. Unless workers were created with these resource tags applied, the protocols will not run. See https://distributed.dask.org/en/stable/resources.html for more information. Default: None
- Returns:
None
- DATETIME_FORMAT: str = '%Y-%m-%d %H:%M:%S.%f'¶
- REMARK_FORMAT: str = 'REMARK PyRosettaCluster: '¶
- __init__(*, tasks: Any = [{}], nstruct=1, input_packed_pose: Any = None, seeds: Optional[Any] = None, decoy_ids: Optional[Any] = None, client: Optional[Client] = None, clients: Optional[List[Client]] = None, scheduler: str = None, cores=1, processes=1, memory='4g', scratch_dir: Any = None, min_workers=1, max_workers=_Nothing.NOTHING, dashboard_address=':8787', project_name='2025.10.14.21.07.15.688633', simulation_name=_Nothing.NOTHING, output_path='/home/benchmark/rosetta/source/build/PyRosetta/Linux-5.4.0-84-generic-x86_64-with-glibc2.27/clang-6.0.0/python-3.11/minsizerel.serialization.thread/documentation/outputs', output_decoy_types: Any = None, output_scorefile_types: Any = None, scorefile_name='scores.json', simulation_records_in_scorefile=False, decoy_dir_name='decoys', logs_dir_name='logs', logging_level='INFO', logging_address: str = _Nothing.NOTHING, compressed=True, compression: Optional[Union[str, bool]] = True, sha1: Any = '', ignore_errors=False, timeout=0.5, max_delay_time=3.0, filter_results: Any = None, save_all=False, dry_run=False, norm_task_options: Any = None, cooldown_time=0.5, system_info: Any = None, pyrosetta_build: Any = None, security=_Nothing.NOTHING, max_nonce: int = 4096, environment: Any = None, author=None, email=None, license=None, output_init_file=_Nothing.NOTHING) None ¶
Method generated by attrs for class PyRosettaCluster.
- static _add_pose_comment(packed_pose: PackedPose, pdbfile_data: str) PackedPose ¶
Cache simulation data as a pose comment.
- _clients_dict_has_security() bool ¶
Test if the self.clients_dict has security enabled on all clients, excluding clients with LocalCluster clusters.
- _close_socket_logger_plugins(clients: Dict[int, Client]) None ¶
Purge cached logging socket addresses on all dask workers.
- _dump_init_file(filename: str, input_packed_pose: Optional[PackedPose] = None, output_packed_pose: Optional[PackedPose] = None, verbose: bool = True) None ¶
Dump compressed PyRosetta initialization input files and poses to the input filename.
- static _dump_json(data: Dict[str, Any]) str ¶
Return JSON-serialized data.
- static _filter_scores_dict(scores_dict: Dict[Any, Any]) Dict[Any, Any] ¶
- _format_result(result: Union[Pose, PackedPose]) Tuple[str, Dict[Any, Any], PackedPose] ¶
Given a Pose or PackedPose object, return a tuple containing the pdb string and a scores dictionary.
- _get_clients_index(clients_indices: List[int], protocols: List[Callable[[...], Any]]) int ¶
Return the clients index for the current protocol.
- _get_cluster() ClusterType ¶
Given user input arguments, return the requested cluster instance.
- _get_init_file_json(packed_pose: PackedPose) str ¶
Return a PyRosetta initialization file as a JSON-serialized string.
- _get_instance_and_metadata(kwargs: Dict[Any, Any]) Tuple[Dict[Any, Any], Dict[Any, Any]] ¶
Get the current state of the PyRosettaCluster instance, and split the kwargs into the PyRosettaCluster instance kwargs and ancillary metadata.
- _get_output_dir(decoy_dir: str) str ¶
Get the output directory in which to write files to disk.
- _get_resource(resources: List[Dict[Any, Any]], protocols: List[Callable[[...], Any]]) Optional[Dict[Any, Any]] ¶
Return the resource for the current protocol.
- _get_seed(protocols: Sized) Optional[str] ¶
Get the seed for the input user-provided PyRosetta protocol.
- _get_task_state(protocols: List[Callable[[...], Any]]) Tuple[List[Callable[[...], Any]], Callable[[...], Any], Optional[str]] ¶
Given the current state of protocols, returns a tuple of the updated state of protocols and current protocol and seed.
- _is_protocol = False¶
- _maybe_teardown(clients: Dict[int, ClientType], cluster: Optional[ClusterType]) None ¶
Teardown client and cluster.
- _parse_results(results: Optional[Union[Iterable[Optional[Union[Pose, PackedPose, bytes]]], Pose, PackedPose]]) Union[List[Tuple[str, Dict[Any, Any]]], NoReturn] ¶
Format output results on distributed worker. Input argument results can be a Pose, PackedPose, or None object, or a list or tuple of Pose and/or PackedPose objects, or an empty list or tuple. Returns a list of tuples, each tuple containing the pdb string and a scores dictionary.
- _process_kwargs(kwargs: Dict[Any, Any]) Dict[Any, Any] ¶
Remove seed specification from ‘extra_options’ or ‘options’, and remove protocols_key from kwargs.
- _register_socket_logger_plugin(clients: Dict[int, Client]) None ¶
Register SocketLoggerPlugin as a dask worker plugin on dask clients.
- _register_task_security_plugin(clients: Dict[int, Client], prk: MaskedBytes) None ¶
Register TaskSecurityPlugin as a dask worker plugin on dask clients.
- _setup_clients_cluster_adaptive() Tuple[Dict[int, ClientType], Optional[ClusterType], Optional[AdaptiveType]] ¶
Given user input arguments, return the requested client, cluster, and adaptive instance.
- _setup_clients_dict() Union[Dict[int, ClientType], NoReturn] ¶
- _setup_initial_kwargs(protocols: List[Callable[[...], Any]], seed: Optional[str], task: Dict[Any, Any]) Tuple[bytes, Dict[str, Any]] ¶
Setup the kwargs for the initial task.
- _setup_kwargs(kwargs: Dict[Any, Any], clients_indices: List[int], resources: Optional[Dict[Any, Any]]) Tuple[bytes, Dict[str, Any], Callable[[...], Any], int, Optional[Dict[Any, Any]]] ¶
Setup the kwargs for the subsequent tasks.
- _setup_protocols_protocol_seed(args: Tuple[Any, ...], protocols: Any, clients_indices: Any, resources: Any) Tuple[List[Callable[[...], Any]], Callable[[...], Any], Optional[str], int, Optional[Dict[Any, Any]]] ¶
Parse, validate, and setup the user-provided PyRosetta protocol(s).
- _setup_pyrosetta_init_kwargs(kwargs: Dict[Any, Any]) Dict[str, Any] ¶
- _setup_seed(kwargs: Dict[Any, Any], seed: Optional[str]) Dict[Any, Any] ¶
Setup the ‘options’ or ‘extra_options’ task kwargs with the -run:jran PyRosetta command line flag.
- _setup_socket_listener(clients: Dict[int, Client]) Tuple[Tuple[str, int], bytes] ¶
Setup logging socket listener.
- _setup_task_security_plugin(clients: Dict[int, Client]) None ¶
Setup tast security worker plugin(s).
- _setup_with_nonce() bool ¶
Post-init hook to setup the PyRosettaCluster().with_nonce instance attribute.
- class pyrosetta.distributed.cluster.Serialization(*, instance_id: Optional[str] = None, prk=None, compression: Any = 'xz', with_nonce: bool = False)¶
Bases:
Generic
[G
]PyRosettaCluster serialization base class.
- classmethod zlib_compress(obj: bytes) bytes ¶
Compress an object with zlib level 9.
- requires_compression() T ¶
Wrapper testing if compression is enabled, and skips compression if it’s disabled.
- _seal(data: bytes) bytes ¶
Seal data with MessagePack.
- _unseal(obj: bytes) bytes ¶
Unseal data with MessagePack and perform Hash-based Message Authentication Code (HMAC) verification.
- compress_packed_pose(packed_pose: Any) Union[NoReturn, None, bytes] ¶
Compress a PackedPose object with the custom serialization module. If the ‘packed_pose’ argument parameter is None, then just return None.
- Args:
packed_pose: the input PackedPose object to compress. If None, then just return None.
- Returns:
A bytes object representing the compressed PackedPose object, or a NoneType object.
- Raises:
TypeError if the ‘packed_pose’ argument parameter is not of type NoneType or PackedPose.
- decompress_packed_pose(compressed_packed_pose: Any) Union[NoReturn, None, PackedPose] ¶
Decompress a bytes object with the custom serialization module and secure implementation of the pickle module. If the ‘compressed_packed_pose’ argument parameter is None, then just return None.
- Args:
compressed_packed_pose: the input bytes object to decompress. If None, then just return None.
- Returns:
A PackedPose object representing the decompressed bytes object, or a NoneType object.
- Raises:
TypeError if the ‘compressed_packed_pose’ argument parameter is not of type NoneType or bytes.
- loads_object(compressed_obj: bytes) Any ¶
Unseal data and run the cloudpickle.loads method.
- dumps_object(obj: Any) bytes ¶
Run the cloudpickle.dumps method and seal data.
- compress_kwargs(kwargs: Any) Union[NoReturn, bytes] ¶
Compress a dict object with the cloudpickle and custom serialization modules.
- Args:
kwargs: the input dict object to compress.
- Returns:
A bytes object representing the compressed dict object.
- Raises:
TypeError if the ‘kwargs’ argument parameter is not of type dict.
- decompress_kwargs(compressed_kwargs: bytes) Union[NoReturn, Dict[Any, Any]] ¶
Decompress a bytes object with the custom serialization and cloudpickle modules.
- Args:
compressed_kwargs: the input bytes object to decompress.
- Returns:
A dict object representing the decompressed bytes object.
- Raises:
TypeError if the ‘compressed_packed_pose’ argument parameter is not of type bytes. TypeError if the returned kwargs is not of type dict.
- compress_object(obj: Any) bytes ¶
Compress an object with the cloudpickle and custom serialization modules.
- Args:
obj: the input object to compress.
- Returns:
A bytes object representing the compressed object.
- decompress_object(compressed_obj: bytes) Any ¶
Decompress a bytes object with the custom serialization and cloudpickle modules.
- Args:
compressed_obj: the input bytes object to decompress.
- Returns:
An object representing the decompressed bytes object.
- Raises:
TypeError if the ‘compressed_obj’ argument parameter is not of type bytes.
- classmethod deepcopy_kwargs(kwargs: Any) Union[NoReturn, Dict[Any, Any]] ¶
The cloudpickle module makes it possible to serialize Python constructs not supported by the default pickle module from the Python standard library.
- Args:
kwargs: the dict object to be deep copied.
- Returns:
A deep copy of the dict object.
- Raises:
TypeError if the ‘kwargs’ argument parameter is not of type dict.
- __init__(*, instance_id: Optional[str] = None, prk=None, compression: Any = 'xz', with_nonce: bool = False) None ¶
Method generated by attrs for class Serialization.
- _is_protocol = False¶
- pyrosetta.distributed.cluster.export_init_file(output_file: str, output_init_file: Optional[str] = None, compressed: Optional[bool] = None) None ¶
Export a PyRosetta initialization file from a decoy output file. The PyRosettaCluster simulation that produced the decoy output file must have had the ‘output_init_file’ instance attribute enabled, so the ‘init_file’ key value can be detected in the metadata of the decoy output file. This function can be used to prepend the decoy output file to the detected PyRosetta initialization file for more facile simulation reproduction using the reproduce() function.
- Args:
- output_file: A required str object representing the decoy output file. The file must end in
either: ‘.pdb’, ‘.pdb.bz2’, ‘.pkl_pose’, ‘.pkl_pose.bz2’, ‘.b64_pose’, or ‘.b64_pose.bz2’.
- output_init_file: An optional str object specifying the output PyRosetta initialization file
path ending with ‘.init’. If NoneType is provided, then the PyRosetta initialization file path is derived from the ‘output_file’ argument parameter by replacing the file extension with ‘.init’ (or ‘.init.bz2’ when the ‘compressed’ argument parameter is set to True). Default: None
- compressed: A bool object specifying whether or not to compress the output PyRosetta initialization
- file with bzip2, resulting in a ‘.init.bz2’ output PyRosetta initialization file.
Default: True
- Returns:
None
- pyrosetta.distributed.cluster.generate_dask_tls_security(output_dir: str = '.', common_name: str = 'dask_tls_security', days: int = 365, openssl_bin: str = 'openssl', overwrite: bool = False, san_dns: Optional[Iterable[str]] = None, san_ip: Optional[Iterable[str]] = None, cleanup: bool = True) Union[Security, NoReturn] ¶
Create cryptographic certificates and private keys for securing a dask cluster, and return a dask Security object that can be passed directly to the PyRosettaCluster(security=…) keyword argument parameter.
This function uses the openssl command-line tool to generate the following:
A “certificate authority” certificate and key: These act as a trusted “parent” identity used to sign other certificates. Files: ca.pem (certificate), ca.key (private key).
A “leaf” certificate and key: These represent the actual dask processes (scheduler, workers, and client). Files: tls.crt (certificate), tls.key (private key).
By default, the leaf certificate is signed by the certificate authority, meaning that any process configured with this authority will trust the leaf certificate as valid.
All generated files are placed in the output_dir keyword argument parameter, which defaults to the current working directory.
- Keyword Args:
- output_dir: A str object representing the directory where all certificate and
key files will be written. The directory will be created if it does not exist. All generated files (CA certificate, leaf certificate, leaf private key, and optional bookkeeping files) are output to this single directory. Therefore, for a distributed dask setup, this directory must be readable by the scheduler, workers, and client processes, either via a shared filesystem or via copying and mounting (e.g., if using Docker, Apptainer, or other container applications). Default: “.”
- common_name: A str object representing the “Common Name” placed inside the leaf
certificate. This is a human-readable identifier that typically names the system or service to which the certificate belongs. Default: “dask_tls_security”
- days: An int object representing the number of days the certificates will be
valid before expiring. Default: 365
- openssl_bin: A str object representing the path or name of the openssl
executable. If the OpenSSL executable is not in the system “PATH” environment variable, then the full path must be provided. Default: “openssl”
- overwrite: A bool object specifying whether or not to overwrite existing
files in ‘output_dir’ keyword argument parameter. If True is provided, the same filenames will be deleted and replaced with newly generated ones. If False is provided, then existing files are re-used. Default: False
- san_dns: An optional iterable of str object representing a list of hostnames
(e.g., [“localhost”, “cluster.example.com”]) that should be accepted when verifying the certificate. These are included in an extension field called “Subject Alternative Names”. Default: None
- san_ip: An optional iterable of str object representing a list of IP addresses
(e.g., [“127.0.0.1”, “111.111.111.1”]) that should be accepted when verifying the certificate. These are also included in the “Subject Alternative Names” field. Default: None
- cleanup: An optional bool object specifying whether or not to delete the index.txt
and serial bookkeeping files used by OpenSSL. Default: True
- Returns:
A dask.distributed.Security() instance configured to require encryption (with require_encryption=True) and configured to use the generated certificates and keys for the scheduler, workers, and client.
- Examples:
Generate a new set of certificates and a configured dask Security object: ``` security = pyrosetta.distributed.cluster.generate_dask_tls_security(
output_dir=”./dask_certs”, common_name=”my-cluster”, san_dns=[“localhost”, “my-host.local”], san_ip=[“127.0.0.1”], cleanup=False,
After running this function, the directory ./dask_certs will contain: - ca.pem: certificate authority certificate (used by dask) - ca.key: certificate authority private key - tls.crt: leaf certificate (used by dask) - tls.key: leaf private key (used by dask) - index.txt, serial, and ca.cnf: bookkeeping files used by OpenSSL (with cleanup=False)
Then use the configured dask Security object with PyRosettaCluster:
` PyRosettaCluster(security=security, ...).distribute(...) `
- Additional Notes:
A “certificate authority” (CA) act as a trusted “parent” identity that
confirms whether a certificate is real. In this function, the user generates their own local CA for the dask cluster. - A “leaf certificate” is the actual identity used by a running process (i.e., the scheduler, a worker, or a client). - “Subject Alternative Names” (SANs) are extra hostnames or IP addresses for which the certificate is valid. This enables the user to connect using either a machine name or an IP address without validation errors. - File permissions are automatically set for private keys using chmod 600 so they are restricted to the owner (read/write only) for basic security. - This function generates all necessary files in a single directory. For proper TLS validation in a distributed dask setup, the CA certificate must be accessible from all nodes (i.e., the scheduler, workers, and client). Leaf certificates and keys must be accessible by the process using them. For example, all files can be placed in a common directory from which all processes can read, or the directory can be mounted (e.g., if using Docker, Apptainer, or other container applications). - If cleanup=False and the same directory is used for multiple function calls, then OpenSSL may create additional files in the output directory (e.g., *.pem, index.txt.attr, index.txt.old, and serial.old). These are simply bookkeeping files used internally by OpenSSL and are not required by dask, so they can be safely deleted after the leaf certificate has been issued.
- pyrosetta.distributed.cluster.get_instance_kwargs(input_file: Optional[Union[str, Pose, PackedPose]] = None, scorefile: Optional[str] = None, decoy_name: Optional[str] = None, skip_corrections: Optional[bool] = None) Union[Dict[str, Any], NoReturn] ¶
Given an input file that was written by PyRosettaCluster, or a scorefile and a decoy name that was written by PyRosettaCluster, return the PyRosettaCluster instance kwargs needed to reproduce the decoy using PyRosettaCluster.
- Args:
- input_file: A str object specifying the path to the ‘.pdb’, ‘.pdb.bz2’, ‘.pkl_pose’,
‘.pkl_pose.bz2’, ‘.b64_pose’, ‘.b64_pose.bz2’, ‘.init’, or ‘.init.bz2’ file, or a Pose`or `PackedPose object, from which to extract PyRosettaCluster instance kwargs. If ‘input_file’ is provided, then ignore the ‘scorefile’ and ‘decoy_name’ keyword argument parameters. Default: None
- scorefile: A str object specifying the path to the JSON-formatted scorefile
(or pickled pandas.DataFrame scorefile) from a PyRosettaCluster simulation from which to extract PyRosettaCluster instance kwargs. If ‘scorefile’ is provided, ‘decoy_name’ must also be provided. In order to use a scorefile, it must contain full simulation records from the original production run; i.e., the attribute ‘simulation_records_in_scorefile’ was set to True. Default: None
- decoy_name: A str object specifying the decoy name for which to extract
PyRosettaCluster instance kwargs. If ‘decoy_name’ is provided, ‘scorefile’ must also be provided. Default: None
- skip_corrections: A bool object specifying whether or not to skip any ScoreFunction
corrections specified in the PyRosettaCluster task initialization options (extracted from the ‘input_file’ or ‘scorefile’ keyword argument parameter). Default: None
- Returns:
A dict object of PyRosettaCluster instance kwargs.
- pyrosetta.distributed.cluster.get_protocols(protocols: Optional[Union[List[Union[Callable[[...], Any], str]], Callable[[...], Any], str]] = None, input_file: Optional[Union[str, Pose, PackedPose]] = None, scorefile: Optional[str] = None, decoy_name: Optional[str] = None) Union[List[Union[Callable[[...], Any], str]], NoReturn] ¶
Given an ‘input_file’ that was written by PyRosettaCluster, or a full ‘scorefile’ and a ‘decoy_name’ that was written by PyRosettaCluster, if ‘protocols’ is provided then validate the ‘protocols’ against those in the ‘input_file’ or ‘scorefile’, otherwise if ‘protocols’ is NoneType then attempt to return the PyRosettaCluster protocols from the current scope matching the protocol names in the ‘input_file’ or ‘scorefile’ keyword argument parameters.
- Args:
- protocols: An iterable of str objects specifying the names of user-provided
PyRosetta protocols to validate or return. Default: None
- input_file: A str object specifying the path to the ‘.pdb’, ‘.pdb.bz2’, ‘.pkl_pose’,
‘.pkl_pose.bz2’, ‘.b64_pose’, ‘.b64_pose.bz2’, ‘.init’, or ‘.init.bz2’ file, or a Pose`or `PackedPose object, from which to extract PyRosettaCluster instance kwargs. If ‘input_file’ is provided, then ignore the ‘scorefile’ and ‘decoy_name’ keyword argument parameters. Default: None
- scorefile: A str object specifying the path to the JSON-formatted scorefile
(or pickled pandas.DataFrame scorefile) from a PyRosettaCluster simulation from which to extract PyRosettaCluster instance kwargs. If ‘scorefile’ is provided, ‘decoy_name’ must also be provided. In order to use a scorefile, it must contain full simulation records from the original production run; i.e., the attribute ‘simulation_records_in_scorefile’ was set to True. Default: None
- decoy_name: A str object specifying the decoy name for which to extract
PyRosettaCluster instance kwargs. If decoy_name is provided, scorefile must also be provided. Default: None
- Returns:
A list of user-defined PyRosetta protocol names from the ‘input_file’ or ‘scorefile’. If protocols is None, then attempt to return the PyRosettaCluster protocols from the current scope matching the protocol names in the ‘input_file’ or ‘scorefile’.
- pyrosetta.distributed.cluster.get_protocols_list_of_str(input_file: Optional[Union[str, Pose, PackedPose]] = None, scorefile: Optional[str] = None, decoy_name: Optional[str] = None) Union[List[str], NoReturn] ¶
Get the user-defined PyRosetta protocols as a list object of str objects.
- Args:
- input_file: A str object specifying the path to the ‘.pdb’, ‘.pdb.bz2’, ‘.pkl_pose’,
‘.pkl_pose.bz2’, ‘.b64_pose’, ‘.b64_pose.bz2’, ‘.init’, or ‘.init.bz2’ file, or a Pose or PackedPose object, from which to extract PyRosettaCluster instance kwargs. If ‘input_file’ is provided, then ignore the ‘scorefile’ and ‘decoy_name’ keyword argument parameters. Default: None
- scorefile: A str object specifying the path to the JSON-formatted scorefile
(or pickled pandas.DataFrame scorefile) from a PyRosettaCluster simulation from which to extract PyRosettaCluster instance kwargs. If ‘scorefile’ is provided, ‘decoy_name’ must also be provided. In order to use a scorefile, it must contain full simulation records from the original production run; i.e., the attribute ‘simulation_records_in_scorefile’ was set to True. Default: None
- decoy_name: A str object specifying the decoy name for which to extract
PyRosettaCluster instance kwargs. If decoy_name is provided, scorefile must also be provided. Default: None
- Returns:
A list object of str objects specifying user-defined PyRosetta protocol names.
- pyrosetta.distributed.cluster.get_scores_dict(obj: Union[str, Pose, PackedPose]) Union[Dict[str, Dict[str, Any]], NoReturn] ¶
Get the PyRosettaCluster scores dictionary from either a Pose or PackedPose object, or a ‘.pdb’, ‘.pdb.bz2’, ‘.pkl_pose’, ‘.pkl_pose.bz2’, ‘.b64_pose’, ‘.b64_pose.bz2’, ‘.init’, or ‘.init.bz2’ file.
- pyrosetta.distributed.cluster.get_yml() str ¶
Use conda env export to return a YML file string with the current conda enviroment, excluding certain source domains.
- pyrosetta.distributed.cluster.iterate(**kwargs: Any) Union[NoReturn, Generator[Tuple[PackedPose, Dict[Any, Any]], None, None]] ¶
PyRosettaCluster().generate() shim requiring the ‘protocols’ keyword argument, and optionally any PyRosettaCluster keyword arguments or the ‘clients_indices’ keyword argument (when using the PyRosettaCluster(clients=…) keyword argument), or the ‘resources’ keyword argument.
- Args:
- **kwargs: See PyRosettaCluster docstring. The keyword arguments must also include
‘protocols’, an iterable object of function or generator objects specifying an ordered sequence of user-defined PyRosetta protocols to execute for the simulation (see PyRosettaCluster().generate docstring). The keyword arguments may also optionally include ‘clients_indices’ or ‘resources’ (see PyRosettaCluster().generate docstring).
- Yields:
(PackedPose, dict) tuples from the most recently run user-provided PyRosetta protocol if PyRosettaCluster(save_all=True) otherwise from the final user-defined PyRosetta protocol.
- pyrosetta.distributed.cluster.produce(**kwargs: Any) Optional[NoReturn] ¶
PyRosettaCluster().distribute() shim requiring the ‘protocols’ keyword argument, and optionally any PyRosettaCluster keyword arguments or the ‘clients_indices’ keyword argument (when using the PyRosettaCluster(clients=…) keyword argument), or the ‘resources’ keyword argument.
- Args:
- **kwargs: See PyRosettaCluster docstring. The keyword arguments must also include
‘protocols’, an iterable object of function or generator objects specifying an ordered sequence of user-defined PyRosetta protocols to execute for the simulation (see PyRosettaCluster().distribute docstring). The keyword arguments may also optionally include ‘clients_indices’ or ‘resources’ (see PyRosettaCluster().distribute docstring).
- Returns:
None
- pyrosetta.distributed.cluster.recreate_environment(environment_name: Optional[str] = None, input_file: Optional[Union[str, Pose, PackedPose]] = None, scorefile: Optional[str] = None, decoy_name: Optional[str] = None, timeout: Optional[int] = None) Optional[NoReturn] ¶
Given an input file that was written by PyRosettaCluster, or a scorefile and a decoy name that was written by PyRosettaCluster, recreate the conda environment that was used to generate the decoy with a new environment name.
- Args:
- environment_name: A str object specifying the new name of the conda environment
to recreate. Default: ‘PyRosettaCluster_’ + datetime.now().strftime(“%Y.%m.%d.%H.%M.%S.%f”)
- input_file: A str object specifying the path to the ‘.pdb’, ‘.pdb.bz2’, ‘.pkl_pose’,
‘.pkl_pose.bz2’, ‘.b64_pose’, or ‘.b64_pose.bz2’ file, or a Pose`or `PackedPose object, from which to extract PyRosettaCluster instance kwargs. If ‘input_file’ is provided, then ignore the ‘scorefile’ and ‘decoy_name’ keyword argument parameters. Default: None
- scorefile: A str object specifying the path to the JSON-formatted scorefile
from which to extract PyRosettaCluster instance kwargs. If ‘scorefile’ is provided, ‘decoy_name’ must also be provided. In order to use a scorefile, it must contain full simulation records from the original production run; i.e., the attribute ‘simulation_records_in_scorefile’ was set to True. Default: None
- decoy_name: A str object specifying the decoy name for which to extract
PyRosettaCluster instance kwargs. If ‘decoy_name’ is provided, ‘scorefile’ must also be provided. Default: None
- timeout: An int object specifying the timeout in seconds before exiting the subprocess.
Default: None
- Returns:
None
- pyrosetta.distributed.cluster.reproduce(input_file: Optional[str] = None, scorefile: Optional[str] = None, decoy_name: Optional[str] = None, protocols: Any = None, client: Optional[Client] = None, clients: Optional[List[Client]] = None, input_packed_pose: Optional[Union[Pose, PackedPose]] = None, instance_kwargs: Optional[Dict[Any, Any]] = None, clients_indices: Optional[List[int]] = None, resources: Optional[Dict[Any, Any]] = None, skip_corrections: bool = False, init_from_file_kwargs: Optional[Dict[str, Any]] = None) Optional[NoReturn] ¶
Given an input file that was written by PyRosettaCluster (or a full scorefile and a decoy name that was written by PyRosettaCluster) and any additional PyRosettaCluster instance kwargs, run the reproduction simulation for the given decoy with a new instance of PyRosettaCluster.
- Args:
- input_file: A str object specifying the path to the ‘.pdb’, ‘.pdb.bz2’,
‘.pkl_pose’, ‘.pkl_pose.bz2’, ‘.b64_pose’, ‘.b64_pose.bz2’, ‘.init’ or ‘.init.bz2’ file from which to extract PyRosettaCluster instance kwargs. If ‘input_file’ is provided, then ignore the ‘scorefile’ and ‘decoy_name’ argument parameters. If a ‘.init’ or ‘.init.bz2’ file is provided and PyRosetta is not yet initialized, this first initializes PyRosetta with the PyRosetta initialization file (see the ‘init_from_file_kwargs’ keyword argument). Note that ‘.pkl_pose’, ‘.pkl_pose.bz2’, ‘.b64_pose’, ‘.b64_pose.bz2’, ‘.init’ and ‘.init.bz2’ files contain pickled Pose objects that are deserialized using PyRosetta’s secure unpickler upon running the reproduce() function, but please still only input these file types if you know and trust their source. Learn more here. Default: None
- scorefile: A str object specifying the path to the JSON-formatted scorefile
(or pickled pandas.DataFrame scorefile) from a PyRosettaCluster simulation from which to extract PyRosettaCluster instance kwargs. If ‘scorefile’ is provided, ‘decoy_name’ must also be provided. In order to use a scorefile, it must contain full simulation records from the original production run; i.e., the attribute ‘simulation_records_in_scorefile’ was set to True. Note that in order to securely load pickled pandas.DataFrame objects, please ensure that pyrosetta.secure_unpickle.add_secure_package(“pandas”) has been run. Default: None
- decoy_name: A str object specifying the decoy name for which to extract
PyRosettaCluster instance kwargs. If decoy_name is provided, scorefile must also be provided. Default: None
- protocols: An optional iterable object of function or generator objects specifying
an ordered sequence of user-defined PyRosetta protocols to execute for the reproduction. This argument only needs to be provided if the user-defined PyRosetta protocols are not defined with the same scope as in the original production run. Default: None
- client: An optional initialized dask distributed.client.Client object to be used as
the dask client interface to the local or remote compute cluster. If None, then PyRosettaCluster initializes its own dask client based on the settings from the original production run. Deprecated by the clients attribute, but supported for legacy purposes. Default: None
- clients: A list or tuple object of initialized dask distributed.client.Client
objects to be used as the dask client interface(s) to the local or remote compute cluster(s). If None, then PyRosettaCluster initializes its own dask client based on the settings from the original production run. Optionally used in combination with the clients_indices attribute. Default: None
- input_packed_pose: An optional input PackedPose object that is accessible via
the first argument of the first user-defined PyRosetta protocol. Default: None
- instance_kwargs: An optional dict object of valid PyRosettaCluster attributes
which will override any PyRosettaCluster attributes that were used to generate the original decoy. Default: None
- clients_indices: An optional list or tuple object of int objects, where each int object represents
a zero-based index corresponding to the initialized dask distributed.client.Client object(s) passed to the PyRosettaCluster(clients=…) class attribute. If not None, then the length of the clients_indices object must equal the number of protocols passed to the PyRosettaCluster().distribute method. Default: None
- resources: An optional list or tuple object of dict objects, where each dict object represents
an abstract, arbitrary resource to constrain which dask workers run the user-defined PyRosetta protocols. If None, then do not impose resource constaints on any protocols. If not None, then the length of the resources object must equal the number of protocols passed to the PyRosettaCluster().distribute method, such that each resource specified indicates the unique resource constraints for the protocol at the corresponding index of the protocols passed to PyRosettaCluster().distribute. Note that this feature is only useful when one passes in their own instantiated client(s) with dask workers set up with various resource constraints. If dask workers were not instantiated to satisfy the specified resource constraints, protocols will hang indefinitely because the dask scheduler is waiting for workers that meet the specified resource constraints so that it can schedule these protocols. Unless workers were created with these resource tags applied, the protocols will not run. See https://distributed.dask.org/en/latest/resources.html for more information. Default: None
- skip_corrections: A bool object specifying whether or not to skip any ScoreFunction corrections specified in
the PyRosettaCluster task ‘options’ or ‘extra_options’ values (extracted from either the ‘input_file’ or ‘scorefile’ keyword argument parameter), which are set in-code upon PyRosetta initialization. If the current PyRosetta build and conda environment are identical to those used for the original simulation, this parameter may be set to True to enable the reproduced decoy output file to be used for successive reproductions. If reproducing from a ‘.init’ file, it is recommended to also set ‘skip_corrections’ of the ‘init_from_file_kwargs’ keyword argument to the same value. Default: False
- init_from_file_kwargs: An optional dict object to override the default pyrosetta.init_from_file() keyword
arguments if the ‘input_file’ keyword argument parameter is a path to a ‘.init’ file, otherwise it is not used. See the pyrosetta.init_from_file docstring for more information. Default: {
‘output_dir’: os.path.join(tempfile.TemporaryDirectory().name, “pyrosetta_init_input_files”), ‘skip_corrections’: skip_corrections, # Defaults to the ‘skip_corrections’ value from reproduce() ‘relative_paths’: True, ‘dry_run’: False, ‘max_decompressed_bytes’: pow(2, 30), # 1 GiB ‘database’: None, ‘verbose’: True, ‘set_logging_handler’: ‘logging’, ‘notebook’: None, ‘silent’: False,
}
- Returns:
None
- pyrosetta.distributed.cluster.requires_packed_pose(func: P) Union[PackedPose, None, P] ¶
Use this as a Python decorator of any user-provided PyRosetta protocol. If a user-provided PyRosetta protocol requires that the first argument parameter be a non-empty PackedPose object, then return any received empty PackedPose objects or NoneType objects and skip the decorated protocol, otherwise run the decorated protocol.
If using PyRosettaCluster(filter_results=False) and the preceding protocol returns or yields either None, an empty Pose object, or an empty PackedPose object, then an empty PackedPose object is distributed to the next user-provided PyRosetta protocol, in which case the next protocol and/or any downstream protocols are skipped if they are decorated with this decorator. If using PyRosettaCluster(ignore_errors=True) and an error is raised in the preceding protocol, then a NoneType object is distributed to the next user-provided PyRosetta protocol, in which case the next protocol and/or any downstream protocols are skipped if they are decorated with this decorator.
For example:
@requires_packed_pose def my_pyrosetta_protocol(packed_pose, **kwargs):
assert packed_pose.pose.size() > 0 return packed_pose
- Args:
A user-provided PyRosetta function.
- Returns:
The input packed_pose argument parameter if it is an empty PackedPose object or a NoneType object, otherwise the results from the decorated protocol.
- pyrosetta.distributed.cluster.reserve_scores(func: P) Union[P, NoReturn] ¶
Use this as a Python decorator of any user-provided PyRosetta protocol. If any scoreterms and values are present in the input packed_pose, then if they are deleted during execution of the decorated user-provided PyRosetta protocol, then append those scoreterms and values back into the pose.cache dictionary after execution. If any scoreterms and values are present in the input packed_pose and also present in the returned or yielded output Pose or PackedPose objects, then do not append the original scoreterms and values back into the pose.cache dictionary after execution (that is, keep the outputted scoreterms and values in the pose.cache dictionary). Any new scoreterms and values acquired in the decorated user-provided PyRosetta protocol will never be overwritten. This allows users to maintain scoreterms and values acquired in earlier user-defined PyRosetta protocols if needing to execute Rosetta Movers that happen to delete scores from pose objects.
For example:
@reserve_scores def my_pyrosetta_protocol(packed_pose, **kwargs):
from pyrosetta import MyMover pose = packed_pose.pose MyMover().apply(pose) return pose
- Args:
A user-provided PyRosetta function.
- Returns:
The output from the user-provided PyRosetta function, reserving the scores.
- pyrosetta.distributed.cluster.run(**kwargs: Any) Optional[NoReturn] ¶
PyRosettaCluster().distribute() shim requiring the ‘protocols’ keyword argument, and optionally any PyRosettaCluster keyword arguments or the ‘clients_indices’ keyword argument (when using the PyRosettaCluster(clients=…) keyword argument), or the ‘resources’ keyword argument.
- Args:
- **kwargs: See PyRosettaCluster docstring. The keyword arguments must also include
‘protocols’, an iterable object of function or generator objects specifying an ordered sequence of user-defined PyRosetta protocols to execute for the simulation (see PyRosettaCluster().distribute docstring). The keyword arguments may also optionally include ‘clients_indices’ or ‘resources’ (see PyRosettaCluster().distribute docstring).
- Returns:
None
- pyrosetta.distributed.cluster.update_scores(packed_pose: PackedPose) PackedPose ¶
Cache scores into the PackedPose object that are not cached in the Pose object and do not have keys with reserved scoretypes, then return the updated PackedPose object.
- Args:
packed_pose: the input PackedPose object in which to update scores.
- Returns:
A new PackedPose object with scores cached in its Pose object if scores could be cached, otherwise the input PackedPose object.