Source code for multipletau_cor_tttr.correlate

# Copyright (c) 2016 Anders Barth
from __future__ import division
import ctypes
import numpy as np
import warnings
import os

# load shared library
_CCF = ctypes.CDLL(os.path.join(os.path.dirname(__file__), 'CCF.so'))


[docs]def _CCF_inC(t1, t2, nc, nb, timeaxis): """ Wrapper function to communicate between python and C using ctypes library. The returned array yields the correlation of intensity fluctuations, decaying to zero. Parameters: * t1: Numpy arrays of photon arrival times in channel 1 (integer type) * t2: Numpy arrays of photon arrival times in channel 2 (integer type) * nc: Number of time points per logarithmic step * nb: Number of logarithmic steps * timeaxis: Logarithmic timeaxis as defined by nc and nb Return: * corr_res: 1d array of correlation result """ global _CCF # read out number of photons and max time np1 = np.size(t1) np2 = np.size(t2) maxT = np.max([t1[-1], t2[-1]]) # convert numpy arrays to python lists so they can be converted to ctypes w1 = np.ones(np1).tolist() w2 = np.ones(np2).tolist() t1 = t1.tolist() t2 = t2.tolist() timeaxis = timeaxis.tolist() # initialize output zer = np.zeros(len(timeaxis)).tolist() corrl = (ctypes.c_double * len(timeaxis))(*zer) # call C function with converted data types _CCF.CCF((ctypes.c_int64 * np1)(*t1), (ctypes.c_int64 * np2)(*t2), (ctypes.c_double * np1)(*w1), (ctypes.c_double * np2)(*w2), ctypes.c_int(nc), ctypes.c_int(nb), ctypes.c_int(np1), ctypes.c_int(np2), (ctypes.c_int * len(timeaxis))(*timeaxis), ctypes.byref(corrl)) # convert output back to numpy array corrl = np.array(corrl) # perform normalizing divisor = np.ones(np.size(timeaxis), dtype='int') divisor[(2 * nc + 1):] = 2 ** (np.floor((np.arange(nc, (np.size(divisor) + 1) - (nc + 2))) / nc)) corr_res = corrl / divisor / (maxT - timeaxis) / (np1 / t1[-1]) / (np2 / t2[-1]) - 1 return corr_res
[docs]def CCF(t1, t2, nblock=10, nc=10, nb='auto'): """ Performs crosscorrelation of time-tagged photon data t1 and t2 using semi-logarithmic timeaxis with nb logarithmic levels and nc equally spaced timebins per level. Error estimation is performed by splitting the measurement into nblock time segments of equal length and taking the standard error of mean. The returned array yields the correlation of intensity fluctuations, decaying to zero. Parameters: * t1: Numpy arrays of photon arrival times in channel 1 (integer type) * t2: Numpy arrays of photon arrival times in channel 2 (integer type) * nblock: Number of blocks used for error estimation. (Default: 10) * nc: Number of time points per logarithmic level. (Default: 10) * nb: Number of logarithmic levels. 'auto' takes the maximum possible lagtime to calculate nb. Return: * mcorr: 1d array of correlation result * stdcorr: Standard error of mean of correlation result * timeaxis: Timeaxis """ # Check inputs and convert if feasible if not isinstance(t1, np.ndarray): t1 = np.array(t1) warnings.warn("Input array 1 is not a numpy array, converting...") if not isinstance(t2, np.ndarray): t2 = np.array(t2) warnings.warn("Input array 2 is not a numpy array, converting...") if t1.dtype.kind not in ['i', 'u']: t1 = t1.astype(int) warnings.warn("Input array 1 is not of integer type, converting...") if t2.dtype.kind not in ['i', 'u']: t2 = t2.astype(int) warnings.warn("Input array 2 is not of integer type, converting...") # define blocks maxT = np.max([t1[-1], t2[-1]]) blocks = np.linspace(0, np.max([t1[-1], t2[-1]]), nblock + 1).astype(int) # preprocess timeaxis block_time = np.floor(maxT / nblock) if nb is 'auto': timeaxis_exponent = np.floor(np.log2(block_time / nc)).astype(int) nb = timeaxis_exponent.astype(int) else: timeaxis_exponent = nb timeaxis = np.ones(nc * (timeaxis_exponent + 1)) timeaxis *= 2 ** np.floor((np.arange(np.size(timeaxis))) / nc - 1) timeaxis[timeaxis < 1] = 1 timeaxis = np.concatenate([np.array([1]), timeaxis]) timeaxis = np.cumsum(timeaxis).astype(int) corr = np.zeros((nblock, np.size(timeaxis))) for i in range(nblock): corr[i, :] = _CCF_inC(t1[(t1 > blocks[i]) & (t1 <= blocks[i + 1])] - blocks[i], t2[(t2 > blocks[i]) & (t2 <= blocks[i + 1])] - blocks[i], nc, nb, timeaxis) # replace -1 occurrences with 0 for time lags that are not realized corr[i, (np.size(timeaxis) - np.where(corr[i][::-1] != -1)[0][0]):] = 0 # remove zeros at end valid = np.sum((corr != 0).all(axis=0)) corr = corr[:, :valid] timeaxis = timeaxis[:valid] # average and standard deviation mcorr = np.mean(corr, axis=0) # calculate std on normalized curves! corr_norm = np.zeros((nblock, np.size(timeaxis))) area = np.sum(corr, axis=1) for i in range(np.size(corr, axis=0)): corr_norm[i, :] = np.mean(area) * corr[i, :] / area[i] stdcorr = np.std(corr_norm, axis=0) / np.sqrt(nblock) # first time bin is actually time lag zero, correct for this: timeaxis[21:] -= 1 return mcorr, stdcorr, timeaxis