本文发布于Cylon的收藏册,转载请著名原文链接~


工作流模型是指 Stackstorm 中 Orquesta 工作流的定义,包含工作流的执行方式,也可以理解为工作流模型就是 Workflow DSL 定义任务执行的有向图

工作流模型

下表是工作流模型 (Workflow Model) 的属性。工作流接受 Input,按预定义顺序执行一组任务 (Task),并返回输出 (Output)。此处的工作流模型是一个有向图 (directed graph),其中 Task 是节点,Taskt 之间的转换及其条件形成边。组成工作流的任务将在 DSL 中定义为名为 Task 的字典, 其中 Key 和 Value 分别是任务名称和任务模型。

Attribute Required Description
version Yes The version of the spec being used in this workflow DSL.
description No The description of the workflow.
input No A list of input arguments for this workflow.
vars No A list of variables defined for the scope of this workflow.
tasks Yes A dictionary of tasks that defines the intent of this workflow.
output No A list of variables defined as output for the workflow.

下面示例是一个 Workflow 的 DSL 定义,说明基础的工作流模型中各个部分的组成

version: 1.0

description: A simple workflow.

# 字符串列表,假设将在运行时提供值或键值对,其中当运行时未提供值时,值使用默认值。
# 这些属性的定义在 Action 的元数据文件中定义,包含默认值
input:
  - arg1
  - arg2: abc

# 一组键值对的变量,可以用于存储数据
vars:
  - var1: 123
  - var2: True
  - var3: null

# 由字典组成任务定义,执行任务顺序由入栈任务转换和出栈条件组成
tasks:
  # 定义两个任务,next为下一个执行的任务
  task1:
    action: core.noop
    next:
      - do: task2
  task2:
    action: core.noop

# 要输出的键值对列表
output:
  - var3: <% ctx().arg1 %>
  - var4:
      var41: 456
      var42: def
  - var5:
      - 1.0
      - 2.0
      - 3.0

with-item 模型

with-item 模型 是workflow 批量处理任务的一种方式,with-item 将遍历每个 item,然后作为参数传给 Action,默认情况下,所有 Item 将同时处理。当配置 concurrency 指定时,将处理最多 concurrency 值的 item 数,其余项目将排队等待执行,当 Item 的 Action 执行完成后,将处理列表中的下一个item。

任务结果是按照与item相同的顺序排列的Action执行结果的列表。所有Action执行必须成功完成,任务才能达到成功状态。如果一个或多个Action执行失败,则该任务将为失败状态。

当有取消或暂停工作流的请求时,任务将分别处于取消或暂停状态,直到执行过程中的所有Action执行完成。一旦这些Action执行完成时,任务将分别进入取消或暂停状态。如果指定了任务的 concurrency 并且还有剩余item,则不会请求新的 Action 执行。当暂停的工作流程恢复时,任务将继续处理剩余的 Item。

Item 可配置参数如下:

Attribute Required Description
items Yes The list of items to execute the action with.
concurrency No The number of items being processed concurrently.

一个简单的with-item 模型示例

以下是一个简单示例,其中包含任务中定义的单个 Item 列表。该任务会收到一个要回显的消息列表。对于不需要并发的项目 Item,有一个速记符号可以将列表直接传递给 with 语句。可以使用 item 函数将各个项目作为输入传递到 Action 中以供执行。

version: 1.0

input:
  - messages

tasks:
  task1:
    with: <% ctx(messages) %>
    action: core.echo message=<% item() %>

当需要并发执行时,使用 with-items 的 schema 去定义 ,如下所示

version: 1.0

input:
  - messages

tasks:
  task1:
    with:
      items: <% ctx(messages) %>
      concurrency: 2
    action: core.echo message=<% item() %>

进阶:为Item命名

