Skip to content

Running Analyses

This section documents Explorer's computational capabilities and how to submit, monitor, and collect results from analyses.

Local and Cluster Compute Capabilities

The interactive Explorer interface is hosted locally on an AWS EC2 instance. This local instance also has the ability to interface with AWS Batch, a scalable compute cluster. The local AWS EC2 instance works interactively using 2 CPUs and 16Gb of RAM. This instance is well suited for tasks like organizing data, submitting analyses to AWS Batch, or data visualization. For tasks that require more compute, analyses can be submitted to AWS Batch through a job submission system built into Explorer. AWS Batch can efficiently run thousands of batch jobs in parallel, and Explorer has convenient tools that allow you to monitor analyses (and their underlying jobs) and collect the results. Setup for both local and remote analyses are the same, making it easy to create and test an analysis locally before deploying at scale on AWS Batch. Below we discuss these capabilities with examples.

Analysis and AnalysisContext objects

The Analysis object is a cornerstone of the Gencove Explorer SDK. It provides the core functionality for specifying the parameters, executing, and fetching output of a work function.

The work function can be any function specified by the user that accepts one parameter that is typed as AnalysisContext.

Additionally:

  • All imports need to occur within the work function
  • All user functions that are called from that function must be defined within the work function

The work function can be thought of as similar to a self-contained Python script or module.

For example:

from gencove_explorer.analysis import Analysis, AnalysisContext


def work(ac: AnalysisContext):
    import random  # import necessary package
    def print_random():  # define user function
        print(random.random())

    print_random()  # execute user function

Upon execution of the work function, the Analysis object passes an AnalysisContext object to the function. This object provides the function with the ability to easily:

  • Access per-job input via .input
  • Generate outputs via .output, which can be:
    • Files
    • Values
  • Shared input via .input_shared, which can be used to specify "shared" parameters for the analysis. This may be useful for specifying common parameters when executing a large number of cluster jobs.
  • Introspect execution parameters via .job_definition like:
    • number of CPUs
    • available memory
    • available storage
    • docker image
    • job timeout
  • Job index when running “array jobs” via .job_index

Interleaving shell and Python

Gencove Explorer support IPython magic commands. For example, ! can be used to execute complex shell commands. The ! magic command supports pipes (|), redirection (>), and parameter substitution from Python variables ({}). For example:

from gencove_explorer.analysis import Analysis, AnalysisContext


def work(ac: AnalysisContext):
    where = "shell"
    !echo "Hello World from {where}"
    where = "Python"
    print(f"Hello World from {where}")


an = Analysis(
    function=work
)
an.run()  # or an.run_local()

💡 Accessing variables for substitution with {} (and $) in shell commands requires braces and shell variables in the shell command to be doubled-up. For example, !echo "1 {two} 3" | awk '{{print $$2}}' and see below.

two = 'dos'
! echo "1 {two} 3" | awk '{{print $$2}}'  # desired behavior, prints "dos"
! echo "1 {two} 3" | awk '{print $2}'  # incorrect, prints "{two}"
! echo "1 {two} 3" | awk '{print $$2}'  # incorrect, prints "1 {two} 3"
! localcopy = "-> {two} <-"; echo $$localcopy  # desired behavior, prints "-> dos <-"
! localcopy = "-> {two} <-"; echo $localcopy  # incorrect, prints "-> {two} <-"

Running locally & Array Jobs

run_local

The .run_local() method runs the job on the local EC2 instance. This method can be executed as many times as needed for development and testing throughout the lifetime of the Analysis object. Its output is printed to stdout and stderr, both of which are available in real-time in the notebook cell output.

Array jobs

The .run() method runs any array of jobs on AWS Batch in parallel. When provided a list of input objects, Gencove Explorer automatically creates an “array job” for each element in the list. Array size is limited to 10,000. Logs and outputs can be accessed by providing the required job index to the logs() and get_output() methods.

from gencove_explorer.analysis import Analysis, AnalysisContext


def work(ac: AnalysisContext):
    print(ac.input)


an = Analysis(
    input=["foo", "bar", "baz"],
    function=work
)

an.run()
an.logs(2, live=True)  # logs for the 3rd input "baz", the log index is zero-based

The implication of Python lists being translated to array cluster jobs results in the obvious question “how do I submit a list as input to one regular (non-array) job?”.

