"""GenomicIndexer module"""
import numpy as np
from pybedtools import Interval
from sklearn.utils import check_random_state
from janggu.utils import _get_genomic_reader
[docs]class GenomicIndexer(object): # pylint: disable=too-many-instance-attributes
"""GenomicIndexer maps a set of integer indices to respective
genomic intervals.
The genomic intervals can be directly used to obtain data from a genomic
array.
"""
_stepsize = None
_binsize = None
_flank = None
zero_padding = None
collapse = None
_randomidx = None
chrs = None
starts = None
strand = None
ends = None
_random_state = None
@property
def randomidx(self):
"""randomidx property"""
if self.random_state is not None and self._randomidx is None:
self._randomidx = check_random_state(self.random_state).permutation(len(self))
return self._randomidx
@property
def random_state(self):
"""random_state property"""
return self._random_state
@random_state.setter
def random_state(self, value):
self._randomidx = None
self._random_state = value
[docs] @classmethod
def create_from_file(cls, regions, # pylint: disable=too-many-locals
binsize, stepsize, flank=0,
zero_padding=True, collapse=False,
random_state=None):
"""Creates a GenomicIndexer object.
This method constructs a GenomicIndexer from
a given BED or GFF file.
Parameters
----------
regions : str or list(Interval)
Path to a BED or GFF file.
binsize : int or None
Binsize in base pairs. If None, the binsize is obtained from
the interval lengths in the bed file, which requires intervals
to be of equal length.
stepsize : int or None
Stepsize in base pairs. If stepsize is None,
stepsize is set to equal to binsize.
flank : int
flank size in bp to be attached to
both ends of a region. Default: 0.
zero_padding : boolean
zero_padding indicate if variable sequence
lengths are used in conjunction with zero-padding.
If zero_padding is True, a binsize must be specified.
Default: True.
collapse : boolean
collapse indicates that the genomic interval will be represented by a
scalar summary value. For example, the gene expression value in TPM.
In this case, zero_padding does not have an effect. Intervals
may be of fixed or variable lengths.
Default: False.
random_state : None or int
random_state for shuffling intervals. Default: None
"""
regions_ = _get_genomic_reader(regions)
if binsize is None and not collapse:
binsize_ = None
# binsize will be inferred from bed file
# the maximum interval length will be used
for reg in regions_:
if binsize_ is None:
binsize_ = reg.length
if binsize_ < reg.length:
binsize_ = reg.length
binsize = binsize_
if stepsize is None:
if binsize is None:
stepsize = 1
else:
stepsize = binsize
gind = cls(binsize, stepsize, flank,
zero_padding=zero_padding,
collapse=collapse, random_state=random_state)
for reg in regions_:
gind.add_interval(
str(reg.chrom),
int(reg.start), int(reg.end), str(reg.strand))
return gind
@classmethod
def create_from_genomesize(cls, gsize):
"""Creates a GenomicIndexer object.
This method constructs a GenomicIndexer from
a given BED or GFF file.
Parameters
----------
gsize : dict
Dictionary with keys and values representing chromosome names
and lengths, respectively.
"""
gind = cls(None, None, 0, zero_padding=False, collapse=False)
for chrom in gsize:
gind.add_interval(chrom, 0, gsize[chrom], '.')
return gind
@classmethod
def create_from_region(cls, chrom, start, end, strand,
binsize, stepsize, flank=0,
zero_padding=True):
"""Creates a GenomicIndexer object.
This method constructs a GenomicIndexer from
a given genomic interval.
Parameters
----------
chrom : str
Chromosome name.
start : int
Interval start
end : int
Interval end
binsize : int or None
Binsize in base pairs. If None, the binsize is obtained from
the interval lengths in the bed file, which requires intervals
to be of equal length.
stepsize : int or None
Stepsize in base pairs. If stepsize is None,
stepsize is set to equal to binsize.
flank : int
flank size in bp to be attached to
both ends of a region. Default: 0.
zero_padding : boolean
zero_padding indicate if variable sequence
lengths are used in conjunction with zero-padding.
If zero_padding is True, a binsize must be specified.
Default: True.
"""
if binsize is None:
binsize = end - start
if stepsize is None:
stepsize = binsize
gind = cls(binsize, stepsize, flank, zero_padding=zero_padding)
gind.add_interval(chrom, start, end, strand)
return gind
def add_interval(self, chrom, start, end, strand):
"""Adds an interal to a GenomicIndexer object.
This method adds another interal to a GenomicIndexer.
Parameters
----------
chrom : str
Chromosome name.
start : int
Interval start
end : int
Interval end
"""
binsize = self.binsize
stepsize = self.stepsize
zero_padding = self.zero_padding
if binsize is None:
binsize = end - start
if stepsize is None:
stepsize = binsize
if stepsize <= binsize:
val = (end - start - binsize + stepsize)
else:
val = (end - start)
reglen = val // stepsize
chrs = [chrom] * reglen
starts = [x for x in range(start, start+(stepsize*reglen), stepsize)]
ends = [x + binsize for x in starts]
strands = [strand] * reglen
# if there is a variable length fragment at the end,
# we record the remaining fragment length
if zero_padding and val % stepsize > 0:
chrs += [chrom]
starts += [start+(stepsize*reglen)]
ends += [end]
strands += [strand]
self.chrs += chrs
self.starts += starts
self.strand += strands
self.ends += ends
return self
def add_gindexer(self, othergindexer):
"""Adds intervals from another GenomicIndexer object.
Parameters
----------
othergindexer : GenomicIndexer
GenomicIndexer object.
"""
# add_interval ensures that the intervals from the other indexer
# are adapted according to the binsize and stepsize used in self
for region in othergindexer:
self.add_interval(region.chrom, region.start, region.end, region.strand)
return self
def __init__(self, binsize, stepsize, flank=0, zero_padding=True,
collapse=False, random_state=None):
self.binsize = binsize
self.stepsize = stepsize
self.flank = flank
self.chrs = []
self.starts = []
self.strand = []
self.ends = []
self.zero_padding = zero_padding
self.collapse = collapse
self.random_state = random_state
def __len__(self):
return len(self.chrs)
def __repr__(self): # pragma: no cover
return "GenomicIndexer(<regions>, " \
+ "binsize={}, stepsize={}, flank={})".format(self.binsize,
self.stepsize,
self.flank)
def __getitem__(self, index_):
if isinstance(index_, int):
if self.randomidx is not None:
index = self.randomidx[index_]
else:
index = index_
start = self.starts[index]
end = self.ends[index]
if end == start:
end += 1
return Interval(self.chrs[index], max(0, start - self.flank),
end + self.flank,
strand=self.strand[index])
raise IndexError('Cannot interpret index: {}'.format(type(index_)))
@property
def binsize(self):
"""binsize of the intervals"""
return self._binsize
@binsize.setter
def binsize(self, value):
if value is not None and value <= 0:
raise ValueError('binsize>0 required')
self._binsize = value
@property
def stepsize(self):
"""stepsize (step size)"""
return self._stepsize
@stepsize.setter
def stepsize(self, value):
if value is not None and value <= 0:
raise ValueError('stepsize>0 required')
self._stepsize = value
@property
def flank(self):
"""Flanking bins"""
return self._flank
@flank.setter
def flank(self, value):
if not isinstance(value, int) or value < 0:
raise ValueError('flank>=0 required')
self._flank = value
def tostr(self):
"""Returns representing the region."""
return ['{}:{}-{}'.format(iv.chrom, iv.start, iv.end) for iv in self]
def idx_by_region(self, include=None, exclude=None, start=None, end=None):
"""idx_by_region filters for chromosome and region ids.
It takes a list of chromosome ids which should be
included or excluded, the start and the end of
a required interval as integers and returns a new GenomicIndexer
associated with the compatible intervals after filtering.
Parameters
----------
include : list(str) or None
List of chromosome names to be included. Default: None means
all chromosomes are included.
exclude : list(str)
List of chromosome names to be excluded. Default: None.
start : int or None
The start of the required interval.
end : int or None
The end of the required interval.
Returns
-------
idxs
Containing a list of filtered regions indexes.
"""
if isinstance(include, str):
include = [include]
if isinstance(exclude, str):
exclude = [exclude]
if not include:
idxs = set(range(len(self)))
else:
idxs = set()
for inc in include:
idxs.update(np.where(np.asarray(self.chrs) == inc)[0])
if exclude:
for exc in exclude:
idxs = idxs.difference(
np.where(np.asarray(self.chrs) == exc)[0])
if start is not None:
regionmatch = ((np.array(self.ends) + self.flank) > start)
indexmatch = np.where(regionmatch)[0]
idxs = idxs.intersection(indexmatch)
if end is not None:
regionmatch = ((np.array(self.starts) - self.flank) < end)
indexmatch = np.where(regionmatch)[0]
idxs = idxs.intersection(indexmatch)
idxs = list(idxs)
idxs.sort()
return idxs
def filter_by_region(self, include=None, exclude=None, start=None, end=None):
"""filter_by_region filters for chromosome and region ids.
It takes a list of chromosome ids which should be
included or excluded, the start and the end of
a required interval as integers and returns a new GenomicIndexer
associated with the compatible intervals after filtering.
Parameters
----------
include : list(str) or str
List of chromosome names to be included. Default: [] means
all chromosomes are included.
exclude : list(str) or str
List of chromosome names to be excluded. Default: [].
start : int
The start of the required interval.
end : int
The end of the required interval.
Returns
-------
GenomicIndexer
Containing the filtered regions.
"""
idxs = self.idx_by_region(include=include, exclude=exclude,
start=start, end=end)
new_gindexer = GenomicIndexer(self.binsize,
self.stepsize,
self.flank,
zero_padding=self.zero_padding,
collapse=self.collapse,
random_state=self.random_state)
new_gindexer.chrs = [self.chrs[i] for i in idxs]
new_gindexer.starts = [self.starts[i] for i in idxs]
new_gindexer.strand = [self.strand[i] for i in idxs]
new_gindexer.ends = [self.ends[i] for i in idxs]
return new_gindexer
def export_to_bed(self, filename):
"""Export the intervals to bed format.
Parameters
----------
filename : str
Bed file name
"""
with open(filename, 'w') as handle:
for i in range(len(self)):
handle.write('{chrom}\t{start}\t{end}\t{name}\t{score}\t{strand}\n'.format(
chrom=self.chrs[i],
start=max(0, self.starts[i] - self.flank),
end=self.ends[i] + self.flank,
name='-',
score='-',
strand=self.strand[i]))
def check_gindexer_compatibility(gindexer, resolution, store_whole_genome):
"""Sanity check for gindexer.
This function tests if the gindexer is compatible with
other properties of the dataset, including the resolution and
the store_whole_genome argument
A ValueError is thrown if the gindexer is not valid.
"""
if resolution is not None and resolution > 1 and store_whole_genome:
# check resolution compatible
if gindexer is not None and (gindexer.binsize % resolution) > 0:
raise ValueError(
'binsize must be an integer-multipe of resolution. '
'Got binsize={} and resolution={}'.format(gindexer.binsize, resolution))
for iv_ in gindexer or []:
if (iv_.start % resolution) > 0:
raise ValueError(
'Please ensure that all interval starts line up '
'with the resolution-sized bins. '
'This is necessary to prevent rounding issues. '
'Interval ({}:{}-{}) not compatible with resolution {}. '.format(
iv_.chrom, iv_.start, iv_.end, resolution) +\
'Consider using '
'"janggu-trim <input_roi> <trun_output> -divisible_by {resolution}"'
.format(resolution=resolution))
if not store_whole_genome and gindexer is None:
raise ValueError('Either specify roi or store_whole_genome=True')