python连接ecflow

python连接ecflow

创建目录

nwp_xp账户登录服务器,创建目录

1
2
3
4
5
6
cd /g11/nwp_xp
mkdir yourname
cd yourname
mkdir ecflow
cd ecflow
mkdir ecfout def

启动服务

启动ecflow_server,记录主机名和端口

1
ecflow_start.sh -b -d /g11/nwp_xp/yourname/ecflow/ecfout/ -p 42045
  • -p:指定一个新端口(未被占用)。
  • /g11/nwp_xp/zhangqq/ecflow/ecfout/:存放ecflow服务器相关日志的目录。

进入ecflowUI

输入命令,后台进入ecflowUI

1
ecflow_ui &

添加启动的ecflow服务

ecflow_client命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 检查服务器的状态
ecflow_client --host=login_a06 --port=42045 --stats

# 手动进入halted状态
ecflow_client --host=login_a06 --port=42045 --halt

# 开启服务
ecflow_client --host=login_a06 --port=42045 --begin

# 重新启动
ecflow_client --host=login_a06 --port=42045 --restart

# 首次加载def
ecflow_client --host login_a06 --port 42045 --load=test.def

# 重新加载def
ecflow_client --host login_a06 --port 42045 --replace /backup backup2.0.def

# 删除def
ecflow_client --host=login_a06 --port=42045 --delete=/backup

定义一个简单的suit

1
2
cd def
vim test.py
1
2
3
4
5
6
7
8
9
10
11
import os
from ecflow import Defs, Suite, Task, Edit

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))
defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base),
Task('t1')))
print(defs)

filems之前

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import os
from ecflow import Defs, Suite, Task, Edit

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base),
Task('t1'),
Task('t2')))
print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

family的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import os
from ecflow import Defs, Suite, Task, Edit, Family

def create_family_f1():
return Family(
"family",
Task("t1"),
Task("t2")
)

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base), create_family_f1()))

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

family

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import os
from ecflow import Defs, Suite, Task, Edit, Family

def create_family_f1():
return Family(
"family",
Edit(SLEEP=20),
Task("t1"),
Task("t2"),
)

def family_new():
return Family(
"F_N",
Edit(SLEEP=80),
Task("t1"),
Task("t2", Edit(SLEEP=10))
)


print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base), create_family_f1(), family_new()
)
)

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

触发器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import os
from ecflow import Defs, Suite, Task, Edit, Family

def create_family_f1():
return Family(
"family",
Edit(SLEEP=20),
Task("t1"),
Task("t2", Trigger("t1 == complete")),
)

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base), create_family_f1()
)
)

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

添加事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
from ecflow import Defs, Suite, Task, Edit, Family, Trigger, Event

def create_family_f1():
return Family(
"family",
Edit(SLEEP=20),
Task("t1"),
Task(
"t2",
Trigger("t1 == complete"),
Event('a'),
Event('b')
),
Task(
"t3",
Trigger("t2:a")
),
Task(
"t4",
Trigger("t2:b")
)
)

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base), create_family_f1()
)
)

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

添加COMPLETE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os
from ecflow import Defs, Suite, Task, Edit, Family, Trigger, Event, Complete

def create_family_f1():
return Family(
"family",
Edit(SLEEP=20),
Task("t1"),
Task(
"t2",
Trigger("t1 == complete"),
Event('a'),
Event('b')
),
Task(
"t3",
Trigger("t2:a")
),
Task(
"t4",
Trigger("t2 == complete"),
Complete("t2:b")
)
)

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base), create_family_f1()
)
)

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

添加METER

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os
from ecflow import Defs, Suite, Task, Edit, Family, Trigger, Event, Complete, Meter

def create_family_f1():
return Family(
"family",
Edit(SLEEP=20),
Task("t1",
Meter("progress", 1, 100, 90)
),
Task(
"t2",
Trigger("t1 == complete"),
Event('a'),
Event('b')
),
Task(
"t3",
Trigger("t2:a")
),
Task(
"t4",
Trigger("t2 == complete"),
Complete("t2:b")
),
Task(
"t5",
Trigger("t1:progress ge 30")
),
Task(
"t6",
Trigger("t1:progress ge 30")
),
Task(
"t7",
Trigger("t1:progress ge 30")
)
)

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base),
create_family_f1()
)
)

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

时间依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import os
from ecflow import Defs, Suite, Task, Family, Edit, Trigger, Event, Complete, Meter, Time, Day, Date

def create_family_f1():
return Family(
"family",
Edit(SLEEP=20),
Task("t1",
Meter("progress", 1, 100, 90)
),
Task(
"t2",
Trigger("t1 == complete"),
Event('a'),
Event('b')
),
Task(
"t3",
Trigger("t2:a")
),
Task(
"t4",
Trigger("t2 == complete"),
Complete("t2:b")
),
Task(
"t5",
Trigger("t1:progress ge 30")
),
Task(
"t6",
Trigger("t1:progress ge 30")
),
Task(
"t7",
Trigger("t1:progress ge 30")
)
)

def create_family_f2():
return Family(
"family",
Edit(SLEEP=20),
Task("t1",
Time("02:15 05:15 00:30")
),
Task("t2",
Day("thursday"),
Time("02:40")
),
Task("t3",
Date("11.*.*"),
Time("03:00")
),
Task("t4",
Time("+00:02")
),
Task("t5",
Time("00:02")
)
)

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base),
create_family_f2()
)
)

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

cron

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
from ecflow import Defs, Suite, Task, Family, Edit, Trigger, Event, Complete, Meter, Time, Day, Date, Cron

