0%

Nightingale(一)——源码分析

Nightingale源码分析

Nightingale介绍

夜莺监控是一款开源云原生观测分析工具,采用 All-in-One 的设计理念,集数据采集、可视化、监控告警、数据分析于一体,与云原生生态紧密集成,提供开箱即用的企业级监控分析和告警能力。

夜莺作为一个服务端组件,可以对接不同的TSDB时序数据库作为数据源,支持的TSDB时序数据库PrometheusVictoriaMetricsThanos等等。也可以接入日志类数据源(Elasticsearch,Loki【预】),链路追踪数据源(Jaeger)。只要数据进到这些库里了,夜莺就可以对数据源的数据进行分析、告警、可视化,以及后续的事件处理、告警自愈。

夜莺部署架构

对于网络结构简单或小规模网络场景下,采用「中心汇聚式部署方案」实施比较简单,可以n9e核心组件采用单机或集群方式搭建,集群模式下前端需架设Nginx作为软负载或F5进行硬件设备负载,同时依赖MySQLRedis中间件存储基础的元数据、用户信息等,不存在大数据量问题,因此,不用太考虑性能瓶颈。

Categraf是夜莺团队开发维护的监控采集侧核心组件,类似TelegrafGrafana-AgentDatadog-Agent,希望对所有常见监控对象提供监控数据采集能力,采用All-in-one的设计,不但支持指标采集,也希望支持日志和调用链路的数据采集。Categraf采集器采集了数据推送给夜莺,然后转存到后端数据源,如TSDBElasticSearch等。

Categraf不属于夜莺监控系统组件,夜莺定位是服务端组件,不侧重监控数据采集侧。

categraf采集组件

1、categraf采集器采用推送模式(push),push模式导致采集器存在状态,即采集器要知道自己要推送给哪个服务后端的配置,少量categraf采集器来说无所谓,但是一旦成千上万采集点,甚至几百采集点,维护成本都是比较高的,特别是后端地址发生变更等。

2、push模式还存在接入权限问题,因为往往服务后端和采集器维护是两拨人,服务后端是运维人员,而采集器是项目组人员维护,比较难于控制接入,可能个别项目组大量接入采集点造成服务端压力过大奔溃,从而影响整个系统运行稳定。

3、push模式还存在推送频率问题,categraf组件可以配置推送频率,但是只能在采集器端控制,不同项目组运维人员可能配置不同推送频率,难以从全局控制,或者这么个场景:前期采集点少,数据量不大,推送频率5s,但是后面接入的越来越多,存储不够用,需要下调推送频率15s,没有统一修改调整方式。

中心汇聚式部署方案

所有机房网络域下监控数据采集器都直接推数据给n9e,这个架构最为简单,维护成本最低。集群方式也很简单,只需要部署多节点即可实现。当然,前提是「要求机房网络域结构简单、规模不大场景,即不太关注跨网络域访问安全问题和大规模跨网络域传输数据网络带宽限制等」

中心汇聚式部署方案

对于 n9e 来说,它本身依赖的存储有两个

  • Mysql : 存放配置类别信息,如用户,监控大盘,告警规则等
  • Redis : 存放访问令牌(JWT Token),心跳信息,如机器列表中CPU、内存、时间偏移、核数、操作系统、CPU架构等

数据来源:

  • n9e 可以支持多种采集器 agent,比如 Datadog-Agent,Telegraf,Grafana-Agent,OpenTSDB agent,Node-Exporter,vmagent 都可以对接,不过最推荐的还是 Categraf。

如果非上述场景,则要使用下面的「边缘下沉式混杂部署方案:」

边缘下沉式混杂部署方案

边缘下沉式混杂部署方案

「边缘下沉式混杂部署方案」中涉及到两个核心组件:「n9e-pushgw组件」和「n9e-alert组件」。

「n9e-pushgw组件」提供类似于remote_writeremote_read功能,categraf采集器将数据通过remote_write推送给n9e-pushgw组件,然后转存到tsdb时序数据,n9e服务端查询检索数据时通过remote_read讲求转发到对应机房下的n9e-pushgw组件。n9e-alert组件提供基于tsdb时序库中的指标数据告警功能。

项目目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
./
├── LICENSE //开源协议
├── Makefile //构建脚本
├── README.md
├── README_en.md
├── alert/ //告警引擎相关##**
├── center/ //中心端(节点)管理相关##**
├── cli/ //版本升级相关**
├── cmd/ //项目的可执行文件
├── conf/ //解析配置文件相关
├── doc/ //文档相关
├── docker/ //容器相关
├── etc/ //配置文件目录
├── fe.sh*
├── go.mod //项目版本依赖包
├── go.sum
├── integrations/ //内置模板,监控大盘和告警规则
├── memsto/ //猜测是memory storage,缓存相关
├── models/ //模型定义
├── pkg/ //公开的库代码,可被外部应用程序依赖使用
├── prom/ //对PromQL的支持相关
├── pushgw/ //转发网关(收采集器,发时序库)**
└── storage/ //存储模块,里面有redis##和关系库**

