本文发布于Cylon的收藏册,转载请著名原文链接~
什么是传感器
传感器 (Sensor) 是将外部系统和事件与 StackStorm 集成的一种方式。传感器是 Python 代码片段,它们要么定期轮询某些外部系统,要么被动等待入站事件,通常示例用于每隔一段时间去轮询某一个对象,然后他们将 Trigger 注入 StackStorm,可以通过规则进行匹配,以执行潜在的 Action。
Sensor 是用 Python 编写的,并且必须遵循 StackStorm 定义的传感器接口要求。
什么是触发器
触发器 (Trigger) 是 StackStorm 中用于识别 StackStorm 的传入事件。Trigger 是类型(字符串)和可选参数(对象)的元组。编写 Rule 是为了与 Trigger 一起使用。Sensor 通常会记录 Trigger,但这并不是严格要求的。例如,有一个向 StackStorm 注册的通用Webhooks触发器,它不需要自定义传感器。
Stackstorm内置触发器
默认情况下,StackStorm 会发出一些内部 Trigger,您可以在规则中利用它们。这些触发器可以与非系统触发器区分开来,因为它们的前缀为 “st2”。
下面包含每个资源的可用 Trigger 列表:
Action
Reference | Description | Properties |
---|---|---|
core.st2.generic.actiontrigger | 封装 Action 执行完成的触发器 | execution_id, status, start_timestamp, action_name, action_ref, runner_ref, parameters, result |
core.st2.generic.notifytrigger | 通知触发器 | execution_id, status, start_timestamp, end_timestamp, action_ref, runner_ref, channel, route, message, data |
core.st2.action.file_written | 触发封装 Action,将文件写入磁盘 | ref, file_path, host_info |
core.st2.generic.inquiry | 触发器指示一个新的查询,表示已经进入 “pending” 状态 | id, route |
Sensor
Reference | Description | Properties |
---|---|---|
core.st2.sensor.process_spawn | 触发器去指示传感器,进程开始启动 | object |
core.st2.sensor.process_exit | 触发器指示传感器,进程已经结束 | object |
如何创建一个 Sensor
创建传感器涉及编写 Python 脚本和定义 Sensor 的 YAML 元数据文件。以下是一个最小化 sensor 的结构示例。
元数据文件:
---
class_name: "SampleSensor"
entry_point: "sample_sensor.py"
description: "Sample sensor that emits triggers."
trigger_types:
-
name: "event"
description: "An example trigger."
payload_schema:
type: "object"
properties:
executed_at:
type: "string"
format: "date-time"
default: "2014-07-30 05:04:24.578325"
相关 Python 脚本的结构,在脚本中,必须遵循该结构进行编写
# Copyright 2020 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from st2reactor.sensor.base import Sensor
class SampleSensor(Sensor):
"""
* self.sensor_service
- provides utilities like
- get_logger() - returns logger instance specific to this sensor.
- dispatch() for dispatching triggers into the system.
* self._config
- contains parsed configuration that was specified as
config.yaml in the pack.
"""
def setup(self):
# 该方法系统仅调用一次,可以设置连接外部系统的内容到这里
pass
def run(self):
# 该方法是 sensor 运行的方法
# 这由系统调用一次。
#(如果您想定期睡觉并保持与外部系统交互,您将从 PollingSensor 继承。)
# 例如:例如,个简单的flask应用程序。您可以在此处运行 Flask 应用程序。
# 您可以使用sensor_service 调度触发器,如下所示:
# 您可以将触发器称为dict
# { "name": ${trigger_name}, "pack": ${trigger_pack} }
# 或者只是简单地作为字符串引用。
# i.e. dispatch(${trigger_pack}.${trigger_name}, payload)
# E.g.: dispatch('examples.foo_sensor', {'k1': 'stuff', 'k2': 'foo'})
# trace_tag 是想要与dispatch的 TriggerInstance 关联的标签,
# 通常,trace_tag 是唯一的并且是对外部事件的引用。
pass
def cleanup(self):
# 当 st2 系统宕机时调用此函数。您可以执行清理操作,例如
# 在此关闭与外部系统的连接。
pass
def add_trigger(self, trigger):
# 当创建触发器时调用此方法
pass
def update_trigger(self, trigger):
# 当触发器更新时调用此方法
pass
def remove_trigger(self, trigger):
# 删除触发器时调用此方法
pass
上述是一个最简单的 Sensor 示例。
您的 Sensor 应生成 Python 字典形式的Trigger:
trigger = 'pack.name'
payload = {
'executed_at': '2014-08-01T00:00:00.000000Z'
}
trace_tag = external_event_id
Sensor 通过使用实例化时传递到 Sensor 的 sensor_service 来注入此类 Trigger。
self.sensor_service.dispatch(trigger=trigger, payload=payload, trace_tag=trace_tag)
如果您想要一个定期轮询外部系统的传感器,您可以使用 PollingSensor 而不是 Sensor 作为基类。
# Copyright 2020 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 这里引入的是 PollingSensor,而不是Sensor
from st2reactor.sensor.base import PollingSensor
class SamplePollingSensor(PollingSensor):
"""
* self.sensor_service
- provides utilities like
get_logger() for writing to logs.
dispatch() for dispatching triggers into the system.
* self._config
- contains configuration that was specified as
config.yaml in the pack.
* self._poll_interval
- indicates the interval between two successive poll() calls.
"""
def setup(self):
# Setup stuff goes here. For example, you might establish connections
# to external system once and reuse it. This is called only once by the system.
pass
def poll(self):
# 该方法是 sensor 运行的方法
# 这由系统在每个周期与性能一次,self._poll_interval。
# 例如:假设您想要查询 ec2 并获取有关实例的运行状况信息:
some_data = aws_client.get('')
payload = self._to_payload(some_data)
# _to_triggers 是您编写的用于将数据格式转换为标准 python 字典的东西。
# 这应该遵循为 Trigger 注册的有效负载架构。
self.sensor_service.dispatch(trigger, payload)
# 您可以将触发器称为 dict
dict = { "name": "${trigger_name}", "pack": "${trigger_pack}" }
# 或者简单引用一个字符串,i.e.
dispatch(${trigger_pack}.${trigger_name}, payload)
# 再例如
dispatch('examples.foo_sensor', {'k1': 'stuff', 'k2': 'foo'})
# trace_tag 是想要与dispatch的 TriggerInstance 关联的标签,
# 通常,trace_tag 是唯一的并且是对外部事件的引用。
pass
def cleanup(self):
# 当 st2 系统宕机时调用此函数。您可以执行清理操作,例如
# 在此关闭与外部系统的连接。
pass
def add_trigger(self, trigger):
# 当创建触发器时调用此方法
pass
def update_trigger(self, trigger):
# 当触发器更新时调用此方法
pass
def remove_trigger(self, trigger):
# 删除触发器时调用此方法
pass
上述是一个 Poll Sensor 代码部分是结构的,setup 是装载时执行,poll 是在每个 interval 执行探测,这里的机制是当完成了派发后是不会第二次派发,这里做法是维护了一个列表到类中。
例如
aa2233
ACG-3612
{'ACG-3612': <JIRA Issue: key='ACG-3612', id='212268'>}
注:轮询传感器 (Polling Sensors) 还需要元数据文件中的 poll_interval 参数。这定义了调用
poll()
方法的频率(以秒为单位)。
Sersor如何运行
每个传感器作为单独的进程运行。 st2sensorcontainer
启动 sensor_wrapper.py
,它将您的 Sensor
类(例如上面的SampleSensor 或 SamplePollingSenso r)包装在 st2reactor.container.sensor_wrapper.SensorWrapper
中。
Sensor Service
正如您在上面的示例中看到的,sensor_service 在实例化时被传递给每个传感器类构造函数。
传感器服务 (Sensor Service) 通过公共方法向 Sensor 提供不同的服务。最重要的一种 dispatch
方法是允许 Sensor 将 Trigger 注入系统的方法。所有公共方法描述如下:
- 常用操作,Common Operations
- 数据存储管理操作 Datastore Management Operations
Common Operations
dispatch
调度:此方法允许传感器将触发器注入系统
dispatch(trigger, payload, trace_tag)
例如:
trigger = 'pack.name'
payload = {
'executed_at': '2014-08-01T00:00:00.000000Z'
}
trace_tag = uuid.uuid4().hex
self.sensor_service.dispatch(trigger=trigger, payload=payload, trace_tag=trace_tag)
get_logger
此方法允许 Sensor 实例检索特定于该传感器的记录器实例。
get_logger(name)
例如:
self._logger = self.sensor_service.get_logger(name=self.__class__.__name__)
self._logger.debug('Polling 3rd party system for information')
Datastore Management Operations
除了触发器注入之外,传感器服务还提供读取和操作数据存储的功能。
每个传感器都有一个本地命名空间,默认情况下,所有数据存储操作都对该 Sensor “本地命名空间” 中的键进行操作。如果要对“全局命名空间”进行操作,则需要将参数传递 local=False
给数据存储操作方法。
除其他原因外,如果想在传感器运行之间保留临时数据,此功能非常有用。
TwitterSensor 就是此功能的一个很好的例子。 Twitter 传感器在每次轮询后都会在数据存储中保留最后处理的推文的 ID。这样,如果 Trigger 重新启动或崩溃,传感器可以从中断处恢复,而无需向系统注入重复的 Trigger。
list_values
list_values(local=True, prefix=None)
该方法允许列出数据存储中的值。您还可以通过将 prefix
参数传递给方法来按键名称前缀(键名称开头)进行过滤:
kvps = self.sensor_service.list_values(local=False, prefix='cmdb.')
for kvp in kvps:
print(kvp.name)
print(kvp.value)
get_value
get_value(name, local=True, decrypt=False)
此方法允许您从数据存储中检索单个值:
kvp = self.sensor_service.get_value('cmdb.api_host')
print(kvp.name)
set_value
set_value(name, value, ttl=None, local=True, encrypt=False)
该方法允许在数据存储中存储设置一个值。您还可以选择指定存储值的生存时间 (TTL):
last_id = 12345
self.sensor_service.set_value(name='last_id', value=str(last_id))
Secret 值可以在存储中中加密:
ma_password = 'Sup3rS34et'
self.sensor_service.set_value(name='ma_password', value=ma_password, encrypt=True)
delete_value
delete_value(name, local=True)
该方法允许从存储中删除现有值。如果未找到值,此方法将返回 False,否则返回 True。
self.sensor_service.delete_value(name='my_key')
定义一个 Sersor
例如我们需要制作一个简单 Sensor 的工作示例,每 60 秒注入一次触发器。
需要注意的部分:
-
Sensor 的 Python 的类必需继承 Sensor 或 PollingSensor类,这个由需求而定,必须实现 setup, poll, dispatch等方法
- 如有需求,例如变量声明,也可以重写 _init_ 类
- setup方法,用于初始化示例需要的数据或状态,例如连接三方系统的配置信息,该方法只执行一次
- Sensor runtime 會把实例化并保持运行,按 poll_interval 設置周期去运行 poll 方法
-
Poll方法是真实执行任务的部分,可在 poll 方法中更新示例持有的数据或状态,判断数据是否匹配,
-
然后运行 disptach; 按需派发数据,dispatch方法会派发数据,把数据派发给 _trigger_ref
- _trigger_ref 是 Sensor 设置的 trigger type,也就是 RabbitMQ 的 Queue 名称
元数据定义
---
class_name: "HelloSensor"
entry_point: "sensor1.py"
description: "Test sensor that emits triggers."
trigger_types:
-
name: "event1"
description: "An example trigger."
payload_schema:
type: "object"
Python 代码部分
# Copyright 2020 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
from st2reactor.sensor.base import Sensor
class HelloSensor(Sensor):
def __init__(self, sensor_service, config):
super(HelloSensor, self).__init__(sensor_service=sensor_service, config=config)
self._logger = self.sensor_service.get_logger(name=self.__class__.__name__)
self._stop = False
def setup(self):
pass
def run(self):
while not self._stop:
self._logger.debug("HelloSensor dispatching trigger...")
count = self.sensor_service.get_value("hello_st2.count") or 0
payload = {"greeting": "Yo, StackStorm!", "count": int(count) + 1}
self.sensor_service.dispatch(trigger="hello_st2.event1", payload=payload)
self.sensor_service.set_value("hello_st2.count", payload["count"])
eventlet.sleep(60)
def cleanup(self):
self._stop = True
# Methods required for programmable sensors.
def add_trigger(self, trigger):
pass
def update_trigger(self, trigger):
pass
def remove_trigger(self, trigger):
pass
Sensor的运行与调试
运行
一旦完成传感器的编写,可以使用以下步骤来首次运行传感器:
- 将传感器 Python 文件和 元数据文件 放入 default 包中的 /opt/stackstorm/packs/default/sensors/ ;或者您也可以根据包结构,创建出自定义包并将传感器元件放置在那里 ( /opt/stackstorm/packs/ )
- 使用
st2ctl
注册传感器 。注意传感器注册中的任何错误,一旦注册时出现错误,请修复错误并使用 重新注册 。
st2ctl reload --register-all
- 如果注册成功,传感器将自动运行。
调试
在编写时,很多时候需要调试 Sensor 的运行,而由于环境问题,我们无法做到正常Python程序的调试步骤,必须遵循 Stackstorm 的调试方式。
如果只想运行包中的单个传感器并且该传感器已注册,则可以使用 st2sensorcontainer
来仅运行该单个传感器:
sudo /opt/stackstorm/st2/bin/st2sensorcontainer --config-file=/etc/st2/st2.conf --sensor-ref=pack.SensorClassName
例如:
sudo /opt/stackstorm/st2/bin/st2sensorcontainer --config-file=/etc/st2/st2.conf --sensor-ref=git.GitCommitSensor
示例:Jira服务台
# See ./requirements.txt for requirements.
import os
import urllib3
import json
from jira.client import JIRA
from st2reactor.sensor.base import PollingSensor
class XinMangSensorForPodMapTicket(PollingSensor):
'''
Sensor will monitor for any new projects created in JIRA and
emit trigger instance when one is created.
'''
def __init__(self, sensor_service, config=None, poll_interval=5):
super(XinMangSensorForPodMapTicket, self).__init__(sensor_service=sensor_service,
config=config,
poll_interval=poll_interval)
self._jira_url = None
self._config = {
"url": "https://jira.ticket.com",
"auth_method": "basic",
"username": "jira_automation",
"password": "jira123",
"poll_interval": 30,
"verify": False
}
# The Consumer Key created while setting up the "Incoming Authentication" in
# JIRA for the Application Link.
self._consumer_key = u''
self._rsa_key = None
self._jira_client = None
self._access_token = u''
self._access_secret = u''
self._projects_available = ["SABP"]
self._poll_interval = 30
self._project = "SABP"
self._issues_in_project = None
self._customer_request_type = "创建Pod映射"
self._issue_status = "IN PROGRESS"
self._jql_query = None
self._trigger_name = 'issues_tracker_for_pod_map_ticket'
self._trigger_pack = 'jira'
self._trigger_ref = '.'.join([self._trigger_pack, self._trigger_name])
urllib3.disable_warnings()
def _read_cert(self, file_path):
with open(file_path) as f:
return f.read()
def setup(self):
self._jira_url = self._config['url']
auth_method = self._config['auth_method']
if auth_method == 'oauth':
rsa_cert_file = self._config['rsa_cert_file']
if not os.path.exists(rsa_cert_file):
raise Exception('Cert file for JIRA OAuth not found at %s.' % rsa_cert_file)
self._rsa_key = self._read_cert(rsa_cert_file)
self._poll_interval = self._config.get('poll_interval', self._poll_interval)
oauth_creds = {
'access_token': self._config['oauth_token'],
'access_token_secret': self._config['oauth_secret'],
'consumer_key': self._config['consumer_key'],
'key_cert': self._rsa_key
}
self._jira_client = JIRA(options={'server': self._jira_url},
oauth=oauth_creds)
elif auth_method == 'basic':
basic_creds = (self._config['username'], self._config['password'])
self._jira_client = JIRA(options={'server': self._jira_url, 'verify': self._config['verify']},basic_auth=basic_creds)
#self._jira_client = JIRA(options={'server': self._jira_url},basic_auth=basic_creds)
else:
msg = ('You must set auth_method to either "oauth"',
'or "basic" your jira.yaml config file.')
raise Exception(msg)
self._jql_query = 'project="%s" and "Customer Request Type"="%s" and status="%s"' % (self._project, self._customer_request_type, self._issue_status)
all_issues = self._jira_client.search_issues(self._jql_query, maxResults=None)
self._issues_in_project = {issue.key: issue for issue in all_issues}
def poll(self):
self._detect_new_issues()
def cleanup(self):
pass
def add_trigger(self, trigger):
pass
def update_trigger(self, trigger):
pass
def remove_trigger(self, trigger):
pass
def _detect_new_issues(self):
new_issues = self._jira_client.search_issues(self._jql_query, maxResults=50, startAt=0)
for issue in new_issues:
# issue 满足需求的工单
print(self._issues_in_project)
# 将服务台任务单保留到这个列表中,保证不会重复派发
if issue.key not in self._issues_in_project:
# 记录下未派发的任务,并派发
self._dispatch_issues_trigger(issue)
self._issues_in_project[issue.key] = issue
def _dispatch_issues_trigger(self, issue):
trigger = self._trigger_ref
payload = {}
payload['project'] = self._project
payload['id'] = issue.id
payload['expand'] = issue.raw.get('expand', '')
payload['issue_key'] = issue.key
payload['issue_url'] = issue.self
payload['issue_browse_url'] = self._jira_url + '/browse/' + issue.key
fields = dict()
fields = issue.raw.get('fields', {})
fields_dict = dict()
print(fields['customfield_19402'].strip())
fields_dict['pod_name'] = fields['customfield_19401'].strip()
fields_dict['start_time'] = fields['customfield_19402'].strip()
fields_dict['end_time'] = fields['customfield_19403'].strip()
fields_dict['k8s_cluster'] = fields['customfield_19404']['value']
fields_dict['reporter'] = fields['reporter']['key']
payload['fields'] = fields_dict
if fields_dict['reporter'] != 'a@gmail.com':
self._sensor_service.dispatch(trigger, payload)
Reference
[2] How many do you need? - Argo CD Architectures Explained
本文发布于Cylon的收藏册,转载请著名原文链接~
链接:https://www.oomkill.com/2023/12/stackstorm-sensors/
版权:本作品采用「署名-非商业性使用-相同方式共享 4.0 国际」 许可协议进行许可。