Skip to content

Commit 145a8d1

Browse files
rileyhalesybaaniya
andauthored
include ungauged bias correction (#44)
* include ungauged bias correction * changed function to use zarr format instead of parquet * correct table names and dataset structures * increment version number * correct module __all__ list --------- Co-authored-by: yubinbaaniya <[email protected]> Co-authored-by: ybaaniya <[email protected]>
1 parent 39c8eeb commit 145a8d1

File tree

4 files changed

+383
-4
lines changed

4 files changed

+383
-4
lines changed

geoglows/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@
1212
'bias', 'plots', 'data', 'analyze', 'streams', 'tables', 'streamflow',
1313
'get_metadata_table_path', 'set_metadata_table_path',
1414
]
15-
__version__ = '1.7.1'
15+
__version__ = '1.8.0'
1616
__author__ = 'Riley Hales'
1717
__license__ = 'BSD 3-Clause Clear License'

geoglows/_constants.py

+65-1
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,76 @@
1010
os.path.join(os.path.dirname(__file__), 'data', 'metadata-tables.parquet')
1111
)
1212

13+
SABER_ASSIGN_TABLE_PATH = os.getenv(
14+
'PYGEOGLOWS_SABER_ASSIGN_TABLE_PATH',
15+
os.path.join(os.path.dirname(__file__), 'data', 'saber-assign-table.parquet'))
16+
17+
SFDC_TABLE_PATH = os.getenv(
18+
'PYGEOGLOWS_SFDC_TABLE_PATH',
19+
os.path.join(os.path.dirname(__file__), 'data', 'sfdc-removing-zero.parquet'))
20+
21+
FDC_TABLE_PATH = os.getenv(
22+
'PYGEOGLOWS_FDC_TABLE_PATH',
23+
os.path.join(os.path.dirname(__file__), 'data', 'simulated_monthly.parquet'))
24+
25+
26+
TRANSFORMER_TABLE_ENV_KEY = 'PYGEOGLOWS_TRANSFORMER_TABLE_URI'
27+
SFDC_ZARR_ENV_KEY = 'PYGEOGLOWS_SFDC_ZARR_URI'
28+
29+
30+
def get_sfdc_zarr_uri() -> str:
31+
return os.getenv(SFDC_ZARR_ENV_KEY, 's3://rfs-v2/transformers/sfdc.zarr')
32+
33+
34+
def set_sfdc_zarr_uri(uri: str) -> None:
35+
os.environ[SFDC_ZARR_ENV_KEY] = uri
36+
return
37+
38+
39+
def get_transformer_table_uri() -> str:
40+
return os.getenv(TRANSFORMER_TABLE_ENV_KEY, 's3://rfs-v2/transformers/transformer_table.parquet')
41+
42+
43+
def set_transformer_table_uri(uri: str) -> None:
44+
os.environ[TRANSFORMER_TABLE_ENV_KEY] = uri
45+
return
46+
1347

1448
def get_metadata_table_path() -> str:
15-
return METADATA_TABLE_PATH
49+
return os.getenv('PYGEOGLOWS_METADATA_TABLE_PATH', METADATA_TABLE_PATH)
1650

1751

1852
def set_metadata_table_path(path: str) -> str:
1953
global METADATA_TABLE_PATH
2054
METADATA_TABLE_PATH = path
2155
return METADATA_TABLE_PATH
56+
57+
58+
def get_saber_assign_table_path() -> str:
59+
return SABER_ASSIGN_TABLE_PATH
60+
61+
62+
def set_saber_assign_table_path(path: str) -> str:
63+
global SABER_ASSIGN_TABLE_PATH
64+
SABER_ASSIGN_TABLE_PATH = path
65+
return SABER_ASSIGN_TABLE_PATH
66+
67+
68+
def get_sfdc_table_path() -> str:
69+
return SFDC_TABLE_PATH
70+
71+
72+
def set_sfdc_table_path(path: str) -> str:
73+
global SFDC_TABLE_PATH
74+
SFDC_TABLE_PATH = path
75+
return SFDC_TABLE_PATH
76+
77+
78+
def get_fdc_table_path() -> str:
79+
return FDC_TABLE_PATH
80+
81+
82+
def set_fdc_table_path(path: str) -> str:
83+
global FDC_TABLE_PATH
84+
FDC_TABLE_PATH = path
85+
return FDC_TABLE_PATH

geoglows/bias.py

+211
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import numpy as np
77
import pandas as pd
88
from scipy import interpolate
9+
from scipy import stats
10+
11+
from .data import sfdc_for_river_id
12+
from .data import retrospective
913

