Writing a workflow- Tutorial
The explicit aim of cgat-core is to allow users to quickly and easily build their own computational pipelines that will speed up your analysis workflow.
Installation of cgat-core
In order to begin writing a pipeline you will need to install the cgat-core code (see Installation) for installation instructions.
Tutorial start
Setting up the pipleine
1. First navigate to a directory where you want to start building your code:
mkdir test && cd test && mkdir configuration && touch configuration/pipeline.yml && touch pipeline_test.py && touch ModuleTest.py
This will create a directory called test in the current directory with the following layout:
|-- configuration
| `-- pipeline.yml
`-- pipeline_test.py
-- ModuleTest.py
The layout has the following components:
- pipeline_test.py
This is the file that will contain all of the ruffus workflows, the file needs the format pipeline_<name>.py
- test/
Directory containing the configuration yml file. The directory needs to be named the same as the pipeline_<name>.py file. This folder will contain the pipeline.yml configuration file.
- ModuleTest.py
This file will contain functions that will be imported into the main ruffus workflow file, pipeline_test.py
2. View the source code within pipeline_test.py
This is where the ruffus tasks will be written. The code begins with a doc string detailing the pipeline functionality. You should use this section to document your pipeline.
'''This pipeline is a test and this is where the documentation goes '''
The pipeline then needs a few utility functions to help with executing the pipeline.
Import statements You will need to import ruffus and cgatcore utilities
from ruffus import *
import cgatcore.experiment as E
from cgatcore import pipeline as P
Importing ruffus allows ruffus decorators to be used within out pipeline
Importing experiment from cgatcore is a module that contains ultility functions for argument parsion, logging and record keeping within scripts.
Importing pipeline from cgatcore allows users to run utility functions for interfacing with CGAT ruffus pipelines with an HPC cluster, uploading data to a database, provides paramaterisation and more.
You’ll also need python modules:
import os
import sys
Config parser: This code helps with parsing the pipeline.yml file:
# load options from the config file
PARAMS = P.get_parameters(
["%s/pipeline.yml" % os.path.splitext(__file__)[0],
"../pipeline.yml",
"pipeline.yml"])
Pipeline configuration: We will add configurable variables to our pipeline.yml file so that we can modify the output of out pipeline. With pipeline.yml open, copy and paste the following into the file.
database:
name: "csvdb"
When you come to run the pipeline the configuration variables (in this case csvdb) can be accessed in the pipeline by PARAMS[“database_name”].
Database connection: This code helps with connecting to a sqlite database:
def connect():
'''utility function to connect to database.
Use this method to connect to the pipeline database.
Additional databases can be attached here as well.
Returns an sqlite3 database handle.
'''
dbh = sqlite3.connect(PARAMS["database_name"])
return dbh
Commandline parser: This bit of code allows pipeline to parse arguments.
def main(argv=None):
if argv is None:
argv = sys.argv
P.main(argv)
if __name__ == "__main__":
sys.exit(P.main(sys.argv))
Running test pipeline
You now have the bare bones layout of the pipeline and you now need code to execute. Below you will
find example code that you can copy and paste into your pipeline_test.py file. The code
includes two ruffus @transform tasks that parse the pipeline.yml. The first function
called countWords
is then called which contains a statement that counts the
number of words in the file. The statement is then ran using P.run()
function.
The second ruffus @transform function called loadWordCounts
takes as an input the output of
the function countWords and loads the number of words to a sqlite database using P.load()
.
The third def full()
function is a dummy task that is written to run the whole
pipeline. It has an @follows function that takes the loadWordCounts
function.
This helps complete the pipeline chain and the pipeline can be ran with the tak name full to execute the
whole workflow.
The following code should be pasted just before the Commandline parser arguments and after the database connection code.
# ---------------------------------------------------
# Specific pipeline tasks
@transform("pipeline.yml",
regex("(.*)\.(.*)"),
r"\1.counts")
def countWords(infile, outfile):
'''count the number of words in the pipeline configuration files.'''
# the command line statement we want to execute
statement = '''awk 'BEGIN { printf("word\\tfreq\\n"); }
{for (i = 1; i <= NF; i++) freq[$i]++}
END { for (word in freq) printf "%%s\\t%%d\\n", word, freq[word] }'
< %(infile)s > %(outfile)s'''
# execute command in variable statement.
#
# The command will be sent to the cluster. The statement will be
# interpolated with any options that are defined in in the
# configuration files or variable that are declared in the calling
# function. For example, %(infile)s will we substituted with the
# contents of the variable "infile".
P.run(statement)
@transform(countWords,
suffix(".counts"),
"_counts.load")
def loadWordCounts(infile, outfile):
'''load results of word counting into database.'''
P.load(infile, outfile, "--add-index=word")
# ---------------------------------------------------
# Generic pipeline tasks
@follows(loadWordCounts)
def full():
pass
To run the pipeline navigate to the working directory and then run the pipeline.
python /location/to/code/pipeline_test.py config
python /location/to/code/pipeline_test.py show full -v 5
This will place the pipeline.yml in the folder. Then run
python /location/to/code/pipeline_test.py make full -v5 --local
The pipeline will then execute and count the words in the yml file.
Modifying the test pipeline to build your own workflows
The next step is to modify the basic code in the pipeline to fit your particular NGS workflow needs. For example, say we wanted to convert a sam file into a bam file then perform flag stats on that output bam file. The code and layout that we just wrote can be easily modified to perform this. We would remove all of the code from the specific pipeline tasks and write our own.
The pipeline will have two steps: 1. Identify all sam files and convert to a bam file. 2. Take the output of step 1 and then perform flagstats on that bam file.
The first step would involve writing a function to identify all
sam files in a data.dir/ directory. This first function would accept a sam file then
use samtools view to convert it to a bam file. Therefore, we would require an @transform
function.
The second function would then take the output of the first function, perform samtools
flagstat and then output the results as a flat .txt file. Again, an @transform
function is required
to track the input and outputs.
This would be written as follows:
@transform("data.dir/*.sam",
regex("data.dir/(\S+).sam"),
r"\1.bam")
def bamConvert(infile, outfile):
'convert a sam file into a bam file using samtools view'
statement = ''' samtools view -bT /ifs/mirror/genomes/plain/hg19.fasta
%(infile)s > %(outfile)s'''
P.run(statement)
@transform(bamConvert,
suffix(".bam"),
"_flagstats.txt")
def bamFlagstats(infile, outfile):
'perform flagstats on a bam file'
statement = '''samtools flagstat %(infile)s > %(outfile)s'''
P.run(statement)
To run the pipeline:
python /path/to/file/pipeline_test.py make full -v5
The bam files and flagstats outputs should then be generated.
Parameterising the code using the .yml file
Having written the basic function of our pipleine, as a philosophy, we try and avoid any hard coded parameters.
This means that any variables can be easily modified by the user without having to modify any code.
Looking at the code above, the hard coded link to the hg19.fasta file can be added as a customisable parameter. This could allow the user to specify any fasta file depending on the genome build used to map and generate the bam file.
In order to do this the pipeline.yml
file needs to be modified. This
can be performed in the following way:
Configuration values are accessible via the PARAMS
variable. The PARAMS
variable is a dictionary mapping
configuration parameters to values. Keys are in the format
section_parameter
. For example, the key genome_fasta
will
provide the configuration value of:
genome:
fasta: /ifs/mirror/genomes/plain/hg19.fasta
In the pipeline.yml, add the above code to the file. In the pipeline_test.py
code the value can be accessed via PARAMS["genome_fasta"]
.
Therefore the code we wrote before for parsing bam files can be modified to
@transform("data.dir/*.sam",
regex("data.dir/(\S+).sam"),
r"\1.bam")
def bamConvert(infile, outfile):
'convert a sam file into a bam file using samtools view'
genome_fasta = PARAMS["genome_fasta"]
statement = ''' samtools view -bT %(genome_fasta)s
%(infile)s > %(outfile)s'''
P.run(statement)
@transform(bamConvert,
suffix(".bam"),
"_flagstats.txt")
def bamFlagstats(infile, outfile):
'perform flagstats on a bam file'
statement = '''samtools flagstat %(infile)s > %(outfile)s'''
P.run(statement)
Running the code again should generate the same output. However, if you had bam files that came from a different genome build then the parameter in the yml file can be modified easily, the output files deleted and the pipeline ran using the new configuration values.