import os
import json
import re
from .variant import annotate_events, determine_prime, flatten_fusion_translation, flatten_fusion_transcript
from .genomic import usTranscript
from ..constants import PROTOCOL, COLUMNS, PRIME, sort_columns
from ..error import DrawingFitError, NotSpecifiedError
from ..illustrate.constants import DiagramSettings
from ..illustrate.constants import DEFAULTS as ILLUSTRATION_DEFAULTS
from ..illustrate.diagram import draw_sv_summary_diagram
from .constants import DEFAULTS, ACCEPTED_FILTERS
from ..util import log, mkdirp, read_inputs, generate_complete_stamp
import warnings
[docs]def main(
inputs, output,
reference_genome, annotations, template_metadata,
min_domain_mapping_match=DEFAULTS.min_domain_mapping_match,
min_orf_size=DEFAULTS.min_orf_size,
max_orf_cap=DEFAULTS.max_orf_cap,
annotation_filters=DEFAULTS.annotation_filters,
**kwargs
):
"""
Args:
inputs (:class:`List` of :class:`str`): list of input files to read
output (str): path to the output directory
reference_genome (object): see :func:`~mavis.annotate.file_io.load_reference_genome`
annotations(object): see :func:`~mavis.annotate.file_io.load_reference_genes`
template_metadata (object): see :func:`~mavis.annotate.file_io.load_templates`
min_domain_mapping_match (float): min mapping match percent (0-1) to count a domain as mapped
min_orf_size (int): minimum size of an :term:`open reading frame` to keep as a putative translation
max_orf_cap (int): the maximum number of :term:`open reading frame` s to collect for any given event
"""
DRAWINGS_DIRECTORY = os.path.join(output, 'drawings')
TABBED_OUTPUT_FILE = os.path.join(output, 'annotations.tab')
FA_OUTPUT_FILE = os.path.join(output, 'annotations.fusion-cdna.fa')
annotation_filters = [] if not annotation_filters else annotation_filters.split(',')
annotation_filters = [ACCEPTED_FILTERS[a] for a in annotation_filters]
mkdirp(DRAWINGS_DIRECTORY)
# test that the sequence makes sense for a random transcript
bpps = read_inputs(
inputs, in_={COLUMNS.protocol: PROTOCOL},
expand_ns=False, explicit_strand=False
)
log('read {} breakpoint pairs'.format(len(bpps)))
annotations = annotate_events(
bpps,
reference_genome=reference_genome, annotations=annotations,
min_orf_size=min_orf_size, min_domain_mapping_match=min_domain_mapping_match,
max_orf_cap=max_orf_cap,
log=log,
filters=annotation_filters
)
fa_sequence_names = set()
# now try generating the svg
DS = DiagramSettings(**{k: v for k, v in kwargs.items() if k in ILLUSTRATION_DEFAULTS.__dict__})
header_req = {
COLUMNS.break1_strand,
COLUMNS.break2_strand,
COLUMNS.fusion_sequence_fasta_file,
COLUMNS.fusion_splicing_pattern,
COLUMNS.fusion_cdna_coding_start,
COLUMNS.fusion_cdna_coding_end,
COLUMNS.fusion_sequence_fasta_id,
COLUMNS.fusion_mapped_domains,
COLUMNS.exon_first_3prime,
COLUMNS.exon_last_5prime,
COLUMNS.annotation_id,
COLUMNS.annotation_figure,
COLUMNS.annotation_figure_legend,
COLUMNS.cdna_synon,
COLUMNS.protein_synon
}
header = None
log('opening for write:', TABBED_OUTPUT_FILE)
tabbed_fh = open(TABBED_OUTPUT_FILE, 'w')
log('opening for write:', FA_OUTPUT_FILE)
fasta_fh = open(FA_OUTPUT_FILE, 'w')
try:
total = len(annotations)
for i, ann in enumerate(annotations):
row = ann.flatten()
row[COLUMNS.break1_strand] = ann.transcript1.get_strand()
row[COLUMNS.break2_strand] = ann.transcript2.get_strand()
row[COLUMNS.fusion_sequence_fasta_file] = FA_OUTPUT_FILE
if header is None:
header_req.update(row.keys())
header = sort_columns(header_req)
tabbed_fh.write('\t'.join([str(c) for c in header]) + '\n')
log(
'({} of {}) current annotation'.format(i + 1, total),
ann.annotation_id, ann.transcript1, ann.transcript2, ann.event_type)
# get the reference sequences for either transcript
ref_cdna_seq = {}
ref_protein_seq = {}
for ust in [x for x in [ann.transcript1, ann.transcript2] if isinstance(x, usTranscript)]:
name = ust.name
for tr in ust.spliced_transcripts:
ref_cdna_seq.setdefault(tr.get_seq(reference_genome), set()).add(name)
for tx in tr.translations:
ref_protein_seq.setdefault(tx.get_AA_seq(reference_genome), set()).add(name)
# try building the fusion product
rows = []
# add fusion information to the current row
transcripts = [] if not ann.fusion else ann.fusion.transcripts
for t in transcripts:
fusion_fa_id = '{}_{}'.format(ann.annotation_id, t.splicing_pattern.splice_type)
fusion_fa_id = re.sub('\s', '-', fusion_fa_id)
if fusion_fa_id in fa_sequence_names:
raise AssertionError('should not be duplicate fa sequence ids', fusion_fa_id)
seq = ann.fusion.get_cdna_seq(t.splicing_pattern)
fasta_fh.write('> {}\n{}\n'.format(fusion_fa_id, seq))
cdna_synon = ';'.join(sorted(list(ref_cdna_seq.get(seq, set()))))
temp_row = {}
temp_row.update(row)
temp_row.update(flatten_fusion_transcript(t))
temp_row[COLUMNS.fusion_sequence_fasta_id] = fusion_fa_id
temp_row[COLUMNS.cdna_synon] = cdna_synon
if len(t.translations):
# duplicate the row for each translation
for tl in t.translations:
nrow = dict()
nrow.update(row)
nrow.update(temp_row)
aa = tl.get_AA_seq()
protein_synon = ';'.join(sorted(list(ref_protein_seq.get(aa, set()))))
nrow[COLUMNS.protein_synon] = protein_synon
# select the exon
nrow.update(flatten_fusion_translation(tl))
rows.append(nrow)
else:
temp_row.update(row)
rows.append(temp_row)
drawing = None
retry_count = 0
draw_fusion_transcript = True
draw_reference_transcripts = True
initial_width = DS.width
while drawing is None: # continue if drawing error and increase width
try:
canvas, legend = draw_sv_summary_diagram(
DS, ann, reference_genome=reference_genome,
templates=template_metadata,
draw_fusion_transcript=draw_fusion_transcript,
draw_reference_transcripts=draw_reference_transcripts
)
gene_aliases1 = 'NA'
gene_aliases2 = 'NA'
try:
if len(ann.transcript1.gene.aliases) > 0:
gene_aliases1 = '-'.join(ann.transcript1.gene.aliases)
if ann.transcript1.is_best_transcript:
gene_aliases1 = 'b-' + gene_aliases1
except AttributeError:
pass
try:
if len(ann.transcript2.gene.aliases) > 0:
gene_aliases2 = '-'.join(ann.transcript2.gene.aliases)
if ann.transcript2.is_best_transcript:
gene_aliases2 = 'b-' + gene_aliases2
except AttributeError:
pass
try:
if determine_prime(ann.transcript1, ann.break1) == PRIME.THREE:
gene_aliases1, gene_aliases2 = gene_aliases2, gene_aliases1
except NotSpecifiedError:
pass
name = 'mavis_{}-chr{}_chr{}-{}_{}'.format(
ann.annotation_id, ann.break1.chr, ann.break2.chr, gene_aliases1, gene_aliases2
)
drawing = os.path.join(DRAWINGS_DIRECTORY, name + '.svg')
l = os.path.join(DRAWINGS_DIRECTORY, name + '.legend.json')
for r in rows + [row]:
r[COLUMNS.annotation_figure] = drawing
r[COLUMNS.annotation_figure_legend] = l
log('generating svg:', drawing, time_stamp=False)
canvas.saveas(drawing)
log('generating legend:', l, time_stamp=False)
with open(l, 'w') as fh:
json.dump(legend, fh)
break
except DrawingFitError as err:
DS.width += DS.drawing_width_iter_increase
log('extending width by', DS.drawing_width_iter_increase, 'to', DS.width, time_stamp=False)
retry_count += 1
if retry_count > DS.max_drawing_retries:
if draw_fusion_transcript and draw_reference_transcripts:
log('restricting to gene-level only', time_stamp=False)
draw_fusion_transcript = False
draw_reference_transcripts = False
DS.width = initial_width
retry_count = 0
else:
warnings.warn(str(err))
drawing = True
DS.width = initial_width # reset the width
if len(rows) == 0:
rows = [row]
for row in rows:
tabbed_fh.write('\t'.join([str(row.get(k, None)) for k in header]) + '\n')
generate_complete_stamp(output, log)
finally:
log('closing:', TABBED_OUTPUT_FILE)
tabbed_fh.close()
log('closing:', FA_OUTPUT_FILE)
fasta_fh.close()