Pipestat Python API

Pipestat is a Python package for a standardized reporting of pipeline statistics. It formalizes a way to communicate between pipelines and downstream tools that analyze their results so that pipeline results can easily become input for downstream analyses.

This tutorial is targeted toward pipeline developers, and shows how to use pipestat to manage pipeline results. This tutorial assumes you're writing your pipeline in Python; if not, there's another tutorial that accomplishes the same thing for any pipeline using the command-line interface.

Introduction

To make your Python pipeline pipestat-compatible, you first need to initialize pipestat with some important configuration setup:

  1. pipestat schema: a path to a JSON-schema file that defines results reported by this pipeline
  2. namespace: defines a unique group name for reported results, typically a pipeline name
  3. record identifier: a unique name for a particular run of the pipeline, typically corresponding to a sample name
  4. backend: where the results should be stored. either path to a YAML-formatted file or pipestat config with PostgreSQL database login credentials

Back-end types

Two types of back-ends are currently supported:

  1. a file (pass a file path to the constructor)
    The changes reported using the report method of PipestatManger will be securely written to the file. Currently only YAML format is supported.

  2. a PostgreSQL database (pass a path to the pipestat config to the constructor) This option gives the user the possibility to use a fully fledged database to back PipestatManager.

Initializing a pipestat session

Start by importing the pipestat package in Python.

import pipestat
from jsonschema import ValidationError

After importing the package, we need to create an PipestatManager object. The object constructor requires a few pieces of information. We'll use a file as the back-end, by passing a file path string to the constructor. Let's create a temporary file first:

from tempfile import mkstemp

_, temp_file = mkstemp(suffix=".yaml")
print(temp_file)
/var/folders/h8/8npwnh2s4rb8lr6hsy2ydrsh0000gp/T/tmpjupp4wcz.yaml

Now we can create a PipestatManager object that uses this file as the back-end:

psm = pipestat.PipestatManager(
    namespace="test",
    record_identifier="sample1",
    results_file_path=temp_file,
    schema_path="../tests/data/sample_output_schema.yaml",
)

You can also put these settings into a config file and just pass that to the config argument, instead of specifying each argument separately. The results will be reported to a "test" namespace.

psm.namespace
'test'

By default, PipestatManager instance is bound to the record it was initialized with. However, reporting or removing results for a different record can be enforced in the respective methods with record_identifier argument.

psm.record_identifier
'sample1'

Since we've used a newly created file, nothing has been reported yet:

psm.data
YacAttMap: {}

Reporting results

To report a result, use a report method. It requires two pieces of information:

  1. record identifier -- record to report the result for, for example a unique name of the sample (optional if provided at PipestatManager initialization stage)
  2. values -- a Python dict of resultID-value pairs to report. The top level keys must correspond to the results identifiers defined in the schema

Available results defined in schemas

To learn about the results that the current PipestatManager instance supports check out the schema property:

psm.schema
{'number_of_things': {'type': 'integer', 'description': 'Number of things'},
 'percentage_of_things': {'type': 'number',
  'description': 'Percentage of things'},
 'name_of_something': {'type': 'string', 'description': 'Name of something'},
 'swtich_value': {'type': 'boolean', 'description': 'Is the switch on of off'},
 'collection_of_things': {'type': 'array',
  'description': 'This store collection of values'},
 'output_object': {'type': 'object', 'description': 'Object output'},
 'output_file': {'type': 'file',
  'description': 'This a path to the output file'},
 'output_image': {'type': 'image',
  'description': 'This a path to the output image'},
 'md5sum': {'type': 'string',
  'description': 'MD5SUM of an object',
  'highlight': True}}

To learn about the actual required attributes of the reported results, like file or image (see: output_file and output_image results) select the result_identifier from the result_schemas property:

psm.result_schemas["output_file"]
{'type': 'object',
 'description': 'This a path to the output file',
 'properties': {'path': {'type': 'string'}, 'title': {'type': 'string'}},
 'required': ['path', 'title']}

Results composition enforcement

As you can see, to report a output_file result, you need to provide an object with path and title string attributes. If you fail to do so PipestatManager will issue an informative validation error:

try:
    psm.report(values={"output_file": {"path": "/home/user/path.csv"}})
except ValidationError as e:
    print(e)
'title' is a required property

Failed validating 'required' in schema:
    {'description': 'This a path to the output file',
     'properties': {'path': {'type': 'string'},
                    'title': {'type': 'string'}},
     'required': ['path', 'title'],
     'type': 'object'}

