Source code for pyddem.spstats_tools

"""
pyddem.spstats_tools provides tools to derive spatial statistics for elevation change data.
"""
from __future__ import print_function
import os
import numpy as np
import math as m
import skgstat as skg
from scipy import integrate
import multiprocessing as mp
import pandas as pd
import ogr
from pyddem.vector_tools import coord_trans, latlon_to_UTM

def neff_sphsum_circular(area,crange1,psill1,crange2,psill2):

    #short range variogram
    c1 = psill1 # partial sill
    a1 = crange1  # short correlation range

    #long range variogram
    c1_2 = psill2
    a1_2 = crange2 # long correlation range

    h_equiv = np.sqrt(area / np.pi)

    #hypothesis of a circular shape to integrate variogram model
    if h_equiv > a1_2:
        std_err = np.sqrt(c1 * a1 ** 2 / (5 * h_equiv ** 2) + c1_2 * a1_2 ** 2 / (5 * h_equiv ** 2))
    elif (h_equiv < a1_2) and (h_equiv > a1):
        std_err = np.sqrt(c1 * a1 ** 2 / (5 * h_equiv ** 2) + c1_2 * (1-h_equiv / a1_2+1 / 5 * (h_equiv / a1_2) ** 3))
    else:
        std_err = np.sqrt(c1 * (1-h_equiv / a1+1 / 5 * (h_equiv / a1) ** 3) + c1_2 * (1-h_equiv / a1_2+1 / 5 * (h_equiv / a1_2) ** 3))

    return (psill1 + psill2)/std_err**2