四大模块:

  • cli,升级用和主业务无关,先不了解
  • center,中心节点也就是n9e的完整版包含各种管理和配置,会调用alert和pushgw模块
  • alert,告警引擎,告警功能的各种代码
  • pushgw,转发网关,接收各种采集器的数据转发给时序库

alert告警引擎

架构图

  • collector 即 agent,可以采集机器常见指标,原生支持日志监控,支持插件机制,支持业务通过接口直接上报数据;
  • transfer提供 rpc 接口接收 collector 上报的数据,然后通过一致性哈希,将数据转发给多台tsdb和多台judge;
  • tsdb 即 open-falcon 中的 graph 组件,用于存储历史数据,支持配置为双写模式提升系统容灾能力,tsdb 会把监控数据转发一份给 index 建索引;
  • index 是内存索引模块,替换原来的 mysql 方案,在内存里构建索引,便于后续数据检索,在检索的灵活性和检索性能方面大幅提升;
  • judge 是告警引擎,周期性的从 monapi(portal) 同步监控策略,然后对接收到的数据做告警判断,如满足阈值,则生成告警事件推送到 redis 队列;
  • monapi(alarm) 从 redis 队列中读取 judge 生成的事件,进行二次处理,补充一些元信息,生成告警消息(alert),重新推送回 redis 队列;
  • 各发送组件,比如 mail-sender、sms-sender 等,从 redis 读取告警消息,发送告警,抽象出各类 sender 是为了后续定制方便;
  • monapi 集成了原来多个模块的功能,提供接口给 js 调用,api 前缀为 /api/portal,数据查询走 transfer,去除了 open-falcon 中原来的 query 组件,api 前缀为 /api/transfer,索引查询的 api 前缀 /api/index,于是,在前端统一搭建 nginx,即可通过不同 location 将请求转发到不同后端;
  • 数据库仍然使用 MySQL,主要存储的内容包括:用户信息、团队信息、树节点信息、告警策略、监控大盘、屏蔽策略、采集策略、部分组件心跳信息等。

告警引擎目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
tree -LF 3 .
./
├── aconf/ //告警配置
│ └── conf.go
├── alert.go //告警初始化入口
├── astats/ //统计信息
│ └── stats.go
├── common/ //通用方法或结构体
│ ├── conv.go //转换成异常点结构体
│ └── key.go //规则key序列化和标签判断
├── dispatch/ //发送告警
│ ├── consume.go //消费者
│ ├── dispatch.go //生产者
│ ├── log.go //日志记录
│ ├── notify_channel.go //通知通道的map
│ └── notify_target.go //通知目标结构体,维护发送目标
├── eval/ //评估
│ ├── alert_rule.go //告警调度
│ └── eval.go //查询时序库评估告警,查询promql
├── mute/ //屏蔽
│ └── mute.go //告警屏蔽
├── naming/ //告警命名
│ ├── hashring.go //数据源对应哈希环
│ └── heartbeat.go //告警心跳维护
├── process/ //告警处理
│ ├── alert_cur_event.go //告警发生的事件的map
│ └── process.go //告警处理
├── queue/ //告警队列
│ └── queue.go //Gauge类型的告警队列
├── record/ //记录规则
│ ├── prom_rule.go //将记录规则写入时序库
│ ├── sample.go //样本,转换记录用
│ └── scheduler.go //记录规则调度
├── router/ //告警路由
│ ├── router.go //告警路由
│ └── router_event.go //告警事件路由
└── sender/ //告警通知发送渠道
├── callback.go //回调
├── dingtalk.go //钉钉
├── email.go //邮件
├── feishu.go //飞书
├── mm.go //mm
├── plugin.go //脚本
├── plugin_cmd_unix.go //linux脚本
├── plugin_cmd_windows.go //window脚本
├── sender.go //发送通知
├── telegram.go //telegram
├── webhook.go //webhook
└── wecom.go //企业微信

13 directories, 35 files

入口函数在同名的alert.go,只有两个函数Initialize(configDir string, cryptoKey string) (func(), error)Start(alertc aconf.Alert, pushgwc pconf.Pushgw, ...lots of arg... isCenter bool) ,打包的n9e程序通过Start函数接入告警引擎,n9e-alert通过Initialize函数完成一些配置工作,然后也是调用Start函数来实现调用。

其中n9e-alert程序有三个参数,其中configs和crypto-key是Initialize函数需要的参数。

  • configs 配置文件路径,优先读取环境变量中N9E_CONFIGS的值没有设置为默认值etc,
  • crypto-key 加密密钥,默认’’
  • version 是否输出版本,默认false,如果是true,打印在输出到控制台上后立即结束程序

配置文件可支持 toml, json, yaml三种类型;
由于告警配置Alert.Heartbeat中IP属性建议设置一个唯一值,配置文件没有填写会自动填充,其逻辑如下:

  1. 首先通过AliDNS获取本机ip
  2. 没有获取到有效值会尝试获取hostname,获取失败会直接退出程序
  3. 获取hostname中包含localhost会打印一个提示建议用一个唯一值
  4. 在填充完 config.Alert.Heartbeat.IP 后会再根据配置文件HTTP.Port的配置(17000)组合一起设置 config.Alert.Heartbeat.Endpoint 属性

