import argparse
from configparser import ConfigParser, ExtendedInterpolation
from copy import copy as _copy
import os
import re
import warnings
import tab
from . import __version__
from .align import SUPPORTED_ALIGNER
from .annotate.constants import DEFAULTS as ANNOTATION_DEFAULTS
from .annotate.file_io import load_annotations
from .bam.cache import BamCache
from .bam.stats import compute_genome_bam_stats, compute_transcriptome_bam_stats
from .cluster.constants import DEFAULTS as CLUSTER_DEFAULTS
from .constants import DISEASE_STATUS, SUBCOMMAND, PROTOCOL, float_fraction
from .illustrate.constants import DEFAULTS as ILLUSTRATION_DEFAULTS
from .pairing.constants import DEFAULTS as PAIRING_DEFAULTS
from .submit import OPTIONS as SUBMIT_OPTIONS
from .submit import SCHEDULER
from .summary.constants import DEFAULTS as SUMMARY_DEFAULTS
from .tools import SUPPORTED_TOOL
from .util import bash_expands, cast, devnull, ENV_VAR_PREFIX, MavisNamespace, WeakMavisNamespace, get_env_variable, log_arguments
from .validate.constants import DEFAULTS as VALIDATION_DEFAULTS
REFERENCE_DEFAULTS = WeakMavisNamespace(
annotations='',
reference_genome='',
template_metadata='',
masking='',
aligner_reference='',
dgv_annotation=''
)
CONVERT_OPTIONS = WeakMavisNamespace()
CONVERT_OPTIONS.add('assume_no_untemplated', True, defn='assume that if not given there is no untemplated sequence between the breakpoints')
[docs]class RangeAppendAction(argparse.Action):
"""
allows an argument to accept a range of arguments
"""
def __init__(self, nmin=1, nmax=None, **kwargs):
kwargs.setdefault('nargs', '+')
kwargs.setdefault('default', [])
argparse.Action.__init__(self, **kwargs)
self.nmin = nmin
self.nmax = nmax
assert nmin is not None
def __call__(self, parser, namespace, values, option_string=None):
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, [])
items = _copy(getattr(namespace, self.dest))
items.append(values)
if self.nmax is None:
if len(values) < self.nmin:
raise argparse.ArgumentError(
self, 'must have at least {} arguments. Given: {}'.format(self.nmin, values))
elif not self.nmin <= len(values) <= self.nmax:
raise argparse.ArgumentError(
self, 'requires {}-{} arguments. Given: {}'.format(self.nmin, self.nmax, values))
setattr(namespace, self.dest, items)
[docs]def cast_if_not_none(value, cast_type):
"""
cast a value to a given type unless it is None
Example:
>>> cast_if_not_none('1', int)
1
>>> cast_if_not_none(None, int)
None
>>> cast_if_not_none('null', int)
None
>>> cast_if_not_none('', int)
None
>>> cast_if_not_none('none', int)
None
"""
if value is None or str(value).lower() in ['none', 'null', '']:
return None
return cast(value, cast_type)
[docs]class LibraryConfig(MavisNamespace):
"""
holds library specific configuration information
"""
def __init__(
self, library, protocol, disease_status, bam_file=None, inputs=None, read_length=None, median_fragment_size=None,
stdev_fragment_size=None, strand_specific=False, strand_determining_read=2,
**kwargs
):
MavisNamespace.__init__(self)
self.library = library
self.protocol = PROTOCOL.enforce(protocol)
self.bam_file = bam_file
self.read_length = cast_if_not_none(read_length, int)
self.median_fragment_size = cast_if_not_none(median_fragment_size, int)
self.stdev_fragment_size = cast_if_not_none(stdev_fragment_size, int)
self.strand_specific = cast(strand_specific, bool)
self.strand_determining_read = int(strand_determining_read)
self.disease_status = DISEASE_STATUS.enforce(disease_status)
try:
self.inputs = [f for f in re.split(r'[;\s]+', inputs) if f]
except TypeError:
self.inputs = inputs if inputs is not None else []
for attr, value in kwargs.items():
for namespace in [CLUSTER_DEFAULTS, VALIDATION_DEFAULTS, ANNOTATION_DEFAULTS]:
if attr not in namespace:
continue
setattr(self, attr, namespace.type(attr)(value))
break
if 'MAVIS_FETCH_METHOD_INDIVIDUAL' not in os.environ and 'fetch_method_individual' not in kwargs:
if self.protocol == PROTOCOL.TRANS:
self.fetch_method_individual = False
else:
self.fetch_method_individual = True
[docs] def flatten(self):
result = MavisNamespace.flatten(self)
result['inputs'] = '\n'.join(result['inputs'])
return result
[docs] def is_trans(self):
return True if self.protocol == PROTOCOL.TRANS else False
[docs] @staticmethod
def build(
library, protocol, bam_file, inputs,
annotations=None,
log=devnull,
distribution_fraction=0.98,
sample_cap=3000,
sample_bin_size=1000,
sample_size=500,
**kwargs
):
"""
Builds a library config section and gathers the bam stats
"""
PROTOCOL.enforce(protocol)
if protocol == PROTOCOL.TRANS and annotations is None:
raise AttributeError(
'missing required attribute: annotations. Annotations must be given for transcriptomes')
bam = BamCache(bam_file)
if protocol == PROTOCOL.TRANS:
bamstats = compute_transcriptome_bam_stats(
bam,
annotations=annotations,
sample_size=sample_size,
sample_cap=sample_cap,
distribution_fraction=distribution_fraction
)
elif protocol == PROTOCOL.GENOME:
bamstats = compute_genome_bam_stats(
bam,
sample_size=sample_size,
sample_bin_size=sample_bin_size,
sample_cap=sample_cap,
distribution_fraction=distribution_fraction
)
else:
raise ValueError('unrecognized value for protocol', protocol)
log(bamstats)
return LibraryConfig(
library=library, protocol=protocol, bam_file=bam_file, inputs=inputs,
median_fragment_size=bamstats.median_fragment_size,
stdev_fragment_size=bamstats.stdev_fragment_size,
read_length=bamstats.read_length,
strand_determining_read=bamstats.strand_determining_read,
**kwargs
)
[docs] @classmethod
def parse_args(cls, *args):
# '<name>', '(genome|transcriptome)', '<diseased|normal>', '[strand_specific]', '[/path/to/bam/file]'
if len(args) < 4:
return LibraryConfig(args[0], protocol=args[1], disease_status=args[2])
elif len(args) < 5:
return LibraryConfig(args[0], protocol=args[1], disease_status=args[2], strand_specific=args[3])
return LibraryConfig(args[0], protocol=args[1], disease_status=args[2], strand_specific=args[3], bam_file=args[4])
[docs]def write_config(filename, include_defaults=False, libraries=[], conversions={}, log=devnull):
"""
Args:
filename (str): path to the output file
include_defaults (bool): True if default parameters should be written to the config, False otherwise
libraries (list of LibraryConfig): library configuration sections
conversions (dict of list by str): conversion commands by alias name
log (function): function to pass output logging to
"""
config = {}
config['reference'] = REFERENCE_DEFAULTS.flatten()
for filetype, fname in REFERENCE_DEFAULTS.items():
if fname is None:
warnings.warn('filetype {} has not been set. This must be done manually before the configuration file is used'.format(filetype))
if libraries:
for lib in libraries:
config[lib.library] = lib.flatten()
if include_defaults:
config['schedule'] = SUBMIT_OPTIONS.flatten()
config['validate'] = VALIDATION_DEFAULTS.flatten()
config['cluster'] = CLUSTER_DEFAULTS.flatten()
config['annotate'] = ANNOTATION_DEFAULTS.flatten()
config['illustrate'] = ILLUSTRATION_DEFAULTS.flatten()
config['summary'] = SUMMARY_DEFAULTS.flatten()
config['convert'] = CONVERT_OPTIONS.flatten()
for alias, command in conversions.items():
if alias in CONVERT_OPTIONS:
raise UserWarning('error in writing config. Alias for conversion product cannot be a setting', alias, CONVERT_OPTIONS.keys())
config['convert'][alias] = '\n'.join(command)
for sec in config:
for tag, value in config[sec].items():
if '_regex_' in tag:
config[sec][tag] = re.sub(r'\$', '$$', config[sec][tag])
elif isinstance(value, list):
config[sec][tag] = '\n'.join([str(v) for v in value])
else:
config[sec][tag] = str(value)
conf = ConfigParser()
for sec in config:
conf[sec] = {}
for tag, val in config[sec].items():
conf[sec][tag] = val
log('writing:', filename)
with open(filename, 'w') as configfile:
conf.write(configfile)
[docs]def validate_section(section, namespace, use_defaults=False):
"""
given a dictionary of values, returns a new dict with the values casted to their appropriate type or set
to a default if the value was not given
"""
new_namespace = MavisNamespace()
if use_defaults:
for attr, value in namespace.items():
new_namespace.add(attr, value, cast_type=namespace.type(attr))
for attr, value in section.items():
if attr not in namespace:
raise KeyError('tag not recognized', attr)
else:
new_namespace.add(attr, namespace.type(attr)(value), cast_type=namespace.type(attr))
return new_namespace
[docs]class MavisConfig:
def __init__(self, **kwargs):
# section can be named schedule or qsub to support older versions
self.schedule = validate_section(kwargs.pop('schedule', kwargs.pop('qsub', {})), SUBMIT_OPTIONS, True)
# set the global defaults
for sec, defaults in [
('pairing', PAIRING_DEFAULTS),
('summary', SUMMARY_DEFAULTS),
('validate', VALIDATION_DEFAULTS),
('annotate', ANNOTATION_DEFAULTS),
('illustrate', ILLUSTRATION_DEFAULTS),
('cluster', CLUSTER_DEFAULTS),
('reference', REFERENCE_DEFAULTS)
]:
setattr(self, sec, validate_section(kwargs.pop(sec, {}), defaults, True))
SUPPORTED_ALIGNER.enforce(self.validate.aligner)
for attr, fname in self.reference.items():
if not os.path.exists(fname):
raise OSError(attr, 'file at', fname, 'does not exist')
# set the conversion section
self.convert = kwargs.pop('convert', {})
for attr, val in self.convert.items():
if attr in CONVERT_OPTIONS:
self.convert[attr] = CONVERT_OPTIONS.type(attr)(val)
continue
val = [v for v in re.split(r'[;\s]+', val) if v]
if val[0] == 'convert_tool_output':
if len(val) < 3 or val[2] not in SUPPORTED_TOOL.values():
raise UserWarning(
'conversion using the built-in convert_tool_output requires specifying the input file and '
'tool name currently supported tools include:', SUPPORTED_TOOL.values(), 'given', val)
inputs = bash_expands(val[1])
if len(inputs) < 1:
raise OSError('input file(s) do not exist', val[1])
if len(val) == 4:
val[3] = tab.cast_boolean(val[3])
elif len(val) > 4:
raise UserWarning(
'conversion using the built-in convert_tool_output takes at most 3 arguments')
self.convert[attr] = val
self.convert = MavisNamespace(**self.convert)
# now add the library specific sections
self.libraries = {}
for libname, val in kwargs.items(): # all other sections already popped
d = {}
d.update(self.cluster.items())
d.update(self.validate.items())
d.update(self.annotate.items())
d.update(val)
d['library'] = libname
val['library'] = libname
self.libraries[libname] = LibraryConfig(**val)
# now try building the LibraryConfig object
try:
lc = LibraryConfig(**d)
self.libraries[libname] = lc
except TypeError as terr: # missing required argument
try:
lc = LibraryConfig.build(**d)
self.libraries[libname] = lc
except Exception as err:
raise UserWarning('could not build configuration section for library', libname, err, terr)
[docs] def has_transcriptome(self):
return any([l.is_trans() for l in self.libraries.values()])
[docs] @staticmethod
def read(filepath):
"""
reads the configuration settings from the configuration file
Args:
filepath (str): path to the input configuration file
Returns:
class:`list` of :class:`Namespace`: namespace arguments for each library
"""
parser = ConfigParser(interpolation=ExtendedInterpolation())
parser.read(filepath)
config_dict = {}
# get the library sections and add the default settings
for sec in parser.sections():
config_dict.setdefault(sec, {}).update(parser[sec].items())
return MavisConfig(**config_dict)
[docs]def add_semi_optional_argument(argname, success_parser, failure_parser, help_msg='', metavar=None):
"""
for an argument tries to get the argument default from the environment variable
"""
env_name = ENV_VAR_PREFIX + argname.upper()
help_msg += ' The default for this argument is configured by setting the environment variable {}'.format(env_name)
if os.environ.get(env_name, None):
required = required = bool(success_parser.title.startswith('required'))
success_parser.add_argument('--{}'.format(argname), required=required, default=os.environ[env_name], help=help_msg, metavar=metavar)
else:
required = required = bool(failure_parser.title.startswith('required'))
failure_parser.add_argument('--{}'.format(argname), required=required, help=help_msg, metavar=metavar)
[docs]def augment_parser(arguments, parser, semi_opt_parser=None, required=None):
"""
Adds options to the argument parser. Separate function to facilitate the pipeline steps
all having a similar look/feel
"""
if required is None:
required = bool(parser.title.startswith('required'))
for arg in arguments:
if arg == 'help':
parser.add_argument('-h', '--help', action='help', help='show this help message and exit')
elif arg == 'version':
parser.add_argument(
'-v', '--version', action='version', version='%(prog)s version ' + __version__,
help='Outputs the version number')
elif arg == 'annotations':
add_semi_optional_argument(
arg, semi_opt_parser, parser, 'Path to the reference annotations of genes, transcript, exons, domains, etc.', 'FILEPATH')
elif arg == 'reference_genome':
add_semi_optional_argument(arg, semi_opt_parser, parser, 'Path to the human reference genome fasta file.', 'FILEPATH')
elif arg == 'template_metadata':
add_semi_optional_argument(arg, semi_opt_parser, parser, 'File containing the cytoband template information.', 'FILEPATH')
elif arg == 'masking':
add_semi_optional_argument(arg, semi_opt_parser, parser, metavar='FILEPATH')
elif arg == 'aligner_reference':
add_semi_optional_argument(
arg, semi_opt_parser, parser, 'path to the aligner reference file used for aligning the contig sequences.', 'FILEPATH')
elif arg == 'dgv_annotation':
add_semi_optional_argument(
arg, semi_opt_parser, parser, 'Path to the dgv reference processed to look like the cytoband file.', 'FILEPATH')
elif arg == 'config':
parser.add_argument('config', 'path to the config file', metavar='FILEPATH')
elif arg == 'bam_file':
parser.add_argument('--bam_file', help='path to the input bam file', required=required, metavar='FILEPATH')
elif arg == 'read_length':
parser.add_argument(
'--read_length', type=int, help='the length of the reads in the bam file',
required=required, metavar=get_metavar(int))
elif arg == 'stdev_fragment_size':
parser.add_argument(
'--stdev_fragment_size', type=int, help='expected standard deviation in insert sizes',
required=required, metavar=get_metavar(int))
elif arg == 'median_fragment_size':
parser.add_argument(
'--median_fragment_size', type=int, help='median inset size for pairs in the bam file', required=required,
metavar=get_metavar(int))
elif arg == 'library':
parser.add_argument('--library', help='library name', required=required)
elif arg == 'protocol':
parser.add_argument('--protocol', choices=PROTOCOL.values(), help='library protocol', required=required)
elif arg == 'disease_status':
parser.add_argument(
'--disease_status', choices=DISEASE_STATUS.values(), help='library disease status', required=required)
elif arg == 'skip_stage':
parser.add_argument(
'--skip_stage', choices=[SUBCOMMAND.CLUSTER, SUBCOMMAND.VALIDATE], action='append', default=[],
help='Use flag once per stage to skip. Can skip clustering or validation or both')
elif arg == 'strand_specific':
parser.add_argument(
'--strand_specific', type=tab.cast_boolean, metavar=get_metavar(bool),
default=False, help='indicates that the input is strand specific')
else:
value_type = None
help_msg = None
default_value = None
choices = None
if arg == 'aligner':
choices = SUPPORTED_ALIGNER.values()
help_msg = 'aligner to use for aligning contigs'
if arg == 'uninformative_filter':
help_msg = 'If flag is False then the clusters will not be filtered based on lack of annotation'
if arg == 'scheduler':
choices = SCHEDULER.keys()
# get default values
for nspace in [
CLUSTER_DEFAULTS,
VALIDATION_DEFAULTS,
ANNOTATION_DEFAULTS,
ILLUSTRATION_DEFAULTS,
PAIRING_DEFAULTS,
SUMMARY_DEFAULTS,
SUBMIT_OPTIONS,
CONVERT_OPTIONS]:
if arg in nspace:
default_value = nspace[arg]
value_type = type(default_value) if not isinstance(default_value, bool) else tab.cast_boolean
if not help_msg:
help_msg = nspace.define(arg)
break
if help_msg is None:
raise KeyError('invalid argument', arg)
parser.add_argument(
'--{}'.format(arg), choices=choices, metavar=get_metavar(value_type),
help=help_msg, required=required, default=default_value, type=value_type
)
[docs]def generate_config(parser, required, optional, log=devnull):
"""
Args:
parser (argparse.ArgumentParser): the main parser
required: the argparse required arguments group
optional: the argparse optional arguments group
"""
# the config sub program is used for writing pipeline configuration files
required.add_argument('-w', '--write', help='path to the new configuration file', required=True, metavar='FILEPATH')
optional.add_argument(
'--library',
metavar='<name> {genome,transcriptome} {diseased,normal} [strand_specific] [/path/to/bam/file]',
action=RangeAppendAction, help='configuration for libraries to be analyzed by mavis', nmin=3, nmax=5
)
optional.add_argument(
'--input', help='path to an input file or filter for mavis followed by the library names it '
'should be used for', nmin=2, action=RangeAppendAction, metavar='FILEPATH <name> [<name> ...]'
)
optional.add_argument(
'--assign', help='library name followed by path(s) to input file(s) or filter names. This represents the list'
' of inputs that should be used for the library', action=RangeAppendAction, nmin=2,
metavar='<name> FILEPATH [FILEPATH ...]')
optional.add_argument(
'--genome_bins', default=get_env_variable('genome_bins', 100), type=int, metavar=get_metavar(int),
help='number of bins/samples to use in calculating the fragment size stats for genomes')
optional.add_argument(
'--transcriptome_bins', default=get_env_variable('transcriptome_bins', 500), type=int, metavar=get_metavar(int),
help='number of genes to use in calculating the fragment size stats for genomes')
optional.add_argument(
'--distribution_fraction', default=get_env_variable('distribution_fraction', 0.97), type=float_fraction, metavar=get_metavar(float),
help='the proportion of the distribution of calculated fragment sizes to use in determining the stdev')
optional.add_argument(
'--convert', nargs=4, default=[],
metavar=('<alias>', 'FILEPATH', '{{{}}}'.format(','.join(SUPPORTED_TOOL.values())), '<stranded>'),
help='input file conversion for internally supported tools', action='append')
optional.add_argument(
'--external_conversion', metavar=('<alias>', '<"command">'), nargs=2, default=[],
help='alias for use in inputs and full command (quoted)', action='append')
optional.add_argument(
'--no_defaults', default=False, action='store_true', help='do not write current defaults to the config output')
augment_parser(['annotations'], optional, optional)
# add the optional annotations file (only need this is auto generating bam stats for the transcriptome)
augment_parser(['skip_stage'], optional)
args = MavisNamespace(**parser.parse_args().__dict__)
try:
# process the libraries by input argument (--input)
libs = []
inputs_by_lib = {}
for libconf in [LibraryConfig.parse_args(*a) for a in args.library]:
if not libconf.bam_file and SUBCOMMAND.VALIDATE not in args.skip_stage:
raise KeyError('argument --library: bam file must be given if validation is not being skipped')
libs.append(libconf)
inputs_by_lib[libconf.library] = set()
for arg_list in args.input:
inputfile = arg_list[0]
for lib in arg_list[1:]:
if lib not in inputs_by_lib:
raise KeyError(
'argument --input: specified a library that was not configured. Please input all libraries using '
'the --library flag', lib)
inputs_by_lib[lib].add(inputfile)
# process the inputs by library argument (--assign)
for arg_list in args.assign:
lib = arg_list[0]
if lib not in inputs_by_lib:
raise KeyError(
'argument --assign: specified a library that was not configured. Please input all libraries using '
'the --library flag', lib)
inputs_by_lib[lib].update(arg_list[1:])
for libconf in libs:
if not inputs_by_lib[libconf.library]:
raise KeyError('argument --input: no input was given for the library', libconf.library)
libconf.inputs = inputs_by_lib[libconf.library]
except KeyError as err:
parser.error(' '.join(err.args))
log('MAVIS: {}'.format(__version__))
log_arguments(args)
if SUBCOMMAND.VALIDATE not in args.skip_stage:
# load the annotations if we need them
if any([l.is_trans() for l in libs]):
if not args.get('annotations'):
parser.error('argument --annotations: is required to gather bam stats for transcriptome libraries')
log('loading the reference annotations file', args.annotations)
args.annotations_filename = args.annotations
args.annotations = load_annotations(args.annotations)
for i, libconf in enumerate(libs):
log('generating the config section for:', libconf.library)
libs[i] = LibraryConfig.build(
library=libconf.library, protocol=libconf.protocol, bam_file=libconf.bam_file,
inputs=inputs_by_lib[libconf.library], strand_specific=libconf.strand_specific,
disease_status=libconf.disease_status, annotations=args.annotations, log=log,
sample_size=args.genome_bins if libconf.protocol == PROTOCOL.GENOME else args.transcriptome_bins,
distribution_fraction=args.distribution_fraction
)
convert = {}
for alias, command in args.external_conversion:
if alias in convert:
raise KeyError('duplicate alias names are not allowed', alias)
convert[alias] = []
open_option = False
for item in re.split(r'\s+', command):
if convert[alias]:
if open_option:
convert[alias][-1] += ' ' + item
open_option = False
else:
convert[alias].append(item)
if item[0] == '-':
open_option = True
else:
convert[alias].append(item)
for alias, inputfile, toolname, stranded in args.convert:
if alias in convert:
raise KeyError('duplicate alias names are not allowed', alias)
stranded = str(tab.cast_boolean(stranded))
SUPPORTED_TOOL.enforce(toolname)
convert[alias] = ['convert_tool_output', inputfile, toolname, stranded]
write_config(args.write, include_defaults=not args.no_defaults, libraries=libs, conversions=convert, log=log)