[docs]def neff_circ(area,list_vgm): """ Effective number of samples numerically integrated for a sum of variogram functions over an area: circular approximation of Rolstad et al. (2009) :param area: Area :param list_vgm: List of variogram function to sum :returns: Number of effective samples """ psill_tot = 0 for vario in list_vgm: psill_tot += vario[2] def hcov_sum(h): fn = 0 for vario in list_vgm: crange, model, psill = vario fn += h*(cov(h,crange,model=model,psill=psill)) return fn h_equiv = np.sqrt(area / np.pi) full_int = integrate_fun(hcov_sum,0,h_equiv)[0] std_err = np.sqrt(2*np.pi*full_int / area) return psill_tot/std_err**2
[docs]def neff_rect(area,width,crange1,psill1,model1='Sph',crange2=None,psill2=None,model2=None): """ Effective number of samples numerically integrated for a sum of 2 variogram functions over a rectangular area: rectangular approximation of Hugonnet et al. (TBD) :param area: Area :param width: Width of rectangular area :param crange1: Correlation range of first variogram :param psill1: Partial sill of first variogram :param model1: Model of first variogram :param crange2: Correlation range of second variogram :param psill2: Partial sill of second variogram :param model2: Model of second variogram :returns: Number of effective samples """ def hcov_sum(h,crange1=crange1,psill1=psill1,model1=model1,crange2=crange2,psill2=psill2,model2=model2): if crange2 is None or psill2 is None or model2 is None: return h*(cov(h,crange1,model=model1,psill=psill1)) else: return h*(cov(h,crange1,model=model1,psill=psill1)+cov(h,crange2,model=model2,psill=psill2)) width = min(width,area/width) full_int = integrate_fun(hcov_sum,0,width/2)[0] bin_int = np.linspace(width/2,area/width,100) for i in range(len(bin_int)-1): low = bin_int[i] upp = bin_int[i+1] mid = bin_int[i] + (bin_int[i+1]- bin_int[i])/2 piec_int = integrate_fun(hcov_sum, low, upp)[0] full_int += piec_int * 2/np.pi*np.arctan(width/(2*mid)) std_err = np.sqrt(2*np.pi*full_int / area) if crange2 is None or psill2 is None or model2 is None: return psill1 / std_err ** 2 else: return (psill1 + psill2) / std_err ** 2
def integrate_fun(fun,low_limit,up_limit): return integrate.quad(fun,low_limit,up_limit)
[docs]def cov(h,crange,model='Sph',psill=1.,kappa=1/2,nugget=0): """ Compute covariance based on variogram function :param h: Spatial lag :param crange: Correlation range :param model: Model :param psill: Partial sill :param kappa: Smoothing parameter for Exp Class :param nugget: Nugget :returns: Variogram function """ return (nugget + psill) - vgm(h,crange,model=model,psill=psill,kappa=kappa)
[docs]def vgm(h,crange,model='Sph',psill=1.,kappa=1/2,nugget=0): """ Compute variogram model function (Spherical, Exponential, Gaussian or Exponential Class) :param h: Spatial lag :param crange: Correlation range :param model: Model :param psill: Partial sill :param kappa: Smoothing parameter for Exp Class :param nugget: Nugget :returns: Variogram function """ c0 = nugget #nugget c1 = psill #partial sill a1 = crange #correlation range s = kappa #smoothness parameter for Matern class if model == 'Sph': # spherical model if h < a1: vgm = c0 + c1 * (3 / 2 * h / a1-1 / 2 * (h / a1) ** 3) else: vgm = c0 + c1 elif model == 'Exp': # exponential model vgm = c0 + c1 * (1-np.exp(-h / a1)) elif model == 'Gau': # gaussian model vgm = c0 + c1 * (1-np.exp(- (h / a1) ** 2)) elif model == 'Exc': # stable exponential model vgm = c0 + c1 * (1-np.exp(-(h/ a1)**s)) return vgm
def std_err_finite(std, Neff, neff): return std * np.sqrt(1 / Neff * (Neff - neff) / Neff) def std_err(std, Neff): return std * np.sqrt(1 / Neff) def distance_latlon(tup1,tup2): # approximate radius of earth in km R = 6373000 lat1 = m.radians(abs(tup1[1])) lon1 = m.radians(abs(tup1[0])) lat2 = m.radians(abs(tup2[1])) lon2 = m.radians(abs(tup2[0])) dlon = lon2 - lon1 dlat = lat2 - lat1 a = m.sin(dlat / 2)**2 + m.cos(lat1) * m.cos(lat2) * m.sin(dlon / 2)**2 c = 2 * m.atan2(m.sqrt(a), m.sqrt(1 - a)) distance = R * c return distance def kernel_sph(xi,x0,a1): if np.abs(xi - x0) > a1: return 0 else: return 1 - 3 / 2 * np.abs(xi-x0) / a1 + 1 / 2 * (np.abs(xi-x0) / a1) ** 3 def part_covar_sum(argsin): list_tuple_errs, corr_ranges, list_area_tot, list_lat, list_lon, i_range = argsin n = len(list_tuple_errs) part_var_err = 0 for i in i_range: for j in range(n): d = distance_latlon((list_lon[i], list_lat[i]), (list_lon[j], list_lat[j])) for k in range(len(corr_ranges)): part_var_err += kernel_sph(0, d, corr_ranges[k]) * list_tuple_errs[i][k] * list_tuple_errs[j][k] * \ list_area_tot[i] * list_area_tot[j] return part_var_err
[docs]def double_sum_covar(list_tuple_errs, corr_ranges, list_area_tot, list_lat, list_lon,nproc=1): """ Double sum of covariances for propagating multi-range correlated errors for disconnected spatial ensembles :param list_tuple_errs: List of tuples of correlated errors by range, by ensemble :param corr_ranges: List of correlation ranges :param list_area_tot: List of areas of ensembles :param list_lat: Center latitude of ensembles :param list_lon: Center longitude of ensembles :param nproc: Number of cores to use for multiprocessing [1] :returns: """ n = len(list_tuple_errs) if nproc==1: print('Deriving double covariance sum with 1 core...') var_err = 0 for i in range(n): for j in range(n): d = distance_latlon((list_lon[i], list_lat[i]), (list_lon[j], list_lat[j])) for k in range(len(corr_ranges)): var_err += kernel_sph(0, d, corr_ranges[k]) * list_tuple_errs[i][k] * list_tuple_errs[j][k] * \ list_area_tot[i] * list_area_tot[j] else: print('Deriving double covariance sum with '+str(nproc)+' cores...') pack_size = int(np.ceil(n/nproc)) argsin = [(list_tuple_errs,corr_ranges,list_area_tot,list_lon,list_lat,np.arange(i,min(i+pack_size,n))) for k, i in enumerate(np.arange(0,n,pack_size))] pool = mp.Pool(nproc, maxtasksperchild=1) outputs = pool.map(part_covar_sum, argsin, chunksize=1) pool.close() pool.join() var_err = np.sum(np.array(outputs)) area_tot = 0 for j in range(len(list_area_tot)): area_tot += list_area_tot[j] var_err /= np.nansum(area_tot) ** 2 return np.sqrt(var_err)
def point_lonlat_trans(epsg_out,list_tup): trans = coord_trans(False,4326,False,epsg_out) list_tup_out = [] for tup in list_tup: point = ogr.Geometry(ogr.wkbPoint) point.AddPoint(tup[0],tup[1]) point.Transform(trans) coord_out = point.GetPoint()[0:2] list_tup_out.append(coord_out) return list_tup_out def get_spatial_corr(argsin): coords, vals, i, cutoffs, nlags, nmax = argsin if len(coords)>nmax: subset = np.random.choice(len(coords), nmax, replace=False) coords = coords[subset] vals = vals[subset] print('Drawing variograms for pack '+str(i+1)+' with '+str(len(coords))+' points.') arr_shape = (len(cutoffs),nlags) exps, bins, counts = (np.zeros(arr_shape)*np.nan for i in range(3)) for i in range(len(cutoffs)): try: #commented "ignoring maxlag" in skgstat/binning.py V = skg.Variogram(coordinates=coords, values=vals, n_lags=nlags, maxlag=cutoffs[i], normalize=False, model='exponential') except: return np.zeros(arr_shape)*np.nan, np.zeros(arr_shape)*np.nan, np.zeros(arr_shape)*np.nan count = np.zeros(nlags) tmp_count = np.fromiter((g.size for g in V.lag_classes()), dtype=int) count[0:len(tmp_count)]=tmp_count exps[i,:] = V.experimental bins[i,:] = V.bins counts[i,:] = count return exps, bins, counts
[docs]def get_tinterpcorr(df,outfile,cutoffs=[10000,100000,1000000],nlags=100,nproc=1,nmax=10000): """ Sample empirical spatial variograms with time lags to observation :param df: DataFrame of differences between ICESat and GP data aggregated for all regions :param outfile: Filename of csv for outputs :param cutoffs: Maximum successive ranges for sampling variogram :param nlags: Number of lags to sample up to cutoff :param nproc: Number of cores to use for multiprocessing [1] :param nmax: Maximum number of observations to use for pairwise sampling (drawn randomly) :returns: """ #df is a subset dataframe for points of interest, standardized #with an attribute .reg for regions, that must be close enough for UTM zones coordinates to be relevant? list_reg = list(set(list(df.reg.values))) df_out = pd.DataFrame() for k in range(len(list_reg)): print('Working on region: '+str(list_reg[k])) df_reg = df[df.reg == list_reg[k]] #this works for ICESat campaigns, might have to put into close groups of similar dates for IceBridge list_dates = list(set(list(df_reg.t.values))) list_ns = [] list_dt = [] list_camp = [] list_vals = [] list_coords = [] bin_dt = [0, 5, 30, 60, 90, 120, 150, 200, 260, 460, 620, 820, 980, 1180, 1500, 2000, 2500] for i in range(len(list_dates)): print('Pack of dates number ' + str(i + 1) + ' out of ' + str(len(list_dates))+':' +str(list_dates[i])) for j in range(len(bin_dt)-1): print('Day spacing number '+str(j+1)+ ' out of '+str(len(bin_dt)-1)+': '+str(bin_dt[j])+ ' to '+str(bin_dt[j+1])) ind = np.logical_and.reduce((df_reg.t == list_dates[i],np.abs(df_reg.dt) >= bin_dt[j],np.abs(df_reg.dt)< bin_dt[j+1])) df_tmp = df_reg[ind] print('Found ' +str(len(df_tmp))+' observations') vals = df_tmp.dh.values if len(vals)>10: lat = df_tmp.lat lon = df_tmp.lon list_tup = list(zip(lon,lat)) med_lat = np.median(lat) med_lon = np.median(lon) print('Median latitude is:'+str(med_lat)) print('Median longitude is:'+str(med_lon)) print('Transforming coordinates...') epsg, _ = latlon_to_UTM(med_lat,med_lon) list_tup_out = point_lonlat_trans(int(epsg),list_tup) print('Estimating spatial correlation...') list_coords.append(np.array(list_tup_out)) list_vals.append(vals) list_dt.append(bin_dt[j]+0.5*(bin_dt[j+1]-bin_dt[j])) list_ns.append(len(df_tmp)) list_camp.append(list_dates[i]) if len(list_coords)>0: if nproc == 1: print('Processing with 1 core...') list_arr_exps, list_arr_bins, list_arr_counts = ([] for i in range(3)) for i in range(len(list_coords)): exps, bins, counts = get_spatial_corr((list_coords[i],list_vals[i],i,cutoffs,nlags,nmax)) list_arr_exps.append(exps) list_arr_bins.append(bins) list_arr_counts.append(counts) else: print('Processing with '+str(nproc)+' cores...') arglist = [(list_coords[i],list_vals[i],i,cutoffs,nlags,nmax) for i in range(len(list_coords))] pool = mp.Pool(nproc,maxtasksperchild=1) outputs = pool.map(get_spatial_corr,arglist,chunksize=1) pool.close() pool.join() print('Finished processing, compiling results...') zipped = list(zip(*outputs)) list_arr_exps = zipped[0] list_arr_bins = zipped[1] list_arr_counts = zipped[2] for l in range(len(cutoffs)): for c in range(len(list_camp)): df_var = pd.DataFrame() df_var = df_var.assign(reg=[list_reg[k]]*nlags,nb_dt=[list_dt[c]]*nlags,bins=list_arr_bins[c][l,:] ,exp=list_arr_exps[c][l,:],count=list_arr_counts[c][l,:],cutoff=cutoffs[l],t=list_camp[c]) df_out = df_out.append(df_var) df_out.to_csv(outfile)
[docs]def aggregate_tinterpcorr(infile,cutoffs=[10000,100000,1000000]): """ Weighted aggregation of empirical spatial variograms between different regions, with varying time lags and sampling ranges, accounting for the number of pairwise samples drawn :param infile: Filename of sampled variograms :param cutoffs: Maximum successive ranges for sampling variogram :returns: """ # infile = '/home/atom/ongoing/work_worldwide/validation/tinterp_corr.csv' # outfile_reg = '/home/atom/ongoing/work_worldwide/validation/agg_reg_tinterp_corr.csv' # outfile_all = '/home/atom/ongoing/work_worldwide/validation/agg_all_tinterp_corr.csv' outfile_reg = os.path.join(os.path.dirname(infile),os.path.splitext(os.path.basename(infile))[0]+'_agg_reg.csv') outfile_all = os.path.join(os.path.dirname(infile),os.path.splitext(os.path.basename(infile))[0]+'_agg_all.csv') df = pd.read_csv(infile) # df = df[df.reg != 19] bin_dt = [0,5, 30, 60, 90, 120, 150, 200, 260, 460, 620, 820, 980, 1180, 1500, 2000, 2500] list_nb_dt = sorted(list(set(list(df.nb_dt)))) #first aggregate campaigns by region by cutoff by dt list_reg = sorted(list(set(list(df.reg)))) df_agg_reg = pd.DataFrame() for reg in list_reg: df_reg = df[df.reg == reg] for cutoff in cutoffs: df_cutoff = df_reg[df_reg.cutoff == cutoff] for nb_dt in list_nb_dt: df_nb_dt = df_cutoff[df_cutoff.nb_dt == nb_dt] df_nb_dt['exp_count_prod'] = df_nb_dt.exp.values * df_nb_dt['count'].values df_agg = df_nb_dt.groupby('bins',as_index=False)['exp_count_prod','count'].sum() df_agg['exp'] = df_agg['exp_count_prod'].values / df_agg['count'] # df_agg['bins'] = df_agg.index.values df_agg['nb_dt'] = nb_dt df_agg['cutoff'] = cutoff df_agg['reg'] = reg df_agg_reg = df_agg_reg.append(df_agg) df_agg_reg.to_csv(outfile_reg) # df_agg_reg.reset_index() #then, aggregate regions by cutoff by dt df_agg_all = pd.DataFrame() for cutoff in cutoffs: df_cutoff = df_agg_reg[df_agg_reg.cutoff == cutoff] for nb_dt in list_nb_dt: df_nb_dt = df_cutoff[df_cutoff.nb_dt == nb_dt] df_nb_dt['exp_count_prod'] = df_nb_dt.exp.values * df_nb_dt['count'].values df_agg_2 = df_nb_dt.groupby('bins',as_index=False)['exp_count_prod','count'].sum() df_agg_2['exp'] = df_agg_2['exp_count_prod'].values / df_agg_2['count'] # df_agg_2['bins'] = df_agg_2.index.values df_agg_2['nb_dt'] = nb_dt df_agg_2['cutoff'] = cutoff df_agg_all = df_agg_all.append(df_agg_2) df_agg_all.to_csv(outfile_all)