syncStats是两个用于监控同步状态的Exporter:duration和sync_number

  • duration是定时任务使用时长

  • sync_number是定时任务同步数量。

alertStats是6个用于监控告警状态的Exporter

  • samples_received_total:从各个接收接口接收到的监控数据总量
  • alerts_total:产生的告警总量
  • alert_queue_size:内存中的告警事件队列的长度
  • sample_queue_size:数据转发队列,各个队列的长度
  • http_request_duration_seconds:一些重要的请求,比如接收数据的请求,应该统计一下延迟情况
  • forward_duration_seconds:发往后端TSDB,延迟如何

告警引擎启动入口-Start

  1. 查询数据中用户,用户团队,订阅告警,记录规则信息并生成时序库的统计指标;

    1
    2
    3
    4
    //新建UserCacheType类型,查询数据库中user表,设置标签为sync_users的统计指标值,并启用单独线程9s同步一次数据,并返回指针,用户
    userCache := memsto.NewUserCache(ctx, syncStats)
    //查询user_group表,设置标签为sync_user_groups的统计指标值,用户团队
    userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
    • users 是用户表,保存了用户基础数据,除了通过管理员在页面上手动添加,还通过可以对接公司内部 SSO 系统来添加用户(当用户通过 SSO 登录的时候会自动把这个用户的信息插入 users 表)
    • user_group 是团队表,默认添加了 demo-root-group 团队。
  2. 初始化设置告警渠道,渠道联系的机器人和系统配置的告警模版等配置;

    1
    2
    //启用单独线程初始化,获取config表数据,保证数据库有告警内置告警渠道,渠道机器人和告警模版
    go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)
    • configs 是夜莺平台配置表,存放系统中的多种配置信息。就只保存有配置键和配置值
  3. 按心跳配置更新告警引擎中的服务列表 ;

    1
    naming := naming.NewNaming(ctx, alertc.Heartbeat)
  4. 初始化时序库相关;

    1
    writers := writer.NewWriters(pushgwc)
  5. 记录规则调度,告警记录调度;

    1
    2
    3
    4
    //记录规则调度
    record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
    //告警规则调度
    eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
  6. 创建告警发送Notify实例,创建告警消费实例Consumer;

    1
    dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, alertc.Alerting, ctx)
  7. 单独线程 更新告警内置模版Tpl文件内容,统计信息,邮件发送;

    1
    2
    3
    4
    5
    6
    7
    8
    //创建告警消费实例Consumer
    go dp.ReloadTpls()
    //消费告警,
    go consumer.LoopConsume()
    //每秒统计一次队列数量
    go queue.ReportQueueSize(alertStats)
    // 告警发送邮件方式处理
    go sender.InitEmailSender(notifyConfigCache.GetSMTP())
  8. 单独线程 消费告警;

    1
    2
    //消费告警,
    go consumer.LoopConsume()

发送告警

通知目标

notify_target.go

结构体 NotifyTarget

  • 字段:
    • userMap:一个映射,键是用户ID,值是NotifyChannels类型,用于存储用户的通知渠道信息。
    • webhooks:一个映射,键是Webhook的URL,值是models.Webhook类型,用于存储Webhook信息。
    • callbacks:一个映射,键是字符串,值是空结构体,用于存储回调信息。
  • 描述NotifyTarget结构体维护所有需要发送的目标用户和通道/回调/钩子信息。

方法分析:

方法 NewNotifyTarget() *NotifyTarget

  • 返回:返回一个新的NotifyTarget实例,其中的映射都是空的。

方法 OrMerge(other *NotifyTarget)

  • 参数other是另一个NotifyTarget实例。
  • 逻辑:将other中的信息按照”或”的方式合并到当前实例中。

方法 AndMerge(other *NotifyTarget)

  • 参数other是另一个NotifyTarget实例。
  • 逻辑:将other中的信息按照”与”的方式合并到当前实例中。

方法 merge(other *NotifyTarget, f func(NotifyChannels, NotifyChannels))

  • 参数:
    • other:另一个NotifyTarget实例。
    • f:一个函数,用于合并NotifyChannels
  • 逻辑:合并other中的信息到当前实例中。

方法 ToChannelUserMap() map[string][]int64

  • 返回:返回一个映射,键是通道名,值是用户ID的切片。

方法 ToCallbackList() []string

  • 返回:返回一个包含所有回调的切片。

方法 ToWebhookList() []*models.Webhook

  • 返回:返回一个包含所有Webhook的切片。

类型 NotifyTargetDispatch

  • 描述:一个函数类型,用于定义从告警事件到信息接收者的路由策略。

函数 NotifyGroupDispatch(...) *NotifyTarget

  • 逻辑:处理告警规则的组订阅关系。

函数 GlobalWebhookDispatch(...) *NotifyTarget

  • 逻辑:获取并处理全局Webhook。

