创建并行任务#

接下来我们创建两个并行任务:

  • psi:生成侧边界及初始场

  • gcas:云分析

并行任务与之前创建的串行任务类似,只不过要额外设置运行节点数和每个节点使用的 CPU 个数。

更新工作流定义#

更新 ${TUTORIAL_HOME}/def 中的工作流定义文件 cma_tym.py

 1import os
 2
 3import ecflow
 4
 5
 6def slurm_serial(class_name="serial"):
 7    variables = {
 8        "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
 9        "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
10        "CLASS": class_name,
11    }
12    return variables
13
14
15def slurm_parallel(nodes, tasks_per_node=32, class_name="normal"):
16    variables = {
17        "ECF_JOB_CMD": "slsubmit6 %ECF_JOB% %ECF_NAME% %ECF_TRIES% %ECF_TRYNO% %ECF_HOST% %ECF_PORT%",
18        "ECF_KILL_CMD": "slcancel4 %ECF_RID% %ECF_NAME% %ECF_HOST% %ECF_PORT%",
19        "NODES": nodes,
20        "TASKS_PER_NODE": tasks_per_node,
21        "CLASS": class_name,
22    }
23    return variables
24
25
26current_path = os.path.dirname(__file__)
27tutorial_base = os.path.abspath(os.path.join(current_path, "../"))
28def_path = os.path.join(tutorial_base, "def")
29ecfout_path = os.path.join(tutorial_base, "ecfout")
30program_base_dir = os.path.join(tutorial_base, "program/grapes-tym-program")
31run_base_dir = os.path.join(tutorial_base, "workdir")
32
33defs = ecflow.Defs()
34
35with defs.add_suite("cma_tym") as suite:
36    suite.add_variable("PROGRAM_BASE_DIR", program_base_dir)
37    suite.add_variable("RUN_BASE_DIR", run_base_dir)
38
39    suite.add_variable("ECF_INCLUDE", os.path.join(def_path, "include"))
40    suite.add_variable("ECF_FILES", os.path.join(def_path, "ecffiles"))
41
42    suite.add_variable("USE_GRAPES", ".false.")
43    suite.add_variable("FORECAST_LENGTH", 120)
44    suite.add_variable("GMF_TINV", 3)
45    suite.add_variable("RMF_TINV", 3)
46    suite.add_variable("USE_GFS", 12)
47
48    suite.add_variable("ECF_DATE", "20220704")
49    suite.add_variable("HH", "00")
50
51    suite.add_limit("total_tasks", 10)
52    suite.add_inlimit("total_tasks")
53
54    with suite.add_task("copy_dir") as tk_copy_dir:
55        pass
56
57    with suite.add_task("get_message") as tk_get_message:
58        tk_get_message.add_trigger("./copy_dir == complete")
59        tk_get_message.add_variable(slurm_serial("serial"))
60        tk_get_message.add_event("arrived")
61        tk_get_message.add_event("peaceful")
62
63    with suite.add_family("get_ncep") as fm_get_ncep:
64        fm_get_ncep.add_trigger("./get_message == complete")
65        fm_get_ncep.add_variable(slurm_serial("serial"))
66        for hour in range(0, 120 + 1, 3):
67            hour_string = "{hour:03}".format(hour=hour)
68            with fm_get_ncep.add_task(hour_string) as tk_hour:
69                tk_hour.add_variable("FFF", hour_string)
70                tk_hour.add_variable(
71                    "ECF_SCRIPT_CMD",
72                    "cat {def_path}/ecffiles/getgmf_ncep.ecf".format(def_path=def_path)
73                )
74
75    with suite.add_task("extgmf") as tk_extgmf:
76        tk_extgmf.add_trigger("./get_ncep == complete")
77        tk_extgmf.add_variable(slurm_serial("serial"))
78
79    with suite.add_task("pregmf") as tk_pregmf:
80        tk_pregmf.add_trigger("./extgmf == complete")
81        tk_pregmf.add_variable(slurm_serial("serial"))
82
83    with suite.add_task("dobugs") as tk_dobugs:
84        tk_dobugs.add_trigger("./pregmf == complete")
85        tk_dobugs.add_variable(slurm_serial("serial"))
86
87    with suite.add_task("psi") as tk_psi:
88        tk_psi.add_trigger("./dobugs == complete")
89        tk_psi.add_variable(slurm_parallel(4, 32, "normal"))
90
91    with suite.add_task("gcas") as tk_psi:
92        tk_psi.add_trigger("./psi == complete")
93        tk_psi.add_variable(slurm_parallel(4, 32, "normal"))
94
95
96print(defs)
97def_output_path = str(os.path.join(def_path, "cma_tym.def"))
98defs.save_as_defs(def_output_path)

新增代码解析:

  • 15-23 行定义 slurm_parallel 函数,定义提交 Slurm 并行作业需要的一些变量。

  • 87-93 行定义两个并行任务,psi 和 gcas,使用并行队列 normal 运行,每个任务需要 4 个节点

