Taskflow 使用小结
TaskFlow是OpenStack开源的Python库,他的优势:
- 可伸缩
- 简单创建任务级对象
- 任务可插拔
- 支持回滚容错机制
api的文档参考:https://docs.openstack.org/taskflow/ocata/
一. 定义完整的工作流
一个完整的taskflow包含了一下几个环节:
- 创建task
- 声明flow
- 构建engine
1.1 构建task
构建task的方式是继承task类,然后修改其excute方法,这里可以指定任务的返回结果为res,注意还可以改写revert函数来重定义回滚的操作。
from taskflow import task
class TaskA(task.Task):
default_provides = 'res'
def execute(self, a:int):
res = a * a
return res
def revert(self, a):
print("a执行失败。。。")
1.2 构建flow
在构建完task之后,则是需要定义flow,一般的线性flow的定义如下:
from taskflow.patterns import linear_flow as lf
flow = lf.Flow('main').add(
Task1(),
)
1.3 定义engine
需要定义一个engine来运行整个工作流,用于task的执行、停止、继续和恢复。
from taskflow import engines
engine = engines.load(flow)
engine.run()
# 拿到所有的结果
context = engine.storage.fetch_all()
二. 工作流形式
taskflow的工作流组合形式分为3种:
- 串行工作流
- 并行工作流
- 图流
2.1 串行工作流——linear_flow
顾名思义,就是多个工作流利用串行的方式将多个flow拼接在一起,然后依次顺序执行,每个flow依赖于他的前一个的flow。
graph LR
A[taskA]:::highlight --> B[TaskB]
B[TaskB] --> C[TaskC]
注意:这里在构建串行的工作流后,我们可以选择engine时候,选择engine='serial'
from taskflow.patterns import linear_flow as lf
from taskflow import engines
class TaskA(task.Task):
def execute(self):
print('task A')
class TaskB(task.Task):
def execute(self):
print('task B')
flow = lf.Flow('merge_ab').add(
TaskA(),
TaskB())
engine = engines.load(flow, engine='serial')
2.2 并行工作流——unordered_flow
多个工作流可以被指定为并行执行,python里面的多线程一样,谁先抢到资源谁就先执行,等到三个都执行完毕了,这个流就结束了。
graph LR
A[taskA]:::highlight
B[TaskB]
C[TaskB]
注意:这里在构建并行的工作流后,我们可以选择engine时候,选择engine='parallel'
from taskflow.patterns import unordered_flow as uf
from taskflow import engines
class TaskA(task.Task):
def execute(self):
print('task A')
class TaskB(task.Task):
def execute(self):
print('task B')
flow = uf.Flow('merge_ab').add(
TaskA(),
TaskB())
engine = engines.load(flow, engine='parallel')
2.3 图流——graph_flow
对于两两有相关关联的工作流,比如图结构中的两个节点,两个节点之间有一条边(这里指代两节点有依赖关系),即taskA执行的时候需要taskB,而taskB执行的时候依赖了taskA。一般来说这种方式用的最少。
graph TD
A[taskA] <--> B[taskB]
from taskflow import task
from taskflow.patterns import graph_flow as gf
from taskflow import engines
class TaskA(task.Task):
def execute(self):
print("TaskA executed")
class TaskB(task.Task):
def requires(self):
return ['data_a']
def execute(self, data_a):
print("TaskB executed")
def build_flow():
flow = gf.Flow('demo_flow')
# 获取节点对象
a_node = flow.add(TaskA())
b_node = flow.add(TaskB())
# 节点间建立依赖
a_node.precede(b_node) # 正确调用层级
return flow
engines.run(build_flow())
三. 完整的项目的例子
3.1 串行和并行工作流混合
对于需要构建多个并行工作流得到多个各自的结果后,然后需要构建一个基于三者结果合并计算的工作流,那么这里可以构建串行和并行的混合工作流,如下所示
graph LR
A[taskA]:::highlight --> D[taskD]
B[TaskB]--> D[taskD]
C[TaskB] --> D[taskD]
那么对于上述串并行混合的工作流而言,可以参考下面代码,其中包含了项目中可能用到的:
- 多线程并行运行
- 多flow组合形式
- 每个task结合了输入和输出
from taskflow import task
from taskflow.patterns import unordered_flow as uf
from taskflow.patterns import linear_flow as lf
from taskflow import engines
from concurrent.futures.thread import ThreadPoolExecutor
pool_executor = ThreadPoolExecutor(max_workers=3)
# 定义并行任务类,继承 task.Task 并声明 provides
class TaskA(task.Task):
def execute(self, a:int):
res_a = a * a
print(f'task a result:{res_a}')
return res_a
class TaskB(task.Task):
def execute(self, b:int):
res_b = b * b
print(f'task b result:{res_b}')
return res_b
class TaskC(task.Task):
def execute(self, c: int):
res_c = c + 2
print(f'task c result:{res_c}')
return res_c
class FinalTask(task.Task):
def execute(self, res_a, res_b, res_c):
final_res = res_a + res_b + res_c
print(f'task final result:{final_res}')
return final_res
# 创建主流程
def build_flow():
parallel_flow = uf.Flow('parallel_tasks').add(
TaskA(name='taska',
provides='res_a'),
TaskB(name='taskb',
provides='res_b'),
TaskC(name='taskc',
provides='res_c'),
)
main_flow = lf.Flow('main').add(
parallel_flow,
FinalTask(name='final',
provides='final_res'),
)
return main_flow
if __name__ == "__main__":
store = {
'a':1, 'b':2, 'c':3
}
flow = build_flow()
eng = engines.load(flow, store=store, engine='parallel', executor=pool_executor)
eng.run()
context = eng.storage.fetch_all()
result = context.get('final_res')
a_res = context.get('res_a')
print(f'task A res: {a_res}, final_res: {result}')
###