def create_family_f1():
return Family(
"family",
Edit(SLEEP=20),
Task("t1",
Meter("progress", 1, 100, 90)
),
Task(
"t2",
Trigger("t1 == complete"),
Event('a'),
Event('b')
),
Task(
"t3",
Trigger("t2:a")
),
Task(
"t4",
Trigger("t2 == complete"),
Complete("t2:b")
),
Task(
"t5",
Trigger("t1:progress ge 30")
),
Task(
"t6",
Trigger("t1:progress ge 30")
),
Task(
"t7",
Trigger("t1:progress ge 30")
)
)

def create_family_f2():
return Family(
"family",
Edit(SLEEP=20),
Time("+00:05"),
Task("t1"),
Task("t2"),
Task("t3"),
Task("t4",
Time("+00:02")
),
Task("t5",
Time("00:02")
)
)

def create_family_house_keeping():
return Family("777",
Task("clear_log",
Cron("06:30",
days_of_week=[4]
)
)
)

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base),
create_family_f2(),
create_family_house_keeping()
)
)

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

label

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import os
from ecflow import Defs, Suite, Task, Family, Edit, Trigger, Event, Complete, Meter, Time, Day, Date, Cron, Label

def create_family_f1():
return Family(
"family",
Edit(SLEEP=20),
Task("t1", Meter("progress", 1, 100, 90)),
Task("t2", Trigger("t0 == complete"), Event('a'), Event('b')),
Task("t3", Trigger("t2:a")),
Task("t4", Trigger("t2 == complete"), Complete("t2:b")),
Task("t5", Trigger("t1:progress ge 30")),
Task("t6", Trigger("t1:progress ge 30")),
Task("t7", Trigger("t1:progress ge 30")))

def create_family_f2():
return Family(
"family",
Edit(SLEEP=20),
Time("+00:05"),
Task("t1"),
Task("t2"),
Task("t3"),
Task("t4", Time("+00:02")),
Task("t5", Time("00:02")))

def create_family_house_keeping():
return Family("777",
Task("clear_log", Cron("06:30", days_of_week=[4])))

def create_family_f3():
return Family(
"f3",
Task("t1", Label("info", "")))

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base),
create_family_f2(),
create_family_house_keeping(),
create_family_f3()))

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

repeat和嵌套family

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import os
from ecflow import Defs, Suite, Task, Family, Edit, Trigger, Event, Complete, Meter, Time, Day, Date, Cron, Label, \
RepeatString, RepeatInteger, RepeatDate

def create_family_f1():
return Family("family",
Edit(SLEEP=20),
Task("t1", Meter("progress", 1, 100, 90)),
Task("t2", Trigger("t0 == complete"), Event('a'), Event('b')),
Task("t3", Trigger("t2:a")),
Task("t4", Trigger("t2 == complete"), Complete("t2:b")),
Task("t5", Trigger("t1:progress ge 30")),
Task("t6", Trigger("t1:progress ge 30")),
Task("t7", Trigger("t1:progress ge 30")))

def create_family_f2():
return Family("family",
Edit(SLEEP=20),
Time("+00:05"),
Task("t1"),
Task("t2"),
Task("t3"),
Task("t4", Time("+00:02")),
Task("t5", Time("00:02")))

def create_family_house_keeping():
return Family("777",
Task("clear_log", Cron("06:30", days_of_week=[4])))

def create_family_f3():
return Family(
"f3",
Task("t1", Label("info", "")))

def create_family_f4():
return Family("f4",
Edit(SLEEP=2),
RepeatString("NAME", ["a", "b", "c", "d", "e", "f"]),
Family("f5",
RepeatInteger("VALUE", 1, 10),
Task("t1",
RepeatDate("DATE", 20220811, 20221231),
Label("info", ""),
Label("date", ""))))

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))

defs = Defs(
Suite('test',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base),
create_family_f2(),
create_family_f4(),
create_family_house_keeping(),
create_family_f3()))

print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'test.def'")
def_output_path = str(os.path.join(tutorial_base, "test.def"))
defs.save_as_defs(def_output_path)

备份

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import os
from ecflow import Defs, Suite, Task, Family, Edit, Trigger, Event, Complete, Meter,\
Time, Day, Date, Cron, Label, RepeatString, RepeatInteger, RepeatDate, InLimit, Limit

print("Creating suite definition")
current_path = os.path.dirname(__file__)
tutorial_base = os.path.abspath(os.path.join(current_path))
include = os.path.abspath(os.path.join(current_path, "include"))
base = "/g1/u/nwp_sp/"
new_base = os.listdir(base)
#for cyc in new_base:


def nwp_sp_backup():
return Family("nwp_sp",
Edit({"SLEEP":20, "atime":"date", "ctime":14, "day20220822":1661097600}),
Time("03:00"),
Family("steps",
Task("mkdir"),
Task("cp", Trigger("mkdir == complete")),
Task("tar", Trigger("cp == complete")),
Task("rm", Trigger("tar == complete"))
))

defs = Defs(
Suite('backup',
Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=include),
RepeatDate("ECF_DATE", 20220823, 20251231),
nwp_sp_backup()
))
print(defs)

print("Checking job creation: .ecf -> .job0")
print(defs.check_job_creation())

print("Saving definition to file 'backup2.0.def'")
def_output_path = str(os.path.join(tutorial_base, "backup2.0.def"))
defs.save_as_defs(def_output_path)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
%include <head.h>

echo "Test"

date_time=%ECF_DATE%
file_dir="/g11/nwp_xp/zhangqq/ecflow/backup/"

cd $file_dir
if [ -d project ]; then
rm -rf project && mkdir project
else
mkdir project
fi
set +e

echo "Copy Start"
for i in `/g1/u/nwp_sp/`;do
cp /g1/u/nwp_sp/${i} project/
done

echo "Copy End"

tar -czf ${date_time}.tar project/

%include <tail.h>