1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| import os from ecflow import Defs, Suite, Task, Family, Edit, Trigger, Event, Complete, Meter, Time, Day, Date, Cron, Label, \ RepeatString, RepeatInteger, RepeatDate
def create_family_f1(): return Family("family", Edit(SLEEP=20), Task("t1", Meter("progress", 1, 100, 90)), Task("t2", Trigger("t0 == complete"), Event('a'), Event('b')), Task("t3", Trigger("t2:a")), Task("t4", Trigger("t2 == complete"), Complete("t2:b")), Task("t5", Trigger("t1:progress ge 30")), Task("t6", Trigger("t1:progress ge 30")), Task("t7", Trigger("t1:progress ge 30")))
def create_family_f2(): return Family("family", Edit(SLEEP=20), Time("+00:05"), Task("t1"), Task("t2"), Task("t3"), Task("t4", Time("+00:02")), Task("t5", Time("00:02")))
def create_family_house_keeping(): return Family("777", Task("clear_log", Cron("06:30", days_of_week=[4])))
def create_family_f3(): return Family( "f3", Task("t1", Label("info", "")))
def create_family_f4(): return Family("f4", Edit(SLEEP=2), RepeatString("NAME", ["a", "b", "c", "d", "e", "f"]), Family("f5", RepeatInteger("VALUE", 1, 10), Task("t1", RepeatDate("DATE", 20220811, 20221231), Label("info", ""), Label("date", ""))))
print("Creating suite definition") current_path = os.path.dirname(__file__) tutorial_base = os.path.abspath(os.path.join(current_path))
defs = Defs( Suite('test', Edit(ECF_HOME=tutorial_base, ECF_INCLUDE=tutorial_base), create_family_f2(), create_family_f4(), create_family_house_keeping(), create_family_f3()))
print(defs)
print("Checking job creation: .ecf -> .job0") print(defs.check_job_creation())
print("Saving definition to file 'test.def'") def_output_path = str(os.path.join(tutorial_base, "test.def")) defs.save_as_defs(def_output_path)
|