Source code for diffusive_distinguishability.ndim_homogeneous_distinguishability

import numpy as np
import math
import scipy.stats
import scipy.special
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from concurrent.futures import ProcessPoolExecutor

# Simulation of trajectories and storage of trajectory data


[docs]def simulate_diffusion_df(n_dim, d_const, n_steps, dt, loc_std=0): """Simulate and output a single trajectory of homogeneous diffusion in a specified number of dimensions. :param n_dim: number of spatial dimensions for simulation (1, 2, or 3) :param d_const: diffusion constant (um2/s) :param n_steps: trajectory length (number of steps) :param dt: timestep size (s) :param loc_std: standard deviation for Gaussian localization error (um) :return: trajectory dataframe (position in n_dim dimensions, at each timepoint) """ np.random.seed() # initialize position at origin x0 = np.zeros(n_dim) x = x0 x_obs = [sum(i) for i in zip(x, [loc_std*np.random.randn() for _dim in range(n_dim)])] df = pd.DataFrame() # at each time-step, stochastically select step size in each dimension to find new location to add to trajectory for t in np.linspace(0, (n_steps-1)*dt, n_steps): dx = [np.sqrt(2*d_const*dt)*np.random.randn() for _dim in range(n_dim)] noise = [loc_std*np.random.randn() for _dim in range(n_dim)] x_new = [sum(i) for i in zip(x, dx)] x_obs_new = [sum(i) for i in zip(x_new, noise)] dx_obs = [sum(i) for i in zip(x_obs_new, np.negative(x_obs))] dr = np.linalg.norm(dx) dr_obs = np.linalg.norm(dx_obs) data = {'t_step': t, 'x': x, 'x_obs': x_obs, 'dx': dx, 'dx_obs': dx_obs, 'dr': dr, 'dr_obs': dr_obs} df = df.append(data, ignore_index=True) x = x_new x_obs = x_obs_new return df
[docs]def trajectory_df_from_data(trajectory): """If you are using experimental rather than simulated trajectories: this is an example function for how you might import your own timelapse trajectory and put into the required dataframe format, compatible with this notebook for analysis. This function will likely require edits for individual use, to make it compatible with your input trajectory format. :param trajectory: list or array of spatial positions, where each entry is the position at a single timepoint (may be 1D, 2D or 3D) :return: dataframe containing trajectory, n-dimensional displacement vectors for each timestep, and step size magnitudes for each timestep """ df = pd.DataFrame() for t in range(len(trajectory)-1): dx_obs = trajectory[t+1] - trajectory[t] data = {'t_step': t, 'x_obs': trajectory[t], 'dx_obs': dx_obs, 'dr_obs': np.linalg.norm(dx_obs)} df = df.append(data, ignore_index=True) return df
# Stats utils - posterior generation and comparison
[docs]def estimate_diffusion(n_dim, dt, dr, prior=scipy.stats.distributions.invgamma(0, scale=0)): """Returns the posterior estimate for the diffusion constant given the displacement data and the prior. :param n_dim: number of spatial dimensions for simulation (1, 2, or 3) :param dt: timestep size (s) :param dr: list of normed step sizes from a single trajectory (um) :param prior: inverse gamma prior distribution estimate for the diffusion constant :return: inverse gamma posterior distribution estimate for the diffusion constant """ # get step sizes and calculate alpha and beta from them to characterize a inverse gamma distribution dr = np.array(dr)/np.sqrt(2*n_dim*dt) alpha0, beta0 = invgamma_fullparams(prior) alpha, beta = len(dr), sum(dr**2) return scipy.stats.distributions.invgamma(alpha0+alpha, scale=beta0+beta), alpha0+alpha, beta0+beta
[docs]def generate_posterior(n_dim, d_const, n_steps, dt, loc_std=0): """ Simulate a single trajectory and find the diffusion constant posterior (inverse gamma) distribution. :param n_dim: number of spatial dimensions :param d_const: diffusion constant (um2/s) :param n_steps: trajectory length (number of steps) :param dt: timestep size (s) :param loc_std: standard deviation for Gaussian localization error (um) :return alpha, beta: scale and shape parameters for inverse gamma posterior for a diffusive trajectory """ # get dataframe of (x, dx) for a trajectory of length n_steps in homogeneous diffusion constant d_const df = simulate_diffusion_df(n_dim, d_const, n_steps, dt, loc_std) # estimate posterior diffusion constant distribution using prior/posterior with inverse gamma form prior = scipy.stats.distributions.invgamma(0, scale=0) posterior, alpha, beta = estimate_diffusion(n_dim=n_dim, dt=dt, dr=df['dr_obs'], prior=prior) # return df, posterior, alpha, beta return alpha, beta
[docs]def get_posterior_set(n_dim, d_const, n_steps, dt, n_reps, loc_std=0): """ Repeat analysis generating a posterior diffusion constant distribution per trajectory for multiple trajectories and return (1) full set and (2) median values of distribution fit parameters. :param n_dim: number of spatial dimensions :param d_const: diffusion constant (um2/s) :param n_steps: trajectory length (number of steps) :param dt: timestep size (s) :param n_reps: number of trajectory replicates :param loc_std: standard deviation for Gaussian localization error (um) :return alpha, beta, alpha_std, beta_std, alphas, betas: medians, std deviations and arrays of scale and shape parameters for inverse gamma posteriors for n_reps diffusive trajectories """ with ProcessPoolExecutor() as exe: results = list(exe.map(generate_posterior, *zip(*((n_dim, d_const, n_steps, dt, loc_std) for _n in range(n_reps))))) alphas = [result[0] for result in results] betas = [result[1] for result in results] # calculate median values for alpha and beta from n_reps simulations alpha = np.nanmedian(np.asarray(alphas)) beta = np.nanmedian(np.asarray(betas)) return alpha, beta, alphas, betas
[docs]def invgamma_fullparams(dist): """Return the alpha,beta parameterization of the inverse gamma distribution. :param dist: scipy inverse gamma distribution :return: alpha and beta parameters characterizing this inverse gamma distribution""" return dist.args[0], dist.kwds['scale']
[docs]def invgamma_kldiv(param1, param2): """Compute KL divergence of two inverse gamma distributions (ref: https://arxiv.org/pdf/1605.01019.pdf). :param param1: list containing alpha and beta parameters characterizing inverse gamma distribution 1 :param param2: list containing alpha and beta parameters characterizing inverse gamma distribution 2 :return: KL divergence of two inverse gamma distributions """ # unpack distribution parameters alpha1 = param1[0] beta1 = param1[1] alpha2 = param2[0] beta2 = param2[1] term1 = (alpha1 - alpha2)*scipy.special.digamma(alpha1) term2 = (beta2*alpha1/beta1) term3 = -alpha1 term4a = (alpha2+1)*np.log(beta1) term4b = math.lgamma(alpha2) term4c = -np.log(beta1) term4d = -alpha2*np.log(beta2) term4e = -math.lgamma(alpha1) return term1 + term2 + term3 + term4a + term4b + term4c + term4d + term4e
# Visualization and analyses
[docs]def compare2(n_dim, d_const1, mult, n_steps, dt, n_reps, loc_std=0): """ For one pair of diffusion constants (d_const, d_const*mult) get KL divergence of their posteriors, where the posteriors are generated from an alpha and beta which are the median values from repeating posterior estimation n_reps times. :param n_dim: number of spatial dimensions :param d_const1: diffusion constant (um2/s) :param mult: multiplier to get d_const2 = mult*d_const1 :param n_steps: trajectory length (number of steps) :param dt: timestep size (s) :param n_reps: number of trajectory replicates :param loc_std: standard deviation of localization error (um) """ d_const2 = d_const1*mult # for n_reps trajectories of length n_steps, get median values for posterior parameters and their std alpha1, beta1, alphas1, betas1 = get_posterior_set(n_dim, d_const1, n_steps, dt, n_reps, loc_std) alpha2, beta2, alphas2, betas2 = get_posterior_set(n_dim, d_const2, n_steps, dt, n_reps, loc_std) # plot both posteriors xx = np.linspace(0, 1.5*d_const2, 50) plt.plot(xx, scipy.stats.distributions.invgamma(alpha1, scale=beta1).pdf(xx), label='D1 posterior') plt.plot(xx, scipy.stats.distributions.invgamma(alpha2, scale=beta2).pdf(xx), label='D2 posterior') plt.axvline(x=d_const1, linestyle=':', color='blue') plt.axvline(x=d_const2, linestyle=':', color='orange') plt.legend() plt.xlabel('Diffusion Constant') plt.ylabel('Probability density') # print posterior-pair KL divergence and its inverse kl_div = invgamma_kldiv([alpha1, beta1], [alpha2, beta2]) print('KL div: ' + str(kl_div)) print('Inverse: ' + str(1./kl_div))
[docs]def fill_heatmap_gen(n_dim, d_const, mult_list, n_steps, dt, n_reps, loc_std=0): """ Generate a heatmap of KL divergence values for pairwise comparison of diffusion constant posterior distributions. Compared posteriors are generated by scanning through pairings of [d_const, mult*d_const] where mult takes on the range of values provided by mult_list and trajectory lengths. For each pair of diffusion constants, generate a trajectory of length n_steps and find the associated posterior parameter fit, repeating n_reps times to get median parameter values (alpha, beta). Use these median values of alpha and beta to select one posterior diffusion constant distribution for that diffusion constant. Repeat this process for diffusion constant d_const*mult, then calculate the KL divergence of the posteriors for (d_const, d_const*multiplier) and store in dataframe. Repeat for all pairs of (n_steps, multiplier) to fill the dataframe. The results is a heatmap of how distinguishable two diff constants are, conditional upon their relative values and the length of trajectories used. :param n_dim: number of spatial dimensions :param d_const: diffusion constant (um2/s) :param mult_list: list of multipliers to get set of d_const2 values, where d_const2 = mult*d_const :param n_steps: trajectory length (number of steps) :param dt: timestep size (s) :param n_reps: number of trajectories :param loc_std: standard deviation of localization error (um) :return df: dataframe containing the pairwise KL divergences """ # find input parameter that is a list, indicating that it is the parameter to be swept over params = [d_const, n_steps, dt, loc_std] is_list = [isinstance(item, (list, np.ndarray)) for item in params] param_ind = int(np.where(is_list)[0]) # make sure only only parameter (besides mult_list) has been entered as a list to be the second axis of sweep figure if sum(is_list) != 1: raise ValueError('Only one input other than "mult" may be multivalued.') # set sweep parameter as x axis and create dataframe x_list = params[param_ind] df = pd.DataFrame(columns=x_list, index=mult_list) # loop through d_const pairs and find their posterior fit parameters and pairwise KL divergences for x_ind in range(len(x_list)): for m_ind in range(len(mult_list)): # for individual model run, get single sweep parameter value and select pair of diffusion constants params[param_ind] = x_list[x_ind] d_const1 = params[0] d_const2 = d_const1*mult_list[m_ind] n_steps = params[1] dt = params[2] loc_std = params[3] # calculate posterior fit params and their std's alpha1_med, beta1_med, alphas1, betas1 = get_posterior_set(n_dim, d_const1, n_steps, dt, n_reps, loc_std) alpha2_med, beta2_med, alphas2, betas2 = get_posterior_set(n_dim, d_const2, n_steps, dt, n_reps, loc_std) # store KL divergence of posteriors in dataframe df.iat[m_ind, x_ind] = invgamma_kldiv([alpha1_med, beta1_med], [alpha2_med, beta2_med]) return df[df.columns].astype(float)
# Error visualization analysis
[docs]def show_error_hist(n_dim, p_error): """ Plot figure with 3 subplots, where each subplot is a histogram of the percent errors from all runs in a given number of spatial dimensions. :param n_dim: number of spatial dimensions :param p_error: array of percent error for all runs in each number of spatial dimensions """ fig, axs = plt.subplots(1, len(n_dim), figsize=(18, 6)) for i in range(len(n_dim)): print('Dim = '+str(i+1)+': '+str(np.mean(p_error[i]))+' +- '+str(np.std(p_error[i])/len(p_error[i]))) sns.distplot(p_error[i], bins=30, ax=axs[i]) axs[i].set_xlabel('% estimation error') axs[i].set_ylabel('Count') axs[i].set_title('Dim = '+str(i+1)) plt.subplots_adjust(wspace=0.5) plt.show()
[docs]def get_single_error(dim, d_const, n_steps, dt, n, loc_std): """ Generate single posterior and calculate percent error of posterior mean relative to the true value. :param dim: number of spatial dimensions :param d_const: diffusion constant (um2/s) whose estimator error we want to calculate :param n_steps: trajectory length (number of steps) :param dt: timestep size (s) :param n: trajectory number :param loc_std: standard deviation of localization error (um) :return: percent error for a single posterior mean relative to true value """ alpha, beta = generate_posterior(dim, d_const, n_steps, dt, loc_std) post_mean = scipy.stats.distributions.invgamma(alpha, scale=beta).mean() return 100*((post_mean - d_const)/d_const)
[docs]def get_dim_error(n_dim, d_const, n_steps, dt, n_reps, show_plot, loc_std=0): """ Given a diffusion constant, get the posterior for a trajectory of length n_steps and timestep dt. Repeat n_reps times and report/plot hist of the percent error of the mean posterior values vs true diffusivity values. :param n_dim: number of spatial dimensions :param d_const: diffusion constant (um2/s) whose estimator error we want to calculate :param n_steps: trajectory length (number of steps) :param dt: timestep size (s) :param n_reps: number of trajectory replicates :param show_plot: T/F flag of whether or not to display histograms of estimator errors :param loc_std: standard deviation of localization error (um) :return p_error: array of percent error between mean posterior estimation and true value for each run with each number of dimensions """ p_error = [] for dim in n_dim: with ProcessPoolExecutor() as exe: results = list(exe.map(get_single_error, *zip(*((dim, d_const, n_steps, dt, n, loc_std) for n in range(n_reps))))) p_error.append(results) # uncomment below to save error results as .npy # np.save('std_'+str(loc_std), p_error) # display histogram of percent errors for n_reps runs of simulation with diffusion constant d_const if show_plot: show_error_hist(n_dim, p_error) return p_error
[docs]def error_sensitivity(d_const, n_steps_list, dt, n_reps, loc_std): """ Look at how the mean and median percent error of the posterior mean relative to the true value depend on the trajectory length used to generate posteriors and number of reps we run. :param d_const: diffusion constants (um2/s) :param n_steps_list: list of trajectory lengths to test :param dt: timestep(s) used to generate trajectories (s) :param n_reps: number(s) of reps to run to calculate mean and mediate percent error :param loc_std: standard deviation for Gaussian localization error (um) :return: three dataframes (for 1, 2, and 3 dimensions); each contains the mean percent posterior error relative to true diffusion constant value, for all pairs of trajectory lengths and localization errors includes in these two input lists """ size = 8 plt.subplots(1, 3, figsize=(3*size, size)) data1 = np.zeros((len(n_steps_list), len(loc_std))) data2 = np.zeros((len(n_steps_list), len(loc_std))) data3 = np.zeros((len(n_steps_list), len(loc_std))) for n_steps in n_steps_list: for std in loc_std: p_error = get_dim_error([1, 2, 3], d_const, n_steps, dt, n_reps, False, std) ind_steps, ind_error = np.asarray(n_steps_list).searchsorted(n_steps), np.asarray(loc_std).searchsorted(std) data1[ind_steps, ind_error] = np.mean(p_error[0]) data2[ind_steps, ind_error] = np.mean(p_error[1]) data3[ind_steps, ind_error] = np.mean(p_error[2]) df1 = pd.DataFrame(data=data1, columns=loc_std, index=n_steps_list) df2 = pd.DataFrame(data=data2, columns=loc_std, index=n_steps_list) df3 = pd.DataFrame(data=data3, columns=loc_std, index=n_steps_list) return df1, df2, df3
# Plotting support functions
[docs]def get_ticks(tick_values, n_round, n_ticks): """ Round tick values and keep only some ticks to improve readability. :param tick_values: tick values :param n_round: number of decimal places to round to :param n_ticks: number of ticks to keep :return ticks: list of axis tick values to display """ ticks = tick_values.round(n_round) keep_ticks = ticks[::int(len(ticks)/n_ticks)] ticks = ['' for _t in ticks] ticks[::int(len(ticks)/n_ticks)] = keep_ticks return ticks
[docs]def plot_df_results(df1, df2, n_round, n_ticks, size, title1, title2, x_lab, y_lab): """ Plot two df heatmaps as two subplots of one figure. They share x and y axis labels but have differing titles. :param df1: df to visualize :param df2: second df to visualize (often log of df1) :param n_round: number of axis tick decimal places to round to :param n_ticks: number of axis ticks to keep :param size: figure size :param title1: plot title for left (df1) panel :param title2: plot title for left (df2) panel :param x_lab: x axis label :param y_lab: y axis label """ # set y ticks: round and only display some to improve readability y_ticks = get_ticks(df1.index.values, n_round, n_ticks) fig, axs = plt.subplots(1, 2, figsize=(2*size, size)) sns.heatmap(df1, yticklabels=y_ticks, cbar_kws={'label': title1}, ax=axs[0], cmap='viridis') axs[0].set(xlabel=x_lab, ylabel=y_lab, title=title1) axs[0].invert_yaxis() sns.heatmap(df2, yticklabels=y_ticks, cbar_kws={'label': title2}, ax=axs[1], cmap='viridis') axs[1].set(xlabel=x_lab, ylabel=y_lab, title=title2) axs[1].invert_yaxis()