Skip to content

Commit 654f703

Browse files
committed
addresses #4
1 parent ccc8e79 commit 654f703

File tree

2 files changed

+48
-35
lines changed

2 files changed

+48
-35
lines changed

computation_scripts/master.py

+21-21
Original file line numberDiff line numberDiff line change
@@ -413,27 +413,27 @@ def concatenate_outputs() -> None:
413413
if not qouts:
414414
raise FileNotFoundError("No Qout files found. RAPID probably not run correctly.")
415415

416-
unique_start_dates = sorted({os.path.basename(f).split('_')[2] for f in qouts})
417-
418-
for unique_start_date in unique_start_dates:
419-
with xr.open_zarr(local_zarr) as retro_ds:
420-
chunks = retro_ds.chunks
421-
with xr.open_mfdataset(
422-
[qout for qout in qouts if unique_start_date in qout],
423-
combine='nested',
424-
concat_dim='rivid',
425-
preprocess=drop_coords
426-
).reindex(rivid=retro_ds['rivid']) as new_ds:
427-
earliest_date = np.datetime_as_string(new_ds.time[0].values, unit="h")
428-
latest_date = np.datetime_as_string(new_ds.time[-1].values, unit="h")
429-
CL.log_message('RUNNING', f'Appending to zarr: {earliest_date} to {latest_date}')
430-
logging.info(f'Appending to zarr: {earliest_date} to {latest_date}')
431-
(
432-
new_ds
433-
.chunk({"time": chunks["time"][0], "rivid": chunks["rivid"][0]})
434-
.to_zarr(local_zarr, mode='a', append_dim='time', consolidated=True)
435-
)
436-
logging.info(f'Finished appending')
416+
with xr.open_zarr(local_zarr) as retro_ds:
417+
chunks = retro_ds.chunks
418+
with xr.open_mfdataset(
419+
qouts,
420+
combine='nested',
421+
concat_dim='rivid',
422+
parallel=True,
423+
preprocess=drop_coords
424+
).reindex(rivid=retro_ds['rivid']) as new_ds:
425+
earliest_date = np.datetime_as_string(new_ds.time[0].values, unit="h")
426+
latest_date = np.datetime_as_string(new_ds.time[-1].values, unit="h")
427+
new_ds = new_ds.round(decimals=3)
428+
new_ds = new_ds.where(new_ds['Qout'] >= 0.0, 0.0)
429+
CL.log_message('RUNNING', f'Appending to zarr: {earliest_date} to {latest_date}')
430+
logging.info(f'Appending to zarr: {earliest_date} to {latest_date}')
431+
(
432+
new_ds
433+
.chunk({"time": chunks["time"][0], "rivid": chunks["rivid"][0]})
434+
.to_zarr(local_zarr, mode='a', append_dim='time', consolidated=True)
435+
)
436+
logging.info(f'Finished appending')
437437
return
438438

439439

downloader_scripts/check_and_download_era.py

+27-14
Original file line numberDiff line numberDiff line change
@@ -164,17 +164,30 @@ def download_era5() -> None:
164164
return
165165

166166
print('converting to daily cumulative')
167-
for downloaded_file in downloaded_files:
168-
daily_cumulative_file_name = os.path.basename(downloaded_file).replace('.nc', '_daily_cumulative.nc')
169-
with xr.open_dataset(downloaded_file) as ds:
170-
print(f'processing {downloaded_file}')
171-
172-
if ds['time'].shape[0] == 0:
173-
print(f'No time steps were downloaded- the shape of the time array is 0.')
174-
print(f'Removing {downloaded_file}')
175-
os.remove(downloaded_file)
176-
continue
177-
167+
year_1 = year_month_combos[0][0]
168+
month_1 = year_month_combos[0][1]
169+
if len(year_month_combos) > 1:
170+
year_2 = year_month_combos[1][0]
171+
month_2 = year_month_combos[1][1]
172+
else:
173+
year_2 = year_1
174+
month_2 = month_1
175+
day_1 = min({d.day for d in date_range if d.year == year_1 and d.month == month_1})
176+
day_2 = max({d.day for d in date_range if d.year == year_2 and d.month == month_2})
177+
daily_cumulative_file_name = f'era5_{year_1}{str(month_1).zfill(2)}{str(day_1).zfill(2)}-{year}{str(month_2).zfill(2)}{str(day_2).zfill(2)}_daily_cumulative.nc'
178+
with xr.open_mfdataset(downloaded_files,
179+
concat_dim='time',
180+
combine='nested',
181+
parallel=True,
182+
chunks = {'time':'auto', 'lat':'auto','lon':'auto'}, # Included to prevent weird slicing behavior and missing data
183+
) as ds:
184+
print(f'processing {", ".join(downloaded_files)}')
185+
186+
if ds['time'].shape[0] == 0:
187+
print(f'No time steps were downloaded- the shape of the time array is 0.')
188+
print(f'Removing {", ".join(downloaded_files)}')
189+
{os.remove(downloaded_file) for downloaded_file in downloaded_files}
190+
else:
178191
if 'expver' in ds.dims:
179192
print('expver in dims')
180193
# find the time steps where the runoff is not nan when expver=1
@@ -211,10 +224,10 @@ def download_era5() -> None:
211224
ds.to_netcdf(os.path.join(era_dir, daily_cumulative_file_name))
212225
print(f'uploading {daily_cumulative_file_name}')
213226
subprocess.call(['aws', 's3', 'cp', os.path.join(era_dir, daily_cumulative_file_name),
214-
os.path.join(s3_era_bucket, os.path.basename(downloaded_file))])
227+
os.path.join(s3_era_bucket, os.path.basename(daily_cumulative_file_name))])
215228

216-
# remove the original file
217-
os.remove(downloaded_file)
229+
# remove the original files
230+
{os.remove(downloaded_file) for downloaded_file in downloaded_files}
218231

219232
# remove the consolidated file
220233
os.remove(os.path.join(era_dir, daily_cumulative_file_name))

0 commit comments

Comments
 (0)