Running Analyses¶
This section documents Explorer's computational capabilities and how to submit, monitor, and collect results from analyses.
Local and Cluster Compute Capabilities¶
The interactive Explorer interface is hosted locally on an AWS EC2 instance. This local instance also has the ability to interface with AWS Batch, a scalable compute cluster. The local AWS EC2 instance works interactively using 4 CPUs and 16Gb of RAM. This instance is well suited for tasks like organizing data, submitting analyses to AWS Batch, or data visualization. For tasks that require more compute, analyses can be submitted to AWS Batch through a job submission system built into Explorer. AWS Batch can efficiently run thousands of batch jobs in parallel, and Explorer has convenient tools that allow you to monitor analyses (and their underlying jobs) and collect the results. Setup for both local and remote analyses are the same, making it easy to create and test an analysis locally before deploying at scale on AWS Batch. Below we discuss these capabilities with examples.
Analysis
and AnalysisContext
objects¶
The Analysis
object is a cornerstone of the Gencove Explorer SDK. It provides the core
functionality for specifying the parameters, executing, and fetching output of a work function
.
The work function can be any function specified by the user that accepts one parameter that is typed
as AnalysisContext
.
Additionally:
- All imports need to occur within the work function
- All user functions that are called from that function must be defined within the work function
The work function can be thought of as similar to a self-contained Python script or module.
For example:
from gencove_explorer.analysis import Analysis, AnalysisContext
def work(ac: AnalysisContext):
import random # import necessary package
def print_random(): # define user function
print(random.random())
print_random() # execute user function
Upon execution of the work function, the Analysis
object passes an AnalysisContext
object to the
function. This object provides the function with the ability to easily:
- Access per-job input via
.input
- Generate outputs via
.output
, which can be:- Files
- Values
- Shared input via
.input_shared
, which can be used to specify "shared" parameters for the analysis. This may be useful for specifying common parameters when executing a large number of cluster jobs. -
Introspect execution parameters via
.job_definition
like:- number of CPUs
- available memory
- available storage
- docker image
- job timeout
-
Job index when running “array jobs” via
.job_index
Interleaving shell and Python¶
Gencove Explorer
support IPython magic commands.
For example, !
can be used to execute complex shell commands. The !
magic command supports
pipes (|
), redirection (>
), and parameter substitution from Python variables ({}
). For
example:
from gencove_explorer.analysis import Analysis, AnalysisContext
def work(ac: AnalysisContext):
where = "shell"
!echo "Hello World from {where}"
where = "Python"
print(f"Hello World from {where}")
an = Analysis(
function=work
)
an.run() # or an.run_local()
💡 Accessing variables for substitution with {}
(and $
) in shell commands requires braces and
shell variables in the shell command to be doubled-up. For
example, !echo "1 {two} 3" | awk '{{print $$2}}'
and see below.
two = 'dos'
! echo "1 {two} 3" | awk '{{print $$2}}' # desired behavior, prints "dos"
! echo "1 {two} 3" | awk '{print $2}' # incorrect, prints "{two}"
! echo "1 {two} 3" | awk '{print $$2}' # incorrect, prints "1 {two} 3"
! localcopy = "-> {two} <-"; echo $$localcopy # desired behavior, prints "-> dos <-"
! localcopy = "-> {two} <-"; echo $localcopy # incorrect, prints "-> {two} <-"
Running locally & Array Jobs¶
run_local
¶
The .run_local()
method runs the job on the local EC2 instance. This method can be executed as
many times as needed for development and testing throughout the lifetime of the Analysis object. Its
output is printed to stdout and stderr, both of which are available in real-time in the notebook
cell output.
By default local runs are executed as a subprocess without restrictions in the current context. It's possible to specify .run_local(container=True)
to run in container mode, which uses Docker to containerize the application and simulates an environment as close as possible to the standard Batch mode (.run()
).
💡 The resource information (CPUs and memory) provided in job_definition
is only accurate when executing run_local(container=True)
. When running locally in non-container mode the CPU and memory limits are ignored.
💡 Container mode might not work as expected when running on instances with less than 4 CPUs.
Array jobs¶
The .run()
method runs any array of jobs on AWS Batch in parallel. When provided a list of input
objects, Gencove Explorer automatically creates an “array job” for each element in the list. Array
size is limited to 10,000. Logs and outputs can be accessed by providing the required job index to
the logs()
and get_output()
methods.
from gencove_explorer.analysis import Analysis, AnalysisContext
def work(ac: AnalysisContext):
print(ac.input)
an = Analysis(
input=["foo", "bar", "baz"],
function=work
)
an.run()
an.logs(2, live=True) # logs for the 3rd input "baz", the log index is zero-based
The implication of Python lists being translated to array cluster jobs results in the obvious question “how do I submit a list as input to one regular (non-array) job?”.
The solution is to convert the list
to a tuple
:
Job input and output¶
Job input¶
Input can be provided to the job by specifying the input
parameter to the Analysis
object. Input
can be any Python object, although there are some exceptions (most commonly, objects that are
difficult to serialize).
from gencove_explorer.analysis import Analysis, AnalysisContext
def work(ac: AnalysisContext):
print(ac.input)
an = Analysis(
input="foo",
function=work
)
an.run() # or an.run_local()
Job output¶
Gencove Explorer work functions can return output by assigning values to attributes
of AnalysisContext.output
. The values can be any Python objects and are automatically collected at
the end of the job, uploaded to object storage, and made available via the Analysis
object.
If the assigned output object is an Explorer File
or FileFormat
and has a .upload()
method,
the .upload()
method will be triggered automatically after the work function completes as a
convenience to the user, i.e., no need to execute .upload()
manually.
from gencove_explorer.analysis import Analysis, AnalysisContext
def work(ac: AnalysisContext):
from gencove_explorer.file import File
ac.output.my_file = f = File()
!echo 123 > {f.local} # work with files
ac.output.val = "abc" # work with values
an = Analysis(
function=work
)
an.run()
an.wait_done()
local_path = an.get_output().my_file.download().local
!cat {local_path} # prints "123"
val = an.get_output().val
print(val) # prints "abc"
Logs¶
The status of the execution can be queried using the .status()
method and standard output and
error can be fetched using the .logs()
method. The .logs()
method accepts three parameters:
live
provides a live tail of logs until the job completessince
indicates how long into the past to display logs (default:10m
, i.e., 10 minutes). This parameter needs to be adjusted to a higher value when requesting logs for a job that completed longer than 10 minutes ago.export
writes logs to a file (denoted by string orFile
) instead of printing to stdout.
from gencove_explorer.analysis import Analysis, AnalysisContext
def work(ac: AnalysisContext):
print("Hello World")
an = Analysis(
function=work
)
an.run()
an.status() # outputs {'status': 'SUCCEEDED'} when job has successfully finished
an.logs(live=True) # live tail of log updates
Persistence with the AnalysisManager object¶
You can retrieve and “rehydrate” previous Analysis
objects via the AnalysisManager
. This is
useful in situations where you want to revisit a previous analysis, e.g. to retrieve outputs or
logs.
Retrieving by ID¶
When you submit an analysis to the AWS Batch cluster with .run()
, an Analysis ID will be returned
and printed to the output below the cell. For example:
an.run()
>>> 2023-06-23 15: 45:06.929 | INFO | gencove_explorer.analysis: run:844 - Analysis ID: 2023-05-08T000000_example_fn_17eb494f3d5349c88959bace7a6b7873
Or if the analysis was already submitted, ID can be retrieved by calling the .id
attribute of the
Analysis object:
You can retrieve your results later using this Analysis ID with the AnalysisManager
object.
from gencove_explorer.analysis_manager import AnalysisManager
mgr = AnalysisManager()
an = mgr.get_analysis(analysis_id="2023-05-08T000000_example_fn_17eb494f3d5349c88959bace7a6b7873")
an.status() # <-- this Analysis object can now be used as a regular analysis object
Listing previous analyses¶
The AnalysisManager
provides methods to list and search across previously run analyses.
-
Listing all analyses
-
Listing analyses from relative time points
-
Listing analyses from an absolute date
-
Listing analyses matching a name filter
Deleting analyses¶
In some cases you may want to delete analyses that are no longer needed. This can be done by calling the delete()
method on an Analysis
object.
Calling delete()
cancels all jobs associated with analysis, and deletes all of their respective outputs. The method also deletes the analysis from EOS storage, and thus cannot be retrieved afterward via AnalysisManager
.
Note that only outputs defined within the analysis_context.output
container
will be deleted, and any other outputs must be deleted manually.
By default, calling delete()
will prompt for confirmation before proceeding. This can be bypassed by passing force=True
as an argument.
The following example demonstrates submitting an analysis which sets a file as an ouput, waiting for it to complete, and then deleting it:
from gencove_explorer.analysis import Analysis, AnalysisContext
def work(ac: AnalysisContext):
"""Work function that creates a file and registers it as an output"""
from gencove_explorer.file import File
from pathlib import Path
# Create example file
Path("./example.txt").touch()
example_file = File(local=Path("./example.txt"))
# Register file as output
ac.output.example_file = example_file
an = Analysis(
function=work
)
# Start analysis
an.run()
# Wait for analysis to complete
an.wait_done()
# Verify output exists
assert an.get_output().example_file.remote.exists()
# Delete analysis
an.delete(force=True)
# Verify output has been removed
assert not an.get_output().example_file.remote.exists()
Advanced features¶
Waiters¶
Gencove Explorer provides convenient waiter methods for blocking execution until a particular state for a AWS Batch cluster job is reached.
from gencove_explorer.analysis import Analysis
an = Analysis()
an.run()
an.wait_running_or_done() # blocks execution until job enters "RUNNING" state on AWS Batch
an.wait_done() # blocks until exection completes on AWS Batch
Custom job imports¶
The user can specify custom packages from PyPI or Anaconda repositories to be accessible in the work function using
the pip_packages
and conda_packages
arguments in the Analysis object.
from gencove_explorer.analysis import Analysis, AnalysisContext
def work(ac: AnalysisContext):
import pendulum
print(pendulum.now())
an = Analysis(
function=work,
pip_packages=["pendulum"] # installs and makes pendulum package available to work
)
an.run()
Anaconda packages can be specified in either the <package>
or <channel>::<package>
format.
from gencove_explorer.analysis import Analysis, AnalysisContext
def work(ac: AnalysisContext):
! fastqc --version
an = Analysis(
function=work,
conda_packages=["bioconda::fastqc"] # installs the tool fastqc from bioconda channel
)
an.run()
Collisions between conda and pip packages can occur and may not be resolvable. For jobs requiring complex dependencies, it is recommended to use a custom Docker image instead.
Specifying job resources¶
CPU, memory, and storage resources can be specified for cluster jobs using the JobDefinition
object.
The default configuration is: JobDefinition(cpu=2, memory_mb=4096, storage_gb=20)
from gencove_explorer.analysis import Analysis, AnalysisContext, JobDefinition
def work(ac: AnalysisContext):
print(ac.input)
job_definition = JobDefinition(cpu=2, memory_mb=4096, storage_gb=50)
an = Analysis(
input=["foo", "bar", "baz"],
function=work,
job_definition=job_definition,
)
an.run()
Accepted values for CPU and Memory¶
VCPU | MEMORY |
---|---|
0.25 vCPU | 512, 1024, and 2048 MiB |
0.5 vCPU | 1024-4096 MiB in 1024 MiB increments |
1 vCPU | 2048-8192 MiB in 1024 MiB increments |
2 vCPU | 4096-16384 MiB in 1024 MiB increments |
4 vCPU | 8192-30720 MiB in 1024 MiB increments |
8 vCPU | 16384-61440 MiB in 4096 MiB increments |
16 vCPU | 32768-122880 MiB in 8192 MiB increments |
To use instances with more than 16 vCPU contact customer support.
Custom Docker images for cluster jobs¶
Users can specify a Docker image to use for a cluster job via the JobDefinition
object.
This feature is useful when users require software that is not available in the base cluster image.
For example, to use an SRA Toolkit Docker image
available on Docker Hub, specify the image name in the JobDefinition
:
Where "pegi3s/sratoolkit"
is the name of the
image listed on Docker Hub. This will enable access
to the software made available in the source image through your Analysis
work function, e.g.
# sratoolkit
def work(ac: AnalysisContext):
print("Printing fasterq-dump help menu:")
!fasterq-dump --help
job_definition = JobDefinition(
cpu=1.0,
memory_mb=2048,
image="pegi3s/sratoolkit"
)
analysis = Analysis(
function=work,
job_definition=job_definition,
)
NOTE: Docker Hub and other public registries impose rate limiting. We recommend building custom images when running large amount of jobs.
Building Docker images for cluster jobs¶
A custom Dockerfile stored locally as ./Dockerfile
is needed:
There's a handy class for building and pushing docker images to the Organization's private repository.
from gencove_explorer.analysis import CustomImage
job_definition = JobDefinition(
cpu=1.0,
memory_mb=2048,
image=CustomImage(dockerfile="./Dockerfile", tag="my-custom-image")
)
def work(ac: AnalysisContext):
!which htop
analysis = Analysis(
function=work,
job_definition=job_definition,
)
analysis.run()
The image is built and pushed when analysis.run()
is called.
ATTENTION: images are also stored locally and can fill up the disk unless cleaned up frequently.
Useful cleanup commands¶
Delete all containers
Delete all images
Docker saying no space left on device
Limitations¶
Note that not all Docker images are compatible with Explorer. Please note the following limitations:
- Docker image host operating system must be Linux-based.
- Docker image CPU architecture must be x86_64 (AMD64).
- Docker image must have a supported package manager. The following are supported:
apt
yum
apk
- Docker image must be either:
- Available on a public repository (e.g. Docker Hub).
- Published as a custom image to the Organization's repository.
- Docker image must not have a custom script set as the
ENTRYPOINT
value. See Docker documentation here.
Shared input¶
In addition to per-job inputs, the user can specify "shared" input that does not change across jobs. The common use-case is specifying shared datasets (e.g., reference genome) or analysis parameters ( e.g., effective population size for imputation).
from gencove_explorer.analysis import Analysis, AnalysisContext, InputShared
def work(ac: AnalysisContext):
print(ac.input_shared.arbitrary)
print(ac.input_shared.age_map["human"])
input_shared = InputShared(
arbitrary="foobar",
age_map={"human": 20, "sheep": 5}
)
an = Analysis(
input=["foo", "bar", "baz"],
function=work,
input_shared=input_shared,
)
an.run()
Using Datasets¶
We provide access to ReferenceGenome
dataset that can be used standalone or as a "shared" input.
from gencove_explorer.analysis import Analysis, AnalysisContext, InputShared
from gencove_explorer.datasets import ReferenceGenome
def work(ac: AnalysisContext):
# this will download genome.fasta.gz and dict, fai, and gzi indexes
ac.input_shared.reference.download()
input_shared = InputShared(
reference=ReferenceGenome(species="human", version="g1k_v37"),
)
an = Analysis(
input=["foo", "bar", "baz"],
function=work,
input_shared=input_shared,
)
an.run()
reference = ReferenceGenome(species="human", version="g1k_v38")
# This will download same files but with the addition of amb, ann, bwt, pac, and sa indexes
reference.download(include_bwa_indices=True)
Currently we support human
species and versions:
g1k_v37
(mirrored on S3 from ftp://ftp-trace.ncbi.nih.gov/1000genomes/ftp/technical/reference/human_g1k_v37.fasta.gz)g1k_v38
(mirrored on S3 from ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/technical/reference/GRCh38_reference_genome/GRCh38_full_analysis_set_plus_decoy_hla.fa)
Job dependency¶
The user can specify dependencies between Analysis
objects. Executing run()
on one Analysis
object triggers run()
on all of its dependencies. Job can access the outputs of their dependencies
using the Analysis
object’s name
attribute, which defaults to the work function name. See
examples below for details.
NB: Dependencies must result in a directed-acyclic graph.
from gencove_explorer.analysis import Analysis, AnalysisContext
from gencove_explorer.models import Sample
def work1(ac: AnalysisContext):
import json
from gencove_explorer.file import File
print("work1")
with open("local.json", "w") as fd:
json.dump({"hello": "world"}, fd)
ac.output.outfile_work1 = File(local="local.json")
def work2(ac: AnalysisContext):
import json
from gencove_explorer.file import File
print("work2")
f = ac.dependency("work1").get_output().outfile_work1.download().local
with open("local.json", "w") as fd:
json.dump(f.read_text(), fd)
ac.output.outfile_work2 = File(local="local.json")
def work3(ac: AnalysisContext):
import json
print("work3")
with open(ac.dependency("work2").get_output().outfile_work2.download().local, "r") as fd:
print(json.load(fd))
a1 = Analysis(
input=["Hello world!", "2"],
function=work1
)
a2 = Analysis(
input=["Hello world!"],
function=work2,
depends_on=[a1]
)
a3 = Analysis(
input=["Hello world!"],
function=work3,
depends_on=[a1, a2]
)
a3.run()
Terminating analysis jobs¶
Analysis jobs can be easily terminated through the terminate()
and terminate_all()
methods.
There are a few things to note on the termination functionality:
- The terminate signal can take up to a minute to propagate to the target job
- Dependent jobs of the target job will automatically fail
Terminate a non-array job:
Terminate a specific job in an array job:
from gencove_explorer.analysis import Analysis
an = Analysis(input=[1, 2, 3])
an.run()
an.terminate(2) # <-- only array index 2 will be terminated
Terminate all array jobs for an analysis:
from gencove_explorer.analysis import Analysis
an = Analysis(input=[1, 2, 3])
an.run()
an.terminate_all() # <-- all 3 array jobs will be terminated
Anonymous Work Function with lambda¶
The work function can be specified on the fly using Python lambda expressions.