Skip to content

Commit

Permalink
Github Issue #331: refactor extract tiles so it is testable and no lo…
Browse files Browse the repository at this point in the history
…nger depends on process id.

Merge branch 'feature_331_refactor_extract_tiles' into develop
  • Loading branch information
bikegeek committed Dec 6, 2019
2 parents 50d52af + ffe4e5b commit 7161c75
Showing 1 changed file with 109 additions and 36 deletions.
145 changes: 109 additions & 36 deletions ush/extract_tiles_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,56 +67,41 @@ def run_at_time(self, input_dict):
None: invokes regrid_data_plane to create a netCDF file from two
extratropical storm track files.
"""

# Do some set up
time_info = time_util.ti_calculate(input_dict)
init_time = time_info['init_fmt']

# get the process id to be used to identify the output
# amongst different users and runs.
cur_pid = str(os.getpid())
tmp_dir = os.path.join(self.config.getdir('TMP_DIR'), cur_pid)
tmp_dir = self.config.getdir('TMP_DIR')
self.logger.info("Begin extract tiles")

cur_init = init_time[0:8]+"_"+init_time[8:10]

# Check that there are tc_pairs data files (.tcst) which are needed as input
# to the extract tiles wrapper
tc_pairs_nc_output_regex = ".*.tcst"
output_files_list = util.get_files(self.tc_pairs_dir, tc_pairs_nc_output_regex, self.logger)
if len(output_files_list) == 0:
self.log_error("No tc pairs data found at {}"\
.format(self.tc_pairs_dir))
# Before proceeding, make sure we have input data.
if not self.tc_files_exist():
self.log_error("No tc pairs data found at {}" \
.format(self.tc_pairs_dir))
sys.exit(1)

# Create the name of the filter file we need to find. If
# the file doesn't exist, then run TC_STAT
# the filter file doesn't yet exist, then run TC_STAT
filter_filename = "filter_" + cur_init + ".tcst"
filter_name = os.path.join(self.filtered_out_dir, cur_init,
filter_filename)

if util.file_exists(filter_name) and not self.overwrite_flag:
self.logger.debug("Filter file exists, using Track data file: {}"\
.format(filter_name))
else:
# Create the storm track by applying the
# filter options defined in the config/param file.
# Use TCStatWrapper to build up the tc_stat command and invoke
# the MET tool tc_stat to perform the filtering.
tiles_list = util.get_files(self.tc_pairs_dir, ".*tcst", self.logger)
tiles_list_str = ' '.join(tiles_list)

tcs = TCStatWrapper(self.config, self.logger)
tcs.build_tc_stat(self.filtered_out_dir, cur_init,
tiles_list_str, self.addl_filter_opts)

# Remove any empty files and directories that can occur
# from filtering.
util.prune_empty(filter_name, self.logger)
# Invoke MET tool tc stat via the tc stat wrapper...
filtering_ok = self.do_filtering(cur_init, filter_name)
if filtering_ok != 0:
self.log_error("There was a problem running MET tc stat, please check your METplus "\
"config file settings.s")
sys.exit(1)

# Now get unique storm ids from the filter file,
# filter_yyyymmdd_hh.tcst
sorted_storm_ids = util.get_storm_ids(filter_name, self.logger)

# Check for empty sorted_storm_ids, if empty,
# Useful debugging info: Check for empty sorted_storm_ids, if empty,
# continue to the next time.
if not sorted_storm_ids:
# No storms found for init time, cur_init
Expand All @@ -125,6 +110,84 @@ def run_at_time(self, input_dict):
self.logger.debug(msg)
return

# Process each storm in the sorted_storm_ids list
# Iterate over each filter file in the output directory and
# search for the presence of the storm id. Store this
# corresponding row of data into a temporary file in the
# /tmp directory where each file contains information based on storm.
tmp_files_created = self.create_results_files(sorted_storm_ids, cur_init, filter_name, tmp_dir)
if tmp_files_created != 0:
self.log_error("There was a problem with processing storms from the filtered result, "\
"please check your METplus config file settings or your write permissions for your "\
"tmp directory.")

self.cleanup(tmp_dir)

def tc_files_exist(self):
''' Check that there are tc_pairs data files (.tcst) which are needed as input
to the extract tiles wrapper
Args:
Return:
True if .tcst files exist, False otherwise
'''

tc_pairs_nc_output_regex = ".*.tcst"
output_files_list = util.get_files(self.tc_pairs_dir, tc_pairs_nc_output_regex, self.logger)
if len(output_files_list) == 0:
return False
else:
return True

def do_filtering(self, cur_init, filter_name):
''' run TC_STAT because the filter output file hasn't yet been created
by the MET tc stat tool.
Args:
@param cur_init: The current init time of interest
@param filter_name: The full filename of the file that will contain filtered results generated
by the MET tool tc stat.
Return:
0 if MET tc stat tool completed successfully (which will
create a filter_name file (which was defined/created in run_at_time()
in the output directory).
'''

# Create the storm track by applying the
# filter options defined in the config/param file.
# Use TCStatWrapper to build up the tc_stat command and invoke
# the MET tool tc_stat to perform the filtering.
tiles_list = util.get_files(self.tc_pairs_dir, ".*tcst", self.logger)
tiles_list_str = ' '.join(tiles_list)

tcs = TCStatWrapper(self.config, self.logger)
tcs.build_tc_stat(self.filtered_out_dir, cur_init,
tiles_list_str, self.addl_filter_opts)

# Remove any empty files and directories that can occur
# from filtering.
util.prune_empty(filter_name, self.logger)

return 0

def create_results_files(self, sorted_storm_ids, cur_init, filter_name, tmp_dir):
''' Create the tmp files that contain filtered results- one tmp file per storm, then
invoke retrieve_and_regrid to create the final output as netCDF forecast and analysis (obs)
files.
Args:
@param sorted_storm_ids:
@param cur_init: The current init time of interest
@param filter_name: The full file name of the filter file generated by tc stat
@param tmp_dir: The location of the tmp directory where these tmp files will be saved
Return:
0 if successful in creating the tmp files (one per storm) in the tmp_dir
'''
# Process each storm in the sorted_storm_ids list
# Iterate over each filter file in the output directory and
# search for the presence of the storm id. Store this
Expand All @@ -139,30 +202,40 @@ def run_at_time(self, input_dict):
tmp_filename = "filter_" + cur_init + "_" + cur_storm
full_tmp_filename = os.path.join(tmp_dir, tmp_filename)

# If this full_tmp_filename already exists for this storm
# (from a previous run), then remove it. Otherwise the file
# will continue to be appended with the same information.
# This in turn will lead to errors when this file is being read/parsed.
if os.path.exists(full_tmp_filename):
os.remove(full_tmp_filename)
storm_match_list = util.grep(cur_storm, filter_name)
with open(full_tmp_filename, "a+") as tmp_file:
# copy over header information
tmp_file.write(header)
for storm_match in storm_match_list:
tmp_file.write(storm_match)

# Perform regridding of the forecast and analysis files
# to an n X n degree tile centered on the storm (dimensions
# are indicated in the config/param file).
feature_util.retrieve_and_regrid(full_tmp_filename, cur_init,
cur_storm, self.filtered_out_dir,
self.config)

# end of for cur_storm
return 0


def cleanup(self, tmp_dir):
'''Remove any empty files and directories in the extract_tiles output
directory and the tmp directory.
# Remove any empty files and directories in the extract_tiles output
# directory
Input:
@param tmp_dir: The full path to the tmp directory
'''
util.prune_empty(self.filtered_out_dir, self.logger)

# Clean up the tmp directory if it exists
if os.path.isdir(tmp_dir):
util.rmtree(tmp_dir)

return 0

if __name__ == "__main__":
util.run_stand_alone("extract_tiles_wrapper", "ExtractTiles")

0 comments on commit 7161c75

Please sign in to comment.