本文发布于Cylon的收藏册,转载请著名原文链接~


什么是传感器

传感器 (Sensor) 是将外部系统和事件与 StackStorm 集成的一种方式。传感器是 Python 代码片段,它们要么定期轮询某些外部系统,要么被动等待入站事件,通常示例用于每隔一段时间去轮询某一个对象,然后他们将 Trigger 注入 StackStorm,可以通过规则进行匹配,以执行潜在的 Action。

Sensor 是用 Python 编写的,并且必须遵循 StackStorm 定义的传感器接口要求。

什么是触发器

触发器 (Trigger) 是 StackStorm 中用于识别 StackStorm 的传入事件。Trigger 是类型(字符串)和可选参数(对象)的元组。编写 Rule 是为了与 Trigger 一起使用。Sensor 通常会记录 Trigger,但这并不是严格要求的。例如,有一个向 StackStorm 注册的通用Webhooks触发器,它不需要自定义传感器。

Stackstorm内置触发器

默认情况下,StackStorm 会发出一些内部 Trigger,您可以在规则中利用它们。这些触发器可以与非系统触发器区分开来,因为它们的前缀为 “st2”

下面包含每个资源的可用 Trigger 列表:

Action

Reference Description Properties
core.st2.generic.actiontrigger 封装 Action 执行完成的触发器 execution_id,
status, start_timestamp,
action_name, action_ref,
runner_ref,
parameters, result
core.st2.generic.notifytrigger 通知触发器 execution_id,
status, start_timestamp,
end_timestamp,
action_ref,
runner_ref,
channel, route,
message, data
core.st2.action.file_written 触发封装 Action,将文件写入磁盘 ref,
file_path,
host_info
core.st2.generic.inquiry 触发器指示一个新的查询,表示已经进入 “pending” 状态 id,
route

Sensor

Reference Description Properties
core.st2.sensor.process_spawn 触发器去指示传感器,进程开始启动 object
core.st2.sensor.process_exit 触发器指示传感器,进程已经结束 object

如何创建一个 Sensor

创建传感器涉及编写 Python 脚本和定义 Sensor 的 YAML 元数据文件。以下是一个最小化 sensor 的结构示例。

元数据文件:

---
  class_name: "SampleSensor"
  entry_point: "sample_sensor.py"
  description: "Sample sensor that emits triggers."
  trigger_types:
    -
      name: "event"
      description: "An example trigger."
      payload_schema:
        type: "object"
        properties:
          executed_at:
            type: "string"
            format: "date-time"
            default: "2014-07-30 05:04:24.578325"

相关 Python 脚本的结构,在脚本中,必须遵循该结构进行编写

# Copyright 2020 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from st2reactor.sensor.base import Sensor


class SampleSensor(Sensor):
    """
    * self.sensor_service
        - provides utilities like
            - get_logger() - returns logger instance specific to this sensor.
            - dispatch() for dispatching triggers into the system.
    * self._config
        - contains parsed configuration that was specified as
          config.yaml in the pack.
    """

    def setup(self):
        # 该方法系统仅调用一次,可以设置连接外部系统的内容到这里
        pass

    def run(self):
        # 该方法是 sensor 运行的方法
        # 这由系统调用一次。
        #(如果您想定期睡觉并保持与外部系统交互,您将从 PollingSensor 继承。)
        # 例如:例如,个简单的flask应用程序。您可以在此处运行 Flask 应用程序。
		# 您可以使用sensor_service 调度触发器,如下所示:
         	#  您可以将触发器称为dict
            # { "name": ${trigger_name}, "pack": ${trigger_pack} }
            # 或者只是简单地作为字符串引用。
            # i.e. dispatch(${trigger_pack}.${trigger_name}, payload)
            # E.g.: dispatch('examples.foo_sensor', {'k1': 'stuff', 'k2': 'foo'})
            # trace_tag 是想要与dispatch的 TriggerInstance 关联的标签,
            # 通常,trace_tag 是唯一的并且是对外部事件的引用。
        pass

    def cleanup(self):
        # 当 st2 系统宕机时调用此函数。您可以执行清理操作,例如
        # 在此关闭与外部系统的连接。
        pass

    def add_trigger(self, trigger):
        # 当创建触发器时调用此方法
        pass

    def update_trigger(self, trigger):
        # 当触发器更新时调用此方法
        pass

    def remove_trigger(self, trigger):
        # 删除触发器时调用此方法
        pass