函数 EventCallbacksDispatch(...) *NotifyTarget

  • 逻辑:处理事件回调。

生产者

dispatch.go

结构体 Dispatch

  • 字段:
    • alertRuleCachenotifyConfigCache:各种缓存类型,用于存储警报规则、用户、用户组、警报订阅、目标和通知配置。
    • alertingaconf.Alerting 类型,用于警报配置。
    • SendersExtraSenders:键为字符串,值为 sender.Sender 实例的映射,代表不同的发送机制。Senders 用于存储默认的发送器,而 ExtraSenders 用于存储额外的发送器。
    • tpls:键为字符串,值为模板的映射,用于存储不同通道的消息模板。
    • BeforeSenderHook:函数,它接受一个 models.AlertCurEvent 参数并返回一个布尔值。在发送消息之前被调用,如果返回 true,则继续发送消息;如果返回 false,则停止发送消息。
      • AlertCurEvent:当前警报事件的模型结构体
    • ctx:上下文对象。
    • RwLock:读写锁,用于保护 Senderstpls 的并发访问。

方法:

  • NewDispatch:这是 Dispatch 的构造函数,用于创建并初始化 Dispatch 的实例。

  • ReloadTplsrelaodTpls:这两个方法用于重新加载模板,并更新 Senders 映射。ReloadTpls 方法会周期性地调用 relaodTpls 方法来重新加载模板。

  • HandleEventNotify:这个方法是处理警报或恢复事件的主逻辑。它首先从缓存中获取警报规则,然后填充事件的用户和用户组信息,然后处理订阅关系和移除订阅关系的逻辑,最后发送消息。

  • handleSubshandleSub:这两个方法用于处理订阅规则的事件。handleSubs 方法获取订阅规则并调用 handleSub 方法来处理每个订阅规则的事件。

  • Send:这个方法用于处理事件的发送。它首先调用 BeforeSenderHook 函数来检查是否需要发送消息,然后通过不同的通道发送消息,并处理事件回调、全局 Webhooks 和插件通知。

    夜莺内置了邮件、企微机器人、钉钉机器人、飞书机器人等通知方式,如果用户在告警规则配置中设置了这些通知方式,夜莺内置就会调用对应的通知方式完成消息推送,如果用户设置了自定义的通知方式,夜莺没有内置对应的通知方法,就会在日志中打印 ’no sender channel: xxx’,这个时候就需要用户在脚本里实现通知方法了。

    • 调用 sender.SendCallbacks 函数来处理事件回调。
    • 调用 sender.SendWebhooks 函数来处理全局 Webhooks。
    • 调用 sender.MayPluginNotify 函数来处理插件通知(通过脚本)。
  • genNoticeBytes:这个方法用于生成通知的字节序列。它创建一个 Notice 结构体,然后将其序列化为 JSON 格式的字节序列。

  • fillUsers:这个方法用于填充 models.AlertCurEventNotifyGroupsObjNotifyUsersObj 字段。它从缓存中获取用户和用户组信息,并设置这些字段。

函数分析:

  • mapKeys:从映射中提取键并返回一个切片。

消费者

结构体:

Consumer 结构体是告警事件的消费者,它包含以下字段:

  • alerting:告警配置。
  • ctx:上下文。
  • dispatchDispatch 类型的指针,用于处理告警事件。

NeNewConsumer 函数:用于创建一个新的Consumer实例。

  • 参数:

    • alerting:告警配置。

    • ctx:上下文。

    • dispatchDispatch 类型的指针。

LoopConsume 方法:是Consumer的主要方法,它循环消费告警事件队列中的事件,并调用consume方法进行处理。

  • 流程:

    • 创建一个信号量,用于控制并发消费的数量。

    • 循环从告警事件队列中取出事件。

    • 如果队列为空,则暂停一段时间后继续。

    • 调用consume方法处理取出的事件。

consume方法:用于处理一组告警事件。

  • 参数:

    • events:告警事件的数组。

    • sema:信号量。

  • 流程:

    • 遍历告警事件数组。

    • 对每个事件,获取信号量的许可,然后启动一个协程调用consumeOne方法处理该事件。

consumeOne 方法:用于处理一个告警事件。

  • 参数:

    • event:告警事件。
  • 流程:

    • 记录事件日志。

    • 解析事件的规则名称、规则注释和注解。

    • 调用persist方法持久化事件。

    • 如果事件已恢复并且不需要通知,则返回。

    • 调用dispatchHandleEventNotify方法处理事件。

persist 方法:用于持久化告警事件。

  • 参数:

    • event:告警事件。
  • 流程:

    • 如果事件的状态不为0,则返回。

    • 如果当前实例不是中心实例,则将事件发送到中心实例进行持久化。

    • 否则,调用models.EventPersist方法持久化事件。

告警发送渠道

nightingale-6.1.0/alert/sender/sender.go

结构和类型:

  • Sender:一个接口,定义了发送消息通知的方法。任何实现了Send(ctx MessageContext)方法的类型都满足这个接口。
  • MessageContext:一个结构体,代表由事件生成的告警通知的上下文。它包含用户、告警规则和当前告警事件的信息。

