Source code for aidapy.aidaxr.vdf

"""AIDA module responsible for the vdf handling.
etienne.behar
"""
import sys
import time
from datetime import timedelta
import matplotlib as mpl
import numpy as np
from matplotlib.dates import date2num
import xarray as xr
from scipy.interpolate import RegularGridInterpolator
import scipy.stats
from scipy.interpolate import interp1d
from scipy.spatial.transform import Rotation as R
try:
    import tricubic
    tricubic_imported = True
except ModuleNotFoundError:
    tricubic_imported = False
import aidapy.tools.vdf_plot as vplt
import aidapy.tools.vdf_utils as vdfu


[docs]@xr.register_dataset_accessor('vdf') class AidaAccessorVDF: """ Xarray accessor responsible for the vdf utilities. """ def __init__(self, xarray_obj): self._obj = xarray_obj self.settings = xarray_obj.attrs['load_settings'] self.R_eci_to_gse = None self.R_gse_to_eci = None self.R_eci_to_gse = None self.R_gse_to_eci = None self.R_eci_to_dbcs = None
[docs] def interpolate(self, start_time, end_time, start_time_sub, end_time_sub, species='electron', frame='instrument', grid_geom='spher', v_max=None, resolution=60, interp_schem='near', verbose=True): """ Main method of the AidaAccessorVDF. Pre-processes the loaded data, initialises the rotation matrices and translations vectors, selects a time sub-interval, interpolates the data, post-process them and save them as new variables of the xarray dataset. Parameters ---------- start_time : datetime.datetime The start time of the global period of interst. end_time : datetime.datetime The end time of the global period of interest. start_time_sub : datetime.datetime The start time of the local period of interest. end_time_sub : datetime.datetime The end time of the local period of interest. species : str The species to be analysed, either 'electron' or 'ion' frame : str The reference frame in which the data will be analysed and visualised. grid_geom : str The geometry of the grid-of-interest, along which VDF values will be interpolated. Either 'cart' for cartesian geometry, 'spher' or spherical, or 'cyl' for cylindrical. v_max : float The maximum extend of the grid-of-interest. If None, taken as the maximum instrument speed coverage. resolution : int Resolution of the grid-of-interest. Advised is between 50 and 200 for reasonable quality and computation time. interp_schem : str Inteprolation scheme used by the method. Either 'near' for nearest-neighbour, 'lin' for tri-linear, or 'cub' for tri-cubic interpolation. The two first are provided by scipy (https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.RegularGridInterpolator.html) and the last by the tricubic package (https://pypi.org/project/tricubic/). Returns ------- xr_mms : xarray dataset The xarray dataset. The interpolation products are added to the dataset at the end of the method. """ vdf0, time_par, speed, theta, phi,\ nb_vdf, B_gse_par = self._preprocess_data(species) if v_max is None: v_max = np.nanmax(speed) self._check_inputs(time_par, start_time, end_time, start_time_sub, end_time_sub, grid_geom) self._init_rotations_matrices(time_par, start_time, end_time) R_b_to_dbcs, R_dbcs_to_b = self._init_R_b_to_dbcs(time_par, B_gse_par) ibulkv_dbcs_par = self._init_ibulkv(species, time_par) if frame == 'B_electron': ebulkv_b = self._init_ebulkv(R_dbcs_to_b) ind_dis_oi, ind_start, ind_stop = self._time_sub_range(nb_vdf, time_par, start_time_sub, end_time_sub) if verbose: self._verbosity(grid_geom, resolution, interp_schem, time_par, ind_dis_oi, ind_start, ind_stop) grid_cart, grid_spher, grid_cyl, dvvv = \ vdfu.init_grid(v_max, resolution, grid_geom) # vdf_interp will contain the interpolated, averaged, values, # along time and the 3 velocity dimensions. vdf_interp = np.zeros((resolution, resolution, resolution)) vdf_scaled = np.zeros((resolution, resolution, resolution)) vdf_normed = np.zeros((resolution, resolution, resolution)) vdf_interp_time = np.zeros((ind_dis_oi.size, resolution, resolution), dtype=np.float32) vdf_scaled_time = np.zeros((ind_dis_oi.size, resolution, resolution), dtype=np.float32) vdf_normed_time = np.zeros((ind_dis_oi.size, resolution, resolution), dtype=np.float32) time_interp = time_par[ind_dis_oi] nb_dis_final = 0 nb_pix_final = np.zeros_like(vdf_interp) # Re-cacluclating moments based on integrated interpolated VDF values. ne = np.zeros(nb_vdf) ve = np.zeros((nb_vdf, 3)) # From here, whatever coordinate system we chose is of no importance, # as each node of the grid-of-interest will be considered separately. tic = time.time() # Loop over the scans, since I couldn't find a way to vectorize # the np.dot() method, or the interpolations. for i, iS in enumerate(ind_dis_oi): if (iS % 10 == 0)*verbose: print('{}/{} (current VDF/nb total VDFs)'.format(iS, nb_vdf), end='\r') if frame != 'B_electron': grid_s = self._transform_grid(grid_cart, R_b_to_dbcs[iS], ibulkv_dbcs_par[:, iS], frame) else: grid_s = self._transform_grid(grid_cart, R_b_to_dbcs[iS], ibulkv_dbcs_par[:, iS], frame, ebulkv_b[:, iS]) d = self._interpolate_spher_mms(vdf0[iS], speed, theta, phi, grid_s, interp_schem) nb_dis_final += 1 nb_pix_final += (~np.isnan(d)) d[np.isnan(d)] = 0. if np.nanmax(d) == 0.: print(iS, 'np.nanmax(d) == 0. Either empty data' ' or grid-of-interest wrongly set.') # Integrated moments. ne[nb_dis_final-1] = np.nansum(dvvv * d) ve[nb_dis_final-1] = np.nansum(grid_cart * d * dvvv, axis=(1, 2, 3))\ / ne[nb_dis_final-1] # Original VDF. vdf_interp += d if grid_geom == 'spher': vdf_interp_time[i] = np.nanmean(d, axis=2) # 0-to-1 scaling ds = d - np.nanmin(d, axis=(1, 2))[:, None, None] ds /= np.nanmax(ds, axis=(1, 2))[:, None, None] vdf_scaled_time[i] = np.nanmean(ds, axis=2) # Normalisation ind_mid = int(resolution/2.) wid = int(5.*resolution/180.) dn = d/np.nanmean(d[:, ind_mid-wid: ind_mid+wid], axis=(1, 2))[:, None, None] vdf_normed_time[i] = np.nanmean(dn, axis=2) elif grid_geom == 'cyl': vdf_interp_time[i] = np.nanmean(d, axis=1) elif grid_geom == 'cart': # This is far from ideal, we drop one dimension for saving memory. # If it is acceptable in spherical to average over the gyro-angle # it is not satisfying in this cartesian case. At least, we drop v_y # and save v_z=v_para. vdf_interp_time[i] = np.nanmean(d, axis=1) if verbose: print('') print('Total runtime: {} s.\n'.format(int(time.time()-tic))) # For now vdf_interp contains the sum all distributions. We need to smartly # average these values, without dividing by zeros. We use a threshold # on the minimum amount of valid vdf values per grid node: nb_pix_min. nb_pix_min = min(len(ind_dis_oi)-1, 100) itk = (nb_pix_final > nb_pix_min) vdf_interp[itk] /= nb_pix_final[itk] vdf_interp[~itk] = 0. # if grid_geom == 'spher': vdf_scaled = vdf_interp - np.nanmin(vdf_interp, axis=(1, 2))[:, None, None] vdf_scaled /= np.nanmax(vdf_scaled, axis=(1, 2))[:, None, None] ind_mid = int(resolution/2.) wid = int(5.*resolution/180.) vdf_normed = vdf_interp.copy() vdf_normed /= np.nanmean(vdf_normed[:, ind_mid-wid: ind_mid+wid], axis=(1, 2))[:, None, None] self._save_xarray_variables(vdf_interp, vdf_scaled, vdf_normed, vdf_interp_time, vdf_scaled_time, vdf_normed_time, grid_cart, grid_spher, grid_cyl, time_interp, grid_geom) return self._obj
def _preprocess_data(self, species): """ Numpyse the loaded data, and pre-format them: time averaging or interpolating data of different time resolution, frame management, etc. """ if self.settings['mode'] == 'low_res': mode_mms_vdf = 'fast' elif self.settings['mode'] == 'high_res': mode_mms_vdf = 'brst' if len(self.settings['probes']) > 1: raise NotImplementedError('VDF interpolations are only implemented\ for one probe at a time for now.') probe_ID = self.settings['probes'][0] if species == 'ion': spec_key = 'i_dist{}'.format(probe_ID) spec_mms_vdf = 'dis' m = 1.660538e-27 # kg e = 1.60217e-19 # C elif species == 'electron': spec_key = 'e_dist{}'.format(probe_ID) spec_mms_vdf = 'des' m = 9.10938356e-31 # kg e = -1.60217e-19 # C else: raise ValueError(' In AidaAccessorVDF: variable \'species\' should \ be either \'ion\' or \'electron\'') vdf0 = self._obj[spec_key].values.copy() # Swap energy and longitude angle dimensions to comply with the # ISO standard for spherical coordinate system: (rho, theta, phi). vdf0 = np.swapaxes(vdf0, 1, 3) time_par = self._obj[spec_key].coords[self._get_time_key(spec_key)].values.copy() key_str = 'mms{}_{}_energy_{}'.format(probe_ID, spec_mms_vdf, mode_mms_vdf) energy = self._obj[spec_key][key_str].values.copy() key_str = 'mms{}_{}_phi_{}'.format(probe_ID, spec_mms_vdf, mode_mms_vdf) phi = self._obj[spec_key][key_str].values.copy() key_str = 'mms{}_{}_theta_{}'.format(probe_ID, spec_mms_vdf, mode_mms_vdf) theta = self._obj[spec_key][key_str].values.copy() # Speed vector, m/s. vel.shape = 32x3000 speed = np.sqrt(2 * energy * np.abs(e) / m) nb_vdf = time_par.size # key_str = 'dc_mag{}'.format(probe_ID) B_gse = self._obj[key_str].values[:, :3].T.copy() time_B = self._obj[key_str].coords[self._get_time_key(key_str)].values B_gse_par = self._time_average(B_gse, time_B, time_par) # Going for SI vdf0 *= 1.e12 # cm^-6 to m^-6 B_gse *= 1e-9 # nT to T B_gse_par *= 1e-9 # nT to T phi *= np.pi / 180 # Degree to radian theta *= np.pi / 180 # Degree to radian return vdf0, time_par, speed, theta, phi, nb_vdf, B_gse_par def _get_time_key(self, variable_key): """ Search for the time key, not knowing it's number. """ for k in self._obj[variable_key].coords: if 'time' in k: key_time = k return key_time def _verbosity(self, grid_geom, resolution, interp_schem, time_par, ind_dis_oi, ind_start, ind_stop): """ Some _verbosity for the main method. """ print('\n') print('.____________________________________________________________') print('| mms_vdf.py, aidapy.') print('|') print('| Product(s):') for p in self.settings['prod']: print('| - {}'.format(p)) print('| Grid geometry: {}'.format(grid_geom)) print('| Resolution: {}'.format(resolution)) print('| Interpolation: {}'.format(interp_schem)) print('| Start time: {}'.format(time_par[ind_dis_oi[0]])) print('| Stop time : {}'.format(time_par[ind_dis_oi[-1]+1])) print('| Ind. start-stop: {}-{}'.format(ind_start, ind_stop)) print('| Nb distributions: {}'.format(len(ind_dis_oi))) print('|____________________________________________________________') print('\n') @staticmethod def _check_inputs(time_par, start_time, end_time, start_time_sub, end_time_sub, grid_geom): """ Various tests on the interpolation inputs (type, value, etc.) """ if grid_geom not in ['cart', 'spher', 'cyl']: raise NotImplementedError('Coordinate system -- {} -- ' 'not implemented.'.format(grid_geom)) if (date2num(time_par[0]) > date2num(start_time_sub)) or \ (date2num(time_par[-1]) < date2num(end_time_sub)): raise ValueError('The chosen time sub_interval falls partially ' 'or entirely outside the time interval covered by' ' the file.') if date2num(start_time_sub) >= date2num(end_time_sub): raise ValueError('The bounds of the time sub-interval are not' ' chrono-logical.') if date2num(start_time+timedelta(seconds=60)) > date2num(end_time): raise ValueError('The time interval must be larger than one ' 'minute.') #if end_time_sub-start_time_sub<30: # raise ValueError('The chose time sub-interval is shorter than ' # 'one FPI measurement.') @staticmethod def _check_interp_scheme(interp_schem): if interp_schem not in ['near', 'lin', 'cub']: raise NotImplementedError('Interpolation schem -- {} -- ' 'not implemented.'.format(interp_schem)) def _init_rotations_matrices(self, time_par, start_time, end_time): """ Returns rotations matrices interpolated at time_par. """ probe_ID = self.settings['probes'][0] key_str = 'sc_att{}'.format(probe_ID) quat_eci_to_gse = self._obj[key_str].values time_quat = self._obj[key_str].coords[self._get_time_key(key_str)].values #################### settings = {'prod': ['sc_att'], 'probes': [probe_ID], 'coords': 'gse', 'mode': 'high_res', 'frame':'dbcs'} from aidapy import load_data xr_mms_tmp = load_data(mission='mms', start_time=start_time, end_time=end_time, **settings) quat_eci_to_dbcs = xr_mms_tmp['sc_att{}'.format(probe_ID)].values # time_quat2 = self._obj['sc_att1'].coords['time1'].values ##################### f = interp1d(mpl.dates.date2num(time_quat), quat_eci_to_gse.T, bounds_error=False, fill_value=np.nan) quat_interp = f(mpl.dates.date2num(time_par)) itk = np.isnan(quat_interp[0]) quat_interp[:, itk] = 1. r = R.from_quat(quat_interp.T) self.R_eci_to_gse = r.as_matrix() self.R_gse_to_eci = np.transpose(self.R_eci_to_gse, axes=(0, 2, 1)) self.R_eci_to_gse[itk] *= np.nan self.R_gse_to_eci[itk] *= np.nan # f = interp1d(mpl.dates.date2num(time_quat), quat_eci_to_dbcs.T, bounds_error=False, fill_value=np.nan) quat_interp = f(mpl.dates.date2num(time_par)) itk = np.isnan(quat_interp[0]) quat_interp[:, itk] = 1. r = R.from_quat(quat_interp.T) self.R_eci_to_dbcs = r.as_matrix() R_dbcs_to_eci = np.transpose(self.R_eci_to_dbcs, axes=(0, 2, 1)) self.R_eci_to_dbcs[itk] *= np.nan R_dbcs_to_eci[itk] *= np.nan def _init_R_b_to_dbcs(self, time_par, B_gse_par): """Returns the main rotation matrix, from the frame-of-interest aligned with the B-field, to the instrument frame, DBCS. Parameters ---------- R_b_to_dbcs rotation matrix from b to dbcs (N, 3, 3) time_par particle timestamps vector, (N,) B_gse_par B-field in GSE at particles's time, (3, N) """ B_eci_par = np.array([np.dot(self.R_gse_to_eci[i], B_gse_par[:, i]) for i in np.arange(time_par.size)]).T B_dbcs_par = np.array([np.dot(self.R_eci_to_dbcs[i], B_eci_par[:, i]) for i in np.arange(time_par.size)]).T R_dbcs_to_b = np.zeros((time_par.size, 3, 3)) R_b_to_dbcs = np.zeros((time_par.size, 3, 3)) for i in np.arange(time_par.size): R_dbcs_to_b[i] = vdfu.R_2vect(B_dbcs_par[:, i], np.array([0, 0, 1])) R_b_to_dbcs[i] = vdfu.R_2vect(np.array([0, 0, 1]), B_dbcs_par[:, i]) return R_b_to_dbcs, R_dbcs_to_b def _init_ibulkv(self, species, time_par): """ Returns the ion bulk velocity at the particles' time in the instrument frame DBCS, i.e. either ions or electrons. In the latter case, the velocity is linearly interpolated at electrons timestamps. Parameters ---------- ibulkv_dbcs_par ion bulk velocity in DBCS, (3, N) species: str species of interest time_par: timestamps of particles, (N,) """ probe_ID = self.settings['probes'][0] ibulk_key = 'i_bulkv{}'.format(probe_ID) ibulkv_gse = self._obj[ibulk_key].values.T time_i = self._obj[ibulk_key].coords[self._get_time_key(ibulk_key)].values if species == 'electron': f = interp1d(mpl.dates.date2num(time_i), ibulkv_gse, bounds_error=False, fill_value=np.nan) ibulkv_gse_par = f(mpl.dates.date2num(time_par)) elif species == 'ion': ibulkv_gse_par = ibulkv_gse ibulkv_eci_par = np.array([np.dot(self.R_gse_to_eci[i], ibulkv_gse_par[:, i]) for i in np.arange(time_par.size)]).T ibulkv_dbcs_par = np.array([np.dot(self.R_eci_to_dbcs[i], ibulkv_eci_par[:, i]) for i in np.arange(time_par.size)]).T # From km/s to m/s: ibulkv_dbcs_par *= 1.e3 return ibulkv_dbcs_par def _init_ebulkv(self, R_dbcs_to_b): """ Returns the electron bulk velocity in the B-field aligned frame. """ probe_ID = self.settings['probes'][0] ebulk_key = 'e_bulkv{}'.format(probe_ID) ebulkv_gse = self._obj[ebulk_key].values.T time_e = self._obj[ebulk_key].coords[self._get_time_key(ebulk_key)].values ebulkv_eci = np.array([np.dot(self.R_gse_to_eci[i], ebulkv_gse[:, i]) for i in np.arange(ebulkv_gse.shape[1])]).T ebulkv_dbcs = np.array([np.dot(self.R_eci_to_dbcs[i], ebulkv_eci[:, i]) for i in np.arange(ebulkv_gse.shape[1])]).T ebulkv_b = np.array([np.dot(R_dbcs_to_b[i], ebulkv_dbcs[:, i]) for i in np.arange(ebulkv_gse.shape[1])]).T # From km/s to m/s: ebulkv_b *= 1.e3 return ebulkv_b @staticmethod def _time_sub_range(nb_vdf, time_par, start_time_sub, end_time_sub): """ Select a sub-time-range and the corresponding indices of interest, ind_dis_oi: "indices of the vdfutions Of Interest" """ ind_dis_oi = np.arange(nb_vdf, dtype=int) ind_start = np.where(date2num(time_par) > date2num(start_time_sub))[0][0] ind_stop = np.where(date2num(time_par) > date2num(end_time_sub))[0][0] ind_dis_oi = ind_dis_oi[ind_start: ind_stop] return ind_dis_oi, ind_start, ind_stop @staticmethod def _time_average(v, time_v, time_oi): """ Returns a degraded-time-resolution version of v, given on the timestamps contained by time_oi. Values are binned and averaged. v.shape = (3,t), time_v.shape = (t,) """ v_av = np.zeros((3, time_oi.size)) dt = time_oi[1]-time_oi[0] time_bins = np.hstack((time_oi-dt*.5, time_oi[-1]+.5*dt)) itk = (~np.isnan(v[0])) stat = scipy.stats.binned_statistic(mpl.dates.date2num(time_v[itk]), v[0, itk], statistic='mean', bins=mpl.dates.date2num(time_bins)) stat.statistic[np.isnan(stat.statistic)] = 0 v_av[0] = stat.statistic stat = scipy.stats.binned_statistic(mpl.dates.date2num(time_v[itk]), v[1, itk], statistic='mean', bins=mpl.dates.date2num(time_bins)) stat.statistic[np.isnan(stat.statistic)] = 0 v_av[1] = stat.statistic stat = scipy.stats.binned_statistic(mpl.dates.date2num(time_v[itk]), v[2, itk], statistic='mean', bins=mpl.dates.date2num(time_bins)) stat.statistic[np.isnan(stat.statistic)] = 0 v_av[2] = stat.statistic return v_av @classmethod def _transform_grid(cls, grid_cart, R_b_to_dbcs, ibulkv_dbcs_par, frame, ebulkv_b=None): """Transforms the grid from the frame of interest to the instrument frame. Returns the grid expressed in the spherical system. Parameters ---------- grid_s the transformed grid, in spherical coordinates, (3, N, N, N) grid_cart cartesian interpolation grid, (3, N, N, N) R_b_to_dbcs rotation matrix from B to DBCS, (3, 3) ibulkv_dbcs_par ion bulk velocity at particles' time, (3,) frame: str frame of interest. ebulkv_dbcs electron bulk velocity at particles' time, optional, (3,) """ # This copy of grid_cart will be rotated, every scan differently. grid_c = grid_cart.copy() # For now grid_c is in the frame of interest, EB, GSE, or any. if frame == 'instrument': pass elif frame == 'B': grid_c = np.dot(R_b_to_dbcs, grid_c.reshape(3, -1)) ## Rotation. grid_c = grid_c.reshape(grid_cart.shape) grid_c += ibulkv_dbcs_par[:, None, None, None] ## Translation. elif frame == 'B_electron': if ebulkv_b is None: raise ValueError('ebulkv_b must be given as input when frame\ B_electron is used.\n') psi = np.arctan2(ebulkv_b[1], ebulkv_b[0]) grid_c = vdfu.Rz(grid_c.reshape(3, -1), psi) ## Rotation. grid_c = np.dot(R_b_to_dbcs, grid_c)#.reshape(3, -1)) ## Rotation. grid_c = grid_c.reshape(grid_cart.shape) grid_c += ibulkv_dbcs_par[:, None, None, None] ## Translation. else: raise ValueError('{}: unknown frame.'.format(frame)) # We now go for the spherical system, the natural instrument # coordinate system. Here, the -1 is reversing velocity vector to # viewing direction, in which the instrument tables are expressed. grid_s = vdfu.cart2spher(-1*grid_c) return grid_s @staticmethod def _interpolate_spher_mms(vdf0, speed, theta, phi, grid_s, interp_schem): """Interpolates particles' VDF, tailored for MMS data.""" vdf_interp_shape = grid_s.shape[1:] # Preparing the interpoplation. phi_period = np.zeros(34) phi_period[1:-1] = phi phi_period[0] = phi[-1] - 2 * np.pi phi_period[-1] = phi[0] + 2 * np.pi theta_period = np.zeros(18) theta_period[1:-1] = theta theta_period[0] = theta[-1] - np.pi theta_period[-1] = theta[0] + np.pi vdf_period = np.zeros((32, 18, 34)) vdf_period[:, 1:-1, 1:-1] = vdf0 vdf_period[:, 1:-1, 0] = vdf0[:, :, -1] vdf_period[:, 1:-1, -1] = vdf0[:, :, 0] vdf_period[:, 0] = vdf_period[:, 1] vdf_period[:, 17] = vdf_period[:, 16] itkR = ~np.isnan(speed) if 0: ## For "partial" moments. itkRtmp = speed < 4e6 vdf_period[itkRtmp] = 0. # INTERPOLATION! if interp_schem in ['near', 'lin']: if interp_schem == 'near': interp_schem_str = 'nearest' elif interp_schem == 'lin': interp_schem_str = 'linear' interp_func = RegularGridInterpolator((speed[itkR], theta_period, phi_period), (vdf_period[itkR]), bounds_error=False, method=interp_schem_str, fill_value=np.nan) d = interp_func(grid_s.reshape(3, -1).T) d = d.T.reshape(vdf_interp_shape) # (res,res,res) elif interp_schem == 'cub': if not tricubic_imported: raise NotImplementedError('The tricubic module was not found. ' 'Try: pip install tricubic') d = np.zeros(vdf_interp_shape).flatten() ip = tricubic.tricubic(list(vdf_period), [vdf_period.shape[0], vdf_period.shape[1], vdf_period.shape[2]]) ds = speed[1:]-speed[:-1] delta_theta = theta[1]-theta[0] delta_phi = phi[1]-phi[0] vMin_theta = 0. vMin_phi = 0. # bi = np.digitize(grid_s[0], speed)-1 grid_s[0] = bi + (grid_s[0]-speed[bi])/ds[bi] grid_s[1] = (grid_s[1]-vMin_theta)/delta_theta + .5 grid_s[2] = (grid_s[2]-vMin_phi)/delta_phi + .5 for j, node in enumerate(grid_s.reshape((3, -1)).T): d[j] = ip.ip(list(node)) d = d.reshape(vdf_interp_shape) # "fill_value". Should also be done for values larger than, # and not only smaller than. d[grid_s[0] < 0] = np.nan return d def _save_xarray_variables(self, vdf_interp, vdf_scaled, vdf_normed, vdf_interp_time, vdf_scaled_time, vdf_normed_time, grid_cart, grid_spher, grid_cyl, time_interp, grid_geom): """Creating and adding variables to the xarray dataset""" try: self._obj = self._obj.drop('vdf_interp_time') self._obj = self._obj.drop('vdf_interp') self._obj = self._obj.drop('grid_interp_cart') self._obj = self._obj.drop('grid_interp_spher') self._obj = self._obj.drop('grid_interp_cyl') self._obj = self._obj.drop('time_interp') self._obj = self._obj.drop('vdf_scaled_time') self._obj = self._obj.drop('vdf_normed_time') self._obj = self._obj.drop('vdf_scaled') self._obj = self._obj.drop('vdf_normed') self._obj = self._obj.drop('grid_geom') except (ValueError, KeyError) as e: pass if grid_geom == 'spher': xrdit = xr.DataArray(vdf_interp_time, dims=['time', 'speed', 'phi']) xrdst = xr.DataArray(vdf_scaled_time, dims=['time', 'speed', 'phi']) xrdnt = xr.DataArray(vdf_normed_time, dims=['time', 'speed', 'phi']) xrdi = xr.DataArray(vdf_interp, dims=['speed', 'theta', 'phi']) xrds = xr.DataArray(vdf_scaled, dims=['speed', 'theta', 'phi']) xrdn = xr.DataArray(vdf_normed, dims=['speed', 'theta', 'phi']) self._obj['vdf_interp_time'] = xrdit self._obj['vdf_scaled_time'] = xrdst self._obj['vdf_normed_time'] = xrdnt self._obj['vdf_interp'] = xrdi self._obj['vdf_scaled'] = xrds self._obj['vdf_normed'] = xrdn elif grid_geom == 'cart': xrdit = xr.DataArray(vdf_interp_time, dims=['time', 'vx', 'vz']) xrdi = xr.DataArray(vdf_interp, dims=['vx', 'vy', 'vz']) self._obj['vdf_interp_time'] = xrdit self._obj['vdf_interp'] = xrdi elif grid_geom == 'cyl': xrdit = xr.DataArray(vdf_interp_time, dims=['time', 'v_perp', 'v_para']) xrdi = xr.DataArray(vdf_interp, dims=['v_perp', 'phi', 'v_para']) self._obj['vdf_interp_time'] = xrdit self._obj['vdf_interp'] = xrdi xr_grid_c = xr.DataArray(grid_cart, dims=['v', 'vx', 'vy', 'vz']) self._obj['grid_interp_cart'] = xr_grid_c xr_grid_s = xr.DataArray(grid_spher, dims=['v', 'speed', 'theta', 'phi']) self._obj['grid_interp_spher'] = xr_grid_s xr_grid_cy = xr.DataArray(grid_cyl, dims=['v', 'v_perp', 'phi', 'v_para']) self._obj['grid_interp_cyl'] = xr_grid_cy xr_time = xr.DataArray(time_interp, dims=['time']) self._obj['time_interp'] = xr_time self._obj['grid_geom'] = grid_geom
[docs] def plot(self, ptype='1d', plt_contourf=False, cmap='RdBu_r'): """Calls to plotting methods from aidapy.tools.vdf_plot Parameters ---------- ptype : str Type of plot, '1d', '2d', '3d', '3d_time' or '3d_gyro' plt_contourf : bool If True, colormeshes are plotted using a filled contour method. Default is False. cmap : str Valid matplotlib colormap string. Default is 'RdBu_r'. """ grid_geom = self._obj['grid_geom'].values raise_error = False if ptype == '1d': if grid_geom == 'spher': vplt.profiles_1d(self._obj['vdf_interp'].values, self._obj['grid_interp_spher'].values) else: raise_error = True elif ptype == '3d_time': if grid_geom == 'spher': vplt.spher_time(self._obj['vdf_interp_time'].values, self._obj['vdf_scaled_time'].values, self._obj['vdf_normed_time'].values, self._obj['grid_interp_spher'].values, self._obj['time_interp'].values, plt_contourf=plt_contourf, cmap=cmap) else: raise_error = True elif ptype == '3d':#to be integrated in vplt.spher! if grid_geom == 'cart': vplt.cart(self._obj['vdf_interp'].values, self._obj['grid_interp_cart'].values, plt_contourf=plt_contourf, cmap=cmap) else: raise_error = True elif ptype == '2d': if grid_geom == 'spher': vplt.spher(self._obj['vdf_interp'].values, self._obj['grid_interp_cart'].values, plt_contourf=plt_contourf, cmap=cmap) else: raise_error = True elif ptype == '3d_gyro': if grid_geom == 'spher': vplt.gyro(self._obj['vdf_interp'].values, self._obj['grid_interp_spher'].values, self._obj['grid_interp_cart'].values, cmap=cmap) else: raise_error = True elif ptype == '2d_gyro': if grid_geom == 'spher': vplt.xy_plane(self._obj['vdf_interp'].values, self._obj['grid_interp_spher'].values, self._obj['grid_interp_cart'].values, cmap=cmap) else: raise_error = True else: raise ValueError('Plot type {} does not exists.'.format(ptype)) if raise_error: raise ValueError('Plot type {} not available for grid ' 'geometry {}'.format(ptype, grid_geom))