上述是一个最简单的 Sensor 示例。

您的 Sensor 应生成 Python 字典形式的Trigger:

trigger = 'pack.name'
payload = {
    'executed_at': '2014-08-01T00:00:00.000000Z'
}
trace_tag = external_event_id

Sensor 通过使用实例化时传递到 Sensor 的 sensor_service 来注入此类 Trigger。

self.sensor_service.dispatch(trigger=trigger, payload=payload, trace_tag=trace_tag)

如果您想要一个定期轮询外部系统的传感器,您可以使用 PollingSensor 而不是 Sensor 作为基类。

# Copyright 2020 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# 这里引入的是 PollingSensor,而不是Sensor
from st2reactor.sensor.base import PollingSensor


class SamplePollingSensor(PollingSensor):
    """
    * self.sensor_service
        - provides utilities like
            get_logger() for writing to logs.
            dispatch() for dispatching triggers into the system.
    * self._config
        - contains configuration that was specified as
          config.yaml in the pack.
    * self._poll_interval
        - indicates the interval between two successive poll() calls.
    """

    def setup(self):
        # Setup stuff goes here. For example, you might establish connections
        # to external system once and reuse it. This is called only once by the system.
        pass

    def poll(self):
        # 该方法是 sensor 运行的方法
        # 这由系统在每个周期与性能一次,self._poll_interval。
		# 例如:假设您想要查询 ec2 并获取有关实例的运行状况信息:
        some_data = aws_client.get('')
        payload = self._to_payload(some_data)
        	# _to_triggers 是您编写的用于将数据格式转换为标准 python 字典的东西。
            # 这应该遵循为 Trigger 注册的有效负载架构。
       		self.sensor_service.dispatch(trigger, payload)
       		# 您可以将触发器称为 dict
        	dict = { "name": "${trigger_name}", "pack": "${trigger_pack}" }
           	# 或者简单引用一个字符串,i.e. 
           	dispatch(${trigger_pack}.${trigger_name}, payload)
            # 再例如
            dispatch('examples.foo_sensor', {'k1': 'stuff', 'k2': 'foo'})
            # trace_tag 是想要与dispatch的 TriggerInstance 关联的标签,
            # 通常,trace_tag 是唯一的并且是对外部事件的引用。
        pass

    def cleanup(self):
        # 当 st2 系统宕机时调用此函数。您可以执行清理操作,例如
        # 在此关闭与外部系统的连接。
        pass

    def add_trigger(self, trigger):
        # 当创建触发器时调用此方法
        pass

    def update_trigger(self, trigger):
        # 当触发器更新时调用此方法
        pass

    def remove_trigger(self, trigger):
        # 删除触发器时调用此方法
        pass

上述是一个 Poll Sensor 代码部分是结构的,setup 是装载时执行,poll 是在每个 interval 执行探测,这里的机制是当完成了派发后是不会第二次派发,这里做法是维护了一个列表到类中。

例如

aa2233
ACG-3612
{'ACG-3612': <JIRA Issue: key='ACG-3612', id='212268'>}

注:轮询传感器 (Polling Sensors) 还需要元数据文件中的 poll_interval 参数。这定义了调用 poll() 方法的频率(以秒为单位)。

Sersor如何运行

每个传感器作为单独的进程运行。 st2sensorcontainer 启动 sensor_wrapper.py ,它将您的 Sensor 类(例如上面的SampleSensor 或 SamplePollingSenso r)包装在 st2reactor.container.sensor_wrapper.SensorWrapper 中。