函数解析:

  • NewSender(key string, tpls map[string]*template.Template, smtp ...aconf.SMTPConfig) Sender:根据给定的key创建一个新的Senderkey是一个字符串,它决定了创建哪种类型的Sendertpls是一个映射,它包含了用于生成消息的模板。smtp是一个可变参数,它包含SMTP配置。
  • BuildMessageContext(rule *models.AlertRule, events []*models.AlertCurEvent, uids []int64, userCache *memsto.UserCacheType) MessageContext:构建并返回一个MessageContext。它使用给定的告警规则、告警事件、用户ID和用户缓存来创建MessageContext
  • buildTplMessage(tpl *template.Template, events []*models.AlertCurEvent) string:使用给定的模板和告警事件来构建消息。它遍历所有的告警事件,并使用模板来生成每个事件的消息。然后,它将所有的消息连接在一起,并返回结果。

邮箱发送

nightingale-6.1.0/alert/sender/email.go

工作流程
  • InitEmailSender函数初始化电子邮件发送器,创建一个gomail.Message类型的通道mailch,并调用startEmailSender函数。
  • startEmailSender函数创建一个gomail.Dialer,并在无限循环中从mailch通道中读取消息并发送它们。如果在发送消息时出现错误,它将尝试重新连接到SMTP服务器并重新发送消息。
  • Send方法创建一个gomail.Message并将其发送到mailch通道。
    • mailch 是一个 Go 语言中的通道(channel),用于在 Go 协程之间传递 *gomail.Message 类型的消息。通道是 Go 语言中一种非常重要的并发编程构造,它允许在不同的 Go 协程之间安全地传递数据。

回调发送

nightingale-6.1.0/alert/sender/callback.go

结构体:

  • TaskForm:用于创建Ibex任务的表单。
  • TaskCreateReply:Ibex任务创建的响应。

函数解析:

  • SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex)
    • 遍历每个URL,发送回调通知。
    • 如果URL以”${ibex}”开头并且事件没有恢复,则处理Ibex任务。
    • 否则,发送JSON格式的事件到指定的URL,并记录响应和状态码。
  • handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex)
    • 从URL中解析任务模板ID和主机。
    • 获取任务模板并检查权限。
    • 构建并发送Ibex任务,并记录任务ID和其他相关信息。
  • canDoIbex(ctx *ctx.Context, username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error)
    • 检查用户是否有权限执行Ibex任务。
    • 如果用户是管理员,返回true。
    • 否则,检查目标的组ID是否与任务模板的组ID匹配。

脚本发送

nightingale-6.1.0/alert/sender/plugin.go

  • MayPluginNotify(noticeBytes []byte, notifyScript models.NotifyScript)
    • 参数:
      • noticeBytes:要发送的通知内容的字节切片。
      • notifyScript:包含通知脚本配置的结构体。
    • 逻辑:
      • 如果noticeBytes为空,则函数直接返回,不执行任何操作。
      • 否则,调用alertingCallScript函数,并传入noticeBytesnotifyScript作为参数。
  • alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript)
    • 参数:
      • stdinBytes:作为标准输入传递给脚本的字节切片。
      • notifyScript:包含通知脚本配置的结构体。
    • 逻辑:
      • 检查notifyScriptEnable字段和Content字段。如果EnablefalseContent为空,则函数直接返回。
      • 根据notifyScriptType字段,确定脚本的文件路径(fpath)。如果Type为1,则fpath就是Content的值;否则,fpath被设置为”.notify_scriptt”。
      • 如果fpath对应的文件不存在或其内容与Content不匹配,则将Content写入fpath对应的文件,并设置文件的权限为0777。
      • 使用exec.Command创建一个命令,该命令执行fpath对应的脚本,并将stdinBytes作为标准输入传递给脚本。
      • 启动命令,并设置命令的超时时间为notifyScriptTimeout字段的值。如果命令超时,则尝试杀死命令,并记录错误日志。
      • 如果命令执行成功,则记录命令的输出;如果命令执行失败,则记录错误日志和命令的输出。

告警服务整体流程

1. 初始化阶段(Initialize 函数):

  • 配置初始化: 通过conf.InitConfig函数读取配置文件和密钥,初始化配置。
  • 日志系统初始化: 使用logx.Init函数初始化日志系统。
  • 上下文创建: 使用ctx.NewContext创建新的上下文。
  • 缓存初始化:
    • 各种状态和告警统计的初始化。
    • 初始化目标缓存、业务组缓存、告警静音缓存、告警规则缓存、通知配置缓存、数据源缓存、用户缓存和用户组缓存。
  • Prometheus客户端创建: 用于查询Prometheus的数据。
  • 外部处理器创建: 用于处理外部告警。
  • 告警服务启动: 调用Start函数启动告警服务。
  • HTTP服务启动: 使用Gin框架启动HTTP服务。

