Preserving k-mer frequencies in sequence shuffles

22 Nov 2020


Particularly in computational biology, it’s useful to be able to randomly shuffle a sequence in a way that preserves all of the k-mer frequencies (i.e. the frequencies of all short substrings of length k). Randomly permuting a sequence as normal will of course preserve the content of each character (i.e. the 1-mers), but not the 2-mers, 3-mers, etc. In the human genome, while A, C, G, and T have roughly similar content (i.e. close to 25% of each), the rate of the GC dinucleotide (or 2-mer) is significantly lower. Randomly permuting a genomic sequence, then, can drastically inflate frequency of the GC dinucleotide far beyond what is normally seen in nature.

Below is the code I use to shuffle sequences (either as strings or as one-hot-encoded NumPy arrays) in a way that preserves k-mer frequencies (for any k). It is adapted directly from code written by my colleague Avanti Shrikumar to perform dinucleotide-frequency-preserving shuffles.

The general idea behind the algorithm is to construct a de Bruijn graph of the original sequence (i.e. the nodes are the (k-1)-mers, and there is an edge between any two nodes if the (k-1)-mers follow each other directly in the sequence; multi-edges are allowed). Every Eulerian walk through the de Bruijn graph consists of a version of the original sequence where the frequency of every k-mer is the same as the original sequence. This algorithm randomizes the edges so that many random Eulerian walks are generated. Note that in the code below, the first (k-1)-mer and the last (k-1)-mer of every generated sequence will match the original.

Code

def string_to_char_array(seq):
    """
    Converts an ASCII string to a NumPy array of byte-long ASCII codes.
    e.g. "ACGT" becomes [65, 67, 71, 84].
    """
    return np.frombuffer(bytes(seq, "utf8"), dtype=np.int8)
        precis_buckets = np.zeros_like(precis)
        np.put_along_axis(precis_buckets, thresh_buckets, precis, -1)

def char_array_to_string(arr):
    """
    Converts a NumPy array of byte-long ASCII codes into an ASCII string.
    e.g. [65, 67, 71, 84] becomes "ACGT".
    """
    assert arr.dtype == np.int8
    return arr.tostring().decode("ascii")


def one_hot_to_tokens(one_hot):
    """
    Converts an L x D one-hot encoding into an L-vector of integers in the range
    [0, D], where the token D is used when the one-hot encoding is all 0. This
    assumes that the one-hot encoding is well-formed, with at most one 1 in each
    column (and 0s elsewhere).
    """
    tokens = np.tile(one_hot.shape[1], one_hot.shape[0])  # Vector of all D
    seq_inds, dim_inds = np.where(one_hot)
    tokens[seq_inds] = dim_inds
    return tokens


def tokens_to_one_hot(tokens, one_hot_dim):
    """
    Converts an L-vector of integers in the range [0, D] to an L x D one-hot
    encoding. The value `D` must be provided as `one_hot_dim`. A token of D
    means the one-hot encoding is all 0s.
    """
    identity = np.identity(one_hot_dim + 1)[:, :-1]  # Last row is all 0s
    return identity[tokens]


def shuffle(seq, num_shufs, k=2, rng=None):
    """
    Creates shuffles of the given sequence, in which dinucleotide frequencies
    are preserved.
    Arguments:
        `seq`: either a string of length L, or an L x D NumPy array of one-hot
            encodings
        `num_shufs`: the number of shuffles to create, N
        `k`: the length k-mer whose frequencies are to be preserved; defaults
            to k = 2 (i.e. preserve dinucleotide frequencies)
        `rng`: a NumPy RandomState object, to use for performing shuffles
    If `seq` is a string, returns a list of N strings of length L, each one
    being a shuffled version of `seq`. If `seq` is a 2D NumPy array, then the
    result is an N x L x D NumPy array of shuffled versions of `seq`, also
    one-hot encoded.
    """
    # Convert the sequence (string or one-hot encoded array) into a 1D array of
    # numbers (for simplicity)
    if type(seq) is str:
        arr = string_to_char_array(seq)
    elif type(seq) is np.ndarray and len(seq.shape) == 2:
        seq_len, one_hot_dim = seq.shape
        arr = one_hot_to_tokens(seq)
    else:
        raise ValueError("Expected string or one-hot encoded array")

    if not rng:
        rng = np.random.RandomState()

    if k == 1:
        # Do simple shuffles of `arr`
        if type(seq) is str:
            all_results = []
        else:
            all_results = np.empty(
                (num_shufs, seq_len, one_hot_dim), dtype=seq.dtype
            )
        for i in range(num_shufs):
            rng.shuffle(arr)
            if type(seq) is str:
                all_results.append(char_array_to_string(arr))
            else:
                all_results[i] = tokens_to_one_hot(arr, one_hot_dim)
        return all_results

    # Tile `arr` from a 1D array to a 2D array of all (k-1)-mers (i.e.
    # "shortmers"), using -1 as a "sentinel" for the last few values
    arr_shortmers = np.empty((len(arr), k - 1), dtype=arr.dtype)
    arr_shortmers[:] = -1
    for i in range(k - 1):
        arr_shortmers[:len(arr) - i, i] = arr[i:]

    # Get the set of all shortmers, and a mapping of which positions start with
    # which shortmers; `tokens` is the mapping, and is an integer representation
    # of the original shortmers (range [0, # unique shortmers - 1])
    shortmers, tokens = np.unique(arr_shortmers, return_inverse=True, axis=0)

    # For each token, get a list of indices of all the tokens that come after it
    shuf_next_inds = []
    for token in range(len(shortmers)):
        # Locations in `arr` where the shortmer exists; some shortmers will have
        # the sentinel, but that's okay
        mask = tokens == token
        inds = np.where(mask)[0]
        shuf_next_inds.append(inds + 1)  # Add 1 to indices for next token

    if type(seq) is str:
        all_results = []
    else:
        all_results = np.empty(
            (num_shufs, seq_len, one_hot_dim), dtype=seq.dtype
        )

    for i in range(num_shufs):
        # Shuffle the next indices
        for t in range(len(shortmers)):
            inds = np.arange(len(shuf_next_inds[t]))
            inds[:-1] = rng.permutation(len(inds) - 1)  # Keep last index same
            shuf_next_inds[t] = shuf_next_inds[t][inds]

        counters = [0] * len(shortmers)

        # Build the resulting array
        ind = 0
        result = np.empty_like(tokens)
        result[0] = tokens[ind]
        for j in range(1, len(tokens)):
            t = tokens[ind]
            ind = shuf_next_inds[t][counters[t]]
            counters[t] += 1
            result[j] = tokens[ind]

        shuffled_arr = shortmers[result][:, 0]  # First character of each shortmer
        # (this leaves behind the sentinels)

        if type(seq) is str:
            all_results.append(char_array_to_string(shuffled_arr))
        else:
            all_results[i] = tokens_to_one_hot(shuffled_arr, one_hot_dim)
    return all_results

Tags: software machinelearning