Sensor Service

正如您在上面的示例中看到的,sensor_service 在实例化时被传递给每个传感器类构造函数。

传感器服务 (Sensor Service) 通过公共方法向 Sensor 提供不同的服务。最重要的一种 dispatch 方法是允许 Sensor 将 Trigger 注入系统的方法。所有公共方法描述如下:

  • 常用操作,Common Operations
  • 数据存储管理操作 Datastore Management Operations

Common Operations

dispatch

调度:此方法允许传感器将触发器注入系统

dispatch(trigger, payload, trace_tag)

例如:

trigger = 'pack.name'
payload = {
    'executed_at': '2014-08-01T00:00:00.000000Z'
}
trace_tag = uuid.uuid4().hex

self.sensor_service.dispatch(trigger=trigger, payload=payload, trace_tag=trace_tag)

get_logger

此方法允许 Sensor 实例检索特定于该传感器的记录器实例。

get_logger(name)

例如:

self._logger = self.sensor_service.get_logger(name=self.__class__.__name__)
self._logger.debug('Polling 3rd party system for information')

Datastore Management Operations

除了触发器注入之外,传感器服务还提供读取和操作数据存储的功能。

每个传感器都有一个本地命名空间,默认情况下,所有数据存储操作都对该 Sensor “本地命名空间” 中的键进行操作。如果要对“全局命名空间”进行操作,则需要将参数传递 local=False 给数据存储操作方法。

除其他原因外,如果想在传感器运行之间保留临时数据,此功能非常有用。

TwitterSensor 就是此功能的一个很好的例子。 Twitter 传感器在每次轮询后都会在数据存储中保留最后处理的推文的 ID。这样,如果 Trigger 重新启动或崩溃,传感器可以从中断处恢复,而无需向系统注入重复的 Trigger。

list_values

list_values(local=True, prefix=None)

该方法允许列出数据存储中的值。您还可以通过将 prefix 参数传递给方法来按键名称前缀(键名称开头)进行过滤:

kvps = self.sensor_service.list_values(local=False, prefix='cmdb.')

for kvp in kvps:
    print(kvp.name)
    print(kvp.value)

get_value

get_value(name, local=True, decrypt=False)

此方法允许您从数据存储中检索单个值:

kvp = self.sensor_service.get_value('cmdb.api_host')
print(kvp.name)

set_value

set_value(name, value, ttl=None, local=True, encrypt=False)

该方法允许在数据存储中存储设置一个值。您还可以选择指定存储值的生存时间 (TTL):

last_id = 12345
self.sensor_service.set_value(name='last_id', value=str(last_id))

Secret 值可以在存储中中加密:

ma_password = 'Sup3rS34et'
self.sensor_service.set_value(name='ma_password', value=ma_password, encrypt=True)

delete_value

delete_value(name, local=True)

该方法允许从存储中删除现有值。如果未找到值,此方法将返回 False,否则返回 True。

self.sensor_service.delete_value(name='my_key')

定义一个 Sersor

例如我们需要制作一个简单 Sensor 的工作示例,每 60 秒注入一次触发器。

需要注意的部分:

  • Sensor 的 Python 的类必需继承 Sensor 或 PollingSensor类,这个由需求而定,必须实现 setup, poll, dispatch等方法

    • 如有需求,例如变量声明,也可以重写 _init_ 类
    • setup方法,用于初始化示例需要的数据或状态,例如连接三方系统的配置信息,该方法只执行一次
    • Sensor runtime 會把实例化并保持运行,按 poll_interval 設置周期去运行 poll 方法
  • Poll方法是真实执行任务的部分,可在 poll 方法中更新示例持有的数据或状态,判断数据是否匹配,

  • 然后运行 disptach; 按需派发数据,dispatch方法会派发数据,把数据派发给 _trigger_ref

    • _trigger_ref 是 Sensor 设置的 trigger type,也就是 RabbitMQ 的 Queue 名称

