Source code for tesliper.datawork.geometry

"""Conformers geometry-related functions, primarily an RMSD sieve implementation.

This module provides an implementation of RMSD sieve, allowing for easy mathematical
comparision of conformers' geometry and filtering out similar ones, based on
user-provided "threshold of similarity".
"""
import math
from typing import Iterable, Iterator, Sequence, Union

import numpy as np


[docs]def find_atoms( atoms: Union[Sequence[int], np.ndarray], find: Union[int, Iterable[int], np.ndarray], reverse: bool = False, ) -> np.ndarray: """Get indices of wanted atoms. Parameters ---------- atoms : Sequence of int or numpy.ndarray List of atoms represented by their atomic numbers. find : int, Sequence of int, or numpy.ndarray Element or list of elements, represented by their atomic numbers, which indices should be find in *atoms* array. reverse : bool If ``True``, indices of atoms NOT specified in *find* will be returned. Returns ------- numpy.ndarray Indices of found elements. """ if isinstance(find, Iterable): blade = np.isin(atoms, find) else: blade = np.equal(atoms, find) blade = np.logical_xor(blade, reverse) indices = np.where(blade)[0] # *atoms* is assumed to be one-dimensional return indices
[docs]def select_atoms( values: Union[Sequence, np.ndarray], indices: Union[Sequence[int], np.ndarray], ) -> np.ndarray: """Filter given values to contain values only corresponding to atoms on given indices. Recognizes if given values are a list of values for one or for many conformers, but it must be in shape (A, N) or (C, A, N) respectively. """ # TODO: add support for indexing with indices shaped (1, A) and (C, A). if not np.asanyarray(indices).size: output = np.array([]) else: try: output = np.take(values, indices, 1) except np.AxisError: output = np.take(values, indices, 0) return output
[docs]def take_atoms( values: Union[Sequence, np.ndarray], atoms: Union[Sequence[int], np.ndarray], wanted: Union[int, Iterable[int], np.ndarray], ) -> np.ndarray: """Filters given values, returning those corresponding to atoms specified as *wanted*. Roughly equivalent to: >>> numpy.take(values, numpy.nonzero(numpy.equal(atoms, wanted))[0], 1) but returns empty array, if no atom in *atoms* matches *wanted* atom. If *wanted* is list of elements, numpy.isin is used instead of numpy.equal. Parameters ---------- values: Sequence or numpy.ndarray array of values; it should be one-dimensional list of values or n-dimensional array of shape (conformers, values[, coordinates[, other]]) atoms: Sequence of int or numpy.ndarray list of atoms in molecule, given as atomic numbers; order should be the same as corresponding values for each conformer wanted: int or Iterable of int or numpy.ndarray atomic number of wanted atom, or a list of those Returns ------- numpy.ndarray values trimmed to corresponding to desired atoms only; preserves original dimension information """ indices = find_atoms(atoms, wanted) return select_atoms(values, indices)
[docs]def drop_atoms( values: Union[Sequence, np.ndarray], atoms: Union[Iterable[int], np.ndarray], discarded: Union[int, Iterable[int], np.ndarray], ) -> np.ndarray: """Filters given values, returning those corresponding to atoms not specified as discarded. Roughly equivalent to: >>> numpy.take(values, numpy.nonzero(~numpy.equal(atoms, discarded))[0], 1) If *wanted* is list of elements, numpy.isin is used instead of numpy.equal. Parameters ---------- values: Sequence or numpy.ndarray array of values; it should be one-dimensional list of values or n-dimensional array of shape (conformers, values[, coordinates[, other]]) atoms: Iterable of int or numpy.ndarray list of atoms in molecule, given as atomic numbers; order should be the same as corresponding values for each conformer discarded: int or Iterable of int or numpy.ndarray atomic number of discarded atom, or a list of those Returns ------- numpy.ndarray values trimmed to corresponding to desired atoms only; preserves original dimension information """ indices = find_atoms(atoms, discarded, reverse=True) return select_atoms(values, indices)
[docs]def is_triangular(n: int) -> bool: """Checks if number *n* is triangular. Notes ----- If *n* is the mth triangular number, then n = m*(m+1)/2. Solving for m using the quadratic formula: m = (sqrt(8n+1) - 1) / 2, so *n* is triangular if and only if 8n+1 is a perfect square. Parameters ---------- n: int number to check Returns ------- bool True if number *n* is triangular, else False """ if n < 0: return False check = math.sqrt(8 * n + 1) try: return check == int(check) except OverflowError: # check is float('inf') return False
[docs]def get_triangular_base(n: int) -> int: """Find which mth triangular number *n* is.""" if not is_triangular(n): raise ValueError(f'"n" should be a triangular number. {n} is not triangular.') return int((math.sqrt(8 * n + 1) - 1) / 2)
[docs]def get_triangular(m: int) -> int: """Find *m*\\th triangular number.""" if m < 0: raise ValueError('"m" should be non-negative number.') if not m // 1 == m: raise ValueError(f'"m" should be a whole number, {m} given.') return m * (m + 1) // 2
MoleculeOrList = Union[Sequence[Sequence[float]], Sequence[Sequence[Sequence[float]]]]
[docs]def center(a: MoleculeOrList) -> MoleculeOrList: """Zero-center all given conformers by subtracting their centroids. Accepts single molecule or list of conformers.""" a = np.asanyarray(a) return a - np.expand_dims(a.mean(axis=-2), -2)
[docs]def kabsch_rotate(a: MoleculeOrList, b: MoleculeOrList) -> np.ndarray: """Minimize RMSD of conformers *a* and *b* by rotating molecule *a* onto *b*. Expects given representation of conformers to be zero-centered. Both *a* and *b* may be a single molecule or a set of conformers. Parameters ---------- a : [Sequence of ]Sequence of Sequence of float Set of points representing atoms, that will be rotated to best match reference. b : [Sequence of ]Sequence of Sequence of float Set of points representing atoms of the reference molecule. Returns ------- numpy.ndarray Rotated set of points *a*. Notes ----- Uses Kabsch algorithm, also known as Wahba's problem. See: https://en.wikipedia.org/wiki/Kabsch_algorithm and https://en.wikipedia.org/wiki/Wahba%27s_problem """ # this implementation uses Einstein summation convention, numpy.eisnum() # this approach is probably not the most efficient # but lets us easily perform a matrix multiplication on stacks of matrices a, b = np.asanyarray(a), np.asanyarray(b) # calculate covariance matrix for each stacked set of points # for each of stacked sets of points, equivalent of: # >>> cov = a.T @ b cov = np.einsum("...ji,...jk", a, b) u, s, vh = np.linalg.svd(cov) # singular value decomposition # if determinant is negative, swap to ensure right-handed coordinate system det = np.linalg.det(vh @ u) # works with stacked matrices # don't introduce new dimension if not necessary shape = (det.size, 3, 3) if det.size > 1 else (3, 3) swap = np.zeros(shape) swap[..., np.arange(2), np.arange(2)] = 1 swap[..., -1, -1] = np.sign(det) # calculate optimally rotated set/s of points `a` # for each of stacked sets of points, equivalent of # >>> rotated = a @ u @ swap @ vh # where u @ swap @ vh is rotation matrix return np.einsum("...ij,...jk,...kl,...lm", a, u, swap, vh)
[docs]def calc_rmsd(a: MoleculeOrList, b: MoleculeOrList) -> np.ndarray: """Compute RMSD (round-mean-square deviation) of two conformers (or sets of them). Parameters ---------- a : [Sequence of ]Sequence of Sequence of float Set of points representing atoms or list thereof. b : [Sequence of ]Sequence of Sequence of float Set of points representing atoms or list thereof. Returns ------- float or numpy.ndarray Value of RMSD of two conformers or list of values, if list of conformers given. Notes ----- https://en.wikipedia.org/wiki/Root-mean-square_deviation_of_atomic_positions """ deviation = np.asanyarray(a) - np.asanyarray(b) # get a sum of two last dimensions by using `axis=(-2, -1)` return np.sqrt(np.square(deviation).sum(axis=(-2, -1)) / deviation.shape[-2])
[docs]def fixed_windows(series: Sequence, size: int) -> np.ndarray: """Simple, vectorized implementation of basic sliding window. Produces a list of windows of given *size* from given *series*. Parameters ---------- series : sequence Series of data, of which sliding window view is requested. size : int Number of data points in the window. Must be a positive integer. Returns ------- numpy.ndarray List of indices, corresponding to values in the original array, that form a window Raises ------ ValueError if non-positive integer given as window size TypeError if non-integer value given as window size Notes ----- Implementation inspired by https://towardsdatascience.com/fast-and-robust-sliding-window-vectorization-with-numpy-3ad950ed62f5 """ if not isinstance(size, int): raise TypeError(f"*size* must be a positive integer, but {type(size)} given.") elif size <= 0: raise ValueError(f"*size* must be a positive integer, but {size} given.") series = np.asanyarray(series) # create indices for fancy indexing [[0, 1, ..., size], [1, 2, ..., size + 1], ...] windows = ( np.arange(size)[np.newaxis, ...] + np.arange(series.size - size + 1)[np.newaxis, ...].T ) return windows
[docs]def stretching_windows( values: Sequence[float], size: Union[int, float], keep_hermits: bool = False, hard_bound: bool = False, ) -> Iterator[np.ndarray]: """Implements a sliding window of a variable size, where values in each window are at most *size* bigger than the lowest value in given window. Values yielded are :class:`np.ndarray`\\s of indices of sorted values, that constitute each window. When window reaches a border, that is an end of the *values* array or a gap between values that is larger than given *size*, it is "squeezed", when pressed against the border, producing subsequences of the first view that touches a border. This is usefull, when one wants to form a window for each value in the original array. >>> list(stretching_windows([1, 2, 3, 4, 7, 8], 3)) [[0, 1, 2], [1, 2, 3], [2, 3], [4, 5]] This "soft" right bound may be "hardened" by passing ``hard_bound=True`` as a parameter to a function call. A window will than move immediately to the border's other side. >>> list(stretching_windows([1, 2, 3, 4, 7, 8], 3), hard_bound=True) [[0, 1, 2], [1, 2, 3], [4, 5]] Windows of size 1, called hermits, are by default ignored. >>> arr = [1, 2, 10, 20, 22] >>> list(stretching_windows(arr, 5)) [[0, 1], [3, 4]] If such behavior is not desired, it may be turned off with ``keep_hermits = True``. One must remember that, when a bound is "soft", the last window is always a hermit. >>> list(stretching_windows(arr, 5, keep_hermits=True)) [[0, 1], [1], [2], [3, 4], [4]] >>> list(stretching_windows(arr, 5, keep_hermits=True, hard_bound=True)) [[0, 1], [2], [3, 4]] Parameters ---------- values : Sequence of float List of values, on which sliding window view is requested. size : int or float Maximum difference of smallest and largest values inside each window. keep_hermits : bool If windows of size one should be yielded (True) or omitted (False). False by default. hard_bound : bool How window should behave close to borders. With hard bound (True) it will move to the other side of border as soon, as it is reached. With soft bound (False) it will "squeeze" when pressed against the border, producing subsequences of the first view that includes border value. False by default. Yields ------ np.array of int List of indices, corresponding to sorted values in the original array, that form a window. Raises ------ ValueError If given *size* is not a positive number. """ if size <= 0: raise ValueError("Size of the energy window must be a positive number.") order = np.argsort(values) if not order.size: return ordered = np.asanyarray(values)[order] # side="right" is ie. "or equal to" part of "include lower on equal to value+size" indices = np.searchsorted(ordered, ordered + size, side="right") bounds = np.stack([np.arange(indices.size), indices], axis=1) # shape (N, 2) if hard_bound: # don't allow repeated stop index unique = np.insert(np.diff(indices).astype(bool), 0, True) bounds = bounds[unique] if not keep_hermits: # if difference between start and stop is 1, it is a hermit hermits = np.diff(bounds, axis=1) == 1 bounds = bounds[~hermits.reshape(-1)] for start, stop in bounds: # if necessary make it order[start:] to include last value yield order[start : (stop if stop <= indices.size else None)]
[docs]def pyramid_windows(series: Sequence) -> Iterator[np.ndarray]: """Produces windows of shrinking sizes, from full sequence to last element only. This function yields :class:`numpy.ndarray`\\s with indices that may be used to index an original sequence (assuming original sequence is :class:`numpy.ndarray` as well). The first window yielded represents a whole *series* sequence and each consecutive window is reduced by the first element, leaving only the last element in the final window. This allows for easy setup of efficient calculations in symmetric each-to-each relationship. >>> series = [3, 6, 3, 5, 7] >>> for window in pyramid_windows(series): ... print(window) [0 1 2 3 4] [1 2 3 4] [2 3 4] [3 4] [4] Parameters ---------- series : sequence Sequence of elements, for which windows should be generated. Yields ------ np.ndarray(dtype=int) Windows as np.ndarray of indices. """ length = len(series) for idx in range(length): yield np.arange(start=idx, stop=length, step=1, dtype=int)
[docs]def rmsd_sieve( geometry: Sequence[Sequence[Sequence[float]]], windows: Iterable[Sequence[int]], threshold: float = 1, ) -> np.ndarray: """Compare conformers' geometry to keep only those that differ at least by a given threshold. This function calculates how similar conformers are one to another, using a RMSD measure, that is is a root-mean-square deviation of atomic positions, and signalizes which of the conformers are duplicates, according to a given similarity threshold. Returned array of booleans may be treated as "originality" indicators for each conformer: ``True`` means given conformer has distinct structure, ``False`` means given conformer is similar to some other conformer marked as "original". The measure of conformers' similarity, the *threshold* parameter, is a minimum value of RMSD needed to consider two conformers different. In other words, if two conformers give a RMSD value that is lower then *threshold*, one of them will be marked as similar, producing a ``False`` in the output array. To lower a computational expense, similarity measurement is performed in "chunks", using a sliding window technique. Windows consist of a portion of conformers from the original data, or more precisely, indices of conformers that should be included in the particular window. First item from the window is compared to all the others that are in the same window, and if any of them is similar to the reference item, it is marked as duplicate (not "original"). The process is repeated for each window. The windows itself should be provided by user as *windows* parameter. This provides a flexibility in the process: you may choose to sacrifice accuracy to lower necessary computational time or vice versa. You may also choose a different moving window strategy or reject it alltogether, and calculate one-to-each similarity in the whole set. Iterables of windows accepted by this function may be generated with one of the dedicated moving window funcions: :func:`.stretching_windows`, :func:`.fixed_windows`, or :func:`.pyramid_windows`. Refer to their documentation for more information. Parameters ---------- geometry : sequence of sequence of sequence of float A list of conformers, where each conformer is represented by a sequence of coordinates in 3-dimensional space. It is assumed that order of atoms in each conformers' representation is identical. windows : iterable of sequence of int An iterable of windows, where each window is a list of indices. Comparision of RMSD values will be performed inside each window. threshold : float Minimum RMSD value to consider two compared conformers different. Returns ------- np.ndarray(dtype=bool) Array of booleans for each conformer: ``True`` if conformer's structure is "original" and should be kept, ``False`` if it is a duplicate of other, "original" structure (at least according to *threshold* given), and should be discarded. """ blade = np.ones(len(geometry), dtype=bool) # zero-center all conformers geometry = center(geometry) for window in windows: # don't include values already discarded reduced_window = window[blade[window]] if reduced_window.size <= 1: # only one molecule in the window, we keep it # or no conformers at all continue head, *tail = geometry[reduced_window] # reference, other # find best rotation of mols in window onto first mol tail = kabsch_rotate(tail, head) # calculate RMSD list of mols to first mol rmsd = calc_rmsd(head, tail) # if RMSD <= threshold mark in blade as False, first one is always kept blade[reduced_window[1:]] = rmsd > threshold # return blade with True for each molecule kept return blade