Skip to content

Commit

Permalink
update HQ model and tests for new csv path validation
Browse files Browse the repository at this point in the history
  • Loading branch information
emlys committed Jan 19, 2024
1 parent d63437d commit 5ca1ec3
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 65 deletions.
65 changes: 61 additions & 4 deletions src/natcap/invest/habitat_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import numpy
from osgeo import gdal
import pandas
import pygeoprocessing
import taskgraph

Expand All @@ -26,6 +25,9 @@
MISSING_COLUMN_MSG = gettext(
"The column '{column_name}' was not found in the Threat Data table for "
"the corresponding input LULC scenario.")
MISSING_THREAT_RASTER_MSG = gettext(
"A threat raster for threats: {threat_list} was not found or it "
"could not be opened by GDAL.")
DUPLICATE_PATHS_MSG = gettext("Threat paths must be unique. Duplicates: ")

MODEL_SPEC = {
Expand Down Expand Up @@ -414,7 +416,17 @@ def execute(args):
for threat, row in threat_df.iterrows():
LOGGER.debug(f"Validating path for threat: {threat}")
threat_table_path_col = _THREAT_SCENARIO_MAP[lulc_key]
threat_path = row[threat_table_path_col]

threat_validate_result = _validate_threat_path(
row[threat_table_path_col], lulc_key)
if threat_validate_result == 'error':
raise ValueError(
'There was an Error locating a threat raster from '
'the path in CSV for column: '
f'{_THREAT_SCENARIO_MAP[lulc_key]} and threat: '
f'{threat}.')

threat_path = threat_validate_result

threat_path_dict['threat' + lulc_key][threat] = threat_path
# save threat paths in a list for alignment and resize
Expand Down Expand Up @@ -563,7 +575,6 @@ def execute(args):

# for each land cover raster provided compute habitat quality
for lulc_key, lulc_path in lulc_path_dict.items():
print(lulc_key, lulc_path)
LOGGER.info(f'Calculating habitat quality for landuse: {lulc_path}')

threat_decay_task_list = []
Expand Down Expand Up @@ -597,7 +608,6 @@ def execute(args):
exit_landcover = False

# adjust each threat/threat raster for distance, weight, and access
print(threat_df)
for threat, row in threat_df.iterrows():
LOGGER.debug(
f'Calculating threat: {threat}.\nThreat data: {row}')
Expand Down Expand Up @@ -1014,6 +1024,35 @@ def exp_op(dist):
target_path=target_path)


def _validate_threat_path(threat_path, lulc_key):
"""Check ``threat_path`` is a valid raster file against ``lulc_key``.
Check to see that the path is a valid raster and if not use ``lulc_key``
to determine how to handle the non valid raster.
Args:
threat_path (str): path on disk for a possible raster file.
lulc_key (str): an string indicating which land cover this threat
path is associated with. Can be: '_b' | '_c' | '_f'
Returns:
If ``threat_path`` is a valid raster file then,
return ``threat_path``.
If ``threat_path`` is not valid then,
return ``None`` if ``lulc_key`` == '_b'
return 'error` otherwise
"""
# Checking threat path exists to control custom error messages
# for user readability.
if threat_path:
return threat_path
else:
if lulc_key == '_b':
return None
else:
return 'error'