2. 告警服务启动阶段(Start 函数):

  • 告警订阅缓存初始化: 用于缓存告警订阅信息。
  • 记录规则缓存初始化: 用于缓存记录规则。
  • 告警配置初始化: 确保数据库中有告警内置告警渠道、渠道机器人和告警模板。
  • 服务列表更新: 根据心跳配置更新告警引擎中的服务列表。
  • 时序数据库写入初始化: 初始化写入监控数据的时序数据库。
  • 记录规则调度器创建: 用于调度记录规则。
  • 告警规则调度器创建: 用于调度告警规则。
  • 告警发送实例创建: 创建用于发送告警的实例。
  • 告警消费实例创建: 创建用于消费告警的实例。
  • 告警模板重载: 重载告警模板。
  • 告警消费循环: 启动告警消费循环。
  • 队列大小报告: 定期报告队列大小。
  • 邮件发送器初始化: 初始化用于发送邮件的发送器。

3. 告警处理流程:

  • 创建Dispatch实例: 用于处理告警分发。
  • 模板重载: 重载告警模板。
  • 告警消费: 消费告警队列中的告警。
  • 告警处理: 根据告警规则和订阅信息处理告警。
  • 告警发送: 将处理后的告警发送到指定的通道。

4. 告警发送流程:

  • 构建消息上下文: 根据告警规则和事件构建消息上下文。
  • 发送消息: 通过指定的发送器发送消息。
  • 发送回调: 发送告警回调。
  • 发送Webhooks: 发送告警Webhooks。
  • 插件通知: 通过插件发送告警通知。

user服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package models

import (
// 导入所需的包
)

// 定义常量,表示消息通知渠道的类型
const (
Dingtalk = "dingtalk" // 钉钉
Wecom = "wecom" // 企业微信
Feishu = "feishu" // 飞书
FeishuCard = "feishucard" // 飞书卡片消息
Mm = "mm" // 钉钉移动应用
Telegram = "telegram" // 电报
Email = "email" // 邮件
EmailSubject = "mailsubject" // 邮件主题

DingtalkKey = "dingtalk_robot_token" // 钉钉机器人令牌配置键
WecomKey = "wecom_robot_token" // 企业微信机器人令牌配置键
FeishuKey = "feishu_robot_token" // 飞书机器人令牌配置键
MmKey = "mm_webhook_url" // 钉钉移动应用 Webhook URL 配置键
TelegramKey = "telegram_robot_token" // 电报机器人令牌配置键
)

// 默认消息通知渠道列表
var (
DefaultChannels = []string{Dingtalk, Wecom, Feishu, Mm, Telegram, Email, FeishuCard}
)

// 用户模型
type User struct {
Id int64 `json:"id" gorm:"primaryKey"`
Username string `json:"username"` // 用户名
Nickname string `json:"nickname"` // 昵称
Password string `json:"-"` // 密码(在 JSON 中不显示)
Phone string `json:"phone"` // 电话号码
Email string `json:"email"` // 电子邮件
Portrait string `json:"portrait"` // 头像
Roles string `json:"-"` // 用户角色(不在 JSON 中显示)
RolesLst []string `json:"roles" gorm:"-"` // 用户角色列表(在 JSON 中显示)
Contacts ormx.JSONObj `json:"contacts"` // 联系人信息,结构为 map[string]string
Maintainer int `json:"maintainer"` // 是否是管理员,0 表示不是管理员,1 表示是管理员
CreateAt int64 `json:"create_at"` // 创建时间
CreateBy string `json:"create_by"` // 创建者
UpdateAt int64 `json:"update_at"` // 更新时间
UpdateBy string `json:"update_by"` // 更新者
Admin bool `json:"admin" gorm:"-"` // 是否是管理员(在 JSON 中显示)
}

// 定义表名
func (u *User) TableName() string {
return "users"
}

// 将数据库数据转化为前端可用数据的函数,暂未实现
func (u *User) DB2FE() error {
return nil
}

// 用户的字符串表示形式,主要输出用户信息
func (u *User) String() string {
bs, err := u.Contacts.MarshalJSON()
if err != nil {
return err.Error()
}

return fmt.Sprintf("<id:%d username:%s nickname:%s email:%s phone:%s contacts:%s>", u.Id, u.Username, u.Nickname, u.Email, u.Phone, string(bs))
}

// 检查用户是否是管理员
func (u *User) IsAdmin() bool {
for i := 0; i < len(u.RolesLst); i++ {
if u.RolesLst[i] == AdminRole {
return true
}
}
return false
}

// 验证用户信息的有效性
func (u *User) Verify() error {
u.Username = strings.TrimSpace(u.Username)

if u.Username == "" {
return errors.New("用户名为空")
}

if str.Dangerous(u.Username) {
return errors.New("用户名包含无效字符")
}

if str.Dangerous(u.Nickname) {
return errors.New("昵称包含无效字符")
}

if u.Phone != "" && !str.IsPhone(u.Phone) {
return errors.New("电话号码无效")
}

if u.Email != "" && !str.IsMail(u.Email) {
return errors.New("电子邮件无效")
}

return nil
}

