Skip to content

Running Analyses

This section documents Explorer's computational capabilities and how to submit, monitor, and collect results from jobs.

Local and Cluster Compute Capabilities

The interactive Explorer interface is hosted locally on an AWS EC2 instance. This local instance also has the ability to interface with AWS Batch, a scalable compute cluster. The local AWS EC2 instance works interactively using 2 CPUs and 16Gb of RAM. This instance is well suited for tasks like organizing data, submitting jobs to AWS Batch, or data visualization. For tasks that require more compute, jobs can be submitted to AWS Batch through a job submission system built into Explorer. AWS Batch can efficiently run thousands of batch jobs in parallel, and Explorer has convenient tools that allow you to monitor batch jobs and collect the results. Setup for both local jobs and batch jobs are the same, making it easy to create and test an analysis locally before deploying at scale on AWS Batch. Below we discuss these capabilities with examples.

Analysis and AnalysisContext objects

The Analysis object is a cornerstone of the Gencove Explorer SDK. It provides the core functionality for specifying the parameters, executing, and fetching output of a work function.

The work function can be any function specified by the user that accepts one parameter that is typed as AnalysisContext.

Additionally:

  • All imports need to occur within the work function
  • All user functions that are called from that function must be defined within the work function

The work function can be thought of as similar to a self-contained Python script or module.

For example:

from gencove_explorer.analysis import Analysis, AnalysisContext

def work(ac: AnalysisContext):
    import random # import necessary package
    def print_random(): # define user function
        print(random.random())
    print_random() # execute user function

Upon execution of the work function, the Analysis object passes an AnalysisContext object to the function. This object provides the function with the ability to easily:

  • Access per-job input via .input
  • Generate outputs via .output, which can be:
    • Files
    • Values
  • Global context via .global_context, which can be used to specify “global” parameters for the analysis. This may be useful for specifying common parameters when executing a large number of cluster jobs.
  • Introspect execution parameters via .job_definition like:
    • number of CPUs
    • available memory
    • docker image
    • job timeout
  • Job index when running “array jobs” via .job_index

Interleaving shell and Python

Gencove Explorer support IPython magic commands. For example, ! can be used to execute complex shell commands. The ! magic command supports pipes (|), redirection (>), and parameter substitution from Python variables ({}). For example:

from gencove_explorer.analysis import Analysis, AnalysisContext

def work(ac: AnalysisContext):
    where = "shell"
    !echo "Hello World from {where}"
    where = "Python"
    print(f"Hello World from {where}")

an = Analysis(
    function=work
)
an.run() # or an.run_local()

💡 Accessing variables for substitution with {} (and $) in shell commands requires braces and shell variables in the shell command to be doubled-up. For example, !echo "1 {two} 3" | awk '{{print $$2}}' and see below.

two='dos'
! echo "1 {two} 3" | awk '{{print $$2}}'  # desired behavior, prints "dos"
! echo "1 {two} 3" | awk '{print $2}' # incorrect, prints "{two}"
! echo "1 {two} 3" | awk '{print $$2}' # incorrect, prints "1 {two} 3"
! localcopy="-> {two} <-"; echo $$localcopy  # desired behavior, prints "-> dos <-"
! localcopy="-> {two} <-"; echo $localcopy # incorrect, prints "-> {two} <-"

Running locally & Array Jobs

run_local

The .run_local() method runs the job on the local EC2 instance. This method can be executed as many times as needed for development and testing throughout the lifetime of the Analysis object. Its output is printed to stdout and stderr, both of which are available in real-time in the notebook cell output.

Array jobs

The .run() method runs any array of jobs on AWS Batch in parallel. When provided a list of input objects, Gencove Explorer automatically creates an “array job” for each element in the list. Array size is limited to 10,000. Logs and outputs can be accessed by providing the required job index to the logs() and get_output() methods.

from gencove_explorer.analysis import Analysis, AnalysisContext

def work(ac: AnalysisContext):
    print(ac.input)

an = Analysis(
    input=["foo", "bar", "baz"],
    function=work
)

an.run()
an.logs(2, live=True) # logs for the 3rd input "baz", the log index is zero-based

