Python xarray 模块,Dataset() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xarray.Dataset()。
def test_snapshot_vars(self, model):
ds = xr.Dataset()
ds['clock'] = ('clock', [0, 2, 4, 6, 8],
{self._clock_key: 1, self._master_clock_key: 1})
ds['snap_clock'] = ('snap_clock', {self._clock_key: 1})
# snapshot clock with no snapshot variable (attribute) set
ds['snap_clock2'] = ('snap_clock2', {self._clock_key: 1})
ds.xsimlab._set_snapshot_vars(model, None, grid='x')
ds.xsimlab._set_snapshot_vars(model, 'clock', quantity='quantity')
ds.xsimlab._set_snapshot_vars(model, 'snap_clock',
other_process=('other_effect', 'x2'))
expected = {None: set([('grid', 'x')]),
'clock': set([('quantity', 'quantity')]),
'snap_clock': set([('other_process', 'other_effect'),
('other_process', 'x2')])}
actual = {k: set(v) for k, v in ds.xsimlab.snapshot_vars.items()}
assert actual == expected
def input_dataset():
clock_key = SimlabAccessor._clock_key
mclock_key = SimlabAccessor._master_clock_key
svars_key = SimlabAccessor._snapshot_vars_key
ds = xr.Dataset()
ds['clock'] = ('clock',
{clock_key: np.uint8(True), mclock_key: np.uint8(True)})
ds['out'] = ('out', {clock_key: np.uint8(True)})
ds['grid__x_size'] = ((), 10, {'description': 'grid size'})
ds['quantity__quantity'] = ('x', np.zeros(10),
{'description': 'a quantity'})
ds['some_process__some_param'] = ((), 1, {'description': 'some parameter'})
ds['other_process__other_param'] = ('clock', [1, 3, 5],
{'description': 'other parameter'})
ds['clock'].attrs[svars_key] = 'quantity__quantity'
ds['out'].attrs[svars_key] = ('other_process__other_effect,'
'some_process__some_effect')
ds.attrs[svars_key] = 'grid__x'
return ds
def master_clock_dim(self):
"""Dimension used as master clock for model runs. Returns None
if no dimension is set as master clock.
See Also
--------
:meth:`Dataset.xsimlab.update_clocks`
"""
if self._master_clock_dim is not None:
return self._master_clock_dim
else:
for c in self._obj.coords.values():
if c.attrs.get(self._master_clock_key, False):
dim = c.dims[0]
self._master_clock_dim = dim
return dim
return None
def can_decode(cls, ds, var):
"""
Class method to determine whether the object can be decoded by this
decoder class.
Parameters
----------
ds: xarray.Dataset
The dataset that contains the given `var`
var: xarray.Variable or xarray.DataArray
The array to decode
Returns
-------
bool
True if the decoder can decode the given array `var`. Otherwise
False
Notes
-----
The default implementation returns True for any argument. Subclass this
method to be specific on what type of data your decoder can decode
"""
return True
def decode_ds(cls, *args, **kwargs):
"""
Static method to decode coordinates and time @R_516_4045@ions
This method interpretes absolute time @R_516_4045@ions (stored with units
``'day as %Y%m%d.%f'``) and coordinates
Parameters
----------
%(CFDecoder._decode_ds.parameters)s
Returns
-------
xarray.Dataset
The decoded dataset"""
for decoder_cls in cls._registry + [CFDecoder]:
ds = decoder_cls._decode_ds(ds, **kwargs)
return ds
def init_accessor(self, base=None, idims=None, decoder=None,
*args, **kwargs):
"""
Initialize the accessor instance
This method initializes the accessor
Parameters
----------
base: xr.Dataset
The base dataset for the data
idims: dict
A mapping from dimension name to indices. If not provided,it is
calculated when the :attr:`idims` attribute is accessed
decoder: CFDecoder
The decoder of this object
%(InteractiveBase.parameters)s
"""
if base is not None:
self.base = base
self.idims = idims
if decoder is not None:
self.decoder = decoder
super(InteractiveArray, self).__init__(*args, **kwargs)
def test_from_dataset_11_list(self):
"""Test the creation of a list of InteractiveLists"""
variables, coords = self._from_dataset_test_variables
ds = xr.Dataset(variables, coords)
# Create two lists,each containing two arrays of variables v1 and v2.
# In the first list,the xdim dimensions are 0 and 1.
# In the second,the xdim dimensions are both 2
l = self.list_class.from_dataset(
ds, name=[['v1', 'v2']], xdim=[[0, 1], 2], prefer_list=True)
self.assertEqual(len(l), 2)
self.assertisinstance(l[0], psyd.InteractiveList)
self.assertisinstance(l[1], psyd.InteractiveList)
self.assertEqual(len(l[0]), 2)
self.assertEqual(len(l[1]), 2)
self.assertEqual(l[0][0].xdim, 0)
self.assertEqual(l[0][1].xdim, 1)
self.assertEqual(l[1][0].xdim, 2)
self.assertEqual(l[1][1].xdim, 2)
def test_to_dataframe(self):
variables, coords = self._from_dataset_test_variables
variables['v1'][:] = np.arange(variables['v1'].size).reshape(
variables['v1'].shape)
ds = xr.Dataset(variables, coords)
l = psyd.InteractiveList.from_dataset(ds, name='v1', t=[0, 1])
l.extend(psyd.InteractiveList.from_dataset(ds, t=2,
x=slice(1, 3)),
new_name=True)
self.assertEqual(len(l), 3)
self.assertTrue(all(arr.ndim == 1 for arr in l), msg=l)
df = l.to_dataframe()
self.assertEqual(df.shape, (ds.xdim.size, 3))
self.assertEqual(df.index.values.tolist(), ds.xdim.values.tolist())
self.assertEqual(df[l[0].psy.arr_name].values.tolist(),
ds.v1[0].values.tolist())
self.assertEqual(df[l[1].psy.arr_name].values.tolist(),
ds.v1[1].values.tolist())
self.assertEqual(df[l[2].psy.arr_name].notnull().sum(), 2)
self.assertEqual(
df[l[2].psy.arr_name].values[
df[l[2].psy.arr_name].notnull().values].tolist(),
ds.v1[2, 1:3].values.tolist())
def to_xarray(self):
"""Convert to xarray.Dataset
Returns
-------
xarray.Dataset
"""
import xarray as xr
data_vars = {
"frequencies": xr.DataArray(self.frequencies, dims="bin"),
"errors2": xr.DataArray(self.errors2,
"bins": xr.DataArray(self.bins, dims=("bin", "x01"))
}
coords = {}
attrs = {
"underflow": self.underflow,
"overflow": self.overflow,
"inner_missed": self.inner_missed,
"keep_missed": self.keep_missed
}
attrs.update(self._Meta_data)
# Todo: Add stats
return xr.Dataset(data_vars, coords, attrs)
def from_xarray(cls, arr):
"""Convert form xarray.Dataset
Parameters
----------
arr: xarray.Dataset
The data in xarray representation
"""
kwargs = {'frequencies': arr["frequencies"],
'binning': arr["bins"],
'errors2': arr["errors2"],
'overflow': arr.attrs["overflow"],
'underflow': arr.attrs["underflow"],
'keep_missed': arr.attrs["keep_missed"]}
# Todo: Add stats
return cls(**kwargs)
def second_layer_input_matrix(X, models):
'''Build a second layer model input matrix by taking the
Metadata from X given to the first layer models and forming
a new matrix from the 1-D predictions of the first layer models
'''
preds = predict_many(dict(X=X), to_raster=False,
ensemble=models)
example = preds[0].flat
input_matrix = np.empty((example.shape[0], len(preds)))
for j, pred in enumerate(preds):
input_matrix[:, j] = pred.flat.values[:, 0]
attrs = X.attrs.copy()
attrs['old_dims'] = [X[SOIL_MOISTURE].dims] * len(preds)
attrs['canvas'] = X[SOIL_MOISTURE].canvas
tags = [tag for tag, _ in models]
arr = xr.DataArray(input_matrix,
coords=[('space', example.space),
('band', tags)],
dims=('space', 'band'),
attrs=attrs)
return xr.Dataset(dict(flat=arr), attrs=attrs)
def _as_numpy_arrs(self, X, y=None, **kw):
'''Convert X,y for a scikit-learn method numpy.ndarrays
'''
if isinstance(X, np.ndarray):
return X, y, None
if isinstance(X, xr.Dataset):
X = MLDataset(X)
if hasattr(X, 'has_features'):
if X.has_features(raise_err=False):
pass
else:
X = X.to_features()
row_idx = get_row_index(X)
if hasattr(X, 'to_array') and not isinstance(X, np.ndarray):
X, y = X.to_array(y=y)
# Todo what about row_idx Now?
# Todo - if y is not numpy array,then the above lines are needed for y
return X, row_idx
def test_ea_search_sklearn_elm_steps(label, do_predict):
'''Test that EaSearchCV can work with numpy,dask.array,
pandas.DataFrame,xarray.Dataset,xarray_filters.MLDataset
'''
from scipy.stats import lognorm
est, make_data, sel, kw = args[label]
parameters = {'kernel': ['linear', 'rbf'],
'C': lognorm(4),}
if isinstance(est, (sk_Pipeline, Pipeline)):
parameters = {'est__{}'.format(k): v
for k, v in parameters.items()}
ea = EaSearchCV(est, parameters,
n_iter=4,
ngen=2,
model_selection=sel,
model_selection_kwargs=kw)
X, y = make_data()
ea.fit(X, y)
if do_predict:
pred = ea.predict(X)
assert isinstance(pred, type(y))
def import_from_netcdf(network, path, skip_time=False):
"""
Import network data from netCDF file or xarray Dataset at `path`.
Parameters
----------
path : string|xr.Dataset
Path to netCDF dataset or instance of xarray Dataset
skip_time : bool,default False
Skip reading in time dependent attributes
"""
assert has_xarray, "xarray must be installed for netCDF support."
basename = os.path.basename(path) if isinstance(path, string_types) else None
with ImporterNetCDF(path=path) as importer:
_import_from_importer(network, importer, basename=basename,
skip_time=skip_time)
def __init__(self,
instance: int,
data: xr.Dataset,
mutable: bool = False):
"""
Create a new _Instance view representing the specified instance of the specified xarray data set.
Parameters
----------
instance: int
The index of the instance in the specified xarray data set
data: xarray.Dataset
The xarray data set containing the instance
mutable: bool,optional
If True,attributes of this instance may be modified. If False (default),any attempt to modify the instance
will result in an AttributeError
"""
self._instance = instance
self._data = data
self._mutable = mutable
def __init__(self,
mutable: bool = False):
"""
Create and initialize a new DataSet with the specified parameters.
There should be no reason to invoke this constructor directly. Instead,the utility methods for loading a data
set from a file,or for creating an empty data set should be used.
Parameters
----------
data: xarray.Dataset
The xarray data set storing the actual data
mutable: bool
True,if modifications to the data set should be allowed,False otherwise
"""
super().__init__()
self._data = data
self._mutable = mutable
def load_netcdf_Meta(datafile):
'''
Loads Metadata for NetCDF
Parameters:
:datafile: str: Path on disk to NetCDF file
Returns:
:Meta: Dictionary of Metadata
'''
ras = nc.Dataset(datafile)
attrs = _get_nc_attrs(ras)
sds = _get_subdatasets(ras)
Meta = {'Meta': attrs,
'layer_Meta': sds,
'name': datafile,
'variables': list(ras.variables.keys()),
}
return Meta_strings_to_dict(Meta)
def read_met_data(params: dict, domain: xr.Dataset) -> xr.Dataset:
"""
Read input meteorological forcings for MetSim.
This method supports ascii,binary,netcdf,and
xarray input pointers. The input source is derived
from the key 'forcing' in the params dictionary.
The format of the data is derived from 'in_format'
key in the parameter dictionary.
"""
process_funcs = {
"netcdf": process_nc,
"binary": process_vic,
"ascii": process_vic,
"data": process_nc
}
return process_funcs[params['forcing_fmt']](params, domain)
def read_netcdf(data_handle, domain=None, iter_dims=['lat', 'lon'],
start=None, stop=None, calendar='standard',
var_dict=None) -> xr.Dataset:
"""Read in a NetCDF file"""
ds = xr.open_dataset(data_handle)
if var_dict is not None:
ds.rename(var_dict, inplace=True)
if start is not None and stop is not None:
ds = ds.sel(time=slice(start, stop))
dates = ds.indexes['time']
ds['day_of_year'] = xr.Variable(('time', ), dates.dayofyear)
if domain is not None:
ds = ds.sel(**{d: domain[d] for d in iter_dims})
out = ds.load()
ds.close()
return out
def read_data(data_handle,
start=None,
var_dict=None) -> xr.Dataset:
"""Read data directly from an xarray dataset"""
varlist = list(data_handle.keys())
if var_dict is not None:
data_handle.rename(var_dict, inplace=True)
varlist = list(var_dict.values())
if start is not None and stop is not None:
data_handle = data_handle[varlist].sel(time=slice(start, stop))
dates = data_handle.indexes['time']
data_handle['day_of_year'] = xr.Variable(('time', dates.dayofyear)
if domain is not None:
data_handle = data_handle.sel(**{d: domain[d] for d in iter_dims})
out = data_handle.load()
data_handle.close()
return out
def test_from_features_dropped_rows(X):
features = X.to_features()
data1 = features.from_features()
# Assert that we get the original Dataset back after X.to_features().from_features()
assert np.array_equal(data1.coords.to_index().values, X.coords.to_index().values)
assert np.allclose(data1.to_xy_arrays()[0], X.to_xy_arrays()[0])
# Drop some rows
features['features'].values[:2, :] = np.nan
zerod_vals_copy = features['features'].values[:] # copy NaN positions for testing later on
features = features.dropna(features['features'].dims[0])
# Convert back to original dataset,padding NaN values into the proper locations if necessary
data2 = features.from_features()
# Assert that the coords are correct,and NaNs are in the right places
if np.nan in data2.to_xy_arrays()[0]:
assert np.array_equal(data2.coords.to_index().values, data1.coords.to_index().values)
assert np.allclose(data2.to_xy_arrays()[0], zerod_vals_copy, equal_nan=True)
def create_master(self, var, data=None, **kwargs):
""" Convenience function to create a master dataset for a
given experiment.
Parameters
----------
var : Var or str
A Var object containing the @R_516_4045@ion about the variable
being processed or a string indicating its name for inference
when creating the master dataset
data : dict (optional,unless var is a string)
Dictionary of dictionaries/dataset containing the variable data
to be collected into a master dataset
Returns
-------
A Dataset with all the data,collapsed onto additional dimensions
for each case in the Experiment.
"""
return create_master(self, data, **kwargs)
def _make_dataset(varname, seed=None, **var_kws):
rs = np.random.RandomState(seed)
_dims = {'time': 10, 'x': 5, 'y': 5}
_dim_keys = ('time', 'x', 'y')
ds = xr.Dataset()
ds['time'] = ('time', pd.date_range('2000-01-01', periods=_dims['time']))
ds['x'] = np.linspace(0, _dims['x'])
ds['y'] = np.linspace(0, _dims['y'])
data = rs.normal(size=tuple(_dims[d] for d in _dim_keys))
ds[varname] = (_dim_keys, data)
ds.coords['numbers'] = ('time',
np.array(range(_dims['time']), dtype='int64'))
return ds
def test_new_geometric_median():
from datacube_stats.statistics import NewGeomedianStatistic
arr = np.random.random((5, 100, 100))
dataarray = xr.DataArray(arr, dims=('time', 'y', 'x'), coords={'time': list(range(5))})
dataset = xr.Dataset(data_vars={'band1': dataarray, 'band2': dataarray})
new_geomedian_stat = NewGeomedianStatistic()
result = new_geomedian_stat.compute(dataset)
assert isinstance(result, xr.Dataset)
assert result.band1.dims == result.band2.dims == ('y', 'x')
# The two bands had the same inputs,so should have the same result
assert (result.band1 == result.band2).all()
def two_band_eo_dataset(draw):
crs, height, width, times = draw(dataset_shape())
coordinates = {dim: np.arange(size) for dim, size in zip(crs.dimensions, (height, width))}
coordinates['time'] = times
dimensions = ('time',) + crs.dimensions
shape = (len(times), width)
arr = np.random.random_sample(size=shape)
data1 = xr.DataArray(arr,
dims=dimensions,
coords=coordinates,
attrs={'crs': crs})
arr = np.random.random_sample(size=shape)
data2 = xr.DataArray(arr,
attrs={'crs': crs})
name1, name2 = draw(st.lists(variable_name, min_size=2, max_size=2, unique=True))
dataset = xr.Dataset(data_vars={name1: data1, name2: data2},
attrs={'crs': crs})
return dataset
def test_normalised_difference_stats(dataset, output_name):
var1, var2 = list(dataset.data_vars)
ndstat = normalisedDifferenceStats(var1, var2, output_name)
result = ndstat.compute(dataset)
assert isinstance(result, xr.Dataset)
assert 'time' not in result.dims
assert dataset.crs == result.crs
expected_output_varnames = set(f'{output_name}_{stat_name}' for stat_name in ndstat.stats)
assert set(result.data_vars) == expected_output_varnames
# Check the measurements() function raises an error on bad input_measurements
with pytest.raises(StatsConfigurationError):
invalid_names = [{'name': 'foo'}]
ndstat.measurements(invalid_names)
# Check the measurements() function returns something reasonable
input_measurements = [{'name': name} for name in (var1, var2)]
output_measurements = ndstat.measurements(input_measurements)
measurement_names = set(m['name'] for m in output_measurements)
assert expected_output_varnames == measurement_names
def compute(self, data):
is_integer_type = np.issubdtype(data.water.dtype, np.integer)
if not is_integer_type:
raise StatsProcessingError("Attempting to count bit flags on non-integer data. Provided data is: {}"
.format(data.water))
# 128 == clear and wet,132 == clear and wet and masked for sea
# The PQ sea mask that we use is dodgy and should be ignored. It excludes lots of useful data
wet = ((data.water == 128) | (data.water == 132)).sum(dim='time')
dry = ((data.water == 0) | (data.water == 4)).sum(dim='time')
clear = wet + dry
with np.errstate(divide='ignore', invalid='ignore'):
frequency = wet / clear
if self.freq_only:
return xarray.Dataset({'frequency': frequency}, attrs=dict(crs=data.crs))
else:
return xarray.Dataset({'count_wet': wet,
'count_clear': clear,
'frequency': frequency}, attrs=dict(crs=data.crs))
def load_data(sub_tile_slice, sources):
"""
Load a masked chunk of data from the datacube,based on a specification and list of datasets in `sources`.
:param sub_tile_slice: A portion of a tile,tuple coordinates
:param sources: a dictionary containing `data`,`spec` and `masks`
:return: :class:`xarray.Dataset` containing loaded data. Will be indexed and sorted by time.
"""
datasets = [load_masked_data(sub_tile_slice, source_prod)
for source_prod in sources] # list of datasets
datasets = _remove_emptys(datasets)
if len(datasets) == 0:
raise EmptyChunkException()
# Todo: Add check for compatible data variable attributes
# flags_deFinition between pq products is different and is silently dropped
datasets = xarray.concat(datasets, dim='time') # copies all the data
if len(datasets.time) == 0:
raise EmptyChunkException()
# sort along time dim
return datasets.isel(time=datasets.time.argsort()) # copies all the data again
def test_filter_accessor():
ds = xr.Dataset(data_vars={'var1': ('x', 2]), 'var2': ('y', [3, 4])},
coords={'x': [1, 'y': [3, 4]})
filtered = ds.filter(lambda var: 'x' in var.dims)
assert 'var1' in filtered and 'var2' not in filtered
assert 'x' in filtered.coords and 'y' not in filtered.coords
def test_clock_coords(self):
ds = xr.Dataset(
coords={
'mclock': ('mclock',
{self._clock_key: 1, self._master_clock_key: 1}),
'sclock': ('sclock', {self._clock_key: 1}),
'no_clock': ('no_clock', 4])
}
)
assert set(ds.xsimlab.clock_coords) == {'mclock', 'sclock'}
def test_master_clock_dim(self):
attrs = {self._clock_key: 1, self._master_clock_key: 1}
ds = xr.Dataset(coords={'clock': ('clock', attrs)})
assert ds.xsimlab.master_clock_dim == 'clock'
assert ds.xsimlab._master_clock_dim == 'clock' # cache
assert ds.xsimlab.master_clock_dim == 'clock' # get cached value
ds = xr.Dataset()
assert ds.xsimlab.master_clock_dim is None
def test_set_master_clock_dim(self):
ds = xr.Dataset(coords={'clock': [1, 'clock2': [3, 4]})
ds.xsimlab._set_master_clock_dim('clock')
assert self._master_clock_key in ds.clock.attrs
ds.xsimlab._set_master_clock_dim('clock2')
assert self._master_clock_key not in ds.clock.attrs
assert self._master_clock_key in ds.clock2.attrs
with pytest.raises(KeyError):
ds.xsimlab._set_master_clock_dim('invalid_clock')
def test_set_input_vars(self, model):
ds = xr.Dataset()
with pytest.raises(KeyError) as excinfo:
ds.xsimlab._set_input_vars(model, 'invalid_process', var=1)
assert "no process named" in str(excinfo.value)
with pytest.raises(ValueError) as excinfo:
ds.xsimlab._set_input_vars(model, 'some_process', some_param=0,
invalid_var=1)
assert "not valid input variables" in str(excinfo.value)
ds.xsimlab._set_input_vars(model, 'quantity',
quantity=('x', np.zeros(10)))
expected = xr.DataArray(data=np.zeros(10), dims='x')
assert "quantity__quantity" in ds
xr.testing.assert_equal(ds['quantity__quantity'], expected)
# test time and parameter dimensions
ds.xsimlab._set_input_vars(model, model.some_process, some_param=[1, 2])
expected = xr.DataArray(data=[1, dims='some_process__some_param',
coords={'some_process__some_param': [1, 2]})
xr.testing.assert_equal(ds['some_process__some_param'], expected)
del ds['some_process__some_param']
ds['clock'] = ('clock', {self._master_clock_key: 1})
ds.xsimlab._set_input_vars(model,
some_param=('clock', 2]))
expected = xr.DataArray(data=[1, dims='clock',
coords={'clock': [0, 1]})
xr.testing.assert_equal(ds['some_process__some_param'], expected)
# test optional
ds.xsimlab._set_input_vars(model, 'grid')
expected = xr.DataArray(data=5)
xr.testing.assert_equal(ds['grid__x_size'], expected)
def test_set_snapshot_vars(self, {self._clock_key: 1})
ds['not_a_clock'] = ('not_a_clock', 1])
with pytest.raises(KeyError) as excinfo:
ds.xsimlab._set_snapshot_vars(model, invalid_process='var')
assert "no process named" in str(excinfo.value)
with pytest.raises(KeyError) as excinfo:
ds.xsimlab._set_snapshot_vars(model, quantity='invalid_var')
assert "has no variable" in str(excinfo.value)
ds.xsimlab._set_snapshot_vars(model, grid='x')
assert ds.attrs[self._snapshot_vars_key] == 'grid__x'
ds.xsimlab._set_snapshot_vars(model,
some_process='some_effect',
quantity='quantity')
expected = {'some_process__some_effect', 'quantity__quantity'}
actual = set(ds['clock'].attrs[self._snapshot_vars_key].split(','))
assert actual == expected
ds.xsimlab._set_snapshot_vars(model, 'x2'))
expected = {'other_process__other_effect', 'other_process__x2'}
actual = set(ds['snap_clock'].attrs[self._snapshot_vars_key].split(','))
assert actual == expected
with pytest.raises(ValueError) as excinfo:
ds.xsimlab._set_snapshot_vars(model, 'not_a_clock',
quantity='quantity')
assert "not a valid clock" in str(excinfo.value)
def test_run_multi(self):
ds = xr.Dataset()
with pytest.raises(NotImplementedError):
ds.xsimlab.run_multi()
def test_constructor(self, model, input_dataset):
ds = xr.Dataset()
with pytest.raises(ValueError) as excinfo:
DatasetModelInterface(model, ds)
assert "missing master clock dimension" in str(excinfo.value)
invalid_ds = input_dataset.drop('quantity__quantity')
with pytest.raises(KeyError) as excinfo:
DatasetModelInterface(model, invalid_ds)
assert "missing data variables" in str(excinfo.value)
def _set_master_clock_dim(self, dim):
if dim not in self._obj.coords:
raise KeyError("Dataset has no %r dimension coordinate. "
"To create a new master clock dimension,"
"use Dataset.xsimlab.update_clock."
% dim)
if self.master_clock_dim is not None:
self._obj[self.master_clock_dim].attrs.pop(self._master_clock_key)
self._obj[dim].attrs[self._clock_key] = np.uint8(True)
self._obj[dim].attrs[self._master_clock_key] = np.uint8(True)
self._master_clock_dim = dim
def _set_snapshot_clock(self, dim, start=0., end=None,
step=None, nsteps=None, auto_adjust=True):
if self.master_clock_dim is None:
raise ValueError("no master clock dimension/coordinate is defined "
"in Dataset. "
"Use `Dataset.xsimlab._set_master_clock` first")
clock_data = self._set_clock_data(dim, start, end, step, nsteps)
da_master_clock = self._obj[self.master_clock_dim]
if auto_adjust:
kwargs = {'method': 'nearest'}
else:
kwargs = {}
indexer = {self.master_clock_dim: clock_data}
kwargs.update(indexer)
da_snapshot_clock = da_master_clock.sel(**kwargs)
self._obj[dim] = da_snapshot_clock.rename({self.master_clock_dim: dim})
# .sel copies variable attributes
self._obj[dim].attrs.pop(self._master_clock_key)
for attr_name in ('units', 'calendar'):
attr_value = da_master_clock.attrs.get(attr_name)
if attr_value is not None:
self._obj[dim].attrs[attr_name] = attr_value
def run(self, model=None, safe_mode=True):
"""Run the model.
Parameters
----------
model : :class:`xsimlab.Model` object,optional
Reference model. If None,tries to get model from context.
safe_mode : bool,optional
If True (default),it is safe to run multiple simulations
simultaneously. Generally safe mode shouldn't be disabled,except
in a few cases (e.g.,debugging).
Returns
-------
output : Dataset
Another Dataset with both model inputs and outputs (snapshots).
"""
model = _maybe_get_model_from_context(model)
if safe_mode:
model = model.clone()
ds_model_interface = DatasetModelInterface(model, self._obj)
out_ds = ds_model_interface.run_model()
return out_ds
def run_multi(self):
"""Run multiple models.
Not yet implemented.
See Also
--------
:meth:`xarray.Dataset.xsimlab.run`
"""
# Todo:
raise NotImplementedError()
def test_version_Metadata_with_streaming(self, api, opener):
np.random.seed(123)
times = pd.date_range('2000-01-01', '2001-12-31', name='time')
annual_cycle = np.sin(2 * np.pi * (times.dayofyear / 365.25 - 0.28))
base = 10 + 15 * np.array(annual_cycle).reshape(-1, 1)
tmin_values = base + 3 * np.random.randn(annual_cycle.size, 3)
tmax_values = base + 3 * np.random.randn(annual_cycle.size, 3)
ds = xr.Dataset({'tmin': (('time', 'location'), tmin_values),
'tmax': (('time', tmax_values)},
{'time': times, 'location': ['IA', 'IN', 'IL']})
var = api.create('streaming_test')
with var.get_local_path(
bumpversion='patch',
dependencies={'arch1': '0.1.0', 'arch2': '0.2.0'}) as f:
ds.to_netcdf(f)
ds.close()
assert var.get_history()[-1]['dependencies']['arch2'] == '0.2.0'
tmin_values = base + 10 * np.random.randn(annual_cycle.size, 3)
ds.update({'tmin': (('time', tmin_values)})
with var.get_local_path(
bumpversion='patch', 'arch2': '1.2.0'}) as f:
with xr.open_dataset(f) as ds:
mem = ds.load()
ds.close()
mem.to_netcdf(f)
assert var.get_history()[-1]['dependencies']['arch2'] == '1.2.0'
assert var.get_history()[-1][
'checksum'] != var.get_history()[-2]['checksum']
def to_netcdf(ds, **kwargs):
"""
Store the given dataset as a netCDF file
This functions works essentially the same as the usual
:meth:`xarray.Dataset.to_netcdf` method but can also encode absolute time
units
Parameters
----------
ds: xarray.Dataset
The dataset to store
%(xarray.Dataset.to_netcdf.parameters)s
"""
to_update = {}
for v, obj in six.iteritems(ds.variables):
units = obj.attrs.get('units', obj.encoding.get('units', None))
if units == 'day as %Y%m%d.%f' and np.issubdtype(
obj.dtype, np.datetime64):
to_update[v] = xr.Variable(
obj.dims, AbsoluteTimeEncoder(obj), attrs=obj.attrs.copy(),
encoding=obj.encoding)
to_update[v].attrs['units'] = units
if to_update:
ds = ds.update(to_update, inplace=False)
return xarray_api.to_netcdf(ds, **kwargs)
def decode_coords(ds, gridfile=None, inplace=True):
"""
Sets the coordinates and bounds in a dataset
This static method sets those coordinates and bounds that are marked
marked in the netCDF attributes as coordinates in :attr:`ds` (without
deleting them from the variable attributes because this @R_516_4045@ion is
necessary for visualizing the data correctly)
Parameters
----------
ds: xarray.Dataset
The dataset to decode
gridfile: str
The path to a separate grid file or a xarray.Dataset instance which
may store the coordinates used in `ds`
inplace: bool,`ds` is modified in place
Returns
-------
xarray.Dataset
`ds` with additional coordinates"""
def add_attrs(obj):
if 'coordinates' in obj.attrs:
extra_coords.update(obj.attrs['coordinates'].split())
if 'bounds' in obj.attrs:
extra_coords.add(obj.attrs['bounds'])
if gridfile is not None and not isinstance(gridfile, xr.Dataset):
gridfile = open_dataset(gridfile)
extra_coords = set(ds.coords)
for k, v in six.iteritems(ds.variables):
add_attrs(v)
add_attrs(ds)
if gridfile is not None:
ds = ds.update({k: v for k, v in six.iteritems(gridfile.variables)
if k in extra_coords}, inplace=inplace)
ds = ds.set_coords(extra_coords.intersection(ds.variables),
inplace=inplace)
return ds
def get_idims(self, arr, coords=None):
"""Get the coordinates in the :attr:`ds` dataset as int or slice
This method returns a mapping from the coordinate names of the given
`arr` to an integer,slice or an array of integer that represent the
coordinates in the :attr:`ds` dataset and can be used to extract the
given `arr` via the :meth:`xarray.Dataset.isel` method.
Parameters
----------
arr: xarray.DataArray
The data array for which to get the dimensions as integers,slices
or list of integers from the dataset in the :attr:`base` attribute
Returns
-------
dict
Mapping from coordinate name to integer,list of integer or slice
See Also
--------
xarray.Dataset.isel,InteractiveArray.idims"""
if coords is None:
coord_items = six.iteritems(arr.coords)
else:
coord_items = ((label, coord) for label, coord in six.iteritems(
arr.coords) if label in coords)
ret = dict(
(label, get_index_from_coord(coord, self.ds.indexes[label]))
for label, coord in coord_items if label in self.ds.indexes)
# handle the coordinates that are not in the dataset
missing = set(arr.dims).difference(ret)
if missing:
warn('Could not get slices for the following dimensions: %r' % (
missing, ))
return ret
def open_dataset(filename_or_obj, decode_cf=True, decode_times=True,
decode_coords=True, engine=None, **kwargs):
"""
Open an instance of :class:`xarray.Dataset`.
This method has the same functionality as the :func:`xarray.open_dataset`
method except that is supports an additional 'gdal' engine to open
gdal Rasters (e.g. GeoTiffs) and that is supports absolute time units like
``'day as %Y%m%d.%f'`` (if `decode_cf` and `decode_times` are True).
Parameters
----------
%(xarray.open_dataset.parameters.no_engine)s
engine: {'netcdf4','scipy','pydap','h5netcdf','gdal'},optional
Engine to use when reading netCDF files. If not provided,the default
engine is chosen based on available dependencies,with a preference for
'netcdf4'.
%(CFDecoder.decode_coords.parameters.gridfile)s
Returns
-------
xarray.Dataset
The dataset that contains the variables from `filename_or_obj`"""
# use the absolute path name (is saver when saving the project)
if isstring(filename_or_obj) and os.path.exists(filename_or_obj):
filename_or_obj = os.path.abspath(filename_or_obj)
if engine == 'gdal':
from psyplot.gdal_store import GdalStore
filename_or_obj = GdalStore(filename_or_obj)
engine = None
ds = xr.open_dataset(filename_or_obj, decode_cf=decode_cf,
decode_coords=False, engine=engine,
decode_times=decode_times, **kwargs)
if decode_cf:
ds = CFDecoder.decode_ds(
ds, decode_coords=decode_coords, decode_times=decode_times,
gridfile=gridfile, inplace=True)
return ds
def __init__(self, xarray_obj, **kwargs):
"""
The ``*args`` and ``**kwargs`` are essentially the same as for the
:class:`xarray.DataArray` method,additional ``**kwargs`` are
described below.
Other Parameters
----------------
base: xarray.Dataset
Default: None. Dataset that serves as the origin of the data
contained in this DataArray instance. This will be used if you want
to update the coordinates via the :meth:`update` method. If None,
this instance will serve as a base as soon as it is needed.
decoder: psyplot.CFDecoder
The decoder that decodes the `base` dataset and is used to get
bounds. If not given,a new :class:`CFDecoder` is created
idims: dict
Default: None. dictionary with integer values and/or slices in the
`base` dictionary. If not given,they are determined automatically
%(InteractiveBase.parameters)s
"""
self.arr = xarray_obj
super(InteractiveArray, **kwargs)
self._registered_updates = {}
self._new_dims = {}
self.method = None
def _register_update(self, method='isel', replot=False, dims={}, fmt={},
force=False, todefault=False):
"""
Register new dimensions and formatoptions for updating
Parameters
----------
method: {'isel',None,'nearest',...}
Selection method of the xarray.Dataset to be used for setting the
variables from the @R_516_4045@ions in `dims`.
If `method` is 'isel',the :meth:`xarray.Dataset.isel` method is
used. Otherwise it sets the `method` parameter for the
:meth:`xarray.Dataset.sel` method.
%(setup_coords.parameters.dims)s
%(InteractiveBase._register_update.parameters)s
See Also
--------
start_update"""
if self._new_dims and self.method != method:
raise ValueError(
"New dimensions were already specified for with the %s method!"
" I can not choose a new method %s" % (self.method, method))
else:
self.method = method
if 'name' in dims:
self._new_dims['name'] = dims.pop('name')
self._new_dims.update(self.decoder.correct_dims(
next(six.itervalues(self.base_variables)), dims))
InteractiveBase._register_update(
self, fmt=fmt, replot=replot or bool(self._new_dims), force=force,
todefault=todefault)
def _open_ds_from_store(fname, store_mod=None, store_cls=None, **kwargs):
"""Open a dataset and return it"""
if isinstance(fname, xr.Dataset):
return fname
if store_mod is not None and store_cls is not None:
fname = getattr(import_module(store_mod), store_cls)(fname)
return open_dataset(fname, **kwargs)
def test_update(self):
"""Test the update of an :class:`psyplot.data.ArrayList`"""
variables, coords)
psy.register_plotter('test_plotter', module='something',
plotter_name='unimportant',
plotter_cls=tp.TestPlotter)
# add 2 arrays
psy.plot.test_plotter(ds, name=['v0', 'v1'], t=0)
# add a list
psy.plot.test_plotter(ds, t=0, prefer_list=True)
mp = psy.gcp(True)
self.assertEqual(len(mp), msg=mp)
self.assertEqual(len(mp.plotters), msg=mp)
# update the list
mp.update(t=1, fmt2='updated')
for i, plotter in enumerate(mp.plotters):
self.assertEqual(plotter['fmt2'], 'updated',
msg='Plotter of array %i not updated! %s' % (
i, mp[i]))
self.assertEqual(mp[0].time, ds.time[1])
self.assertEqual(mp[1].time, ds.time[1])
for data in mp[2]:
self.assertEqual(data.time, ds.time[1])
def test_1D_cf_bounds(self):
"""Test whether the CF Conventions for 1D bounaries are correct"""
final_bounds = np.arange(-180, 181, 30)
lon = xr.Variable(('lon', np.arange(-165, 166, 30),
{'bounds': 'lon_bounds'})
cf_bounds = xr.Variable(('lon', 'bnds'), np.zeros((len(lon), 2)))
for i in range(len(lon)):
cf_bounds[i, :] = final_bounds[i:i+2]
ds = xr.Dataset(coords={'lon': lon, 'lon_bounds': cf_bounds})
decoder = psyd.CFDecoder(ds)
self.assertEqual(list(final_bounds),
list(decoder.get_plotbounds(lon)))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。