ovo.core.scheduler.nextflow_scheduler

Module Contents

Classes

API

class ovo.core.scheduler.nextflow_scheduler.NextflowScheduler(name: str, workdir: str, reference_files_dir: str, allow_submit: bool = True, submission_args: dict = None)

Bases: ovo.core.scheduler.base_scheduler.Scheduler, ovo.core.scheduler.simple_queue_mixin.SimpleQueueMixin

submit(pipeline_name: str, params: dict = None, submission_args: dict = None) str

Submits a Nextflow workflow asynchronously and returns the PID.

Parameters:
  • pipeline_name – Workflow name and revision (ovo.rfdiffusion-end-to-end or github url with @version)

  • params – Dictionary of parameters to pass to the workflow.

  • submission_args – Submission arguments for the scheduler, overrides values in self.submission_args

Returns:

Scheduler job ID

_run_subprocess(command: list[str], job_id: str, sync: bool = False)
queue_run_task(job_id: str, task: Any)

Execute a single task from the queue synchronously - executed in the worker loop

run(pipeline_name: str, output_dir: str = None, params: dict = None, submission_args: dict = None, link=False) str

Run workflow, wait for it to finish and return (process, job_id) tuple.

Raise RuntimeError if the workflow fails (return code != 0).

Shorthand for submit(submission_args=dict(sync=True))

Parameters:
  • pipeline_name – Workflow name and revision (ovo.rfdiffusion-end-to-end or github url with @version)

  • output_dir – output directory to save results to (optional, will be symlinked from workdir/execdir/job_id/output)

  • params – Dictionary of parameters to pass to the workflow.

  • submission_args – Submission arguments for the scheduler, overrides values in self.submission_args

  • link – If True, symlink the output directory to output_dir instead of copying (only if output_dir is set)

Return job_id:

Scheduler job ID

_create_run_command(pipeline_name: str, params: dict = None, sync: bool = False, submission_args: dict = None) list[str]

Prepare a Nextflow run command for job submission.

Parameters:
  • pipeline_name – Workflow name and revision (ovo.rfdiffusion-end-to-end or github url with @version)

  • params – Dictionary of parameters to pass to the workflow.

  • sync – If true, run the workflow synchronously.

  • submission_args – Submission arguments for the scheduler, overrides values in self.submission_args

Returns:

Command as a list of strings

_get_module_path(module_name) str

Get absolute path to modulename directory

_get_default_config_path()

Get path to nextflow_default.config file

_get_pipeline_dir(pipeline_name: str) str

Get path to workflow directory (with main.nf, nextflow.config, etc)

get_trace_table(job_id: str) pandas.DataFrame | None
get_status_label(job_id: str) str

Get human-readable job status label. Should NOT be used to determine job status.

get_result(job_id: str) bool | None

Get job result: True if successful, False if failed, None if still running.

get_log(job_id: str) str | None

Get job execution log

cancel(job_id)

Cancel job execution

get_output_dir(job_id: str)

Get job output path

_get_exec_dir(job_id: str)
_get_pid(job_id) int
get_job_start_time(job_id: str) datetime.datetime | None

Get job start time

get_job_stop_time(job_id: str) datetime.datetime | None

Get job end time

_get_shared_module_paths()
_get_shared_modules_string() str
get_pipeline_names() list[str]
get_param_schema(pipeline_name: str) dict | None

Get workflow schema JSON dict (JSON Schema standard) or None if not available.

get_failed_message(job_id)