Source code for janggu.utils

"""Utilities for janggu """

import json
import os
from collections import defaultdict
from collections import OrderedDict
from copy import deepcopy

import numpy as np
import pandas as pd
from Bio import SeqIO
from Bio.Alphabet import IUPAC
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from pybedtools import BedTool
from pybedtools import Interval

import matplotlib.pyplot as plt
import pyBigWig
import seaborn as sns

from sklearn.manifold import TSNE  # pylint: disable=import-error


try:
    from urllib import urlcleanup, urlretrieve
except ImportError:
    try:
        from urllib.request import urlcleanup, urlretrieve
    except ImportError as exception:  # pragma: no cover
        urlretrieve = None
        urlcleanup = None


def _get_output_root_directory():
    """Function returns the output root directory."""
    if "JANGGU_OUTPUT" not in os.environ:  # pragma: no cover

        os.environ['JANGGU_OUTPUT'] = os.path.join(os.path.expanduser("~"),
                                                   'janggu_results')
        print('environment variable JANGGU_OUTPUT not set.'
              ' Will use JANGGU_OUTPUT={}'.format(os.environ['JANGGU_OUTPUT']))
    return os.environ['JANGGU_OUTPUT']


def _get_output_data_location(datatags):
    """Function returns the output location for the dataset.

    Parameters
    ------------
    datatags : list(str)
        Tags describing the dataset. E.g. ['testdna'].
    """
    outputdir = _get_output_root_directory()

    args = (outputdir, 'datasets')
    if datatags is not None:
        args += tuple(datatags)

    return os.path.join(*args)


def sequences_from_fasta(fasta, string='dna'):
    """Obtains nucleotide or peptide sequences from a fasta file.

    Parameters
    -----------
    fasta : str
        Fasta-filename
    string : str
        Either dna or protein.

    Returns
        List of Biostring sequences
    """

    file_ = open(fasta)
    if string == 'dna':
        alpha = IUPAC.unambiguous_dna
    else:
        alpha = IUPAC.protein
    gen = SeqIO.parse(file_, "fasta", alpha)
    seqs = [item for item in gen]

    return seqs


def _to_list(objs):
    """Makes a list of objs"""
    if objs is None:
        return []
    elif not isinstance(objs, list):
        return [objs]
    else:
        return objs

def _check_valid_files(list_of_files):
    if len(list_of_files) == 0:
        raise ValueError("Empty list fo files.")
    for f in list_of_files:
        if not os.path.exists(f):
            raise ValueError("File {} does not exist.".format(f))
    return list_of_files

NOLETTER = -100000000
LETTERMAP = {k: i for i, k in enumerate(sorted(IUPAC.unambiguous_dna.letters))}
NNUC = len(IUPAC.unambiguous_dna.letters)

# mapping of nucleotides to integers
NMAP = defaultdict(lambda: NOLETTER)
NMAP.update(LETTERMAP)

# mapping of amino acids to integers
LETTERMAP = {k: i for i, k in enumerate(sorted(IUPAC.protein.letters))}
PMAP = defaultdict(lambda: NOLETTER)
PMAP.update(LETTERMAP)


def seq2ind(seq):
    """Transforms a biological sequence into an int array.

    Each nucleotide or amino acid maps to an integer between
    zero and len(alphabet) - 1.

    Any other characters (e.g. 'N') are represented by a negative value
    to avoid confusion with valid nucleotides.

    Parameters
    ----------
    seq : str, Bio.SeqRecord or Bio.Seq.Seq
        Sequence represented as string, SeqRecord or Seq.

    Returns
    -------
    list(int)
        Integer array representation of the biological sequence.
    """

    if isinstance(seq, SeqRecord):
        seq = seq.seq
    if isinstance(seq, (str, Seq)):
        if type(seq.alphabet) is type(IUPAC.unambiguous_dna):
            return [NMAP[x.upper()] for x in seq]
        # else proteins should be used
        return [PMAP[x.upper()] for x in seq]
    raise TypeError('seq2ind: Format is not supported')