@validation.invest_validator
def validate(args, limit_to=None):
"""Validate args to ensure they conform to ``execute``'s contract.
Expand Down Expand Up @@ -1081,7 +1120,18 @@ def validate(args, limit_to=None):
bad_threat_columns.append(threat_table_path_col)
break

# Threat path from threat CSV is relative to CSV
threat_path = row[threat_table_path_col]

threat_validate_result = _validate_threat_path(
threat_path, lulc_key)
if threat_validate_result == 'error':
bad_threat_paths.append(
(threat, threat_table_path_col))
continue

threat_path = threat_validate_result

if threat_path:
# check for duplicate absolute threat path names that
# cause errors when trying to write aligned versions
Expand All @@ -1096,6 +1146,13 @@ def validate(args, limit_to=None):
['threats_table_path'],
MISSING_COLUMN_MSG.format(column_name=bad_threat_columns[0])))

if bad_threat_paths:
validation_warnings.append((
['threats_table_path'],
MISSING_THREAT_RASTER_MSG.format(threat_list=bad_threat_paths)
))
invalid_keys.add('threats_table_path')

if duplicate_paths:
validation_warnings.append((
['threats_table_path'],
Expand Down
116 changes: 55 additions & 61 deletions tests/test_habitat_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def make_threats_raster(
if threat_values is None:
threat_values = [1, 1]

for time_index, suffix in enumerate(['_b', '_c', '_f']):
for time_index, suffix in enumerate(['_c', '_f']):
for (i, threat), value in zip(enumerate(threat_names), threat_values):
threat_array = numpy.zeros((side_length, side_length), dtype=dtype)
raster_path = os.path.join(folder_path, threat + suffix + '.tif')
Expand Down Expand Up @@ -914,6 +914,52 @@ def test_habitat_quality_no_fut_column(self):
self.fail("HQ failed when using threat data CSV missing FUT_PATH"
f" column. \n {str(e)}")

def test_habitat_quality_bad_rasters(self):
"""Habitat Quality: raise error on threats that aren't real rasters."""
from natcap.invest import habitat_quality

args = {
'half_saturation_constant': '0.5',
'workspace_dir': self.workspace_dir,
'n_workers': -1,
}

args['sensitivity_table_path'] = os.path.join(
args['workspace_dir'], 'sensitivity_samp.csv')
make_sensitivity_samp_csv(args['sensitivity_table_path'])

args['lulc_cur_path'] = os.path.join(
args['workspace_dir'], 'lc_samp_cur_b.tif')

lulc_array = numpy.ones((100, 100), dtype=numpy.int8)
lulc_array[50:, :] = 2
make_raster_from_array(lulc_array, args['lulc_cur_path'])

# Make an empty threat raster in the workspace folder.
make_threats_raster(
args['workspace_dir'], make_empty_raster=True)

args['threats_table_path'] = os.path.join(
args['workspace_dir'], 'threats_samp.csv')

# create the threat CSV table
with open(args['threats_table_path'], 'w') as open_table:
open_table.write(
'MAX_DIST,WEIGHT,THREAT,DECAY,BASE_PATH,CUR_PATH,FUT_PATH\n')
open_table.write(
'0.04,0.7,threat_1,linear,,threat_1_c.tif,threat_1_f.tif\n')
open_table.write(
'0.07,1.0,threat_2,exponential,,threat_2_c.tif,'
'threat_2_f.tif\n')

with self.assertRaises(ValueError) as cm:
habitat_quality.execute(args)

actual_message = str(cm.exception)
self.assertIn(
'File could not be opened as a GDAL raster',
actual_message)

def test_habitat_quality_lulc_current_only(self):
"""Habitat Quality: on missing base and future LULC rasters."""
from natcap.invest import habitat_quality
Expand Down Expand Up @@ -1039,9 +1085,9 @@ def test_habitat_quality_lulc_baseline_current(self):
open_table.write(
'MAX_DIST,WEIGHT,THREAT,DECAY,BASE_PATH,CUR_PATH,FUT_PATH\n')
open_table.write(
'0.04,0.7,threat_1,linear,threat_1_b.tif,threat_1_c.tif,threat_1_f.tif\n')
'0.04,0.7,threat_1,linear,,threat_1_c.tif,threat_1_f.tif\n')
open_table.write(
'0.07,1.0,threat_2,exponential,threat_2_b.tif,threat_2_c.tif,'
'0.07,1.0,threat_2,exponential,,threat_2_c.tif,'
'threat_2_f.tif\n')

habitat_quality.execute(args)
Expand Down Expand Up @@ -1326,6 +1372,7 @@ def test_habitat_quality_validation_bad_threat_path(self):
make_sensitivity_samp_csv(args['sensitivity_table_path'])

# intentialy do not make the threat rasters

args['threats_table_path'] = os.path.join(
args['workspace_dir'], 'threats_samp.csv')

