添加产品后处理任务#

模式生成的二进制数据不便于分发和展示,我们通常要对模式原始数据结果进行后处理,并生成数据和图片产品。

本节将添加后处理任务和图片产品制作任务,并介绍如何在触发器中使用标尺。 这两个任务仅在在有台风时运行。

更新工作流定义#

更新 ${TUTORIAL_HOME}/def 中的工作流定义文件 cma_tym.py

  1import os
  2
  3import ecflow
  4
  5
  6def slurm_serial(class_name="serial"):
  7    variables = {
  8        "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
  9        "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
 10        "CLASS": class_name,
 11    }
 12    return variables
 13
 14
 15def slurm_parallel(nodes, tasks_per_node=32, class_name="normal"):
 16    variables = {
 17        "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
 18        "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
 19        "NODES": nodes,
 20        "TASKS_PER_NODE": tasks_per_node,
 21        "CLASS": class_name,
 22    }
 23    return variables
 24
 25
 26current_path = os.path.dirname(__file__)
 27tutorial_base = os.path.abspath(os.path.join(current_path, "../"))
 28def_path = os.path.join(tutorial_base, "def")
 29ecfout_path = os.path.join(tutorial_base, "ecfout")
 30program_base_dir = os.path.join(tutorial_base, "program/grapes-tym-program")
 31run_base_dir = os.path.join(tutorial_base, "workdir")
 32
 33defs = ecflow.Defs()
 34
 35with defs.add_suite("cma_tym") as suite:
 36    suite.add_variable("PROGRAM_BASE_DIR", program_base_dir)
 37    suite.add_variable("RUN_BASE_DIR", run_base_dir)
 38
 39    suite.add_variable("ECF_INCLUDE", os.path.join(def_path, "include"))
 40    suite.add_variable("ECF_FILES", os.path.join(def_path, "ecffiles"))
 41
 42    suite.add_variable("USE_GRAPES", ".false.")
 43    suite.add_variable("FORECAST_LENGTH", 120)
 44    suite.add_variable("GMF_TINV", 3)
 45    suite.add_variable("RMF_TINV", 3)
 46    suite.add_variable("USE_GFS", 12)
 47
 48    suite.add_variable("ECF_DATE", "20220704")
 49    suite.add_variable("HH", "00")
 50
 51    suite.add_limit("total_tasks", 10)
 52    suite.add_inlimit("total_tasks")
 53
 54    with suite.add_task("copy_dir") as tk_copy_dir:
 55        pass
 56
 57    with suite.add_task("get_message") as tk_get_message:
 58        tk_get_message.add_trigger("./copy_dir == complete")
 59        tk_get_message.add_variable(slurm_serial("serial"))
 60        tk_get_message.add_event("arrived")
 61        tk_get_message.add_event("peaceful")
 62
 63    with suite.add_family("get_ncep") as fm_get_ncep:
 64        fm_get_ncep.add_trigger("./get_message == complete")
 65        fm_get_ncep.add_variable(slurm_serial("serial"))
 66        for hour in range(0, 120 + 1, 3):
 67            hour_string = "{hour:03}".format(hour=hour)
 68            with fm_get_ncep.add_task(hour_string) as tk_hour:
 69                tk_hour.add_variable("FFF", hour_string)
 70                tk_hour.add_variable(
 71                    "ECF_SCRIPT_CMD",
 72                    "cat {def_path}/ecffiles/getgmf_ncep.ecf".format(def_path=def_path)
 73                )
 74
 75    with suite.add_task("extgmf") as tk_extgmf:
 76        tk_extgmf.add_trigger("./get_ncep == complete")
 77        tk_extgmf.add_variable(slurm_serial("serial"))
 78
 79    with suite.add_task("pregmf") as tk_pregmf:
 80        tk_pregmf.add_trigger("./extgmf == complete")
 81        tk_pregmf.add_variable(slurm_serial("serial"))
 82
 83    with suite.add_task("dobugs") as tk_dobugs:
 84        tk_dobugs.add_trigger("./pregmf == complete")
 85        tk_dobugs.add_variable(slurm_serial("serial"))
 86
 87    with suite.add_task("psi") as tk_psi:
 88        tk_psi.add_trigger("./dobugs == complete")
 89        tk_psi.add_variable(slurm_parallel(4, 32, "normal"))
 90
 91    with suite.add_task("gcas") as tk_psi:
 92        tk_psi.add_trigger("./psi == complete")
 93        tk_psi.add_variable(slurm_parallel(4, 32, "normal"))
 94
 95    with suite.add_family("model") as fm_model:
 96        fm_model.add_trigger("./gcas == complete")
 97        with fm_model.add_task("grapes") as tk_grapes:
 98            tk_grapes.add_event("clean_ready")
 99            tk_grapes.add_variable(slurm_parallel(128, 32, "normal"))