def sequence_padding(seqs, length):
    """This function truncates or pads the sequences
    to achieve fixed length sequences.

    Padding of the sequence is achieved by appending '.'

    Parameters
    ----------
    seqs : str, Bio.SeqRecord or Bio.Seq.Seq
        Sequence represented as string, SeqRecord or Seq.

    Returns
    -------
    list(Bio.SeqRecord)
        Padded sequence represented as string, SeqRecord or Seq.
    """
    seqs_ = deepcopy(seqs)
    for idx, seq in enumerate(seqs_):
        if len(seq) < length:
            seqs_[idx] += '.' * (length - len(seq))
        else:
            seqs_[idx] = seq[:length]
    return seqs_


def as_onehot(iseq, order, alphabetsize):
    """Converts a index sequence into one-hot representation.

    This method is used to transform a biological sequence
    for a given batch, represented by integer indices,
    into a one-hot representation.

    Parameters
    ----------
    iseq: numpy.array
        Array that holds the indices for a given batch.
        The dimensions of the array correspond to
        `(batch_size, sequence_length + 2*flank - order + 1)`.
    order: int
        Order of the sequence representation. Used for higher-order
        motif modelling.
    alphabetsize : int
        Alphabetsize.

    Returns
    -------
    numpy.array
        One-hot representation of the batch. The dimension
        of the array is given by
        `(batch_size, sequence length, 1, pow(alphabetsize, order))`
    """

    onehot = np.zeros((len(iseq),
                       iseq.shape[1], 1,
                       pow(alphabetsize, order)), dtype='int8')
    for nuc in np.arange(pow(alphabetsize, order)):
        onehot[:, :, 0, nuc][iseq == nuc] = 1

    return onehot


def _complement_index(idx, order):
    rev_idx = np.arange(NNUC)[::-1]
    irc = 0
    for iord in range(order):
        nuc = idx % NNUC
        idx = idx // NNUC
        irc += rev_idx[nuc] * pow(NNUC, order - iord - 1)

    return irc


def complement_permmatrix(order):
    """This function returns a permutation matrix for computing
    the complementary DNA strand one-hot representation for a given order.

    Parameters
    ----------
    order : int
        Order of the one-hot representation

    Returns
    -------
    np.array
        Permutation matrix
    """
    perm = np.zeros((pow(NNUC, order), pow(NNUC, order)))
    for idx in range(pow(NNUC, order)):
        jdx = _complement_index(idx, order)
        perm[jdx, idx] = 1
    return perm


def get_genome_size(refgenome='hg19', outputdir='./', skip_random=True):
    """Get genome size.

    This function loads the genome size for a specified reference genome
    into a dict. The reference genome sizes are obtained from
    UCSC genome browser.

    Parameters
    ----------
    refgenome : str
        Reference genome name. Default: 'hg19'.
    outputdir : str
        Directory in which the downloaded *.chrom.sizes file will be stored.
    skipRandom : True

    Returns
    -------
    dict()
        Dictionary with chromosome names as keys and their respective lengths
        as values.
    """
    if urlcleanup is None:  # pragma: no cover
        raise Exception('\n\nurllib not available. Please install urllib3.\n\n')

    outputfile = os.path.join(outputdir, '{}.chrom.sizes'.format(refgenome))
    if not os.path.exists(outputfile):  # pragma: no cover
        # not part of unit tests, because this requires internet connection
        urlpath = 'http://hgdownload.cse.ucsc.edu/goldenPath/{ref1}/bigZips/{ref2}.chrom.sizes'\
            .format(ref1=refgenome, ref2=refgenome)

        # From the md5sum.txt we extract the
        print("Downloading {}".format(urlpath))
        urlcleanup()
        urlretrieve(urlpath.format(refgenome), outputfile)

    content = pd.read_csv(outputfile, sep='\t', names=['chr', 'length'],
                          index_col='chr')
    if skip_random:
        fltr = [True if len(name.split('_')) <= 1 else False for name in content.index]
        content = content[fltr]
    return content.to_dict()['length']