元数据定义

---
class_name: "HelloSensor"
entry_point: "sensor1.py"
description: "Test sensor that emits triggers."
trigger_types:
  -
    name: "event1"
    description: "An example trigger."
    payload_schema:
      type: "object"

Python 代码部分

# Copyright 2020 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import eventlet

from st2reactor.sensor.base import Sensor


class HelloSensor(Sensor):
    def __init__(self, sensor_service, config):
        super(HelloSensor, self).__init__(sensor_service=sensor_service, config=config)
        self._logger = self.sensor_service.get_logger(name=self.__class__.__name__)
        self._stop = False

    def setup(self):
        pass

    def run(self):
        while not self._stop:
            self._logger.debug("HelloSensor dispatching trigger...")
            count = self.sensor_service.get_value("hello_st2.count") or 0
            payload = {"greeting": "Yo, StackStorm!", "count": int(count) + 1}
            self.sensor_service.dispatch(trigger="hello_st2.event1", payload=payload)
            self.sensor_service.set_value("hello_st2.count", payload["count"])
            eventlet.sleep(60)

    def cleanup(self):
        self._stop = True

    # Methods required for programmable sensors.
    def add_trigger(self, trigger):
        pass

    def update_trigger(self, trigger):
        pass

    def remove_trigger(self, trigger):
        pass

Sensor的运行与调试

运行

一旦完成传感器的编写,可以使用以下步骤来首次运行传感器:

  1. 将传感器 Python 文件和 元数据文件 放入 default 包中的 /opt/stackstorm/packs/default/sensors/ ;或者您也可以根据包结构,创建出自定义包并将传感器元件放置在那里 ( /opt/stackstorm/packs/ )
  2. 使用 st2ctl 注册传感器 。注意传感器注册中的任何错误,一旦注册时出现错误,请修复错误并使用 重新注册 。
st2ctl reload --register-all
  1. 如果注册成功,传感器将自动运行。

调试

在编写时,很多时候需要调试 Sensor 的运行,而由于环境问题,我们无法做到正常Python程序的调试步骤,必须遵循 Stackstorm 的调试方式。

如果只想运行包中的单个传感器并且该传感器已注册,则可以使用 st2sensorcontainer 来仅运行该单个传感器:

sudo /opt/stackstorm/st2/bin/st2sensorcontainer --config-file=/etc/st2/st2.conf --sensor-ref=pack.SensorClassName

例如:

sudo /opt/stackstorm/st2/bin/st2sensorcontainer --config-file=/etc/st2/st2.conf --sensor-ref=git.GitCommitSensor

示例:Jira服务台

# See ./requirements.txt for requirements.
import os
import urllib3
import json
from jira.client import JIRA
from st2reactor.sensor.base import PollingSensor