The solution is to wrap the list into another object like a dictionary: `{"i": ["foo", "bar", "baz"]}`.

Job input and output

Job input

Input can be provided to the job by specifying the input parameter to the Analysis object. Input can be any Python object, although there are some exceptions (most commonly, objects that are difficult to serialize).

from gencove_explorer.analysis import Analysis, AnalysisContext


def work(ac: AnalysisContext):
    print(ac.input)


an = Analysis(
    input="foo",
    function=work
)
an.run()  # or an.run_local()

Job output

Gencove Explorer work functions can return output by assigning values to attributes of AnalysisContext.output. The values can be any Python objects and are automatically collected at the end of the job, uploaded to object storage, and made available via the Analysis object.

If the assigned output object is an Explorer File or FileFormat and has a .upload() method, the .upload() method will be triggered automatically after the work function completes as a convenience to the user, i.e., no need to execute .upload() manually.

from gencove_explorer.analysis import Analysis, AnalysisContext


def work(ac: AnalysisContext):
    from gencove_explorer.models import File

    ac.output.my_file = f = File()
    !echo 123 > {f.path_local}  # work with files
    ac.output.val = "abc"  # work with values


an = Analysis(
    function=work
)
an.run()
an.wait_done()

local_path = an.get_output().my_file.download().path_local
!cat {local_path}  # prints "123"
val = an.get_output().val
print(val)  # prints "abc"

Logs

The status of the execution can be queried using the .status() method and standard output and error can be fetched using the .logs() method. The .logs() method accepts two parameters:

  • live provides a live tail of logs until the job completes
  • since indicates how long into the past to display logs (default: 10m, i.e., 10 minutes). This parameter needs to be adjusted to a higher value when requesting logs for a job that completed longer than 10 minutes ago.
from gencove_explorer.analysis import Analysis, AnalysisContext


def work(ac: AnalysisContext):
    print("Hello World")


an = Analysis(
    function=work
)
an.run()
an.status()  # outputs {'status': 'SUCCEEDED'} when job has successfully finished
an.logs(live=True)  # live tail of log updates

Persistence with the AnalysisManager object

You can retrieve and “rehydrate” previous Analysis objects via the AnalysisManager. This is useful in situations where you want to revisit a previous analysis, e.g. to retrieve outputs or logs.

Retrieving by ID

When you submit an analysis to the AWS Batch cluster with .run(), an Analysis ID will be returned and printed to the output below the cell. For example:

an.run()

>>> 2023-06-23 15: 45:06.929 | INFO | gencove_explorer.analysis: run:844 - Analysis ID: 2023-05-08T000000_example_fn_17eb494f3d5349c88959bace7a6b7873

Or if the analysis was already submitted, ID can be retrieved by calling the .id attribute of the Analysis object:

an.id

>>> 2023-05-08T000000_example_fn_17eb494f3d5349c88959bace7a6b7873

You can retrieve your results later using this Analysis ID with the AnalysisManager object.

from gencove_explorer.analysis_manager import AnalysisManager

mgr = AnalysisManager()
an = mgr.get_analysis(analysis_id="2023-05-08T000000_example_fn_17eb494f3d5349c88959bace7a6b7873")
an.status()  # <-- this Analysis object can now be used as a regular analysis object

Listing previous analyses

The AnalysisManager provides methods to list and search across previously run analyses.

  • Listing all analyses

    from gencove_explorer.analysis_manager import AnalysisManager
    mgr = AnalysisManager()
    print(mgr.list_analyses())
    
  • Listing analyses from relative time points

    print(mgr.list_analyses(since="1h")) # <-- analyses from the last hour
    print(mgr.list_analyses(since="1d")) # <-- analyses from the last day
    print(mgr.list_analyses(since="2w")) # <-- analyses from the last two weeks
    
  • Listing analyses from an absolute date

    print(mgr.list_analyses(date="2023-05-09")) # <-- specific date
    print(mgr.list_analyses(date="2023-04")) # <-- from a single month
    
  • Listing analyses matching a name filter

    print(mgr.list_analyses(name="example_fn")) # <-- case-insenstive search across all analyses for this substring
    

Advanced features

Waiters

Gencove Explorer provides convenient waiter methods for blocking execution until a particular state for a AWS Batch cluster job is reached.