Expand All @@ -1340,9 +1387,10 @@ def test_habitat_quality_validation_bad_threat_path(self):
'threat_2_f.tif\n')

validate_result = habitat_quality.validate(args, limit_to=None)
self.assertIn(
'File not found',
validate_result[0][1])
self.assertTrue(
validate_result,
"expected failed validations instead didn't get any.")
self.assertIn('File not found', validate_result[0][1])

def test_habitat_quality_missing_cur_threat_path(self):
"""Habitat Quality: test for missing threat paths in current."""
Expand Down Expand Up @@ -1498,10 +1546,7 @@ def test_habitat_quality_misspelled_cur_threat_path(self):
habitat_quality.execute(args)

actual_message = str(cm.exception)
self.assertIn(
'There was an Error locating a threat raster from '
'the path in CSV for column: cur_path and threat: threat_1',
actual_message)
self.assertIn('File not found', actual_message)

def test_habitat_quality_validate_missing_cur_threat_path(self):
"""Habitat Quality: test validate for missing threat paths in cur."""
Expand Down Expand Up @@ -1591,8 +1636,6 @@ def test_habitat_quality_validate_missing_fut_threat_path(self):
args['threats_table_path'] = os.path.join(
args['workspace_dir'], 'threats_samp.csv')

print(os.listdir(args['workspace_dir']))

with open(args['threats_table_path'], 'w') as open_table:
open_table.write(
'MAX_DIST,WEIGHT,THREAT,DECAY,BASE_PATH,CUR_PATH,FUT_PATH\n')
Expand All @@ -1611,55 +1654,6 @@ def test_habitat_quality_validate_missing_fut_threat_path(self):
threat_list=[('threat_1', 'fut_path')]),
validate_result[0][1])

def test_habitat_quality_validate_misspelled_cur_threat_path(self):
"""Habitat Quality: test validate for a misspelled cur threat path."""
from natcap.invest import habitat_quality

args = {
'half_saturation_constant': '0.5',
'results_suffix': 'regression',
'workspace_dir': self.workspace_dir,
'n_workers': -1,
}

args['access_vector_path'] = os.path.join(
args['workspace_dir'], 'access_samp.shp')
make_access_shp(args['access_vector_path'])

scenarios = ['_bas_', '_cur_', '_fut_']
for lulc_val, scenario in enumerate(scenarios, start=1):
lulc_array = numpy.ones((100, 100), dtype=numpy.int8)
lulc_array[50:, :] = lulc_val
args['lulc' + scenario + 'path'] = os.path.join(
args['workspace_dir'], 'lc_samp' + scenario + 'b.tif')
make_raster_from_array(
lulc_array, args['lulc' + scenario + 'path'])

args['sensitivity_table_path'] = os.path.join(
args['workspace_dir'], 'sensitivity_samp.csv')
make_sensitivity_samp_csv(args['sensitivity_table_path'])

make_threats_raster(
args['workspace_dir'], threat_values=[1, 1],
dtype=numpy.int8, gdal_type=gdal.GDT_Int32)

args['threats_table_path'] = os.path.join(
args['workspace_dir'], 'threats_samp.csv')

with open(args['threats_table_path'], 'w') as open_table:
open_table.write(
'MAX_DIST,WEIGHT,THREAT,DECAY,BASE_PATH,CUR_PATH,FUT_PATH\n')
open_table.write(
'0.04,0.7,threat_1,linear,,threat_1_cur.tif,threat_1_c.tif\n')
open_table.write(
'0.07,1.0,threat_2,exponential,,threat_2_c.tif,'
'threat_2_f.tif\n')

validate_result = habitat_quality.validate(args, limit_to=None)
self.assertIn(
f'"{self.workspace_dir}/threat_1_cur.tif": File not found',
validate_result[0][1])

def test_habitat_quality_validate_duplicate_threat_path(self):
"""Habitat Quality: test validate for duplicate threat paths."""
from natcap.invest import habitat_quality
Expand Down

0 comments on commit 5ca1ec3

Please sign in to comment.