挂起 cma_tym 节点,更新 ecFlow 上的工作流:

cd ${TUTORIAL_HOME}/def/ecffiles
python cma_tym.py
ecflow_client --port 43083 --replace /cma_tym cma_tym.def

创建头文件#

${TUTORIAL_HOME}/def/include 中创建头文件 slurm_parallel.h

## This is a head file for Slurm serial job.
#SBATCH -J GRAPES
#SBATCH -p %CLASS%
#SBATCH -N %NODES%
#SBATCH --ntasks-per-node=%TASKS_PER_NODE%
#SBATCH -o %ECF_JOBOUT%
#SBATCH -e %ECF_JOBOUT%.err
#SBATCH --comment=GRAPES
#SBATCH -t 00:60:00
#SBATCH --no-requeue

创建任务脚本#

${TUTORIAL_HOME}/def/ecffiles 中创建 ecf 脚本 psi.ecf

#!/bin/ksh
%include <slurm_parallel.h>
#SBATCH -t 00:15:00
%include <head.h>
%include <configure.h>
#--------------------------------------

do_static=0

run_dir=${CYCLE_RUN_DIR}
cd ${run_dir}

cp -vr ${PROGRAM_CON_DIR}/grapes/run/* .

rm -f bogus.dat
rm -f namelist.input
rm -f grapesinput grapesbdy

echo "[INFO] use cma-ncep bckg"
${PROGRAM_SCRIPT_DIR}/do_grapesd01.csh \
  ${START_TIME} ${END_TIME} ${START_TIME} ${GMF_TINV} ${FORECAST_LENGTH} ${RMF_TINV}

if [ -s ${MSG_DIR}/tc_report_${START_TIME}.txt -o -s ${MSG_DIR}/tc_message_global_${START_TIME} ] ;then
  ln -sf ${CYCLE_VTX_DIR}/bogus${START_TIME}000.dat bogus.dat
  sed -e "s#do_bogus.*=#do_bogus = .true., ! #" namelist.input > namelist.tmp
  mv -f namelist.tmp namelist.input
fi

if [ do_static -eq 1 -o ! -f static_data ] ; then
  ln -sf ${GEOGDIR} ./geog_data
  sed -e "s#do_static_data.*=#do_static_data = .true., ! #" namelist.input > namelist.tmp
  mv -f namelist.tmp namelist.input
fi

ulimit -s unlimited

# module load compiler/intel/composer_xe_2017.2.174
# module load mpi/intelmpi/2017.2.174
export I_MPI_PMI_LIBRARY=/opt/gridview/slurm17/lib/libpmi.so

srun ${PROGRAM_BIN_DIR}/psi.exe

#---------------------------------------
%include <tail.h>

注意标亮的第三行设置墙钟时间为 15 分钟,会覆盖 slurm_parallel.h 头文件中定义的 60 分钟墙钟时间。

备注

定义精确的墙钟时间会加快作业的排队效率,短时限的任务没有限时的任务更容易排上队。

${TUTORIAL_HOME}/def/ecffiles 中创建 ecf 脚本 gcas.ecf

#!/bin/ksh
%include <slurm_parallel.h>
#SBATCH -t 00:10:00
%include <head.h>
%include <configure.h>
#--------------------------------------
obs_dir="/g1/COMMONDATA/obs/OPER/NWPC"

run_dir=${CYCLE_RUN_DIR}
cd $run_dir

#===========================#
# check grapesinput namelist.input

cp ${PROGRAM_SCRIPT_DIR}/gcasnamelist.sh .

ln -sf grapesinput grapesinput${START_TIME}00
rm -f qcqr_gcas_${START_TIME}00 radar_temperatue.dat

./gcasnamelist.sh  $START_TIME $obs_dir 1 .false.

ulimit -s unlimited

module load compiler/intel/composer_xe_2017.2.174
module load mpi/intelmpi/2017.2.174
export I_MPI_PMI_LIBRARY=/opt/gridview/slurm17/lib/libpmi.so

srun ${PROGRAM_BIN_DIR}/gcas_new.exe

#============================

iret=$?
if [ $iret -eq 0 -a -e qcqr_gcas_${START_TIME}00 ];then
   echo "gcas complete...."
else
   echo "gcas failed ...."
   exit 1
fi

cat namelist.input | sed -e "s#warm_start .*=#warm_start = .T., ! #" | sed -e "s#do_cld .*=#do_cld = .T., ! #" > namelist.tmp
mv namelist.tmp namelist.input

#---------------------------------------
%include <tail.h>

运行任务#

在 ecFlowUI 中运行任务 psi 和 gcas,检查输出日志。

备注

CMA-PI 计算资源比较紧张,作业较多,ecFlow 提交的并行作业可能会排队。 下图中 gcas 作业就处于 submitted 状态,表明作业可能正在排队。

../_images/ecflow-ui-submitted-task.png

调用 squeue 命令可以查看排队的作业:

squeue -u wangdp
   JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
43420772    normal   GRAPES   wangdp PD       0:00      4 (Priority)