添加归档任务
Contents
添加归档任务#
数值预报模式业务系统的最后一个任务通常都执行数据归档和清理工作,让我们为本教程的 CMA-TYM 模式系统添加最后一个任务。
更新工作流定义#
更新 ${TUTORIAL_HOME}/def
中的工作流定义文件 cma_tym.py:
1import os
2
3import ecflow
4
5
6def slurm_serial(class_name="serial"):
7 variables = {
8 "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
9 "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
10 "CLASS": class_name,
11 }
12 return variables
13
14
15def slurm_parallel(nodes, tasks_per_node=32, class_name="normal"):
16 variables = {
17 "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
18 "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
19 "NODES": nodes,
20 "TASKS_PER_NODE": tasks_per_node,
21 "CLASS": class_name,
22 }
23 return variables
24
25
26current_path = os.path.dirname(__file__)
27tutorial_base = os.path.abspath(os.path.join(current_path, "../"))
28def_path = os.path.join(tutorial_base, "def")
29ecfout_path = os.path.join(tutorial_base, "ecfout")
30program_base_dir = os.path.join(tutorial_base, "program/grapes-tym-program")
31run_base_dir = os.path.join(tutorial_base, "workdir")
32
33defs = ecflow.Defs()
34
35with defs.add_suite("cma_tym") as suite:
36 suite.add_variable("PROGRAM_BASE_DIR", program_base_dir)
37 suite.add_variable("RUN_BASE_DIR", run_base_dir)
38
39 suite.add_variable("ECF_INCLUDE", os.path.join(def_path, "include"))
40 suite.add_variable("ECF_FILES", os.path.join(def_path, "ecffiles"))
41
42 suite.add_variable("USE_GRAPES", ".false.")
43 suite.add_variable("FORECAST_LENGTH", 120)
44 suite.add_variable("GMF_TINV", 3)
45 suite.add_variable("RMF_TINV", 3)
46 suite.add_variable("USE_GFS", 12)
47
48 suite.add_variable("ECF_DATE", "20220704")
49 suite.add_variable("HH", "00")
50
51 suite.add_limit("total_tasks", 10)
52 suite.add_inlimit("total_tasks")
53
54 with suite.add_task("copy_dir") as tk_copy_dir:
55 pass
56
57 with suite.add_task("get_message") as tk_get_message:
58 tk_get_message.add_trigger("./copy_dir == complete")
59 tk_get_message.add_variable(slurm_serial("serial"))
60 tk_get_message.add_event("arrived")
61 tk_get_message.add_event("peaceful")
62
63 with suite.add_family("get_ncep") as fm_get_ncep:
64 fm_get_ncep.add_trigger("./get_message == complete")
65 fm_get_ncep.add_variable(slurm_serial("serial"))
66 for hour in range(0, 120 + 1, 3):
67 hour_string = "{hour:03}".format(hour=hour)
68 with fm_get_ncep.add_task(hour_string) as tk_hour:
69 tk_hour.add_variable("FFF", hour_string)
70 tk_hour.add_variable(
71 "ECF_SCRIPT_CMD",
72 "cat {def_path}/ecffiles/getgmf_ncep.ecf".format(def_path=def_path)
73 )
74
75 with suite.add_task("extgmf") as tk_extgmf:
76 tk_extgmf.add_trigger("./get_ncep == complete")
77 tk_extgmf.add_variable(slurm_serial("serial"))
78
79 with suite.add_task("pregmf") as tk_pregmf:
80 tk_pregmf.add_trigger("./extgmf == complete")
81 tk_pregmf.add_variable(slurm_serial("serial"))
82
83 with suite.add_task("dobugs") as tk_dobugs:
84 tk_dobugs.add_trigger("./pregmf == complete")
85 tk_dobugs.add_variable(slurm_serial("serial"))
86
87 with suite.add_task("psi") as tk_psi:
88 tk_psi.add_trigger("./dobugs == complete")
89 tk_psi.add_variable(slurm_parallel(4, 32, "normal"))
90
91 with suite.add_task("gcas") as tk_psi:
92 tk_psi.add_trigger("./psi == complete")
93 tk_psi.add_variable(slurm_parallel(4, 32, "normal"))
94
95 with suite.add_family("model") as fm_model:
96 fm_model.add_trigger("./gcas == complete")
97 with fm_model.add_task("grapes") as tk_grapes:
98 tk_grapes.add_event("clean_ready")
99 tk_grapes.add_variable(slurm_parallel(128, 32, "normal"))
100
101 with fm_model.add_task("grapes_monitor") as tk_grapes_monitor:
102 tk_grapes_monitor.add_trigger("./grapes:clean_ready == set or ./grapes == complete")
103 tk_grapes_monitor.add_meter("forecastHours", -1, 120)
104
105 with suite.add_family("post") as fm_post:
106 last_hour = None
107 for hour in range(0, 120 + 1, 1):
108 with fm_post.add_task("post_{hour:03}".format(hour=hour)) as tk_hour:
109 trigger = "../model/grapes_monitor:forecastHours >= {hour} or ../model/grapes_monitor == complete".format(hour=hour)
110 if last_hour is not None:
111 trigger = "./post_{last_hour:03} == complete and ({trigger})".format(last_hour=last_hour, trigger=trigger)
112 tk_hour.add_trigger(trigger)
113 tk_hour.add_variable(slurm_serial("serial"))
114 tk_hour.add_variable("FFF", "{hour:03}".format(hour=hour))
115 tk_hour.add_variable(
116 "ECF_SCRIPT_CMD",
117 "cat {def_path}/ecffiles/post.ecf".format(def_path=def_path)
118 )
119 last_hour = hour
120
121 with suite.add_family("prods") as fm_prods:
122 with fm_prods.add_family("plot") as fm_plot:
123 for hour in range(0, 120 + 1, 1):
124 with fm_plot.add_task("plot_{hour:03}".format(hour=hour)) as tk_hour:
125 tk_hour.add_trigger("../../post/post_{hour:03} == complete".format(hour=hour))
126 tk_hour.add_variable(slurm_serial("serial"))
127 tk_hour.add_variable("FFF", "{hour:03}".format(hour=hour))
128 tk_hour.add_variable(
129 "ECF_SCRIPT_CMD",
130 "cat {def_path}/ecffiles/plot.ecf".format(def_path=def_path)
131 )
132
133 with suite.add_task("archive") as tk_archive:
134 tk_archive.add_trigger("./prods == complete")
135 tk_archive.add_variable(slurm_serial("serial"))
136
137print(defs)
138def_output_path = str(os.path.join(def_path, "cma_tym.def"))
139defs.save_as_defs(def_output_path)
新增代码解析:
133-135 行添加 archive 任务,在 prods 完成后运行。
挂起 cma_tym 节点,更新 ecFlow 上的工作流:
cd ${TUTORIAL_HOME}/def/ecffiles
python cma_tym.py
ecflow_client --port 43083 --replace /cma_tym cma_tym.def
查看 ecFlowUI:
创建任务脚本#
本教程仅模拟归档过程,archive 任务不执行任何操作。
在 ${TUTORIAL_HOME}/def/ecffiles
中创建 ecf 脚本 archive.ecf:
#!/bin/ksh
%include <slurm_serial.h>
%include <head.h>
%include <configure.h>
#--------------------------------------
run_dir=${CYCLE_RUN_BASE_DIR}
cd $run_dir
echo "ignore archive..."
#---------------------------------------
%include <tail.h>
备注
如果不需要执行归档操作,也推荐在工作流结束时设置一个单独的任务,如我们这里使用空的 archive 任务。 通过这个任务,我们可以很方便查看工作流运行的结束时间。