1014
__all__ = [
1115
'correct_historical',
@@ -48,6 +52,213 @@ def correct_historical(simulated_data: pd.DataFrame, observed_data: pd.DataFrame
4852
return corrected
4953

5054

55+
def sfdc_bias_correction(river_id: int) -> pd.DataFrame:
56+
"""
57+
Corrects the bias in the simulated data for a given river_id using the SFDC method. This method is based on the
58+
https://github.com/rileyhales/saber-hbc repository.
59+
60+
Args: river_id: int: a 9 digit integer that is a valid GEOGLOWS River ID number
61+
62+
Returns: pandas DataFrame with a DateTime index and columns with Simulated flow, Bias Corrected Simulation flow.
63+
64+
"""
65+
simulated_data = retrospective(river_id=river_id)
66+
print(simulated_data)
67+
sfdc_b = sfdc_for_river_id(river_id=river_id)
68+
sim_fdc_b = []
69+
for i in range(1, 13):
70+
mdf = fdc(simulated_data[simulated_data.index.month == i].values.flatten()).rename(columns={'Q': 'fdc'}).reset_index()
71+
mdf['month'] = i
72+
sim_fdc_b.append(mdf)
73+
sim_fdc_b = pd.concat(sim_fdc_b, axis=0)
74+
75+
# Process each month from January (1) to December (12)
76+
monthly_results = []
77+
for month in sorted(set(simulated_data.index.month)):
78+
mon_sim_b = simulated_data[simulated_data.index.month == month].dropna().clip(lower=0)
79+
qb_original = mon_sim_b[river_id].values.flatten()
80+
scalar_fdc = sfdc_b[sfdc_b['month'] == month][['p_exceed', 'sfdc']].set_index('p_exceed')
81+
sim_fdc_b_m = sim_fdc_b[sim_fdc_b['month'] == month][['p_exceed', 'fdc']].set_index('p_exceed')
82+
83+
flow_to_percent = _make_interpolator(sim_fdc_b_m.values.flatten(),
84+
sim_fdc_b_m.index,
85+
extrap='nearest',
86+
fill_value=None)
87+
88+
percent_to_scalar = _make_interpolator(scalar_fdc.index,
89+
scalar_fdc.values.flatten(),
90+
extrap='nearest',
91+
fill_value=None)
92+
p_exceed = flow_to_percent(qb_original)
93+
scalars = percent_to_scalar(p_exceed)
94+
qb_adjusted = qb_original / scalars
95+
96+
# Create a DataFrame for this month's results
97+
month_df = pd.DataFrame({
98+
'date': mon_sim_b.index,
99+
'Simulated': qb_original,
100+
'Bias Corrected Simulation': qb_adjusted
101+
})
102+
103+
# Append the month's DataFrame to the results list
104+
monthly_results.append(month_df)
105+
return pd.concat(monthly_results).set_index('date').sort_index()
106+
107+
108+
def make_sfdc(simulated_df: pd.DataFrame, gauge_df: pd.DataFrame,
109+
use_log: bool = False, fix_seasonally: bool = True, empty_months: str = 'skip',
110+
drop_outliers: bool = False, outlier_threshold: int or float = 2.5,
111+
filter_scalar_fdc: bool = False, filter_range: tuple = (0, 80),
112+
extrapolate: str = 'nearest', fill_value: int or float = None,
113+
fit_gumbel: bool = False, fit_range: tuple = (10, 90),
114+
metadata: bool = False, ) -> pd.DataFrame:
115+
"""
116+
Makes a scalar flow duration curve (SFDC) mapping from simulated to observed (gauge) flow data. SFDC is used in SABER to
117+
correct timeseries data for a ungauged riverid.
118+
119+
Args:
120+
simulated_data: A dataframe with a datetime index and a single column of simulated streamflow values
121+
gauge_df: A dataframe with a datetime index and a single column of observed streamflow values
122+
extrapolate: Method to use for extrapolation. Options: nearest, const, linear, average, max, min
123+
fill_value: Value to use for extrapolation when extrapolate_method='const'
124+
filter_range: Lower and upper bounds of the filter range
125+
drop_outliers: Flag to exclude outliers
126+
outlier_threshold: Number of std deviations from mean to exclude from flow duration curve
127+
128+
Returns:
129+
pandas DataFrame with a DateTime index and columns with corrected flow, uncorrected flow, the scalar adjustment
130+
factor applied to correct the discharge, and the percentile of the uncorrected flow (in the seasonal grouping,
131+
if applicable).
132+
"""
133+
if fix_seasonally:
134+
# list of the unique months in the historical simulation. should always be 1->12 but just in case...
135+
monthly_results = []
136+
for month in sorted(set(simulated_df.index.strftime('%m'))):
137+
# filter data to current iteration's month
138+
mon_obs_a = gauge_df[gauge_df.index.month == int(month)].dropna().clip(lower=0)
139+
if mon_obs_a.empty:
140+
if empty_months == 'skip':
141+
continue
142+
else:
143+
raise ValueError(f'Invalid value for argument "empty_months". Given: {empty_months}.')
144+
145+
mon_sim_b = simulated_df[simulated_df.index.month == int(month)].dropna().clip(lower=0)
146+
monthly_results.append(make_sfdc(
147+
mon_obs_a, mon_sim_b,
148+
fix_seasonally=False, empty_months=empty_months,
149+
drop_outliers=drop_outliers, outlier_threshold=outlier_threshold,
150+
filter_scalar_fdc=filter_scalar_fdc, filter_range=filter_range,
151+
extrapolate=extrapolate, fill_value=fill_value,
152+
fit_gumbel=fit_gumbel, fit_range=fit_range, )
153+
)
154+
# combine the results from each monthly into a single dataframe (sorted chronologically) and return it
155+
return pd.concat(monthly_results).sort_index()
156+
157+
if drop_outliers:
158+
simulated_df = _drop_outliers_by_zscore(simulated_df, threshold=outlier_threshold)
159+
gauge_df = _drop_outliers_by_zscore(gauge_df, threshold=outlier_threshold)
160+
161+
# compute the flow duration curves
162+
sim_fdc = fdc(simulated_df)
163+
obs_fdc = fdc(gauge_df)
164+
165+
# calculate the scalar flow duration curve
166+
scalar_fdc = sfdc(sim_fdc, obs_fdc)
167+
168+
return scalar_fdc
169+
170+
171+
def fdc(flows: np.array, steps: int = 101, col_name: str = 'Q') -> pd.DataFrame:
172+
"""
173+
Compute flow duration curve (exceedance probabilities) from a list of flows
174+
175+
Args:
176+
flows: array of flows
177+
steps: number of steps (exceedance probabilities) to use in the FDC
178+
col_name: name of the column in the returned dataframe
179+
180+
Returns:
181+
pd.DataFrame with index 'p_exceed' and columns 'Q' (or col_name)
182+
"""
183+
# calculate the FDC and save to parquet
184+
exceed_prob = np.linspace(100, 0, steps)
185+
fdc_flows = np.nanpercentile(flows, exceed_prob)
186+
df = pd.DataFrame(fdc_flows, columns=[col_name, ], index=exceed_prob)
187+
df.index.name = 'p_exceed'
188+
return df
189+
190+
191+
def _drop_outliers_by_zscore(df: pd.DataFrame, threshold: float = 3) -> pd.DataFrame:
192+
"""
193+
Drop outliers from a dataframe by their z-score and a threshold
194+
Based on https://stackoverflow.com/questions/23199796/detect-and-exclude-outliers-in-pandas-data-frame
195+
Args:
196+
df: dataframe to drop outliers from
197+
threshold: z-score threshold
198+
199+
Returns:
200+
pd.DataFrame with outliers removed
201+
"""
202+
return df[(np.abs(stats.zscore(df)) < threshold).all(axis=1)]
203+
204+
205+
def sfdc(sim_fdc: pd.DataFrame, obs_fdc: pd.DataFrame) -> pd.DataFrame:
206+
"""
207+
Compute the scalar flow duration curve (exceedance probabilities) from two flow duration curves
208+
209+
Args:
210+
sim_fdc: simulated flow duration curve
211+
obs_fdc: observed flow duration curve
212+
213+
Returns:
214+
pd.DataFrame with index (exceedance probabilities) and a column of scalars
215+
"""
216+
scalars_df = pd.DataFrame(np.divide(sim_fdc.values.flatten(), obs_fdc.values.flatten()),
217+
columns=['scalars', ],
218+
index=sim_fdc.index
219+
)
220+
scalars_df.replace(np.inf, np.nan, inplace=True)
221+
scalars_df.dropna(inplace=True)
222+
return scalars_df
223+
224+
225+
def _make_interpolator(x: np.array, y: np.array, extrap: str = 'nearest',
226+
fill_value: int or float = None) -> interpolate.interp1d:
227+
"""
228+
Make an interpolator from two arrays
229+
230+
Args:
231+
x: x values
232+
y: y values
233+
extrap: method for extrapolation: nearest, const, linear, average, max, min
234+
fill_value: value to use when extrap='const'
235+
236+
Returns:
237+
interpolate.interp1d
238+
"""
239+
# todo check that flows are not negative and have sufficient variance - even for small variance in SAF
240+
# if np.max(y) - np.min(y) < 5:
241+
# logger.warning('The y data has similar max/min values. You may get unanticipated results.')
242+
243+
# make interpolator which converts the percentiles to scalars
244+
if extrap == 'nearest':
245+
return interpolate.interp1d(x, y, fill_value='extrapolate', kind='nearest')
246+
elif extrap == 'const':
247+
if fill_value is None:
248+
raise ValueError('Must provide the const kwarg when extrap_method="const"')
249+
return interpolate.interp1d(x, y, fill_value=fill_value, bounds_error=False)
250+
elif extrap == 'linear':
251+
return interpolate.interp1d(x, y, fill_value='extrapolate')
252+
elif extrap == 'average':
253+
return interpolate.interp1d(x, y, fill_value=np.mean(y), bounds_error=False)
254+
elif extrap == 'max' or extrap == 'maximum':
255+
return interpolate.interp1d(x, y, fill_value=np.max(y), bounds_error=False)
256+
elif extrap == 'min' or extrap == 'minimum':
257+
return interpolate.interp1d(x, y, fill_value=np.min(y), bounds_error=False)
258+
else:
259+
raise ValueError('Invalid extrapolation method provided')
260+
261+
51262
def correct_forecast(forecast_data: pd.DataFrame, simulated_data: pd.DataFrame,
52263
observed_data: pd.DataFrame, use_month: int = 0) -> pd.DataFrame:
53264
"""

0 commit comments

Comments
 (0)