其他函数解析:

  1. func (u *User) Add(ctx *ctx.Context) error: 这个方法用于向数据库中添加新用户。它首先检查数据库中是否已存在具有相同用户名的用户,如果存在则返回错误。然后,它设置用户的创建时间和更新时间为当前时间,并调用 Insert 函数将用户信息插入数据库。

  2. func (u *User) Update(ctx *ctx.Context, selectField interface{}, selectFields ...interface{}) error: 这个方法用于更新用户信息。它首先验证用户信息的有效性,然后使用给定的选择字段更新数据库中的用户信息。

  3. func (u *User) UpdateAllFields(ctx *ctx.Context) error: 这个方法用于更新用户的所有字段,而不是仅更新指定字段。它首先验证用户信息的有效性,然后将用户的更新时间设置为当前时间,并更新数据库中的用户信息。

  4. func (u *User) UpdatePassword(ctx *ctx.Context, password, updateBy string) error: 这个方法用于更新用户的密码。它接受一个新密码和更新者的信息,然后更新用户的密码、更新时间和更新者,并将这些更改保存到数据库中。

  5. func (u *User) Del(ctx *ctx.Context) error: 这个方法用于删除用户。它使用事务来执行两个删除操作:首先删除与用户相关的用户组成员信息,然后删除用户自身的信息。

  6. func (u *User) ChangePassword(ctx *ctx.Context, oldpass, newpass string) error: 这个方法用于更改用户的密码。它接受旧密码和新密码作为参数,并使用 CryptoPass 函数将它们加密。然后,它检查旧密码是否与数据库中存储的密码匹配,如果匹配则更新密码为新密码。

  7. func UserGet(ctx *ctx.Context, where string, args ...interface{}) (*User, error): 这个函数用于根据给定的条件从数据库中获取用户信息。它接受一个查询条件 where 和可选的查询参数 args,并返回满足条件的第一个用户对象。

  8. func UserGetByUsername(ctx *ctx.Context, username string) (*User, error): 这个函数用于根据用户名从数据库中获取用户信息。

  9. func UserGetById(ctx *ctx.Context, id int64) (*User, error): 这个函数用于根据用户ID从数据库中获取用户信息。

  10. func InitRoot(ctx *ctx.Context): 这个函数用于初始化根用户。它首先尝试获取名为 “root” 的用户,如果该用户不存在,则不执行任何操作。如果用户存在但密码长度小于等于 31,则将用户的密码重新加密并更新到数据库中。

  11. func PassLogin(ctx *ctx.Context, username, pass string) (*User, error): 这个函数用于通过用户名和密码进行登录验证。它首先根据用户名从数据库中获取用户信息,然后将提供的密码与数据库中存储的密码进行比较,如果匹配则返回用户对象,否则返回错误。

  12. func LdapLogin(ctx *ctx.Context, username, pass, roles string, ldap *ldapx.SsoClient) (*User, error): 这个函数用于通过 LDAP 进行登录验证。它接受用户名、密码、角色信息和 LDAP 客户端作为参数。首先,它使用 LDAP 客户端来验证用户名和密码,然后尝试从数据库中获取与用户名匹配的用户信息。如果用户不存在,则创建一个新用户,并从 LDAP 中获取用户的属性(昵称、电子邮件、电话号码等),然后将用户信息保存到数据库中。

  13. func UserTotal(ctx *ctx.Context, query string) (num int64, err error): 这个函数用于获取满足指定查询条件的用户总数。可以通过传递一个查询字符串 query 来筛选用户。如果 query 不为空,则根据用户名、昵称、电话号码或电子邮件进行模糊匹配筛选用户;如果 query 为空,则获取所有用户的总数。

  14. func UserGets(ctx *ctx.Context, query string, limit, offset int) ([]User, error): 这个函数用于根据查询条件获取一定数量的用户列表。它接受查询条件 query、每页返回的用户数量 limit 和偏移量 offset 作为参数,并返回用户列表。同样,如果 query 不为空,则根据用户名、昵称、电话号码或电子邮件进行模糊匹配筛选用户。

  15. func UserGetAll(ctx *ctx.Context) ([]*User, error): 这个函数用于获取所有用户的列表。如果当前上下文不是中心,则使用 poster.GetByUrls 函数从外部服务获取用户列表;否则,从数据库中获取用户列表,并设置用户的角色信息和管理员标志。

  16. func UserGetsByIds(ctx *ctx.Context, ids []int64) ([]User, error): 这个函数用于根据一组用户ID获取用户列表。它接受一个整数切片 ids,包含要获取的用户的ID。然后,它从数据库中获取这些用户,并设置用户的角色信息和管理员标志。

  17. func (u *User) CanModifyUserGroup(ctx *ctx.Context, ug *UserGroup) (bool, error): 这个方法用于检查用户是否有权限修改指定的用户组。它首先检查用户是否是管理员,如果是管理员则具有权限。然后,它检查用户是否是用户组的创建者,如果是创建者也具有权限。最后,它检查用户是否是用户组的成员,如果是成员也具有权限。

  18. func (u *User) CanDoBusiGroup(ctx *ctx.Context, bg *BusiGroup, permFlag ...string) (bool, error): 这个方法用于检查用户是否有权限执行指定的业务组操作。它首先检查用户是否是管理员,如果是管理员则具有权限。然后,它检查用户是否属于任何一个具有指定权限标志的用户组,如果属于任何一个用户组则具有权限。

  19. func (u *User) CheckPerm(ctx *ctx.Context, operation string) (bool, error): 这个方法用于检查用户是否具有执行指定操作的权限。它首先检查用户是否是管理员,如果是管理员则具有权限。否则,它调用 RoleHasOperation 函数检查用户的角色是否具有执行该操作的权限。

  20. func UserStatistics(ctx *ctx.Context) (*Statistics, error): 这个函数用于获取与用户相关的统计信息。如果当前上下文不是中心,则使用 poster.GetByUrls 函数从外部服务获取统计信息;否则,从数据库中获取统计信息,包括用户总数和最后更新时间。

  21. func (u *User) NopriIdents(ctx *ctx.Context, idents []string) ([]string, error): 这个方法用于根据用户的权限过滤一组标识符。如果用户是管理员,则返回原始标识符列表;否则,根据用户的用户组和业务组权限过滤标识符列表。

