"""Array dataset"""
import copy
import numpy as np
from janggu.data.data import Dataset
class Wrapper(Dataset):
def __init__(self, data, *args):
super(Wrapper, self).__init__(data.name)
self.data = copy.copy(data)
self.args = args
def __repr__(self):
return "{}({})".format(self.__class__.__name__, self.data)
def __len__(self):
return len(self.data)
@property
def conditions(self):
"""conditions"""
return self.data.conditions if hasattr(self.data, "conditions") else None
@property
def shape(self):
"""shape"""
return self.data.shape
@property
def ndim(self):
return len(self.shape)
def __copy__(self):
ar = tuple([copy.copy(e) for e in (self.data,)+self.args])
obj = self.__class__(*ar)
return obj
@property
def gindexer(self): # pragma: no cover
"""gindexer"""
if hasattr(self.data, 'gindexer'):
return self.data.gindexer
raise ValueError('No gindexer available.')
@gindexer.setter
def gindexer(self, gindexer): # pragma: no cover
if hasattr(self.data, 'gindexer'):
self.data.gindexer = gindexer
return
raise ValueError('No gindexer available.')
@property
def garray(self): # pragma: no cover
"""gindexer"""
if hasattr(self.data, 'garray'):
return self.data.garray
raise ValueError('No garray available.')
[docs]class Array(Dataset):
"""Array class.
This datastructure wraps arbitrary numpy.arrays for a
deep learning application with Janggu.
The main difference to an ordinary numpy.array is that
Array has a name attribute.
Parameters
-----------
name : str
Name of the dataset
array : :class:`numpy.array`
Numpy array.
conditions : list(str) or None
Conditions or label names of the dataset.
"""
def __init__(self, name, array, conditions=None):
if conditions is not None:
assert array.shape[-1] == len(conditions), \
"array shape and condition number does not agree: {} != {}" \
.format(array.shape[-1], len(conditions))
self.data = copy.copy(array)
self.conditions = conditions
Dataset.__init__(self, '{}'.format(name))
def __repr__(self): # pragma: no cover
return 'Array("{}", <np.array>)'.format(self.name)
def __len__(self):
return len(self.data)
def __getitem__(self, idxs):
data = self.data[idxs]
return data
@property
def shape(self):
"""Shape of the dataset"""
if len(self.data.shape) == 1:
return self.data.shape + (1,)
return self.data.shape
@property
def ndim(self): # pragma: no cover
"ndim"
return len(self.shape)
def __copy__(self):
obj = type(self)(self.name, self.data, self.conditions)
obj.__dict__.update(self.__dict__)
return obj
[docs]class ReduceDim(Wrapper):
"""ReduceDim class.
This class wraps an 4D coverage object and reduces
the middle two dimensions by applying the aggregate function.
Therefore, it transforms the 4D object into a table-like 2D representation
Example
-------
.. code-block:: python
# given some dataset, e.g. a Cover object
# originally, the cover object is a 4D-object.
cover.shape
cover = ReduceDim(cover, aggregator='mean')
cover.shape
# Afterwards, the cover object is 2D, where the second and
# third dimension have been averaged out.
Parameters
-----------
array : Dataset
Dataset
aggregator : str or callable
Aggregator used for reducing the intermediate dimensions.
Available aggregators are 'sum', 'mean', 'max' for performing
summation, averaging or obtaining the maximum value.
It is also possible to supply a callable directly that performs the
operation.
Default: 'sum'
axis : None or tuple(ints)
Dimensions over which to perform aggregation. Default: None
aggregates with :code:`axis=(1, 2)`
"""
def __init__(self, array, aggregator=None, axis=None):
super(ReduceDim, self).__init__(array, *(aggregator, axis))
if aggregator is None:
aggregator = 'sum'
def _get_aggregator(name):
if callable(name):
return name
if name == 'sum':
return np.sum
elif name == 'mean':
return np.mean
elif name == 'max':
return np.max
raise ValueError('ReduceDim aggregator="{}" not known.'.format(name) +
'Must be "sum", "mean" or "max" or a callable.')
self.aggregator = _get_aggregator(aggregator)
self.axis = axis if axis is not None else (1, 2)
#Dataset.__init__(self, array.name)
def __getitem__(self, idxs):
if isinstance(idxs, int):
idxs = slice(idxs, idxs + 1)
data = self.aggregator(self.data[idxs], axis=self.axis)
return data
@property
def shape(self):
"""Shape of the dataset"""
shape = (self.data.shape[0],)
for idx in range(1, self.data.ndim):
if idx in self.axis:
continue
shape += (self.data.shape[idx],)
return shape
[docs]class SqueezeDim(Wrapper):
"""SqueezeDim class.
This class wraps an 4D coverage object and reduces
the middle two dimensions by applying the aggregate function.
Therefore, it transforms the 4D object into a table-like 2D representation
Parameters
-----------
array : Dataset
Dataset
axis : None or tuple(ints)
Dimensions over which to perform aggregation. Default: None
aggregates with :code:`axis=(1, 2)`
"""
def __init__(self, array, axis=None):
super(SqueezeDim, self).__init__(array, *(axis,))
self.axis = axis
def __getitem__(self, idxs):
if isinstance(idxs, int):
idxs = slice(idxs, idxs + 1)
data = np.squeeze(self.data[idxs], axis=self.axis)
return data
@property
def shape(self):
"""Shape of the dataset"""
shape = tuple()
for idx in range(self.data.ndim):
if self.data.shape[idx] == 1:
if self.axis is None or idx in self.axis:
continue
shape += (self.data.shape[idx],)
return shape
[docs]class Transpose(Wrapper):
"""Transpose class.
This class can be used to shuffle the dimensions.
For example, if the channel is expected to be at a specific location.
Parameters
-----------
array : Dataset
Dataset
axis : tuple(ints)
Order to the dimensions.
"""
def __init__(self, array, axis):
super(Transpose, self).__init__(array, *(axis,))
self.axis = axis
def __getitem__(self, idxs):
if isinstance(idxs, int):
idxs = slice(idxs, idxs + 1)
data = np.transpose(self.data[idxs], self.axis)
return data
@property
def shape(self):
"""Shape of the dataset"""
shape = tuple()
for idx in self.axis:
shape += (self.data.shape[idx],)
return shape
[docs]class NanToNumConverter(Wrapper):
"""NanToNumConverter class.
This wrapper dataset converts NAN's in the dataset to
zeros.
Example
-------
.. code-block:: python
# given some dataset, e.g. a Cover object
cover
cover = NanToNumConverter(cover)
# now all remaining NaNs will be converted to zeros.
Parameters
-----------
array : Dataset
Dataset
"""
def __init__(self, array):
super(NanToNumConverter, self).__init__(array, *())
def __getitem__(self, idxs):
if isinstance(idxs, int):
idxs = slice(idxs, idxs + 1)
data = np.nan_to_num(self.data[idxs])
return data
# Wrappers for data augmentation
[docs]class RandomSignalScale(Wrapper):
"""RandomSignalScale class.
This wrapper performs
performs random uniform scaling of the original input.
For example, this can be used to randomly change the peak or signal
heights during training.
Parameters
-----------
array : Dataset
Dataset object
deviance : float
The signal is rescaled using (1 + uniform(-deviance, deviance)) x original signal.
"""
def __init__(self, array, deviance):
super(RandomSignalScale, self).__init__(array, *(deviance,))
self.deviance = deviance
def __getitem__(self, idxs):
if isinstance(idxs, int):
idxs = slice(idxs, idxs + 1)
data = self.data[idxs]
scales = np.random.rand(data.shape[0], data.shape[-1])
scales = 1 - (scales - self.deviance) / (2*self.deviance)
data = data * scales[:, None, None, :]
return data
[docs]class RandomOrientation(Wrapper):
"""RandomOrientation class.
This wrapper randomly inverts the directionality of
the signal tracks.
For example a signal track is randomely presented in 5' to 3' and 3' to 5'
orientation. Furthermore, if the dataset is stranded, the strand is switched
as well.
Parameters
-----------
array : Dataset
Dataset object must be 4D.
"""
def __init__(self, array):
super(RandomOrientation, self).__init__(array, *())
def __getitem__(self, idxs):
if isinstance(idxs, int):
data = self.data[slice(idxs, idxs+1)]
else:
data = self.data[idxs]
for i, _ in enumerate(data):
if np.random.randint(2, size=1):
data[i] = data[i, ::-1, ::-1, :]
return data
[docs]class RandomShift(Wrapper):
"""Randomshift class.
This wrapper randomly shifts the input sequence by a random number of
up to 'shift' bases in either direction. Meant for use with BioSeq.
This form of data-augmentation has been shown to reduce overfitting
in a number of settings.
The sequence is zero-padded in order to remain the same length.
When 'batchwise' is set to True it will shift all the sequences retrieved
by a single call to __getitem__ by the same amount (useful for
computationally efficient batching).
Parameters
-----------
array : Dataset
Dataset object must be 4D.
"""
def __init__(self, array, shift, batchwise=False):
super(RandomShift, self).__init__(array, *(shift, batchwise))
self.batchwise = batchwise
self.shift = shift
def __getitem__(self, idxs):
if isinstance(idxs, int):
data = self.data[slice(idxs, idxs+1)]
else:
data = self.data[idxs]
if self.batchwise:
rshift = np.random.randint(-self.shift, self.shift)
if rshift < 0:
data[:, :rshift, :, :] = data[:, -rshift:, :, :]
data[:, rshift:, :, :] = 0.
elif rshift > 0:
data[:, rshift:, :, :] = data[:, :-rshift, :, :]
data[:, :rshift, :, :] = 0.
else:
for i, _ in enumerate(data):
rshift = np.random.randint(-self.shift, self.shift)
if rshift < 0:
data[i, :rshift, :, :] = data[i, -rshift:, :, :]
data[i, rshift:, :, :] = 0.
elif rshift > 0:
data[i, rshift:, :, :] = data[i, :-rshift, :, :]
data[i, :rshift, :, :] = 0.
return data