On instance:
    {'path': '/home/user/path.csv'}

Let's report a correct object this time:

psm.report(
    values={
        "output_file": {
            "path": "/home/user/path.csv",
            "title": "CSV file with some data",
        }
    }
)
Reported records for 'sample1' in 'test' namespace:
 - output_file: {'path': '/home/user/path.csv', 'title': 'CSV file with some data'}

True

Inspect the object's database to verify whether the result has been successfully reported:

psm.data
test:
  sample1:
    output_file:
      path: /home/user/path.csv
      title: CSV file with some data

No results duplication is allowed, unless you force overwrite:

psm.report(
    values={
        "output_file": {
            "path": "/home/user/path_new.csv",
            "title": "new CSV file with some data",
        }
    }
)
These results exist for 'sample1': output_file

False
psm.report(
    values={
        "output_file": {
            "path": "/home/user/path_new.csv",
            "title": "new CSV file with some data",
        }
    },
    force_overwrite=True,
)
psm.data
These results exist for 'sample1': output_file
Overwriting existing results: output_file
Reported records for 'sample1' in 'test' namespace:
 - output_file: {'path': '/home/user/path_new.csv', 'title': 'new CSV file with some data'}

test:
  sample1:
    output_file:
      path: /home/user/path_new.csv
      title: new CSV file with some data

Most importantly, by backing the object by a file, the reported results persist -- another PipestatManager object reads the results when created:

psm1 = pipestat.PipestatManager(
    namespace="test",
    record_identifier="sample1",
    results_file_path=temp_file,
    schema_path="../tests/data/sample_output_schema.yaml",
)
psm.data
test:
  sample1:
    output_file:
      path: /home/user/path_new.csv
      title: new CSV file with some data

That's because the contents are stored in the file we've specified at object creation stage:

!echo $temp_file
!cat $temp_file
/var/folders/h8/8npwnh2s4rb8lr6hsy2ydrsh0000gp/T/tmpjupp4wcz.yaml
test:
  sample1:
    output_file:
      path: /home/user/path_new.csv
      title: new CSV file with some data

Note that two processes can securely report to a single file and single namespace since pipestat supports locks and race-free writes to control multi-user conflicts and prevent data loss.

Results type enforcement

By default PipestatManager raises an exception if a non-compatible result value is reported.

This behavior can be changed by setting strict_type to True in PipestatManager.report method. In this case PipestatManager tries to cast the reported results values to the Python classes required by schema. For example, if a result defined as integer is reported and a str value is passed, the eventual value will be int:

psm.result_schemas["number_of_things"]
{'type': 'integer', 'description': 'Number of things'}
psm.report(values={"number_of_things": "10"}, strict_type=False)
Reported records for 'sample1' in 'test' namespace:
 - number_of_things: 10

True

The method will attempt to cast the value to a proper Python class and store the converted object. In case of a failure, an error will be raised:

try:
    psm.report(
        record_identifier="sample2", values={"number_of_things": []}, strict_type=False
    )
except TypeError as e:
    print(e)
int() argument must be a string, a bytes-like object or a number, not 'list'

Note that in this case we tried to report a result for a different record (sample2), which had to be enforced with record_identifier argument.

psm.data
test:
  sample1:
    output_file:
      path: /home/user/path_new.csv
      title: new CSV file with some data
    number_of_things: 10

Retrieving results

Naturally, the reported results can be retrieved. Let's explore all the options the PipestatManager.retrieve method provides:

To retrieve a specific result for a record, provide the identifiers:

psm.retrieve(record_identifier="sample1", result_identifier="number_of_things")
'10'

To retrieve all the results for a record, skip the result_identifier argument:

psm.retrieve(record_identifier="sample1")
{'output_file': {'path': '/home/user/path_new.csv',
  'title': 'new CSV file with some data'},
 'number_of_things': '10'}

Removing results

PipestatManager object also supports results removal. Call remove method and provide record_identifier and result_identifier arguments to do so:

psm.remove(result_identifier="number_of_things")
Removed result 'number_of_things' for record 'sample1' from 'test' namespace

True

The entire record, skip the result_identifier argument:

psm.remove()
Removing 'sample1' record

True

Verify that an appropriate entry from the results was deleted:

psm.data
test: OrderedDict()

Highligting results

In order to highlight results we need to add an extra property in the pipestat results schema (highlight: true) under the result identifier that we wish to highlight.

