工作流模型是指 Stackstorm 中 Orquesta 工作流的定义,包含工作流的执行方式,也可以理解为工作流模型就是 Workflow DSL 定义任务执行的有向图

工作流模型

下表是工作流模型 (Workflow Model) 的属性。工作流接受 Input,按预定义顺序执行一组任务 (Task),并返回输出 (Output)。此处的工作流模型是一个有向图 (directed graph),其中 Task 是节点,Taskt 之间的转换及其条件形成边。组成工作流的任务将在 DSL 中定义为名为 Task 的字典, 其中 Key 和 Value 分别是任务名称和任务模型。

AttributeRequiredDescription
versionYesThe version of the spec being used in this workflow DSL.
descriptionNoThe description of the workflow.
inputNoA list of input arguments for this workflow.
varsNoA list of variables defined for the scope of this workflow.
tasksYesA dictionary of tasks that defines the intent of this workflow.
outputNoA list of variables defined as output for the workflow.

下面示例是一个 Workflow 的 DSL 定义,说明基础的工作流模型中各个部分的组成

yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
version: 1.0

description: A simple workflow.

# 字符串列表,假设将在运行时提供值或键值对,其中当运行时未提供值时,值使用默认值。
# 这些属性的定义在 Action 的元数据文件中定义,包含默认值
input:
  - arg1
  - arg2: abc

# 一组键值对的变量,可以用于存储数据
vars:
  - var1: 123
  - var2: True
  - var3: null

# 由字典组成任务定义,执行任务顺序由入栈任务转换和出栈条件组成
tasks:
  # 定义两个任务,next为下一个执行的任务
  task1:
    action: core.noop
    next:
      - do: task2
  task2:
    action: core.noop

# 要输出的键值对列表
output:
  - var3: <% ctx().arg1 %>
  - var4:
      var41: 456
      var42: def
  - var5:
      - 1.0
      - 2.0
      - 3.0

with-item 模型

with-item 模型 是workflow 批量处理任务的一种方式,with-item 将遍历每个 item,然后作为参数传给 Action,默认情况下,所有 Item 将同时处理。当配置 concurrency 指定时,将处理最多 concurrency 值的 item 数,其余项目将排队等待执行,当 Item 的 Action 执行完成后,将处理列表中的下一个item。

任务结果是按照与item相同的顺序排列的Action执行结果的列表。所有Action执行必须成功完成,任务才能达到成功状态。如果一个或多个Action执行失败,则该任务将为失败状态。

当有取消或暂停工作流的请求时,任务将分别处于取消或暂停状态,直到执行过程中的所有Action执行完成。一旦这些Action执行完成时,任务将分别进入取消或暂停状态。如果指定了任务的 concurrency 并且还有剩余item,则不会请求新的 Action 执行。当暂停的工作流程恢复时,任务将继续处理剩余的 Item。

Item 可配置参数如下:

AttributeRequiredDescription
itemsYesThe list of items to execute the action with.
concurrencyNoThe number of items being processed concurrently.

一个简单的with-item 模型示例

以下是一个简单示例,其中包含任务中定义的单个 Item 列表。该任务会收到一个要回显的消息列表。对于不需要并发的项目 Item,有一个速记符号可以将列表直接传递给 with 语句。可以使用 item 函数将各个项目作为输入传递到 Action 中以供执行。

bash
1
2
3
4
5
6
7
8
9
version: 1.0

input:
  - messages

tasks:
  task1:
    with: <% ctx(messages) %>
    action: core.echo message=<% item() %>

当需要并发执行时,使用 with-items 的 schema 去定义 ,如下所示

yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
version: 1.0

input:
  - messages

tasks:
  task1:
    with:
      items: <% ctx(messages) %>
      concurrency: 2
    action: core.echo message=<% item() %>

进阶:为Item命名

Item 也可以被命名,下面示例时和上面相同功能的 workflow,但他为 items 使用 “message” 的作为名称进行标注。在标记时,指定了 message in <% ctx(messages) %> 进行命名,这里 item 被指定为 “message”,在引用时,item 函数也必须指定名称 item(message),这种场景返回的就不是列表,而是一个字典,类似 {"message": "value"} 在处理多个项目时比较有用。

yaml
1
2
3
4
5
6
7
8
9
version: 1.0

input:
  - messages

tasks:
  task1:
    with: message in <% ctx(messages) %>
    action: core.echo message=<% item(message) %>

为多个Item命名

在执行 Action 时,可以将多个 Item 作为 input 传入到 Action中,这里就利用到 item 命名的方式与另外 stackstrom 提供的 zip 函数。zip 与 python zip 相同,将多个元组进行迭代,然后将他们(多个元组相同下标项)的每项压缩为一个元组。

yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
个元组进行迭代,然后将他们的每项压缩为一个元组。
version: 1.0

input:
  - hosts
  - commands

tasks:
  task1:
    with: host, command in <% zip(ctx(hosts), ctx(commands)) %>
    action: core.remote hosts=<% item(host) %> cmd=<% item(command) %>

上面示例为,input 有两个列表,hosts 和 commands,这里使用 zip 压缩后将便成为 ,({hosts:xxx}, {commands:xxx}) 这样在执行时通过 item 函数指定名称 item(host),就可以获取到对应的迭代器位的 host,通过这样的方式来进行多列表参数的传入。

下面是一个多参数 with-item 模型的执行示例

yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
version: 1.0

description: A workflow demonstrating with items.

input:
  - members
  - test

tasks:
  task1:
    with: host, command in <% zip(ctx(members), ctx(test)) %>
    action: core.echo message="<% item() %>,  resistance is futile!"

output:
  - items: <% task(task1).result.items.select($.result.stdout) %>

通过执行结果可以看出 multiple-Item 的机制

yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
st2 execution get 664dd003169d72f36729cb70
id: 664dd003169d72f36729cb70
action.ref: frist.orquesta-with-items
parameters: None
status: succeeded (1s elapsed)
start_timestamp: Wed, 22 May 2024 10:59:15 UTC
end_timestamp: Wed, 22 May 2024 10:59:16 UTC
log: 
  - status: requested
    timestamp: '2024-05-22T10:59:15.698000Z'
  - status: scheduled
    timestamp: '2024-05-22T10:59:15.813000Z'
  - status: running
    timestamp: '2024-05-22T10:59:15.865000Z'
  - status: succeeded
    timestamp: '2024-05-22T10:59:16.918000Z'
result: 
  output:
    items:
    - '{''host'': ''Lakshmi'', ''command'': ''t1''},  resistance is futile!'
    - '{''host'': ''Lindsay'', ''command'': ''t2''},  resistance is futile!'
    - '{''host'': ''Tomaz'', ''command'': ''t3''},  resistance is futile!'
+--------------------------+------------------------+-------+-----------+------------------------------+
| id                       | status                 | task  | action    | start_timestamp              |
+--------------------------+------------------------+-------+-----------+------------------------------+
| 664dd004ed940d64824e33fb | succeeded (0s elapsed) | task1 | core.echo | Wed, 22 May 2024 10:59:16 UTC|
| 664dd004ed940d64824e33fe | succeeded (0s elapsed) | task1 | core.echo | Wed, 22 May 2024 10:59:16 UTC|
| 664dd004ed940d64824e3402 | succeeded (0s elapsed) | task1 | core.echo | Wed, 22 May 2024 10:59:16 UTC|
+--------------------------+------------------------+-------+-----------+------------------------------+

Reference

[1] Sensors and Triggers

[2] How many do you need? - Argo CD Architectures Explained