Item 也可以被命名,下面示例时和上面相同功能的 workflow,但他为 items 使用 “message” 的作为名称进行标注。在标记时,指定了 message in <% ctx(messages) %> 进行命名,这里 item 被指定为 “message”,在引用时,item 函数也必须指定名称 item(message),这种场景返回的就不是列表,而是一个字典,类似 {"message": "value"} 在处理多个项目时比较有用。

version: 1.0

input:
  - messages

tasks:
  task1:
    with: message in <% ctx(messages) %>
    action: core.echo message=<% item(message) %>

为多个Item命名

在执行 Action 时,可以将多个 Item 作为 input 传入到 Action中,这里就利用到 item 命名的方式与另外 stackstrom 提供的 zip 函数。zip 与 python zip 相同,将多个元组进行迭代,然后将他们(多个元组相同下标项)的每项压缩为一个元组。

个元组进行迭代,然后将他们的每项压缩为一个元组。
version: 1.0

input:
  - hosts
  - commands

tasks:
  task1:
    with: host, command in <% zip(ctx(hosts), ctx(commands)) %>
    action: core.remote hosts=<% item(host) %> cmd=<% item(command) %>

上面示例为,input 有两个列表,hosts 和 commands,这里使用 zip 压缩后将便成为 ,({hosts:xxx}, {commands:xxx}) 这样在执行时通过 item 函数指定名称 item(host),就可以获取到对应的迭代器位的 host,通过这样的方式来进行多列表参数的传入。

下面是一个多参数 with-item 模型的执行示例

version: 1.0

description: A workflow demonstrating with items.

input:
  - members
  - test

tasks:
  task1:
    with: host, command in <% zip(ctx(members), ctx(test)) %>
    action: core.echo message="<% item() %>,  resistance is futile!"

output:
  - items: <% task(task1).result.items.select($.result.stdout) %>

通过执行结果可以看出 multiple-Item 的机制

st2 execution get 664dd003169d72f36729cb70
id: 664dd003169d72f36729cb70
action.ref: frist.orquesta-with-items
parameters: None
status: succeeded (1s elapsed)
start_timestamp: Wed, 22 May 2024 10:59:15 UTC
end_timestamp: Wed, 22 May 2024 10:59:16 UTC
log: 
  - status: requested
    timestamp: '2024-05-22T10:59:15.698000Z'
  - status: scheduled
    timestamp: '2024-05-22T10:59:15.813000Z'
  - status: running
    timestamp: '2024-05-22T10:59:15.865000Z'
  - status: succeeded
    timestamp: '2024-05-22T10:59:16.918000Z'
result: 
  output:
    items:
    - '{''host'': ''Lakshmi'', ''command'': ''t1''},  resistance is futile!'
    - '{''host'': ''Lindsay'', ''command'': ''t2''},  resistance is futile!'
    - '{''host'': ''Tomaz'', ''command'': ''t3''},  resistance is futile!'
+--------------------------+------------------------+-------+-----------+------------------------------+
| id                       | status                 | task  | action    | start_timestamp              |
+--------------------------+------------------------+-------+-----------+------------------------------+
| 664dd004ed940d64824e33fb | succeeded (0s elapsed) | task1 | core.echo | Wed, 22 May 2024 10:59:16 UTC|
| 664dd004ed940d64824e33fe | succeeded (0s elapsed) | task1 | core.echo | Wed, 22 May 2024 10:59:16 UTC|
| 664dd004ed940d64824e3402 | succeeded (0s elapsed) | task1 | core.echo | Wed, 22 May 2024 10:59:16 UTC|
+--------------------------+------------------------+-------+-----------+------------------------------+

Reference

[1] Sensors and Triggers

[2] How many do you need? - Argo CD Architectures Explained

本文发布于Cylon的收藏册,转载请著名原文链接~

链接:https://www.oomkill.com/2024/05/stackstorm-sensors/

版权:本作品采用「署名-非商业性使用-相同方式共享 4.0 国际」 许可协议进行许可。