from gencove_explorer.analysis import Analysis

an = Analysis()
an.run()
an.wait_running_or_done()  # blocks execution until job enters "RUNNING" state on AWS Batch
an.wait_done()  # blocks until exection completes on AWS Batch

Custom job imports

The user can specify custom packages from PyPI or Anaconda repositories to be accessible in the work function using the pip_packages and conda_packages arguments in the Analysis object.

from gencove_explorer.analysis import Analysis, AnalysisContext


def work(ac: AnalysisContext):
    import pendulum
    print(pendulum.now())


an = Analysis(
    function=work,
    pip_packages=["pendulum"]  # installs and makes pendulum package available to work
)
an.run()

Anaconda packages can be specified in either the <package> or <channel>::<package> format.

from gencove_explorer.analysis import Analysis, AnalysisContext


def work(ac: AnalysisContext):
    ! fastqc --version


an = Analysis(
    function=work,
    conda_packages=["bioconda::fastqc"]  # installs the tool fastqc from bioconda channel
)
an.run()

Collisions between conda and pip packages can occur and may not be resolvable. For jobs requiring complex dependencies, it is recommended to use a custom Docker image instead.

Specifying job resources

CPU, memory, and storage resources can be specified for cluster jobs using the JobDefinition object.

The default configuration is: JobDefinition(cpu=2, memory_mb=4000, storage_gb=20)

from gencove_explorer.analysis import Analysis, AnalysisContext, JobDefinition


def work(ac: AnalysisContext):
    print(ac.input)


job_definition = JobDefinition(cpu=1, memory_mb=4000, storage_gb=50)

an = Analysis(
    input=["foo", "bar", "baz"],
    function=work,
    job_definition=job_definition,
)
an.run()

Accepted values for CPU and Memory

VCPU MEMORY
0.25 vCPU 512, 1024, and 2048 MiB
0.5 vCPU 1024-4096 MiB in 1024 MiB increments
1 vCPU 2048-8192 MiB in 1024 MiB increments
2 vCPU 4096-16384 MiB in 1024 MiB increments
4 vCPU 8192-30720 MiB in 1024 MiB increments
8 vCPU 16384-61440 MiB in 4096 MiB increments
16 vCPU 32768-122880 MiB in 8192 MiB increments

To use instances with more than 16 vCPU contact customer support.

Custom Docker images for cluster jobs

Users can specify a Docker image to use for a cluster job via the JobDefinition object. This feature is useful when users require software that is not available in the base cluster image.

For example, to use an SRA Toolkit Docker image available on Docker Hub, specify the image name in the JobDefinition:

job_definition = JobDefinition(
    cpu=1,
    memory_mb=4000,
    image="pegi3s/sratoolkit"
)

Where "pegi3s/sratoolkit" is the name of the image listed on Docker Hub. This will enable access to the software made available in the source image through your Analysis work function, e.g.

# sratoolkit
def work(ac: AnalysisContext):
    print("Printing fasterq-dump help menu:")
    !fasterq - dump - -help


job_definition = JobDefinition(
    cpu=1.0,
    memory_mb=2048,
    image="pegi3s/sratoolkit"
)

analysis = Analysis(
    function=work,
    job_definition=job_definition,
)

Building Docker images for cluster jobs

A custom Dockerfile stored locally as ./Dockerfile is needed:

FROM ubuntu:latest

RUN apt-get update -y && apt-get install htop

There's a handy class for building and pushing docker images to the Organization's private repository.

from gencove_explorer.analysis import CustomImage

job_definition = JobDefinition(
    cpu=1.0,
    memory_mb=2048,
    image=CustomImage(dockerfile="./Dockerfile", tag="my-custom-image")
)

def work(ac: AnalysisContext):
    !which htop

analysis = Analysis(
    function=work,
    job_definition=job_definition,
)
analysis.run()

The image is built and pushed when analysis.run() is called.

ATTENTION: images are also stored locally and can fill up the disk unless cleaned up frequently.

Useful cleanup commands

Delete all containers

docker rm -f $(docker ps -a -q)

Delete all images

docker rmi -f $(docker images -q)

Docker saying no space left on device

docker system prune -a
docker volume prune

Limitations