def _get_genomic_reader(inputregions):
    """regions from a BedTool
    """
    if isinstance(inputregions, BedTool):
        # already in the desired format
        return inputregions

    if isinstance(inputregions, pd.DataFrame):
        if np.any([c not in inputregions.columns \
                   for c in ['chrom', 'start', 'end']]):
            raise ValueError("The dataframe must contain the columns"
                             "'chrom', 'start' and 'end'")
        if 'strand' not in inputregions.columns:
            inputregions['strand'] = '.'

        # transform the pandas dataframe to a list of intervals
        inputregions = [Interval(str(row['chrom']),
                                 int(row['start']),
                                 int(row['end']),
                                 strand=str(row['strand'])) for \
                                 _, row in inputregions.iterrows()]

    # at this point the inputregions are either a filename pointing to a
    # bedfile or a list of intervals. Both of which are understood by BedTool.
    regions_ = BedTool(inputregions)

    return regions_


def _iv_to_str(chrom, start, end):
    """Converts a genomic interval into a string representation 'chr:start-end'."""
    return '{}:{}-{}'.format(chrom, start, end)


def _str_to_iv(givstr, template_extension=0):
    """Converts a string representation 'chr:start-end' into genomic coordinates."""
    sub = givstr.split(':')
    if len(sub) == 1:
        return (sub[0], )

    chr_ = sub[0]

    second_split = sub[1].split('-')
    # if the interval contains negative values, they will be zero-padded later
    if second_split[0] == '':
        # start is a negative value
        second_split = second_split[1:]
        second_split[0] = '-' + second_split[0]
    if second_split[1] == '':
        # if end is a negative value it does not make sense.
        raise ValueError('Invalid genomic location string: {}'.format(givstr))

    start = int(second_split[0])
    end = int(second_split[1])

    return (chr_, start - template_extension, end + template_extension)


def get_genome_size_from_regions(regions):
    """Get genome size.

    This function loads the genome size for a specified reference genome
    into a dict. The reference genome sizes are obtained from
    UCSC genome browser.

    Parameters
    ----------
    regions : str or GenomicIndexer
        Either a path pointing to a BED or GFF file containing genomic regions
        or a GenomicIndexer object.

    Returns
    -------
    dict()
        Dictionary with chromosome names as keys and their respective lengths
        as values.
    """

    regions_ = regions
    if isinstance(regions, str):
        regions_ = _get_genomic_reader(regions)

    gsize = OrderedDict()
    for interval in regions_:
        if interval.chrom not in gsize:
            gsize[interval.chrom] = interval.end
        elif gsize[interval.chrom] < interval.end:
            gsize[interval.chrom] = interval.end
    return gsize


