Source code for riboraptor.coherence

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
import pickle
import numpy as np
import pandas as pd
import six
from scipy import signal
from .helpers import identify_peaks
from .statistics import coherence_pvalue


def _shift_bit_length(x):
    """Shift bit"""
    return 1 << (x - 1).bit_length()


def _padwithzeros(vector, pad_width, iaxis, kwargs):
    """Pad with zeros"""
    vector[:pad_width[0]] = 0
    vector[-pad_width[1]:] = 0
    return vector


[docs]def naive_periodicity(values, identify_peak=False): '''Calculate periodicity in a naive manner Take ratio of frame1 over avg(frame2+frame3) counts. By default the first value is treated as the first frame as well Parameters ---------- values : Series Metagene profile Returns ------- periodicity : float Periodicity ''' if identify_peak: peak_location = identify_peaks(values) min_index = min(values.index) max_index = max(values.index) # Frame 1 starts at the peak_location # We assume that if values is series it will have continuous # indices values = values[np.arange(peak_location, max_index)] values = pd.Series(values) values = values.fillna(0) frame1_total = 0 frame2_total = 0 frame3_total = 0 values = list(values)[0:len(values) - len(values) % 3] for i in range(0, len(values), 3): frame1_total += values[i] for i in range(1, len(values), 3): frame2_total += values[i] for i in range(2, len(values), 3): frame3_total += values[i] return frame1_total / (frame1_total + frame2_total + frame3_total)
[docs]def coherence_ft(values, nperseg=30, noverlap=15, window='flattop'): """Calculate coherence and an idea ribo-seq signal based on Fourier transform Parameters ---------- values : array like List of values Returns ------- periodicity : float Periodicity score calculated as coherence between input and idea 1-0-0 signal window: str or tuple See scipy.signal.get_window f: array like List of frequencies Cxy: array like List of coherence at the above frequencies """ length = len(values) uniform_signal = [1, 0, 0] * (length // 3) mean_centered_values = values - np.nanmean(values) normalized_values = mean_centered_values / \ np.max(np.abs(mean_centered_values)) mean_centered_values = uniform_signal - np.nanmean(uniform_signal) uniform_signal = mean_centered_values / \ np.max(np.abs(uniform_signal)) f, Cxy = signal.coherence( normalized_values, uniform_signal, window=window, nperseg=nperseg, noverlap=noverlap) periodicity_score = Cxy[np.argwhere(np.isclose(f, 1 / 3.0))[0]][0] return periodicity_score, f, Cxy
[docs]def coherence(original_values, frames=[0]): """Calculate coherence and an idea ribo-seq signal Parameters ---------- values : array like List of values Returns ------- periodicity : float Periodicity score calculated as coherence between input and idea 1-0-0 signal f: array like List of frequencies Cxy: array like List of coherence at the above frequencies """ if not isinstance(original_values, list): original_values = list(original_values) coh, pval, valid = 0.0, 1.0, -1 final_individual_angles = [] final_angle = np.nan final_coherence_raw = np.nan final_unit_vector_real = [] final_unit_vector_imag = [] thetas = [] for frame in frames: unit_vectors_real = [] unit_vectors_imag = [] values = original_values[frame:] normalized_values = [] i = 0 while i + 2 < len(values): if values[i] == values[i + 1] == values[i + 2] == 0: i += 3 continue real = values[i] + values[i + 1] * np.cos( 2 * np.pi / 3) + values[i + 2] * np.cos(4 * np.pi / 3) imag = values[i + 1] * np.sin( 2 * np.pi / 3) + values[i + 2] * np.sin(4 * np.pi / 3) norm = np.sqrt(real**2 + imag**2) if norm == 0: norm = 1 unit_vectors_real += [real/norm] unit_vectors_imag += [imag/norm] normalized_values += [ values[i] / norm, values[i + 1] / norm, values[i + 2] / norm ] i += 3 unit_vectors_real = np.array(unit_vectors_real) unit_vectors_imag = np.array(unit_vectors_imag) length = len(normalized_values) // 3 * 3 if length == 0: return coh, pval, valid, final_coherence_raw, final_angle, final_individual_angles, final_unit_vector_real, final_unit_vector_imag normalized_values = normalized_values[:length] uniform_signal = [1, 0, 0] * (len(normalized_values) // 3) f, Cxy = signal.coherence( normalized_values, uniform_signal, window=[1.0, 1.0, 1.0], nperseg=3, noverlap=0) coherence_raw = np.nanmean(unit_vectors_real)**2 + np.nanmean(unit_vectors_imag)**2 individual_angles = np.arctan2(unit_vectors_imag, unit_vectors_real) combine_angle = np.arctan2(np.sum(unit_vectors_imag), np.sum(unit_vectors_real)) try: periodicity_score = Cxy[np.argwhere(np.isclose(f, 1 / 3.0))[0]][0] periodicity_pval = coherence_pvalue(periodicity_score, length // 3) except: periodicity_score = 0.0 periodicity_pval = 1.0 if coherence_raw > coh: final_individual_angles = individual_angles final_angle = combine_angle final_coherence_raw = coherence_raw final_unit_vector_real = unit_vectors_real final_unit_vector_imag = unit_vectors_imag #coh = periodicity_score coh = coherence_raw pval = periodicity_pval valid = length if valid == -1: valid = length return coh, pval, valid, final_coherence_raw, final_angle, final_individual_angles, final_unit_vector_real, final_unit_vector_imag
[docs]def get_periodicity(values, input_is_stream=False): """Calculate periodicty wrt 1-0-0 signal. Parameters ---------- values : array like List of values Returns ------- periodicity : float Periodicity calculated as cross correlation between input and idea 1-0-0 signal """ from mtspec import mtspec, mt_coherence tbp = 4 kspec = 3 nf = 30 p = 0.90 if input_is_stream: values = list(map(lambda x: float(x.rstrip()), values)) if isinstance(values, six.string_types): try: values = pickle.load(open(values)) except KeyError: pass values = pd.Series(values) values = values[0:max(values.index)] length = len(values) next_pow2_length = _shift_bit_length(length) values = np.lib.pad(values, (0, next_pow2_length - len(values) % next_pow2_length), _padwithzeros) mean_centered_values = values - np.nanmean(values) normalized_values = mean_centered_values / \ np.max(np.abs(mean_centered_values)) uniform_signal = [1, -0.6, -0.4] * (next_pow2_length // 3) uniform_signal = np.lib.pad( uniform_signal, (0, next_pow2_length - len(uniform_signal) % next_pow2_length), _padwithzeros) out = mt_coherence( 1, normalized_values, uniform_signal, tbp, kspec, nf, p, freq=True, phase=True, cohe=True, iadapt=1) spec, freq, jackknife, fstatistics, _ = mtspec( data=values, delta=1., time_bandwidth=4, number_of_tapers=3, statistics=True, rshape=0, fcrit=0.9) p = 99 base_fstat = np.percentile(fstatistics, p) fstat = fstatistics[np.argmin(np.abs(freq - 1 / 3.0))] p_value = 'p < 0.05' if fstat < base_fstat: p_value = 'p > 0.05' return out['cohe'][np.argmin(np.abs(out['freq'] - 1 / 3.0))], p_value