Note that not all Docker images are compatible with Explorer. Please note the following limitations:

  • Docker image host operating system must be Linux-based
  • Docker image CPU architecture must be x86_64 (AMD64)
  • Docker image must have a supported package manager. The following are supported:
    • apt
    • yum
    • apk
  • Docker image must be available on a public repository (e.g. Docker Hub)
  • Docker image must not have a custom script set as the ENTRYPOINT value. See Docker documentation here.

Shared input

In addition to per-job inputs, the user can specify "shared" input that does not change across jobs. The common use-case is specifying shared datasets (e.g., reference genome) or analysis parameters ( e.g., effective population size for imputation).

from gencove_explorer.analysis import Analysis, AnalysisContext, InputShared


def work(ac: AnalysisContext):
    print(ac.input_shared.arbitrary)
    print(ac.input_shared.age_map["human"])


input_shared = InputShared(
    arbitrary="foobar",
    age_map={"human": 20, "sheep": 5}
)

an = Analysis(
    input=["foo", "bar", "baz"],
    function=work,
    input_shared=input_shared,
)
an.run()

Using Datasets

We provide access to ReferenceGenome dataset that can be used standalone or as a "shared" input.

from gencove_explorer.analysis import Analysis, AnalysisContext, InputShared
from gencove_explorer.datasets import ReferenceGenome


def work(ac: AnalysisContext):
    # this will download genome.fasta.gz and dict, fai, and gzi indexes
    ac.input_shared.reference.download()


input_shared = InputShared(
    reference=ReferenceGenome(species="human", version="g1k_v37"),
)

an = Analysis(
    input=["foo", "bar", "baz"],
    function=work,
    input_shared=input_shared,
)
an.run()

reference = ReferenceGenome(species="human", version="g1k_v38")
# This will download same files but with the addition of amb, ann, bwt, pac, and sa indexes
reference.download(include_bwa_indices=True)

Currently we support human species and versions:

Job dependency

The user can specify dependencies between Analysis objects. Executing run() on one Analysis object triggers run() on all of its dependencies. Job can access the outputs of their dependencies using the Analysis object’s name attribute, which defaults to the work function name. See examples below for details.

NB: Dependencies must result in a directed-acyclic graph.

from gencove_explorer.analysis import Analysis, AnalysisContext
from gencove_explorer.models import Sample


def work1(ac: AnalysisContext):
    import json
    print("work1")
    with open("local.json", "w") as fd:
        json.dump({"hello": "world"}, fd)
    ac.output.outfile_work1 = File(path_local="local.json")


def work2(ac: AnalysisContext):
    import json
    print("work2")
    f = ac.dependency("work1").get_output().outfile_work1.download().path_local

    with open("local.json", "w") as fd:
        json.dump(f.read_text(), fd)
    ac.output.outfile_work2 = File(path_local="local.json")


def work3(ac: AnalysisContext):
    import json
    print("work3")

    with open(ac.dependency("work2").get_output().outfile_work2.download().path_local, "r") as fd:
        print(json.load(fd))


a1 = Analysis(
    input=["Hello world!", "2"],
    function=work1
)
a2 = Analysis(
    input=["Hello world!"],
    function=work2,
    depends_on=[a1]
)
a3 = Analysis(
    input=["Hello world!"],
    function=work3,
    depends_on=[a1, a2]
)

a3.run()

Terminating analysis jobs

Analysis jobs can be easily terminated through the terminate() and terminate_all() methods.

There are a few things to note on the termination functionality:

  • The terminate signal can take up to a minute to propagate to the target job
  • Dependent jobs of the target job will automatically fail

Terminate a non-array job:

from gencove_explorer.analysis import Analysis

an = Analysis(input=[1])
an.run()
an.terminate()

Terminate a specific job in an array job:

from gencove_explorer.analysis import Analysis

an = Analysis(input=[1, 2, 3])
an.run()
an.terminate(2)  # <-- only array index 2 will be terminated

Terminate all array jobs for an analysis:

from gencove_explorer.analysis import Analysis

an = Analysis(input=[1, 2, 3])
an.run()
an.terminate_all()  # <-- all 3 array jobs will be terminated

Anonymous Work Function with lambda

The work function can be specified on the fly using Python lambda expressions.

from gencove_explorer.analysis import Analysis

an = Analysis(
    function=lambda x: print("Hello World")
)
an.run()