Skip to content

Commit

Permalink
Add missing arch_tars dependencies (#3319)
Browse files Browse the repository at this point in the history
The `gfs_arch_tars` job currently does not depend on `gempak` jobs,
even though it archives data produced by them. This PR will introduce
that dependency. Additionally, there are several missing dependencies
for cleanup when the arch_tar job is not executed. Nearly all of the
job's dependencies need to be replicated for cleanup in case arch_tar
doesn't run. This PR will address this problem as well.

Resolves #3294
  • Loading branch information
AntonMFernando-NOAA authored Feb 18, 2025
1 parent 0da3832 commit 012c5ea
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 25 deletions.
4 changes: 3 additions & 1 deletion env/HERA.env
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ elif [[ "${step}" = "gempak" ]]; then
export CFP_MP="YES"

export NTHREADS_GEMPAK=${NTHREADS1}
[[ ${NTHREADS_GEMPAK} -gt ${max_threads_per_task} ]] && export NTHREADS_GEMPAK=${max_threads_per_task}
if [[ ${NTHREADS_GEMPAK} -gt ${max_threads_per_task} ]]; then
export NTHREADS_GEMPAK=${max_threads_per_task}
fi

elif [[ "${step}" = "fit2obs" ]]; then

Expand Down
2 changes: 2 additions & 0 deletions modulefiles/module_base.hera.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,5 @@ prepend_path("MODULEPATH", pathJoin("/scratch1/NCEPDEV/global/glopara/git/Fit2Ob
load(pathJoin("fit2obs", (os.getenv("fit2obs_ver") or "None")))

whatis("Description: GFS run environment")

load(pathJoin("imagemagick", (os.getenv("imagemagick_ver") or "None")))
2 changes: 2 additions & 0 deletions versions/run.hera.ver
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ export perl_ver=5.38.0

source "${HOMEgfs:-}/versions/spack.ver"
export spack_mod_path="/scratch1/NCEPDEV/nems/role.epic/spack-stack/spack-stack-${spack_stack_ver}/envs/${spack_env}/install/modulefiles/Core"

export imagemagick_ver=7.1.1-11
124 changes: 100 additions & 24 deletions workflow/rocoto/gfs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2399,6 +2399,22 @@ def arch_tars(self):
dep_dict = {'type': 'task', 'name': f'{self.run}_mos_{job}'}
deps.append(rocoto.add_dependency(dep_dict))

if self.options['do_gempak']:
if self.run in ['gdas']:
dep_dict = {'type': 'task', 'name': f'{self.run}_gempakmetancdc'}
deps.append(rocoto.add_dependency(dep_dict))
elif self.run in ['gfs']:
dep_dict = {'type': 'task', 'name': f'{self.run}_gempakmeta'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.mode in ['cycled']:
dep_dict = {'type': 'task', 'name': f'{self.run}_gempakncdcupapgif'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_goes']:
dep_dict = {'type': 'task', 'name': f'{self.run}_npoess_pgrb2_0p5deg'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': f'{self.run}_gempakgrb2spec'}
deps.append(rocoto.add_dependency(dep_dict))

if self.options['do_metp'] and self.run in ['gfs']:
deps2 = []
# taskvalid only handles regular tasks, so just check the first metp job exists
Expand Down Expand Up @@ -2441,36 +2457,96 @@ def cleanup(self):
deps.append(rocoto.add_dependency(dep_dict))

else:
dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_archtar']:
dep_dict = {'type': 'task', 'name': f'{self.run}_arch_tars'}
if self.app_config.mode in ['cycled']:
if self.run in ['gfs']:
dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_vminmon']:
dep_dict = {'type': 'task', 'name': f'{self.run}_vminmon'}
deps.append(rocoto.add_dependency(dep_dict))
elif self.run in ['gdas']:
dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_fit2obs']:
dep_dict = {'type': 'task', 'name': f'{self.run}_fit2obs'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_verfozn']:
dep_dict = {'type': 'task', 'name': f'{self.run}_verfozn'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_verfrad']:
dep_dict = {'type': 'task', 'name': f'{self.run}_verfrad'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_vminmon']:
dep_dict = {'type': 'task', 'name': f'{self.run}_vminmon'}
deps.append(rocoto.add_dependency(dep_dict))
if self.run in ['gfs'] and self.options['do_tracker']:
dep_dict = {'type': 'task', 'name': f'{self.run}_tracker'}
deps.append(rocoto.add_dependency(dep_dict))

if self.options['do_gempak']:
if self.run in ['gdas']:
dep_dict = {'type': 'task', 'name': f'{self.run}_gempakmetancdc'}
if self.run in ['gfs'] and self.options['do_genesis']:
dep_dict = {'type': 'task', 'name': f'{self.run}_genesis'}
deps.append(rocoto.add_dependency(dep_dict))
elif self.run in ['gfs']:
dep_dict = {'type': 'task', 'name': f'{self.run}_gempakmeta'}
if self.run in ['gfs'] and self.options['do_genesis_fsu']:
dep_dict = {'type': 'task', 'name': f'{self.run}_genesis_fsu'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.mode in ['cycled']:
dep_dict = {'type': 'task', 'name': f'{self.run}_gempakncdcupapgif'}
# Post job dependencies
dep_dict = {'type': 'metatask', 'name': f'{self.run}_atmos_prod'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_wave']:
dep_dict = {'type': 'metatask', 'name': f'{self.run}_wavepostsbs'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{self.run}_wavepostpnt'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_wave_bnd']:
dep_dict = {'type': 'task', 'name': f'{self.run}_wavepostbndpnt'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_goes']:
dep_dict = {'type': 'task', 'name': f'{self.run}_npoess_pgrb2_0p5deg'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': f'{self.run}_gempakgrb2spec'}
if self.options['do_ocean']:
if self.run in ['gfs']:
dep_dict = {'type': 'metatask', 'name': f'{self.run}_ocean_prod'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_ice']:
if self.run in ['gfs']:
dep_dict = {'type': 'metatask', 'name': f'{self.run}_ice_prod'}
deps.append(rocoto.add_dependency(dep_dict))
# MOS job dependencies
if self.run in ['gfs'] and self.options['do_mos']:
mos_jobs = ["stn_prep", "grd_prep", "ext_stn_prep", "ext_grd_prep",
"stn_fcst", "grd_fcst", "ext_stn_fcst", "ext_grd_fcst",
"stn_prdgen", "grd_prdgen", "ext_stn_prdgen", "ext_grd_prdgen",
"wx_prdgen", "wx_ext_prdgen"]
for job in mos_jobs:
dep_dict = {'type': 'task', 'name': f'{self.run}_mos_{job}'}
deps.append(rocoto.add_dependency(dep_dict))

if self.options['do_gempak']:
if self.run in ['gdas']:
dep_dict = {'type': 'task', 'name': f'{self.run}_gempakmetancdc'}
deps.append(rocoto.add_dependency(dep_dict))
elif self.run in ['gfs']:
dep_dict = {'type': 'task', 'name': f'{self.run}_gempakmeta'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.mode in ['cycled']:
dep_dict = {'type': 'task', 'name': f'{self.run}_gempakncdcupapgif'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_goes']:
dep_dict = {'type': 'task', 'name': f'{self.run}_npoess_pgrb2_0p5deg'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': f'{self.run}_gempakgrb2spec'}
deps.append(rocoto.add_dependency(dep_dict))

if self.options['do_metp'] and self.run in ['gfs']:
deps2 = []
# taskvalid only handles regular tasks, so just check the first metp job exists
dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_metpg2g1', 'condition': 'not'}
deps2.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': f'{self.run}_metp'}
deps2.append(rocoto.add_dependency(dep_dict))
deps.append(rocoto.create_dependency(dep_condition='or', dep=deps2))
if self.options['do_metp'] and self.run in ['gfs']:
deps2 = []
# taskvalid only handles regular tasks, so just check the first metp job exists
dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_metpg2g1', 'condition': 'not'}
deps2.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': f'{self.run}_metp'}
deps2.append(rocoto.add_dependency(dep_dict))
deps.append(rocoto.create_dependency(dep_condition='or', dep=deps2))

dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy'}
deps.append(rocoto.add_dependency(dep_dict))
if self.options['do_archtar']:
dep_dict = {'type': 'task', 'name': f'{self.run}_arch_tars'}
deps.append(rocoto.add_dependency(dep_dict))

dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

Expand Down

0 comments on commit 012c5ea

Please sign in to comment.