Source code for cooler.api

import os

import h5py
import numpy as np
import pandas as pd
import simplejson as json
from pandas.api.types import is_integer_dtype

from .core import (
    CSRReader,
    DirectRangeQuery2D,
    FillLowerRangeQuery2D,
    RangeSelector1D,
    RangeSelector2D,
    get,
    region_to_extent,
    region_to_offset,
)
from .fileops import list_coolers
from .util import closing_hdf5, open_hdf5, parse_cooler_uri, parse_region

__all__ = ["Cooler", "annotate"]


# The 4DN data portal and hic2cool store these weight vectors in divisive form
_4DN_DIVISIVE_WEIGHTS = {"KR", "VC", "VC_SQRT"}


[docs]class Cooler: """ A convenient interface to a cooler data collection. Parameters ---------- store : str, :py:class:`h5py.File` or :py:class:`h5py.Group` Path to a cooler file, URI string, or open handle to the root HDF5 group of a cooler data collection. root : str, optional [deprecated] HDF5 Group path to root of cooler group if ``store`` is a file. This option is deprecated. Instead, use a URI string of the form :file:`<file_path>::<group_path>`. kwargs : optional Options to be passed to :py:class:`h5py.File()` upon every access. By default, the file is opened with the default driver and mode='r'. Notes ----- If ``store`` is a file path, the file will be opened temporarily in when performing operations. This allows :py:class:`Cooler` objects to be serialized for multiprocess and distributed computations. Metadata is accessible as a dictionary through the :py:attr:`info` property. Table selectors, created using :py:meth:`chroms`, :py:meth:`bins`, and :py:meth:`pixels`, perform range queries over table rows, returning :py:class:`pd.DataFrame` and :py:class:`pd.Series`. A matrix selector, created using :py:meth:`matrix`, performs 2D matrix range queries, returning :py:class:`numpy.ndarray` or :py:class:`scipy.sparse.coo_matrix`. """ def __init__(self, store, root=None, **kwargs): if isinstance(store, str): if root is None: self.filename, self.root = parse_cooler_uri(store) elif h5py.is_hdf5(store): with open_hdf5(store, **kwargs) as h5: self.filename = h5.file.filename self.root = root else: raise ValueError("Not a valid path to a Cooler file") self.uri = self.filename + "::" + self.root self.store = self.filename self.open_kws = kwargs else: # Assume an open HDF5 handle, ignore open_kws self.filename = store.file.filename self.root = store.name self.uri = self.filename + "::" + self.root self.store = store.file self.open_kws = {} self._refresh() def _refresh(self): try: with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] _ct = chroms(grp) _ct["name"] = _ct["name"].astype(object) self._chromsizes = _ct.set_index("name")["length"] self._chromids = dict(zip(_ct["name"], range(len(_ct)))) self._info = info(grp) mode = self._info.get("storage-mode", "symmetric-upper") self._is_symm_upper = mode == "symmetric-upper" except KeyError: err_msg = f"No cooler found at: {self.store}." listing = list_coolers(self.store) if len(listing): err_msg += ( f" Coolers found in {listing}. " + "Use '::' to specify a group path" ) raise KeyError(err_msg) from None def _load_dset(self, path): with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return grp[path][:] def _load_attrs(self, path): with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return dict(grp[path].attrs)
[docs] def open(self, mode="r", **kwargs): """ Open the HDF5 group containing the Cooler with :py:mod:`h5py` Functions as a context manager. Any ``open_kws`` passed during construction are ignored. Parameters ---------- mode : str, optional [default: 'r'] * ``'r'`` (readonly) * ``'r+'`` or ``'a'`` (read/write) Notes ----- For other parameters, see :py:class:`h5py.File`. """ grp = h5py.File(self.filename, mode, **kwargs)[self.root] return closing_hdf5(grp)
@property def storage_mode(self): """Indicates whether ordinary sparse matrix encoding is used (``"square"``) or whether a symmetric matrix is encoded by storing only the upper triangular elements (``"symmetric-upper"``). """ return self._info.get("storage-mode", "symmetric-upper") @property def binsize(self): """ Resolution in base pairs if uniform else None """ return self._info["bin-size"] @property def chromsizes(self): """ Ordered mapping of reference sequences to their lengths in bp """ return self._chromsizes @property def chromnames(self): """ List of reference sequence names """ return list(self._chromsizes.index)
[docs] def offset(self, region): """ Bin ID containing the left end of a genomic region Parameters ---------- region : str or tuple Genomic range Returns ------- int Examples -------- >>> c.offset('chr3') # doctest: +SKIP 1311 """ with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return region_to_offset( grp, self._chromids, parse_region(region, self._chromsizes), self.binsize )
[docs] def extent(self, region): """ Bin IDs containing the left and right ends of a genomic region Parameters ---------- region : str or tuple Genomic range Returns ------- 2-tuple of ints Examples -------- >>> c.extent('chr3') # doctest: +SKIP (1311, 2131) """ with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return region_to_extent( grp, self._chromids, parse_region(region, self._chromsizes), self.binsize )
@property def info(self): """ File information and metadata Returns ------- dict """ with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return info(grp) @property def shape(self): return (self._info["nbins"],) * 2
[docs] def chroms(self, **kwargs): """ Chromosome table selector Returns ------- Table selector """ def _slice(fields, lo, hi): with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return chroms(grp, lo, hi, fields, **kwargs) return RangeSelector1D(None, _slice, None, self._info["nchroms"])
[docs] def bins(self, **kwargs): """ Bin table selector Returns ------- Table selector """ def _slice(fields, lo, hi): with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return bins(grp, lo, hi, fields, **kwargs) def _fetch(region): with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return region_to_extent( grp, self._chromids, parse_region(region, self._chromsizes), self.binsize, ) return RangeSelector1D(None, _slice, _fetch, self._info["nbins"])
[docs] def pixels(self, join=False, **kwargs): """ Pixel table selector Parameters ---------- join : bool, optional Whether to expand bin ID columns into chrom, start, and end columns. Default is ``False``. Returns ------- Table selector """ def _slice(fields, lo, hi): with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return pixels(grp, lo, hi, fields, join, **kwargs) def _fetch(region): with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] i0, i1 = region_to_extent( grp, self._chromids, parse_region(region, self._chromsizes), self.binsize, ) lo = grp["indexes"]["bin1_offset"][i0] hi = grp["indexes"]["bin1_offset"][i1] return lo, hi return RangeSelector1D(None, _slice, _fetch, self._info["nnz"])
[docs] def matrix( self, field=None, balance=True, sparse=False, as_pixels=False, join=False, ignore_index=True, divisive_weights=None, chunksize=10000000, ): """ Contact matrix selector Parameters ---------- field : str, optional Which column of the pixel table to fill the matrix with. By default, the 'count' column is used. balance : bool, optional Whether to apply pre-calculated matrix balancing weights to the selection. Default is True and uses a column named 'weight'. Alternatively, pass the name of the bin table column containing the desired balancing weights. Set to False to return untransformed counts. sparse: bool, optional Return a scipy.sparse.coo_matrix instead of a dense 2D numpy array. as_pixels: bool, optional Return a DataFrame of the corresponding rows from the pixel table instead of a rectangular sparse matrix. False by default. join : bool, optional If requesting pixels, specifies whether to expand the bin ID columns into (chrom, start, end). Has no effect when requesting a rectangular matrix. Default is True. ignore_index : bool, optional If requesting pixels, don't populate the index column with the pixel IDs to improve performance. Default is True. divisive_weights : bool, optional Force balancing weights to be interpreted as divisive (True) or multiplicative (False). Weights are always assumed to be multiplicative by default unless named KR, VC or SQRT_VC, in which case they are assumed to be divisive by default. Returns ------- Matrix selector Notes ----- If ``as_pixels=True``, only data explicitly stored in the pixel table will be returned: if the cooler's storage mode is symmetric-upper, lower triangular elements will not be generated. If ``as_pixels=False``, those missing non-zero elements will automatically be filled in. """ if balance in _4DN_DIVISIVE_WEIGHTS and divisive_weights is None: divisive_weights = True def _slice(field, i0, i1, j0, j1): with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] return matrix( grp, i0, i1, j0, j1, field, balance, sparse, as_pixels, join, ignore_index, divisive_weights, chunksize, self._is_symm_upper, ) def _fetch(region, region2=None): with open_hdf5(self.store, **self.open_kws) as h5: grp = h5[self.root] if region2 is None: region2 = region region1 = parse_region(region, self._chromsizes) region2 = parse_region(region2, self._chromsizes) i0, i1 = region_to_extent( grp, self._chromids, region1, self.binsize ) j0, j1 = region_to_extent( grp, self._chromids, region2, self.binsize ) return i0, i1, j0, j1 return RangeSelector2D(field, _slice, _fetch, (self._info["nbins"],) * 2)
def __repr__(self): if isinstance(self.store, str): filename = os.path.basename(self.store) container = f"{filename}::{self.root}" else: container = repr(self.store) return f'<Cooler "{container}">'
def info(h5): """ File and user metadata dict. Parameters ---------- h5 : :py:class:`h5py.File` or :py:class:`h5py.Group` Open handle to cooler file. Returns ------- dict """ d = {} for k, v in h5.attrs.items(): if isinstance(v, str): try: v = json.loads(v) except ValueError: pass d[k] = v return d def chroms(h5, lo=0, hi=None, fields=None, **kwargs): """ Table describing the chromosomes/scaffolds/contigs used. They appear in the same order they occur in the heatmap. Parameters ---------- h5 : :py:class:`h5py.File` or :py:class:`h5py.Group` Open handle to cooler file. lo, hi : int, optional Range of rows to select from the table. fields : sequence of str, optional Subset of columns to select from table. Returns ------- :py:class:`DataFrame` """ if fields is None: fields = ( pd.Index(["name", "length"]) .append(pd.Index(h5["chroms"].keys())) .drop_duplicates() ) return get(h5["chroms"], lo, hi, fields, **kwargs) def bins(h5, lo=0, hi=None, fields=None, **kwargs): """ Table describing the genomic bins that make up the axes of the heatmap. Parameters ---------- h5 : :py:class:`h5py.File` or :py:class:`h5py.Group` Open handle to cooler file. lo, hi : int, optional Range of rows to select from the table. fields : sequence of str, optional Subset of columns to select from table. Returns ------- :py:class:`DataFrame` """ if fields is None: fields = ( pd.Index(["chrom", "start", "end"]) .append(pd.Index(h5["bins"].keys())) .drop_duplicates() ) # If convert_enum is not explicitly set to False, chrom IDs will get # converted to categorical chromosome names, provided the ENUM header # exists in bins/chrom. Otherwise, they will return as integers. out = get(h5["bins"], lo, hi, fields, **kwargs) # Handle the case where the ENUM header doesn't exist but we want to # convert integer chrom IDs to categorical chromosome names. if "chrom" in fields: convert_enum = kwargs.get("convert_enum", True) if isinstance(fields, str): chrom_col = out else: chrom_col = out["chrom"] if is_integer_dtype(chrom_col.dtype) and convert_enum: chromnames = chroms(h5, fields="name") chrom_col = pd.Categorical.from_codes(chrom_col, chromnames, ordered=True) if isinstance(fields, str): out = pd.Series(chrom_col, out.index) else: out["chrom"] = chrom_col return out def pixels(h5, lo=0, hi=None, fields=None, join=True, **kwargs): """ Table describing the nonzero upper triangular pixels of the Hi-C contact heatmap. Parameters ---------- h5 : :py:class:`h5py.File` or :py:class:`h5py.Group` Open handle to cooler file. lo, hi : int, optional Range of rows to select from the table. fields : sequence of str, optional Subset of columns to select from table. join : bool, optional Whether or not to expand bin ID columns to their full bin description (chrom, start, end). Default is True. Returns ------- :py:class:`DataFrame` """ if fields is None: fields = ( pd.Index(["bin1_id", "bin2_id"]) .append(pd.Index(h5["pixels"].keys())) .drop_duplicates() ) df = get(h5["pixels"], lo, hi, fields, **kwargs) if join: bins = get(h5["bins"], 0, None, ["chrom", "start", "end"], **kwargs) df = annotate(df, bins, replace=True) return df
[docs]def annotate(pixels, bins, replace=False): """ Add bin annotations to a data frame of pixels. This is done by performing a relational "join" against the bin IDs of a table that describes properties of the genomic bins. New columns will be appended on the left of the output data frame. .. versionchanged:: 0.8.0 The default value of ``replace`` changed to False. Parameters ---------- pixels : :py:class:`DataFrame` A data frame containing columns named ``bin1_id`` and/or ``bin2_id``. If columns ``bin1_id`` and ``bin2_id`` are both present in ``pixels``, the adjoined columns will be suffixed with '1' and '2' accordingly. bins : :py:class:`DataFrame` or DataFrame selector Data structure that contains a full description of the genomic bins of the contact matrix, where the index corresponds to bin IDs. replace : bool, optional Remove the original ``bin1_id`` and ``bin2_id`` columns from the output. Default is False. Returns ------- :py:class:`DataFrame` """ columns = pixels.columns ncols = len(columns) is_selector = isinstance(bins, RangeSelector1D) if "bin1_id" in columns: if len(bins) > len(pixels): bin1 = pixels["bin1_id"] lo = bin1.min() hi = bin1.max() lo = 0 if np.isnan(lo) else lo hi = 0 if np.isnan(hi) else hi if is_selector: right = bins[lo:hi + bin1.dtype.type(1)] # slicing works like iloc else: right = bins.loc[lo:hi] elif is_selector: right = bins[:] else: right = bins pixels = pixels.merge(right, how="left", left_on="bin1_id", right_index=True) if "bin2_id" in columns: if len(bins) > len(pixels): bin2 = pixels["bin2_id"] lo = bin2.min() hi = bin2.max() lo = 0 if np.isnan(lo) else lo hi = 0 if np.isnan(hi) else hi if is_selector: right = bins[lo:hi + bin2.dtype.type(1)] # slicing works like iloc else: right = bins.loc[lo:hi] elif is_selector: right = bins[:] else: right = bins pixels = pixels.merge( right, how="left", left_on="bin2_id", right_index=True, suffixes=("1", "2") ) # rearrange columns pixels = pixels[list(pixels.columns[ncols:]) + list(pixels.columns[:ncols])] # drop bin IDs if replace: cols_to_drop = [col for col in ("bin1_id", "bin2_id") if col in columns] pixels = pixels.drop(cols_to_drop, axis=1) return pixels
def matrix( h5, i0, i1, j0, j1, field=None, balance=True, sparse=False, as_pixels=False, join=True, ignore_index=True, divisive_weights=False, chunksize=10000000, fill_lower=True, ): """ Two-dimensional range query on the Hi-C contact heatmap. Depending on the options, returns either a 2D NumPy array, a rectangular sparse ``coo_matrix``, or a data frame of pixels. Parameters ---------- h5 : :py:class:`h5py.File` or :py:class:`h5py.Group` Open handle to cooler file. i0, i1 : int, optional Bin range along the 0th (row) axis of the heatap. j0, j1 : int, optional Bin range along the 1st (col) axis of the heatap. field : str, optional Which column of the pixel table to fill the matrix with. By default, the 'count' column is used. balance : bool, optional Whether to apply pre-calculated matrix balancing weights to the selection. Default is True and uses a column named 'weight'. Alternatively, pass the name of the bin table column containing the desired balancing weights. Set to False to return untransformed counts. sparse: bool, optional Return a scipy.sparse.coo_matrix instead of a dense 2D numpy array. as_pixels: bool, optional Return a DataFrame of the corresponding rows from the pixel table instead of a rectangular sparse matrix. False by default. join : bool, optional If requesting pixels, specifies whether to expand the bin ID columns into (chrom, start, end). Has no effect when requesting a rectangular matrix. Default is True. ignore_index : bool, optional If requesting pixels, don't populate the index column with the pixel IDs to improve performance. Default is True. Returns ------- ndarray, coo_matrix or DataFrame Notes ----- If ``as_pixels=True``, only data explicitly stored in the pixel table will be returned: if the cooler's storage mode is symmetric-upper, lower triangular elements will not be generated. If ``as_pixels=False``, those missing non-zero elements will automatically be filled in. """ if field is None: field = "count" if isinstance(balance, str): name = balance elif balance: name = "weight" if balance and name not in h5["bins"]: raise ValueError( f"No column 'bins/{name}'" + "found. Use ``cooler.balance_cooler`` to " + "calculate balancing weights or set balance=False." ) reader = CSRReader(h5['pixels'], h5['indexes/bin1_offset'][:]) if as_pixels: # The historical behavior for as_pixels is to return only explicitly stored # pixels so we ignore the ``fill_lower`` parameter in this case. engine = DirectRangeQuery2D( reader, field, (i0, i1, j0, j1), chunksize, return_index=not ignore_index ) df = engine.to_frame() if balance: weights = Cooler(h5).bins()[[name]] df2 = annotate(df, weights, replace=False) if divisive_weights: df2[name + "1"] = 1 / df2[name + "1"] df2[name + "2"] = 1 / df2[name + "2"] df["balanced"] = df2[name + "1"] * df2[name + "2"] * df2[field] if join: bins = Cooler(h5).bins()[["chrom", "start", "end"]] df = annotate(df, bins, replace=True) return df elif sparse: if fill_lower: engine = FillLowerRangeQuery2D(reader, field, (i0, i1, j0, j1), chunksize) else: engine = DirectRangeQuery2D(reader, field, (i0, i1, j0, j1), chunksize) mat = engine.to_sparse_matrix() if balance: weights = h5["bins"][name] bias1 = weights[i0:i1] bias2 = bias1 if (i0, i1) == (j0, j1) else weights[j0:j1] if divisive_weights: bias1 = 1 / bias1 bias2 = 1 / bias2 mat.data = bias1[mat.row] * bias2[mat.col] * mat.data return mat else: if fill_lower: engine = FillLowerRangeQuery2D(reader, field, (i0, i1, j0, j1), chunksize) else: engine = DirectRangeQuery2D(reader, field, (i0, i1, j0, j1), chunksize) arr = engine.to_array() if balance: weights = h5["bins"][name] bias1 = weights[i0:i1] bias2 = bias1 if (i0, i1) == (j0, j1) else weights[j0:j1] if divisive_weights: bias1 = 1 / bias1 bias2 = 1 / bias2 arr = arr * np.outer(bias1, bias2) return arr