添加更多任务
Contents
添加更多任务#
下面继续添加两个数据预处理任务:
extgmf 任务解码全球模式资料,生成二进制文件
pregmf 任务对解码后的全球模式资料进行预处理
修改工作流定义#
更新 ${TUTORIAL_HOME}/def
中的工作流定义文件 cma_tym.py:
1import os
2
3import ecflow
4
5
6def slurm_serial(class_name="serial"):
7 variables = {
8 "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
9 "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
10 "CLASS": class_name,
11 }
12 return variables
13
14
15current_path = os.path.dirname(__file__)
16tutorial_base = os.path.abspath(os.path.join(current_path, "../"))
17def_path = os.path.join(tutorial_base, "def")
18ecfout_path = os.path.join(tutorial_base, "ecfout")
19program_base_dir = os.path.join(tutorial_base, "program/grapes-tym-program")
20run_base_dir = os.path.join(tutorial_base, "workdir")
21
22defs = ecflow.Defs()
23
24with defs.add_suite("cma_tym") as suite:
25 suite.add_variable("PROGRAM_BASE_DIR", program_base_dir)
26 suite.add_variable("RUN_BASE_DIR", run_base_dir)
27
28 suite.add_variable("ECF_INCLUDE", os.path.join(def_path, "include"))
29 suite.add_variable("ECF_FILES", os.path.join(def_path, "ecffiles"))
30
31 suite.add_variable("USE_GRAPES", ".false.")
32 suite.add_variable("FORECAST_LENGTH", 120)
33 suite.add_variable("GMF_TINV", 3)
34 suite.add_variable("RMF_TINV", 3)
35 suite.add_variable("USE_GFS", 12)
36
37 suite.add_variable("ECF_DATE", "20220704")
38 suite.add_variable("HH", "00")
39
40 suite.add_limit("total_tasks", 10)
41 suite.add_inlimit("total_tasks")
42
43 with suite.add_task("copy_dir") as tk_copy_dir:
44 pass
45
46 with suite.add_task("get_message") as tk_get_message:
47 tk_get_message.add_trigger("./copy_dir == complete")
48 tk_get_message.add_variable(slurm_serial("serial"))
49 tk_get_message.add_event("arrived")
50 tk_get_message.add_event("peaceful")
51
52 with suite.add_family("get_ncep") as fm_get_ncep:
53 fm_get_ncep.add_trigger("./get_message == complete")
54 fm_get_ncep.add_variable(slurm_serial("serial"))
55 for hour in range(0, 120 + 1, 3):
56 hour_string = "{hour:03}".format(hour=hour)
57 with fm_get_ncep.add_task(hour_string) as tk_hour:
58 tk_hour.add_variable("FFF", hour_string)
59 tk_hour.add_variable(
60 "ECF_SCRIPT_CMD",
61 "cat {def_path}/ecffiles/getgmf_ncep.ecf".format(def_path=def_path)
62 )
63
64 with suite.add_task("extgmf") as tk_extgmf:
65 tk_extgmf.add_trigger("./get_ncep == complete")
66 tk_extgmf.add_variable(slurm_serial("serial"))
67
68 with suite.add_task("pregmf") as tk_pregmf:
69 tk_pregmf.add_trigger("./extgmf == complete")
70 tk_pregmf.add_variable(slurm_serial("serial"))
71
72
73print(defs)
74def_output_path = str(os.path.join(def_path, "cma_tym.def"))
75defs.save_as_defs(def_output_path)
65-70 行添加两个任务 extgmf 和 pregmf。
更新 ecFlow 上的工作流:
cd ${TUTORIAL_HOME}/def/ecffiles
python cma_tym.py
ecflow_client --port 43083 --replace /cma_tym cma_tym.def
在 ecFlowUI 中可以通过 Force/complete 命令将多个节点设为 complete 状态:
创建任务脚本#
在 ${TUTORIAL_HOME}/def/ecffiles
目录下创建 ecf 脚本 extgmf.ecf:
#!/bin/ksh
%include <slurm_serial.h>
%include <head.h>
%include <configure.h>
#=============================================
RUN_DIR=${CYCLE_RUN_DIR}/bckg_data
cd ${RUN_DIR}
cp ${CYCLE_GMF_DIR}/data_proc/000/bckg.inc bckg.inc
dobckg=1
dopost=0
doplot=0
dodata=0
upload=0
${PROGRAM_SCRIPT_DIR}/TcPro.pl \
-B $dobckg \
-P $dopost \
-G $doplot \
-M $dodata \
-U $upload \
-S ${COMPONENT_PROJECT_BASE} \
-D ${CYCLE_RUN_BASE_DIR} \
-f ${FORECAST_LENGTH} \
-i ${RMF_TINV} \
-I ${GMF_TINV} \
${START_TIME}
#---------------------------------------
%include <tail.h>
在 ${TUTORIAL_HOME}/def/ecffiles
目录下创建 ecf 脚本 pregmf.ecf:
#!/bin/ksh
%include <slurm_serial.h>
%include <head.h>
%include <configure.h>
#==============================
RUN_DIR=${CYCLE_RUN_DIR}
cd ${RUN_DIR}
#===========================#
rm -f namelist_xb
rm -f xb${START_TIME}000.dat
${PROGRAM_SCRIPT_DIR}/do_xb.csh ${START_TIME}
${PROGRAM_BIN_DIR}/pre_xb.x
#===========================#
${PROGRAM_SCRIPT_DIR}/xbctl.csh xb ${START_TIME} 000
mv -f xb${START_TIME}000.dat ${CYCLE_VTX_DIR}
mv -f xb${START_TIME}000.ctl ${CYCLE_VTX_DIR}
mv -f Postvar.ctl ${CYCLE_VTX_DIR}
#---------------------------------------
%include <tail.h>
运行任务#
恢复节点 cma_tym,ecFlow 会自动运行后续任务。