[docs]class ExportJson(object): """Method that dumps the results in a json file. Parameters ---------- filesuffix : str Target file ending. Default: 'json'. annot: None, dict Annotation data. If encoded as dict the key indicates the name, while the values holds a list of annotation labels. Default: None. row_names : None or list List of row names. Default: None. """ def __init__(self, filesuffix='json', annot=None, row_names=None): self.filesuffix = filesuffix self.annot = annot self.row_names = row_names def __call__(self, output_dir, name, results): filesuffix = self.filesuffix annot = self.annot row_names = self.row_names filename = os.path.join(output_dir, name + '.' + filesuffix) content = {} # needed for py27 with open(filename, 'w') as jsonfile: try: content.update({'-'.join(key): results[key]['value'].tolist() for key in results}) except AttributeError: content.update({'-'.join(key): results[key]['value'] for key in results}) if annot is not None: content.update({'annot': annot}) if row_names is not None: content.update({'row_names': row_names}) json.dump(content, jsonfile)
[docs]class ExportTsv(object): """Method that dumps the results as tsv file. This class can be used to export general table summaries. Parameters ---------- filesuffix : str File ending. Default: 'tsv'. annot: None, dict Annotation data. If encoded as dict the key indicates the name, while the values holds a list of annotation labels. For example, this can be used to store the true output labels. Default: None. row_names : None, list List of row names. For example, chromosomal loci. Default: None. """ def __init__(self, filesuffix='tsv', annot=None, row_names=None): self.filesuffix = filesuffix self.annot = annot self.row_names = row_names def __call__(self, output_dir, name, results): filesuffix = self.filesuffix annot = self.annot row_names = self.row_names filename = os.path.join(output_dir, name + '.' + filesuffix) try: # check if the result is iterable iter(results[list(results.keys())[0]]['value']) _rs = {'-'.join(k): results[k]['value'] for k in results} except TypeError: # if the result is not iterable, wrap it up as list _rs = {'-'.join(k): [results[k]['value']] for k in results} _df = pd.DataFrame.from_dict(_rs) for _an in annot or []: _df['annot.' + _an] = annot[_an] if row_names is not None: _df['row_names'] = row_names _df.to_csv(filename, sep='\t', index=False)
[docs]class ExportScorePlot(object): """Exporting score plot. This class can be used for producing an AUC or PRC plot. Parameters ---------- figsize : tuple(int, int) Used to specify the figure size for matplotlib. xlabel : str or None xlabel used for the plot. ylabel : str or None ylabel used for the plot. fform : str or None Output file format. E.g. 'png', 'eps', etc. Default: 'png'. """ def __init__(self, figsize=None, xlabel=None, ylabel=None, fform=None): self.figsize = figsize self.xlabel = xlabel self.ylabel = ylabel self.fform = fform if self.fform is None: self.fform = 'png' def __call__(self, output_dir, name, results): if plt is None: # pragma: no cover raise Exception('matplotlib not available. Please install matplotlib.') if self.figsize is not None: fig = plt.figure(figsize=self.figsize) else: fig = plt.figure() ax_ = fig.add_axes([0.1, 0.1, .55, .5]) ax_.set_title(name) for keys in results: if isinstance(keys, str): k = (keys,) else: k = keys # avg might be returned using a custom function x_score, y_score, auxstr = results[keys]['value'] label = "{}".format('-'.join(k)) if isinstance(auxstr, str): label += ' ' + auxstr ax_.plot(x_score, y_score, label=label) lgd = ax_.legend(bbox_to_anchor=(1.05, 1), loc=2, prop={'size': 10}, ncol=1) if self.xlabel is not None: ax_.set_xlabel(self.xlabel, size=14) if self.ylabel is not None: ax_.set_ylabel(self.ylabel, size=14) filename = os.path.join(output_dir, name + '.' + self.fform) fig.savefig(filename, format=self.fform, dpi=1000, bbox_extra_artists=(lgd,), bbox_inches="tight")
[docs]class ExportBigwig(object): """Export predictions to bigwig. This function exports the predictions to bigwig format which allows you to inspect the predictions in a genome browser. Importantly, gindexer must contain non-overlapping windows! Parameters ---------- gindexer : GenomicIndexer GenomicIndexer that links the prediction for a certain region to its associated genomic coordinates. """ def __init__(self, gindexer): self.gindexer = gindexer def __call__(self, output_dir, name, results): if pyBigWig is None: # pragma: no cover raise Exception('pyBigWig not available. ' '`export_bigwig` requires pyBigWig to be installed.') genomesize = OrderedDict() # extract genome size from gindexer # check also if sorted and non-overlapping for region in self.gindexer: if region.chrom not in genomesize: genomesize[region.chrom] = region.end if genomesize[region.chrom] < region.end: genomesize[region.chrom] = region.end bw_header = [(str(chrom), genomesize[chrom]) for chrom in genomesize] # the last dimension holds the conditions. Each condition # needs to be stored in a separate file for keys in results: if isinstance(keys, str): k = (keys,) else: k = keys bw_file = pyBigWig.open(os.path.join( output_dir, '{prefix}.{key}.bigwig'.format( prefix=name, key='.'.join(k))), 'w') bw_file.addHeader(bw_header) pred = results[keys]['value'] # compute the ratio between binsize and stepsize bsss = max(np.ceil(float(self.gindexer.binsize) / float(self.gindexer.stepsize)), 1.) ppi = int(np.rint(len(pred)/(len(self.gindexer) - 1. + bsss))) # case 1) stepsize >= binsize # then bsss = 1; ppi = len(pred)/len(gindexer) # # case 2) stepsize < binsize # then bsss > 1; ppi = len(pred)/ (len(gindexer) -1 + bsss) resolution = int(region.length / bsss) // ppi for idx, region in enumerate(self.gindexer): val = [float(p) for p in pred[(idx*ppi):((idx+1)*ppi)]] bw_file.addEntries(str(region.chrom), int(region.start), values=val, span=int(resolution), step=int(resolution)) bw_file.close()
[docs]class ExportBed(object): """Export predictions to bed. This function exports the predictions to bed format which allows you to inspect the predictions in a genome browser. Parameters ---------- gindexer : GenomicIndexer GenomicIndexer that links the prediction for a certain region to its associated genomic coordinates. resolution : int Used to output the results. """ def __init__(self, gindexer, resolution): self.gindexer = gindexer self.resolution = resolution def __call__(self, output_dir, name, results): gindexer = self.gindexer resolution = self.resolution # the last dimension holds the conditions. Each condition # needs to be stored in a separate file for keys in results: if isinstance(keys, str): k = (keys,) else: k = keys bed_content = pd.DataFrame(columns=['chr', 'start', 'end', 'name', 'score']) for ridx, region in enumerate(gindexer): pred = results[keys]['value'] nsplit = (region.end-region.start)//resolution starts = list(range(region.start, region.end, resolution)) ends = list(range(region.start + resolution, region.end + resolution, resolution)) cont = {'chr': [region.chrom] * nsplit, 'start': [s for s in starts], 'end': [e for e in ends], 'name': ['.'] * nsplit, 'score': pred[ridx*nsplit:((ridx+1)*nsplit)]} bed_entry = pd.DataFrame(cont) bed_content = bed_content.append(bed_entry, ignore_index=True, sort=False) bed_content.to_csv(os.path.join( output_dir, '{prefix}.{key}.bed'.format( prefix=name, key='.'.join(k))), sep='\t', header=False, index=False, columns=['chr', 'start', 'end', 'name', 'score'])
class ExportClustermap(object): """Create of clustermap of the feature activities. This method utilizes `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_ to illustrate feature activities of the neural net. Parameters ---------- fform : str or None Output file format. E.g. 'png', 'eps', etc. Default: 'png'. figsize : tuple(int, int) or None Used to specify the figure size for matplotlib. annot : None Row annotation is used to create a row color annotation track for the figure. method : str See `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_. Default: 'ward' metric : str See `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_. Default: 'euclidean' z_score : int or None Whether to transform rows or columns to z-scores. See `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_. standard_scale : See `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_. [row/col]_cluster : boolean See `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_. [row/col]_linkage : See `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_. [row/col]_colors : See `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_. mask : See `seaborn.clustermap <https://seaborn.pydata.org/generated/seaborn.clustermap.html>`_. """ def __init__(self, fform=None, figsize=None, # pylint: disable=too-many-locals annot=None, method='ward', metric='euclidean', z_score=None, standard_scale=None, row_cluster=True, col_cluster=True, row_linkage=None, col_linkage=None, row_colors=None, col_colors=None, mask=None): self.fform = fform self.figsize = figsize self.annot = annot self.method = method self.metric = metric self.z_score = z_score self.standard_scale = standard_scale self.row_cluster = row_cluster self.col_cluster = col_cluster self.row_linkage = row_linkage self.col_linkage = col_linkage self.row_colors = row_colors self.col_colors = col_colors self.mask = mask def __call__(self, output_dir, name, results): if sns is None: # pragma: no cover raise Exception('seaborn not available. Please install seaborn.') annot = self.annot if annot is not None: firstkey = list(annot.keys())[0] pal = sns.color_palette('hls', len(set(annot[firstkey]))) lut = dict(zip(set(annot[firstkey]), pal)) self.row_colors = [lut[k] for k in annot[firstkey]] _rs = {k: results[k]['value'] for k in results} data = pd.DataFrame.from_dict(_rs) fform = self.fform if fform is not None: fform = fform else: fform = 'png' sns.clustermap(data, method=self.method, metric=self.metric, z_score=self.z_score, standard_scale=self.standard_scale, row_cluster=self.row_cluster, col_cluster=self.col_cluster, row_linkage=self.row_linkage, col_linkage=self.col_linkage, col_colors=self.col_colors, row_colors=self.row_colors, mask=self.mask, figsize=self.figsize).savefig( os.path.join(output_dir, name + '.' + fform), format=fform, dpi=700) class ExportTsne(object): """Create a plot of the 2D t-SNE embedding of the feature activities. Parameters ---------- fform : str or None Output file format. E.g. 'png', 'eps', etc. Default: 'png'. figsize : tuple(int, int) or None Used to specify the figure size for matplotlib. cmap : None or colormap Optional argument for matplotlib.pyplot.scatter. Only used if annot is abscent. Otherwise, marker colors are derived from the annotation. colors : None Optional argument for matplotlib.pyplot.scatter. Only used if annot is abscent. Otherwise, marker colors are derived from the annotation. norm : Optional argument for matplotlib.pyplot.scatter. alpha : None or float Opacity used for scatter plot markers. annot : None or dict. If annotation data is available, the color of the markers is automatically derived for the annotation. """ def __init__(self, figsize=None, cmap=None, colors=None, norm=None, alpha=None, fform=None, annot=None): self.figsize = figsize self.cmap = cmap self.colors = colors self.norm = norm self.alpha = alpha self.fform = fform self.annot = annot def __call__(self, output_dir, name, results): figsize = self.figsize annot = self.annot if TSNE is None: # pragma: no cover raise Exception('scikit-learn not available. ' 'Please install scikit-learn to be able to use export_tsne.') if plt is None: # pragma: no cover raise Exception('matplotlib not available. ' 'Please install matplotlib to be able to use export_tsne.') _rs = {k: results[k]['value'] for k in results} data = pd.DataFrame.from_dict(_rs) tsne = TSNE() embedding = tsne.fit_transform(data.values) if self.figsize is not None: fig = plt.figure(figsize=figsize) else: fig = plt.figure() if self.annot is not None: firstkey = list(annot.keys())[0] pal = sns.color_palette('hls', len(set(annot[firstkey]))) lut = dict(zip(set(annot[firstkey]), pal)) for label in lut: plt.scatter(x=embedding[np.asarray(annot[firstkey]) == label, 0], y=embedding[np.asarray(annot[firstkey]) == label, 1], c=lut[label], label=label, norm=self.norm, alpha=self.alpha) plt.legend() else: plt.scatter(x=embedding[:, 0], y=embedding[:, 1], c=self.colors, cmap=self.cmap, norm=self.norm, alpha=self.alpha) plt.axis('off') fform = self.fform if fform is not None: fform = fform else: fform = 'png' fig.savefig(os.path.join(output_dir, name + '.' + fform), format=fform, dpi=700) def trim_bed(inputbed, outputbed, divby): """Trims starts and ends of intervals. This function takes a BED file and produces a BED file with starts and ends being trimmed such that they are divisible by divby. Parameters ---------- inputbed : str Input BED file. outputbed : str Trimmed output BED file. divby : int Factor to trim the starts and ends with. """ with open(outputbed, 'w') as bed: regions = _get_genomic_reader(inputbed) for region in regions: start = int(np.ceil(region.start / divby)) * divby end = (region.end // divby) * divby bed.write('{chrom}\t{start}\t{end}\t{name}\t{score}\t{strand}\n' .format(chrom=region.chrom, start=start, end=end, name=region.name, score=region.score if region.score is not None else 0, strand=region.strand))