The implication of Python lists being translated to array cluster jobs results in the obvious question “how do I submit a list as input to one regular (non-array) job?”.

The solution is to wrap the list into another object like a dictionary: {"i": ["foo", "bar", "baz"]}.

Job input and output

Job input

Input can be provided to the job by specifying the input parameter to the Analysis object. Input can be any Python object, although there are some exceptions (most commonly, objects that are difficult to serialize).

from gencove_explorer.analysis import Analysis, AnalysisContext

def work(ac: AnalysisContext):
    print(ac.input)

an = Analysis(
    input="foo",
    function=work
)
an.run() # or an.run_local()

Job output

Gencove Explorer supports two types of outputs for storing data created while running a work function:

  • Files via AnalysisContext.output.file(<key>)
  • Values via AnalysisContext.output.value(<key>)

Files and values assigned using these methods are automatically collected at the end of the job, uploaded to object storage, and made available via the Analysis object.

from gencove_explorer.analysis import Analysis, AnalysisContext

def work(ac: AnalysisContext):
    path = ac.output.file("my-file").as_local()
    !echo 123 > {path} # store data in file object
    ac.output.value(key='var', value="abc") # store data in key/value object

an = Analysis(
    function=work
)
an.run()
an.wait_done()

local_path = an.get_output().file("my-file").as_local()
!cat {local_path} # prints "123"
var = an.get_output().value("var")
print(var) # prints "abc"

Logs

The status of the execution can be queried using the .status() method and standard output and error can be fetched using the .logs() method. The .logs() method accepts two parameters:

  • live provides a live tail of logs until the job completes
  • since indicates how long into the past to display logs (default: 10m, i.e., 10 minutes). This parameter needs to be adjusted to a higher value when requesting logs for a job that completed longer than 10 minutes ago.
from gencove_explorer.analysis import Analysis, AnalysisContext

def work(ac: AnalysisContext):
    print("Hello World")

an = Analysis(
    function=work
)
an.run()
an.status() # outputs {'status': 'SUCCEEDED'} when job has successfully finished
an.logs(live=True) # live tail of log updates

Run persistence with the JobManager object

You can retrieve and “rehydrate” previous Analysis objects via the JobManager. This is useful in situations where you want to revisit a previous analysis, e.g. to retrieve outputs or logs.

Retrieving by ID

When you submit a job to the AWS Batch cluster with .run(), an Analysis ID will be returned and printed to the output below the cell. For example:

an.run()

>>> 2023-06-23 15:45:06.929 | INFO | gencove_explorer.analysis:run:844 - Analysis ID: 2023-05-08T000000_example_fn_17eb494f3d5349c88959bace7a6b7873

Or if the job was already submitted, ID can be retrieved by calling the .id attribute of the Analysis object:

an.id

>>> 2023-05-08T000000_example_fn_17eb494f3d5349c88959bace7a6b7873

You can retrieve your results later using this Analysis ID with the JobManager object.

from gencove_explorer.job_manager import JobManager
mgr = JobManager()
an = mgr.get_analysis(job_id="2023-05-08T000000_example_fn_17eb494f3d5349c88959bace7a6b7873")
an.status()  # <-- this Analysis object can now be used as a regular analysis object

Listing previous jobs

The JobManager provides methods to list and search across previously run jobs.

  • Listing all jobs

    from gencove_explorer.job_manager import JobManager
    mgr = JobManager()
    print(mgr.list_jobs())
    
  • Listing jobs from relative time points

    print(mgr.list_jobs(since="1h")) # <-- jobs from the last hour
    print(mgr.list_jobs(since="1d")) # <-- jobs from the last day
    print(mgr.list_jobs(since="2w")) # <-- jobs from the last two weeks
    
  • Listing jobs from an absolute date

    print(mgr.list_jobs(date="2023-05-09")) # <-- specific date
    print(mgr.list_jobs(date="2023-04")) # <-- from a single month
    
  • Listing jobs matching a name filter

    print(mgr.list_jobs(name="example_fn")) # <-- case-insenstive search across all analyses for this substring
    

Advanced features

Waiters

Gencove Explorer provides convenient waiter methods for blocking execution until a particular state for a AWS Batch cluster job is reached.

from gencove_explorer.analysis import Analysis