脚本运行

说明:在运行脚本的时候,相关程序直接获取前端展示的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript, stats *astats.Stats) {
// not enable or no notify.py? do nothing
config := notifyScript
if !config.Enable || config.Content == "" {
return
}

channel := "script"
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
fpath := ".notify_scriptt"
if config.Type == 1 {
fpath = config.Content
} else {
rewrite := true
if file.IsExist(fpath) {
oldContent, err := file.ToString(fpath)
if err != nil {
logger.Errorf("event_script_notify_fail: read script file err: %v", err)
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
return
}

if oldContent == config.Content {
rewrite = false
}
}

if rewrite {
_, err := file.WriteString(fpath, config.Content)
if err != nil {
logger.Errorf("event_script_notify_fail: write script file err: %v", err)
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
return
}

err = os.Chmod(fpath, 0777)
if err != nil {
logger.Errorf("event_script_notify_fail: chmod script file err: %v", err)
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
return
}
}
fpath = "./" + fpath
}

cmd := exec.Command(fpath)
cmd.Stdin = bytes.NewReader(stdinBytes)

// combine stdout and stderr
var buf bytes.Buffer
cmd.Stdout = &buf
cmd.Stderr = &buf

err := startCmd(cmd)
if err != nil {
logger.Errorf("event_script_notify_fail: run cmd err: %v", err)
return
}

err, isTimeout := sys.WrapTimeout(cmd, time.Duration(config.Timeout)*time.Second)

if isTimeout {
if err == nil {
logger.Errorf("event_script_notify_fail: timeout and killed process %s", fpath)
}

if err != nil {
logger.Errorf("event_script_notify_fail: kill process %s occur error %v", fpath, err)
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
}
return
}

if err != nil {
logger.Errorf("event_script_notify_fail: exec script %s occur error: %v, output: %s", fpath, err, buf.String())
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
return
}

logger.Infof("event_script_notify_ok: exec %s output: %s", fpath, buf.String())
}
  1. 检查通知脚本是否启用:
    • 首先,它检查 notifyScript 中的 Enable 字段是否为 true,以及 Content 字段是否非空。
    • 如果通知脚本未启用或未提供有效的脚本内容,则函数会立即返回,不执行任何后续操作。
  2. 增加统计信息:
    • 如果通知脚本被启用,函数会使用 stats 统计信息对象记录一次通知事件,以标识这次通知是通过 “script” 渠道发送的。
  3. 确定脚本文件路径:
    • 函数根据通知脚本的类型(config.Type)来决定脚本文件的路径。
    • 如果类型是 1,脚本路径将使用 config.Content 字段指定的内容。
    • 否则,函数会检查是否已经存在名为 “.notify_scriptt” 的脚本文件。
    • 如果已存在且内容与配置中的内容相同,函数将不会重写脚本文件,否则将覆盖文件内容。
  4. 执行脚本命令:
    • 使用确定的脚本文件路径创建一个 exec.Command 对象。
    • 将输入数据 stdinBytes 传递给脚本的标准输入。
    • 执行脚本命令,并捕获标准输出和标准错误的内容。
  5. 处理命令执行结果:
    • 如果在启动命令时发生错误,函数将记录错误消息并返回。
    • 如果命令运行超时,函数将记录相应的错误消息,包括超时和进程被终止的信息。
    • 如果命令执行出错,函数将记录错误消息和标准输出/标准错误的内容,并增加错误的统计信息。
    • 如果命令成功执行,函数将记录成功的消息以及命令的标准输出内容。

参考文章

mp.weixin.qq.com/s/GPIQ4-8o1z7bNJq7M7IvXg

手把手教你使用钉钉发送告警 (qq.com)

夜莺接入飞书告警保姆教程_xiangzilong的博客-CSDN博客