class XinMangSensorForPodMapTicket(PollingSensor):
    '''
    Sensor will monitor for any new projects created in JIRA and
    emit trigger instance when one is created.
    '''
    def __init__(self, sensor_service, config=None, poll_interval=5):
        super(XinMangSensorForPodMapTicket, self).__init__(sensor_service=sensor_service,
                                                 config=config,
                                                 poll_interval=poll_interval)

        self._jira_url = None
        self._config  = { 
                "url": "https://jira.ticket.com",
                "auth_method": "basic",
                "username": "jira_automation",
                "password": "jira123",
                "poll_interval": 30,
                "verify": False
        }
        # The Consumer Key created while setting up the "Incoming Authentication" in
        # JIRA for the Application Link.
        self._consumer_key = u''
        self._rsa_key = None
        self._jira_client = None
        self._access_token = u''
        self._access_secret = u''
        self._projects_available = ["SABP"]
        self._poll_interval = 30
        self._project = "SABP"
        self._issues_in_project = None
        self._customer_request_type = "创建Pod映射"
        self._issue_status = "IN PROGRESS"
        self._jql_query = None
        self._trigger_name = 'issues_tracker_for_pod_map_ticket'
        self._trigger_pack = 'jira'
        self._trigger_ref = '.'.join([self._trigger_pack, self._trigger_name])
        urllib3.disable_warnings()

    def _read_cert(self, file_path):
        with open(file_path) as f:
            return f.read()

    def setup(self):
        self._jira_url = self._config['url']
        auth_method = self._config['auth_method']

        if auth_method == 'oauth':
            rsa_cert_file = self._config['rsa_cert_file']
            if not os.path.exists(rsa_cert_file):
                raise Exception('Cert file for JIRA OAuth not found at %s.' % rsa_cert_file)
            self._rsa_key = self._read_cert(rsa_cert_file)
            self._poll_interval = self._config.get('poll_interval', self._poll_interval)
            oauth_creds = {
                'access_token': self._config['oauth_token'],
                'access_token_secret': self._config['oauth_secret'],
                'consumer_key': self._config['consumer_key'],
                'key_cert': self._rsa_key
            }

            self._jira_client = JIRA(options={'server': self._jira_url},
                                     oauth=oauth_creds)
        elif auth_method == 'basic':
            basic_creds = (self._config['username'], self._config['password'])
            self._jira_client = JIRA(options={'server': self._jira_url, 'verify': self._config['verify']},basic_auth=basic_creds)
            #self._jira_client = JIRA(options={'server': self._jira_url},basic_auth=basic_creds)
        else:
            msg = ('You must set auth_method to either "oauth"',
                   'or "basic" your jira.yaml config file.')
            raise Exception(msg)
        self._jql_query = 'project="%s" and "Customer Request Type"="%s" and status="%s"' % (self._project, self._customer_request_type, self._issue_status)
        all_issues = self._jira_client.search_issues(self._jql_query, maxResults=None)
        self._issues_in_project = {issue.key: issue for issue in all_issues}

    def poll(self):
        self._detect_new_issues()

    def cleanup(self):
        pass

    def add_trigger(self, trigger):
        pass

    def update_trigger(self, trigger):
        pass

    def remove_trigger(self, trigger):
        pass

    def _detect_new_issues(self):
        new_issues = self._jira_client.search_issues(self._jql_query, maxResults=50, startAt=0)
        for issue in new_issues:
            # issue 满足需求的工单
            print(self._issues_in_project)
            # 将服务台任务单保留到这个列表中,保证不会重复派发
            if issue.key not in self._issues_in_project:
                # 记录下未派发的任务,并派发
                self._dispatch_issues_trigger(issue)
                self._issues_in_project[issue.key] = issue

    def _dispatch_issues_trigger(self, issue):
        trigger = self._trigger_ref
        payload = {}
        payload['project'] = self._project
        payload['id'] = issue.id
        payload['expand'] = issue.raw.get('expand', '')
        payload['issue_key'] = issue.key
        payload['issue_url'] = issue.self
        payload['issue_browse_url'] = self._jira_url + '/browse/' + issue.key
        fields = dict()
        fields = issue.raw.get('fields', {})
        fields_dict = dict()
        print(fields['customfield_19402'].strip())
        fields_dict['pod_name'] = fields['customfield_19401'].strip()
        fields_dict['start_time'] = fields['customfield_19402'].strip()
        fields_dict['end_time'] = fields['customfield_19403'].strip()
        fields_dict['k8s_cluster'] = fields['customfield_19404']['value']

        fields_dict['reporter'] = fields['reporter']['key']
        payload['fields'] = fields_dict
        if fields_dict['reporter'] != 'a@gmail.com':
            self._sensor_service.dispatch(trigger, payload)

Reference

[1] Sensors and Triggers

[2] How many do you need? - Argo CD Architectures Explained

本文发布于Cylon的收藏册,转载请著名原文链接~

链接:https://www.oomkill.com/2023/12/stackstorm-sensors/

版权:本作品采用「署名-非商业性使用-相同方式共享 4.0 国际」 许可协议进行许可。