100
101        with fm_model.add_task("grapes_monitor") as tk_grapes_monitor:
102            tk_grapes_monitor.add_trigger("./grapes:clean_ready == set or ./grapes == complete")
103            tk_grapes_monitor.add_meter("forecastHours", -1, 120)
104
105    with suite.add_family("post") as fm_post:
106        last_hour = None
107        for hour in range(0, 120 + 1, 1):
108            with fm_post.add_task("post_{hour:03}".format(hour=hour)) as tk_hour:
109                trigger = "../model/grapes_monitor:forecastHours >= {hour} or ../model/grapes_monitor == complete".format(hour=hour)
110                if last_hour is not None:
111                    trigger = "./post_{last_hour:03} == complete and ({trigger})".format(last_hour=last_hour, trigger=trigger)
112                tk_hour.add_trigger(trigger)
113                tk_hour.add_variable(slurm_serial("serial"))
114                tk_hour.add_variable("FFF", "{hour:03}".format(hour=hour))
115                tk_hour.add_variable(
116                    "ECF_SCRIPT_CMD",
117                    "cat {def_path}/ecffiles/post.ecf".format(def_path=def_path)
118                )
119            last_hour = hour
120
121    with suite.add_family("prods") as fm_prods:
122        with fm_prods.add_family("plot") as fm_plot:
123            for hour in range(0, 120 + 1, 1):
124                with fm_plot.add_task("plot_{hour:03}".format(hour=hour)) as tk_hour:
125                    tk_hour.add_trigger("../../post/post_{hour:03} == complete".format(hour=hour))
126                    tk_hour.add_variable(slurm_serial("serial"))
127                    tk_hour.add_variable("FFF", "{hour:03}".format(hour=hour))
128                    tk_hour.add_variable(
129                        "ECF_SCRIPT_CMD",
130                        "cat {def_path}/ecffiles/plot.ecf".format(def_path=def_path)
131                    )
132
133
134print(defs)
135def_output_path = str(os.path.join(def_path, "cma_tym.def"))
136defs.save_as_defs(def_output_path)

新增代码解析:

  • 105-119 行创建 post 节点,逐小时创建 post 后处理任务,任务脚本是 post.ecf
    • 每个时效的 post 任务需要模式输出对应时效的输出数据,grapes_monitor 中 forecastHours 指示当前模式积分进度。 当使用 meter 作为触发器时,使用 == 作为判断条件可能会因为 meter 变化太快而错过某值,导致触发失败。 所以我们一般使用 >= 作为判断条件,同时加上任务运行结束的条件,已确保在任何条件下该任务都会被正确触发。

    • post 任务需要串行执行,所以从时效 001 开始,每个 post 任务都将前一个时效 post 任务完成作为触发条件之一。

  • 121-131 行创建 prods 节点和 plot 节点,逐小时创建 plot 绘图任务,任务脚本是 plot.ecf

创建任务脚本#

${TUTORIAL_HOME}/def/ecffiles 目录中创建 ecf 脚本 post.ecf

#!/bin/ksh
%include <slurm_serial.h>
%include <head.h>
%include <configure.h>
#--------------------------------------

%include <check_message.h>

#===========================#
run_dir=${CYCLE_RUN_BASE_DIR}
forecast_hour=%FFF%
cd $run_dir
#===========================#
dobckg=0
dopost=1
doplot=0
dodata=0
upload=0

  ${PROGRAM_SCRIPT_DIR}/TcPro.pl \
    -B $dobckg \
    -P $dopost \
    -G $doplot \
    -M $dodata \
    -U $upload \
    -S ${COMPONENT_PROJECT_BASE} \
    -D ${CYCLE_RUN_BASE_DIR} \
    -f ${FORECAST_LENGTH} \
    -i $RMF_TINV \
    -I $GMF_TINV \
    -b ${forecast_hour} \
    -e ${forecast_hour} \
    ${START_TIME}

#---------------------------------------

%include <tail.h>

注意标亮行引入头文件 check_message.h,表示该任务仅在有台风时运行。

${TUTORIAL_HOME}/def/ecffiles 目录中创建 ecf 脚本 plot.ecf

#!/bin/ksh
%include <slurm_serial.h>
%include <head.h>
%include <configure.h>
#--------------------------------------

%include <check_message.h>

#===========================#
run_dir=${CYCLE_RUN_BASE_DIR}
forecast_hour=%FFF%
cd $run_dir
#===========================#
dobckg=0
dopost=0
doplot=1
dodata=0
upload=0

# upload=10 plot
# upload=20 micaps

  ${PROGRAM_SCRIPT_DIR}/TcPro.pl \
    -B $dobckg \
    -P $dopost \
    -G $doplot \
    -M $dodata \
    -U $upload \
    -S ${COMPONENT_PROJECT_BASE} \
    -D ${CYCLE_RUN_BASE_DIR} \
    -f ${FORECAST_LENGTH} \
    -i $RMF_TINV \
    -I $GMF_TINV \
    -b ${forecast_hour} \
    -e ${forecast_hour} \
    -u 0 \
    -w ${forecast_hour} \
    ${START_TIME}

#---------------------------------------

%include <tail.h>