-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use polars for seaexplorer data file load #120
Changes from 13 commits
c2c4a92
94a6840
20e3080
2c6d458
c579a3c
942c5ea
8630680
eaf2450
5485043
ef8cc90
ce89d1b
a8eb889
94371a2
4bd4571
9a51289
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,5 +13,6 @@ dependencies: | |
- scipy | ||
- bitstring | ||
- pooch | ||
- polars | ||
- pip: | ||
- dbdreader |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,8 +9,8 @@ | |
import xarray as xr | ||
import yaml | ||
import pyglider.utils as utils | ||
import pandas as pd | ||
|
||
import datetime | ||
import polars as pl | ||
|
||
_log = logging.getLogger(__name__) | ||
|
||
|
@@ -25,7 +25,7 @@ def _outputname(f, outdir): | |
for ff in fns: | ||
fnout += ff.lower() + '.' | ||
filenum = int(fns[4]) | ||
return outdir + fnout + 'nc', filenum | ||
return outdir + fnout + 'parquet', filenum | ||
|
||
|
||
def _needsupdating(ftype, fin, fout): | ||
|
@@ -41,7 +41,7 @@ def _sort(ds): | |
def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, | ||
min_samples_in_file=5, dropna_subset=None, dropna_thresh=1): | ||
""" | ||
Convert seaexplorer text files to raw netcdf files. | ||
Convert seaexplorer text files to raw parquet files. | ||
|
||
Parameters | ||
---------- | ||
|
@@ -125,66 +125,85 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, | |
_log.info(f'{f} to {fnout}') | ||
if not incremental or _needsupdating(ftype, f, fnout): | ||
_log.info(f'Doing: {f} to {fnout}') | ||
out = pd.read_csv(f, header=0, delimiter=';', | ||
parse_dates=True, index_col=0, | ||
dayfirst=True) | ||
# Try to read the file with polars. If the file is corrupted (rare), file read will fail and file | ||
# is appended to badfiles | ||
try: | ||
out = pl.read_csv(f, sep=';') | ||
except: | ||
_log.warning(f'Could not read {f}') | ||
badfiles.append(f) | ||
continue | ||
# Parse the datetime from nav files (called Timestamp) and pld1 files (called PLD_REALTIMECLOCK) | ||
if "Timestamp" in out.columns: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this get a comment? Why do we need this check? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check is for corrupted files. I encountered this once in ~ 50 missions I've processed so far. I've added a comment |
||
out = out.with_column( | ||
pl.col("Timestamp").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S")) | ||
out = out.rename({"Timestamp": "time"}) | ||
else: | ||
out = out.with_column( | ||
pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S.%3f")) | ||
out = out.rename({"PLD_REALTIMECLOCK": "time"}) | ||
for col_name in out.columns: | ||
if "time" not in col_name.lower(): | ||
out = out.with_column(pl.col(col_name).cast(pl.Float64)) | ||
# If AD2CP data present, convert timestamps to datetime | ||
if 'AD2CP_TIME' in out.columns: | ||
# Set datestamps with date 00000 to None | ||
out.loc[out.AD2CP_TIME.str[:6] == '000000', | ||
'AD2CP_TIME'] = None | ||
out['AD2CP_TIME'] = pd.to_datetime(out.AD2CP_TIME) | ||
out = out.with_column( | ||
pl.col('AD2CP_TIME').str.strptime(pl.Datetime, fmt="%m%d%y %H:%M:%S", strict=False)) | ||
|
||
# subsetting for heavily oversampled raw data: | ||
if rawsub=='raw' and dropna_subset is not None: | ||
out = out.dropna(subset=dropna_subset, thresh=dropna_thresh) | ||
|
||
with out.to_xarray() as outx: | ||
key = list(outx.coords.keys())[0] | ||
outx = outx.rename({key: 'time'}) | ||
outx['fnum'] = ('time', | ||
int(filenum)*np.ones(len(outx['time']))) | ||
if ftype == 'gli': | ||
outx.to_netcdf(fnout[:-3] + '.nc', 'w') | ||
if rawsub == 'raw' and dropna_subset is not None: | ||
# This check is the polars equivalent of pandas dropna. See docstring note on dropna | ||
out = out.with_column(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs a comment, and if it doesn't cause a slowdown or extra writes maybe would benefit from unpacking into separate calls. Looks like you are dropping repeats, but its too many steps in one line for me to follow without spending too much time ;-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a comment on this. It's a bit convoluted looking, but it's just the polars equivalent of pandas.dropna. I didn't want to change the functionality of the dropna option in this PR. We can factor it out as a separate PR though? |
||
.sum(axis=1).alias("null_count")).filter( | ||
pl.col("null_count") <= dropna_thresh) \ | ||
.drop("null_count") | ||
|
||
if ftype == 'gli': | ||
out = out.with_columns([(pl.col("NavState") * 0 + int(filenum)).alias("fnum")]) | ||
out.write_parquet(fnout) | ||
goodfiles.append(f) | ||
else: | ||
if out.select("time").shape[0] > min_samples_in_file: | ||
out.write_parquet(fnout) | ||
goodfiles.append(f) | ||
else: | ||
if outx.indexes["time"].size > min_samples_in_file: | ||
outx.to_netcdf(f'{fnout[:-3]}.nc', 'w', | ||
unlimited_dims=['time']) | ||
goodfiles.append(f) | ||
else: | ||
_log.warning('Number of sensor data points' | ||
'too small. Skipping nc write') | ||
badfiles.append(f) | ||
_log.warning('Number of sensor data points' | ||
'too small. Skipping parquet write') | ||
badfiles.append(f) | ||
if len(badfiles) > 0: | ||
_log.warning('Some files could not be parsed:') | ||
for fn in badfiles: | ||
_log.warning('%s', fn) | ||
if not goodfiles: | ||
_log.warning(f'No valid unprocessed seaexplorer files found in'f'{indir}') | ||
return False | ||
_log.info('All raw files converted to nc') | ||
_log.info('All raw files converted to parquet') | ||
return True | ||
|
||
|
||
def drop_rogue_1970(ds): | ||
def drop_pre_1971_samples(df): | ||
""" | ||
If dates greater than 1971, 1, 1 are observed, drop any dates before | ||
1971-01-01, from the datset and return it. This function removes 1970 | ||
timestamps caused by a SeaExplorer rebooting during a mission. If all dates | ||
are < 1971-01-01, no action is taken | ||
|
||
Parameters: | ||
ds: xarray.DataSet | ||
dataset to check for pre-1971 dates | ||
df: polars.DataFrame | ||
dataframe to check for pre-1971 dates | ||
Returns: | ||
ds: xarray.DataSet | ||
df: polars.DataFrame | ||
""" | ||
dt_1971 = np.datetime64("1971-01-01") | ||
dt_1971 = datetime.datetime(1971, 1, 1) | ||
# If all dates before or after 1971-01-01, return the dataset | ||
if (ds.time > dt_1971).all() or (ds.time < dt_1971).all(): | ||
return ds | ||
return ds.where(ds.time > dt_1971, drop=True) | ||
pre_1971 = df.filter(pl.col("time") < dt_1971) | ||
if len(pre_1971) == len(df): | ||
return pre_1971 | ||
post_1971 = df.filter(pl.col("time") > dt_1971) | ||
if len(post_1971) == len(df): | ||
return post_1971 | ||
return df.filter(pl.col("time") > dt_1971) | ||
|
||
|
||
def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably rename this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've renamed this to the more descriptive There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant rename merege_rawnc.... Though that screws with older scripts, but I'd rename this merge_parquet or whatever, and then alias merge_rawnc so old scripts work |
||
|
@@ -216,49 +235,44 @@ def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): | |
deployment = yaml.safe_load(fin) | ||
metadata = deployment['metadata'] | ||
id = metadata['glider_name'] | ||
outgli = outdir + '/' + id + '-rawgli.nc' | ||
outpld = outdir + '/' + id + '-' + kind + 'pld.nc' | ||
outgli = outdir + '/' + id + '-rawgli.parquet' | ||
outpld = outdir + '/' + id + '-' + kind + 'pld.parquet' | ||
|
||
_log.info('Opening *.gli.sub.*.nc multi-file dataset from %s', indir) | ||
files = sorted(glob.glob(indir+'/*.gli.sub.*.nc')) | ||
_log.info('Opening *.gli.sub.*.parquet multi-file dataset from %s', indir) | ||
files = sorted(glob.glob(indir + '/*.gli.sub.*.parquet')) | ||
if not files: | ||
_log.warning(f'No *gli*.nc files found in {indir}') | ||
_log.warning(f'No *gli*.parquet files found in {indir}') | ||
return False | ||
with xr.open_dataset(files[0]) as gli: | ||
for fil in files[1:]: | ||
try: | ||
with xr.open_dataset(fil) as gli2: | ||
gli = xr.concat([gli, gli2], dim='time') | ||
except: | ||
pass | ||
gli = drop_rogue_1970(gli) | ||
gli.to_netcdf(outgli) | ||
gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet') | ||
gli = drop_pre_1971_samples(gli) | ||
gli.write_parquet(outgli) | ||
_log.info(f'Done writing {outgli}') | ||
|
||
_log.info('Opening *.pld.sub.*.nc multi-file dataset') | ||
files = sorted(glob.glob(indir+'/*.pld1.'+kind+'.*.nc')) | ||
_log.info('Opening *.pld.sub.*.parquet multi-file dataset') | ||
files = sorted(glob.glob(indir + '/*.pld1.' + kind + '.*.parquet')) | ||
if not files: | ||
_log.warning(f'No *{kind}*.nc files found in {indir}') | ||
_log.warning(f'No *{kind}*.parquet files found in {indir}') | ||
return False | ||
with xr.open_dataset(files[0]) as pld: | ||
for fil in files[1:]: | ||
try: | ||
with xr.open_dataset(fil) as pld2: | ||
pld = xr.concat([pld, pld2], dim='time') | ||
except: | ||
pass | ||
pld = drop_rogue_1970(pld) | ||
pld.to_netcdf(outpld) | ||
pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet') | ||
pld = drop_pre_1971_samples(pld) | ||
pld.write_parquet(outpld) | ||
|
||
_log.info(f'Done writing {outpld}') | ||
_log.info('Done merge_rawnc') | ||
return True | ||
|
||
|
||
def _interp_gli_to_pld(gli, ds, val, indctd): | ||
gli_ind = ~np.isnan(val) | ||
valout = np.interp(ds['time'], | ||
gli['time'][gli_ind], | ||
val[gli_ind]) | ||
# switch for if we are comparing two polars dataframes or a polars dataframe and a xarray dataset | ||
if type(ds) is pl.internals.dataframe.frame.DataFrame: | ||
valout = np.interp(ds["time"], | ||
gli.filter(gli_ind)["time"], | ||
val[gli_ind]) | ||
else: | ||
valout = np.interp(ds['time'].astype(int), | ||
np.array(gli.filter(gli_ind)["time"].to_numpy().astype('datetime64[ns]')).astype(int), | ||
val[gli_ind]) | ||
return valout | ||
|
||
|
||
|
@@ -286,9 +300,9 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', | |
device_data = deployment['glider_devices'] | ||
id = metadata['glider_name'] | ||
_log.info(f'Opening combined nav file {indir}/{id}-rawgli.nc') | ||
gli = xr.open_dataset(f'{indir}/{id}-rawgli.nc') | ||
_log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.nc') | ||
sensor = xr.open_dataset(f'{indir}/{id}-{kind}pld.nc') | ||
gli = pl.read_parquet(f'{indir}/{id}-rawgli.parquet') | ||
_log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.parquet') | ||
sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet') | ||
|
||
# build a new data set based on info in `deploymentyaml.` | ||
# We will use ctd as the interpolant | ||
|
@@ -304,7 +318,8 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', | |
# It oversamples the nav data, but mildly undersamples the optics and | ||
# oxygen.... | ||
if 'timebase' in ncvar: | ||
indctd = np.where(~np.isnan(sensor[ncvar['timebase']['source']]))[0] | ||
vals = sensor.select([ncvar['timebase']['source']]).to_numpy()[:, 0] | ||
indctd = np.where(~np.isnan(vals))[0] | ||
elif 'GPCTD_TEMPERATURE' in list(sensor.variables): | ||
_log.warning('No timebase specified. Using GPCTD_TEMPERATURE as time' | ||
'base') | ||
|
@@ -317,35 +332,43 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', | |
_log.warning('No gpctd or legato data found. Using NAV_DEPTH as time' | ||
'base') | ||
indctd = np.where(~np.isnan(sensor.NAV_DEPTH))[0] | ||
ds['time'] = (('time'), sensor['time'].values[indctd], attr) | ||
ds['time'] = (('time'), sensor.select('time').to_numpy()[indctd, 0], attr) | ||
thenames = list(ncvar.keys()) | ||
for i in ['time', 'timebase', 'keep_variables']: | ||
if i in thenames: | ||
thenames.remove(i) | ||
for name in thenames: | ||
_log.info('interpolating ' + name) | ||
if not('method' in ncvar[name].keys()): | ||
if not ('method' in ncvar[name].keys()): | ||
# variables that are in the data set or can be interpolated from it | ||
if 'conversion' in ncvar[name].keys(): | ||
convert = getattr(utils, ncvar[name]['conversion']) | ||
else: | ||
convert = utils._passthrough | ||
sensorname = ncvar[name]['source'] | ||
if sensorname in list(sensor.variables): | ||
if sensorname in list(sensor.columns): | ||
_log.debug('sensorname %s', sensorname) | ||
val = convert(sensor[sensorname]) | ||
val = convert(sensor.select(sensorname).to_numpy()[:, 0]) | ||
if 'coarsen' in ncvar[name]: | ||
# smooth oxygen data as originally perscribed | ||
coarsen_time = ncvar[name]['coarsen'] | ||
sensor_sub = sensor.coarsen(time=coarsen_time, | ||
boundary='trim').mean() | ||
val2 = sensor_sub[sensorname] | ||
val = _interp_gli_to_pld(sensor_sub, sensor, val2, indctd) | ||
coarse_ints = np.arange(0, len(sensor)/coarsen_time, 1/coarsen_time).astype(int) | ||
sensor_sub = sensor.with_columns(pl.lit(coarse_ints).alias("coarse_ints")) | ||
sensor_sub_grouped = sensor_sub.with_column( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block needs a comment as well... It says "smooth" oxygen above, but I'm not following this danse with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah that makes more sense! I've renamed to merge_parquet and added |
||
pl.col('time').to_physical() | ||
).groupby( | ||
by=pl.col('coarse_ints'), | ||
maintain_order=True | ||
).mean().with_column( | ||
pl.col('time').cast(pl.Datetime('ms')) | ||
)[:-1, :] | ||
val2 = sensor_sub_grouped.select(sensorname).to_numpy()[:, 0] | ||
val = _interp_gli_to_pld(sensor_sub_grouped, sensor, val2, indctd) | ||
val = val[indctd] | ||
|
||
ncvar['method'] = 'linear fill' | ||
else: | ||
val = gli[sensorname] | ||
val = gli.select(sensorname).to_numpy()[:, 0] | ||
val = convert(val) | ||
# Values from the glider netcdf must be interpolated to match | ||
# the sensor netcdf | ||
|
@@ -355,7 +378,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', | |
ncvar[name].pop('coordinates', None) | ||
attrs = ncvar[name] | ||
attrs = utils.fill_required_attrs(attrs) | ||
ds[name] = (('time'), val.data, attrs) | ||
ds[name] = (('time'), val, attrs) | ||
|
||
# fix lon and lat to be linearly interpolated between fixes | ||
good = np.where(np.abs(np.diff(ds.longitude)) + | ||
|
@@ -397,7 +420,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', | |
ds = ds.isel(time=good) | ||
len_after_drop = len(ds.time) | ||
proportion_kept = len_after_drop / len_before_drop | ||
loss_str = f"{100 * (1-proportion_kept)} % samples removed by timestamp deduplication." | ||
loss_str = f"{100 * (1 - proportion_kept)} % samples removed by timestamp deduplication." | ||
if proportion_kept < 0.5: | ||
raise ValueError(f"{loss_str} Check input data for duplicate timestamps") | ||
elif proportion_kept < 0.999: | ||
|
@@ -441,7 +464,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', | |
ds.ad2cp_time.attrs.pop('units') | ||
ds.to_netcdf(outname, 'w', | ||
encoding={'time': {'units': | ||
'seconds since 1970-01-01T00:00:00Z'}}) | ||
'seconds since 1970-01-01T00:00:00Z'}}) | ||
return outname | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ dependencies: | |
- pytest | ||
- pooch | ||
- matplotlib | ||
- polars | ||
- pip: | ||
- dbdreader | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add the
badfiles
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
badfiles added