an = Analysis()
an.run()
an.wait_running_or_done() # blocks execution until job enters "RUNNING" state on AWS Batch
an.wait_done() # blocks until exection completes on AWS Batch

Custom job imports

The user can specify custom packages from PyPI to be accessible in the work function using the pip_packages argument in the Analysis object.

from gencove_explorer.analysis import Analysis, AnalysisContext

def work(ac: AnalysisContext):
    import pendulum
    print(pendulum.now())

an = Analysis(
    function=work,
    pip_packages=["pendulum"] # installs and makes pendulum package available to work
)
an.run()

Specifying job resources

CPU and memory resources can be specified for cluster jobs using the JobDefinition object.

The default configuration is: JobDefinition(cpu=2, memory_mb=4000)

from gencove_explorer.analysis import Analysis, AnalysisContext, JobDefinition

def work(ac: AnalysisContext):
    print(ac.input)

job_definition = JobDefinition(cpu=1, memory_mb=4000)

an = Analysis(
    input=["foo", "bar", "baz"],
    function=work,
    job_definition=job_definition,
)
an.run()

Global config

In addition to per-job inputs, the user can specify a global configuration that does not change across jobs. The common use-case is specifying shared datasets (e.g., reference genome) or analysis parameters (e.g., effective population size for imputation).

from gencove_explorer.analysis import Analysis, AnalysisContext, GlobalConfig

def work(ac: AnalysisContext):
    print(ac.global_config.arbitrary)
    print(ac.global_config.age_map["human"])

global_config = GlobalConfig(
    arbitrary="foobar",
    age_map={"human": 20, "sheep":5})

an = Analysis(
    input=["foo", "bar", "baz"],
    function=work,
    global_config=global_config,
)
an.run()

Job dependency

The user can specify dependencies between Analysis objects. Executing run() on one Analysis object triggers run() on all of its dependencies. Job can access the outputs of their dependencies using the Analysis object’s name attribute, which defaults to the work function name. See examples below for details.

NB: Dependencies must result in a directed-acyclic graph.

from gencove_explorer.analysis import Analysis, AnalysisContext
from gencove_explorer.models import Sample

def work1(ac: AnalysisContext):
    import json
    print("work1")
    with open("local.json", "w") as fd:
        json.dump({"hello": "world"}, fd)
    ac.output.file(path_local="local.json", key="outfile_work1")

def work2(ac: AnalysisContext):
    import json
    print("work2")
    f = ac.dependency("work1").get_output().file("outfile_work1").as_local()

    with open("local.json", "w") as fd:
        json.dump(f.read_text(), fd)
    ac.output.file(path_local="local.json", key="outfile_work2")

def work3(ac: AnalysisContext):
    import json
    print("work3")

    with open(ac.dependency("work2").get_output().file("outfile_work2").as_local(), "r") as fd:
        print(json.load(fd))

a1 = Analysis(
    input=["Hello world!", "2"],
    function=work1
)
a2 = Analysis(
    input=["Hello world!"],
    function=work2,
    depends_on=[a1]
)
a3 = Analysis(
    input=["Hello world!"],
    function=work3,
    depends_on=[a1, a2]
)

a3.run()

Terminating analysis jobs

Analysis jobs can be easily terminated through the terminate() and terminate_all() methods.

There are a few things to note on the termination functionality:

  • The terminate signal can take up to a minute to propagate to the target job
  • Dependent jobs of the target job will automatically fail

Terminate a non-array job:

from gencove_explorer.analysis import Analysis

an = Analysis(input=[1])
an.run()
an.terminate()

Terminate a specific job in an array job:

from gencove_explorer.analysis import Analysis

an = Analysis(input=[1,2,3])
an.run()
an.terminate(2)  # <-- only array index 2 will be terminated

Terminate all array jobs for an analysis:

from gencove_explorer.analysis import Analysis

an = Analysis(input=[1,2,3])
an.run()
an.terminate_all()  # <-- all 3 array jobs will be terminated

Anonymous Work Function with lambda

The work function can be specified on the fly using Python lambda expressions.

from gencove_explorer.analysis import Analysis

an = Analysis(
    function=lambda x: print("Hello World")
)
an.run()