from tempfile import mkstemp

_, temp_file_highlight = mkstemp(suffix=".yaml")
print(temp_file_highlight)

psm_highlight = pipestat.PipestatManager(
    namespace="test_highlight",
    record_identifier="sample1",
    results_file_path=temp_file_highlight,
    schema_path="../tests/data/sample_output_schema_highlight.yaml",
)
/var/folders/h8/8npwnh2s4rb8lr6hsy2ydrsh0000gp/T/tmpshtnle33.yaml

For example, result log is highlighted in this case:

psm_highlight.schema
{'number_of_things': {'type': 'integer', 'description': 'Number of things'},
 'percentage_of_things': {'type': 'number',
  'description': 'Percentage of things'},
 'name_of_something': {'type': 'string', 'description': 'Name of something'},
 'swtich_value': {'type': 'boolean', 'description': 'Is the switch on of off'},
 'collection_of_things': {'type': 'array',
  'description': 'This store collection of values'},
 'output_object': {'type': 'object', 'description': 'Object output'},
 'output_file': {'type': 'file',
  'description': 'This a path to the output file'},
 'output_image': {'type': 'image',
  'highlight': False,
  'description': 'This a path to the output image'},
 'log': {'type': 'file',
  'highlight': True,
  'description': 'The log file of the pipeline run'},
 'profile': {'type': 'file',
  'highlight': True,
  'description': 'The profile of the pipeline run'},
 'commands': {'type': 'file',
  'highlight': True,
  'description': 'The file with shell commands executed by this pipeline'},
 'version': {'type': 'string',
  'highlight': True,
  'description': 'Pipeline version'}}

The highlighting feature can be used by pipestat clients to present the highlighted results in a special way.

psm_highlight.highlighted_results
['log', 'profile', 'commands', 'version']

Pipeline status management

Pipestat provides a pipeline status management system, which can be used to set and read pipeline status. To maintain the status information between sessions it uses flags or additional DB table if the PipestatManager object is backed with YAML file or PostgreSQL database, respectively.

To set pipeline status use set_status method:

psm.set_status(record_identifier="sample1", status_identifier="running")

To get pipeline status use get_status method:

psm.get_status(record_identifier="sample1")
'running'

Allowable statuses and related metadata are defined in the status schema, which can be accessed via PipestatManager.status_schema property.

psm.status_schema
{'running': {'description': 'the pipeline is running',
  'color': [30, 144, 255]},
 'completed': {'description': 'the pipeline has completed',
  'color': [50, 205, 50]},
 'failed': {'description': 'the pipeline has failed', 'color': [220, 20, 60]},
 'waiting': {'description': 'the pipeline is waiting',
  'color': [240, 230, 140]},
 'partial': {'description': 'the pipeline stopped before completion point',
  'color': [169, 169, 169]}}

pipestat Python package ships with a default status schema, so we did not have to provide the schema when constructing the PipestatManager object. Similarly, the flags containg directory is an optional configuration option.

Please refer to the Python API documentation (__init__ method) to see how to use custom status schema and flags directory.

Initializing PipestatManager without results schema

Starting with pipestat 0.0.3, it is possible to initialize the PipestatManager object without specifying the results schema file. This feature comes in handy if PipestatManager is created with a sole intent to monitor pipeline status.

Here's an example:

_, temp_file_no_schema = mkstemp(suffix=".yaml")
print(temp_file_no_schema)

psm_no_schema = pipestat.PipestatManager(
    namespace="test_no_schema", results_file_path=temp_file_no_schema
)
/var/folders/h8/8npwnh2s4rb8lr6hsy2ydrsh0000gp/T/tmpauamyheb.yaml

As you can see, the object has been initialized successfully. Obviously, the schema has to be defined to report and retrieve results as the requirement to predefine results and therefore the possibility to rely on the schema to gather all the possible results metadata in the pipestat clients is a big advantage.

try:
    psm_no_schema.report(record_identifier="sample1", values={"key": "val"})
except pipestat.SchemaNotFoundError as e:
    print(e)
Results schema not found. The schema is required to report results. It needs to be supplied to the object constructor or via 'PIPESTAT_RESULTS_SCHEMA' environment variable.

As mentioned above, the pipeline status management capabilities are supported with no results schema defined:

psm_no_schema.set_status(status_identifier="running", record_identifier="sample1")
psm_no_schema.get_status(record_identifier="sample1")
'running'