如何使用InfluxDB和Flux查询语言创建Grafana警报

Grant Pinkos
作者:Grant Pinkos

最后更新于2023年7月18日

高级

简介

Grafana警报 代表了一种强大的新方法来处理系统可观察性和事件响应管理。虽然警报平台最知名的是与Prometheus的强大集成,但该系统可以与InfluxDB等众多流行数据源配合工作。在本教程中,我们将学习如何使用InfluxDB和较新的Flux查询语言创建Grafana警报。我们将涵盖五种常见场景,从最基础的到最复杂的。这五个场景结合起来,将为几乎任何类型的警报查询提供极好的指南,您希望使用Grafana和Flux创建这些警报。

在我们深入探讨警报场景之前,考虑到InfluxDB的两个流行查询语言的发展是值得的:InfluxQL和Flux。最初,InfluxDB使用InfluxQL作为它们的查询语言,该语言使用类似SQL的语法。但从InfluxDB v1.8版本开始,公司引入了Flux,这是一种“专为查询、分析和在数据上执行操作而设计的开源函数式数据脚本语言”。“Flux”,其官方文档继续说明,“将查询、处理、写入和执行数据的相关代码统一为单一语法。该语言旨在易于使用、阅读、灵活、可组合、可测试、可贡献和可分享。”

在接下来的五个示例中,我们将看到新Flux查询语言是多么强大和灵活。我们还将看到Flux与Grafana警报的配合是多么完美。

示例1:在值高于或低于设置阈值时创建警报

我们的第一个示例使用了一个常见的InfluxDB和Grafana警报的实际场景。对于IoT和边缘应用来说非常受欢迎,InfluxDB擅长现场实时可观察性。在本例中,以及实际上在许多后续示例中,我们将考虑一个假设的场景,即我们在监控一家制造厂中的多个液体储罐。这个场景,基于InfluxDB和警报的实际应用,将允许我们通过Grafana的各个警报设置进行工作,从简单到复杂逐步进行。

对于示例1,让我们考虑以下场景:我们正在监控一个储罐,A5,我们为其存储实时温度数据。我们需要确保这个储罐的温度始终高于30°C且低于60°C。

我们希望编写一个Grafana警报,每当储罐A5的温度超过30°C的阈值或60°C的上限阈值时,它就会触发。

为此,我们需要:创建一个Grafana警报规则,添加一个Flux查询,然后向警报规则中添加表达式。

创建Grafana警报规则

  1. 打开Grafana警报菜单并选择警报规则
  2. 点击新建警报规则
  3. 给你的警报规则起一个名字,然后选择Grafana托管警报。对于InfluxDB,你将始终创建一个Grafana托管规则

将初始Flux查询添加到警报规则

仍然在警报规则页面中的步骤2部分,你会看到三个框:一个查询编辑器(A),然后是两个标记为BC的部分。您将使用这三个部分来构建您的规则。让我们逐一进行。

首先,我们想查询我们假想的InfluxDB实例中的数据,以获得储罐A5的温度的时间序列图。为此,您需要从下拉菜单中选择您的InfluxDB数据源,然后编写如下查询:

```
 from(bucket: "RetroEncabulator")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "TemperatureData")
|> filter(fn: (r) => r["Tank"] == "A5")
|> filter(fn: (r) => r["_field"] == "Temperature")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")
```

这是一个相当典型的Flux查询。让我们逐个函数地来分析它。我们从使用from()函数开始,选择我们的罐数据所在的正确桶。然后我们使用range()函数根据时间限制过滤我们的行。然后我们通过三个filter()函数来缩小我们的结果。我们选择一个特定的测量值(InfluxDB中的一个特殊关键字),然后是我们的问题罐A5,然后是一个特定的字段(InfluxDB中的另一个特殊关键字)。在这之后,我们将数据传递给aggregateWindow()函数,它将数据下采样到特定的时间段,然后最终使用yield()函数,该函数指定我们想要的最终结果:平均数

这个Flux查询将生成如下时间序列图表

grafana alerts from flux queries

向Grafana警报规则添加表达式

现在数据显示在我们的规则设置中,下一步是创建一个表达式。移动到B部分。对于这种情况,我们想创建一个Reduce表达式,将上述结果减少到一个单一值。在这张图片中,你可以看到我们选择将时间序列数据从输入ALast值减少。在这种情况下,它返回了53摄氏度的值用于A5罐。

grafana alerts from flux queries

最后,我们需要创建Grafana将对其发出警报的数学表达式。在我们的情况下,我们将编写一个包含两个由OR ||运算符分隔的条件的表达式。我们希望在部分B的结果小于30或大于60时触发警报。这看起来像$B < 30 || $B > 60

grafana alerts from flux queries

将警报条件设置为C - expression。我们现在可以预览我们的警报。以下是当状态为正常时此警报的预览

grafana alerts from flux queries

以下是当状态为警报时的此警报的预览

grafana alerts from flux queries

注意,上述Reduce表达式是必要的。如果没有它,当预览结果时,Grafana会显示警报定义B的评估结果格式无效:看起来像是时间序列数据,只能对缩减数据发出警报

💡提示:如果您的地区仍然固执地使用华氏度,我们可以在aggregateWindow语句之前添加一个map()函数将值从°C转换为°F。注意,我们不是创建一个新字段。我们只是在重映射现有值。

flux
|> map(fn: (r) => ({r with _value: r._value * 1.8 + 32.0}))

结论

使用这三个步骤,您可以创建一个基于Flux的Grafana警报,该警报将根据单个数据源的两个阈值之一触发。但是,如果您需要根据多个条件和多个时间序列触发警报怎么办?在第2个示例中,我们将涵盖这种情况。

示例2:如何从两个查询和两个条件创建Grafana警报

让我们来稍微改变一下局面,比如说例子二,离开我们的想象中的制造工厂。想象一下你是《回到未来》中的伟大的埃米特·布朗博士的助理,博士交给你的任务是:“每次当时间旅行的两个条件都满足时,向我的警报器发送警报:当一个车辆的时速达到88英里/小时,一个物体产生1.21吉瓦特的电力。”

让我们假设我们正在InfluxDB和Grafana中跟踪这些数据。也假设上述每个数据源都来自不同的器皿。我们应该如何进行警报?我们应该如何使用Grafana和Flux从两个不同的数据源来警报两种不同的条件?

向Grafana警报规则添加两个Flux查询

就像我们在例子1中所做的那样,我们先来模拟我们的查询。我们查询交通工具数据的方式与我们的上一个查询非常相似。我们使用from()range()和一系列的filter()函数。然后我们使用AggregateWindow()yield()进一步筛选我们的数据。在这种情况下,结果是追踪1983款德洛瑞安的时速的时间序列。

flux
from(bucket: "vehicles")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "VehicleData")
|> filter(fn: (r) => r["VehicleType"] == "DeLorean")
|> filter(fn: (r) => r["VehicleYear"] == "1983")
|> filter(fn: (r) => r["_field"] == "velocity")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")

我们的第二个查询将在我们的电力资源(希尔谷钟楼上的闪电)达到所需的1.21吉瓦特时触发警报。这样的查询将与我们的车辆速度查询非常相似。

flux
from(bucket: "HillValley")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "ElectricityData")
|> filter(fn: (r) => r["Location"] == "clocktower")
|> filter(fn: (r) => r["Source"] == "lightning")
|> filter(fn: (r) => r["_field"] == "power")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")

我们现在准备好使用表达式来修改这些数据。

向Grafana警报规则添加表达式

  1. 现在,让我们使用相同的步骤将每个查询减少到最后的(最新的)值。减少查询A到一个值可能看起来像这样

    grafana alerts from flux queries

  2. 这里我们正在减少查询B

    grafana alerts from flux queries

  3. 现在,在第C节中,我们需要创建一个要警报的数学表达式。在这种情况下,我们将使用AND&&运算符来指定必须满足两个条件:C的值(来自查询A的减少值)必须大于88.0,而D的值(来自查询B的减少值)必须大于1.21。我们将其写成$C > 88.0 && $D > 1.21

    grafana alerts from flux queries

这里是我们警报的预览

grafana alerts from flux queries

💡提示:如果你的InfluxDB中的数据小数位数过多(如上面显示的1.2104705741732575),你希望你的Grafana警报更易于阅读,请尝试使用{{ printf “%.2f” $values.D.Value }}。例如,在摘要说明中,我们可以编写以下内容

{{  $values.D.Labels.Source }} at the {{  $values.D.Labels.Location }} has generated {{ printf "%.2f"  $values.D.Value }} jigowatts.`

它将显示如下:基于Flux查询的Grafana警报)

您可以参考我们关于警报信息模板的文档,以了解更多关于这个强大功能的信息。

结论

在这个例子中,我们展示了如何创建一个基于Flux的警报,它使用两个不同查询的两项不同条件,这些查询使用来自两个不同数据源的数据。例如三将换挡并解决另一个流行的警报场景:如何根据聚合值(每日)创建警报。

示例3:如何根据聚合(每日)值创建Grafana警报

Grafana社区论坛中,最常见的要求之一是绘制每日的电力消耗和生产。这种类型的数据通常存储在InfluxDB中。在这个例子中,我们将看到如何将时间序列数据聚合为日值,然后对此进行警报。

假设我们的电表每小时向InfluxDB发送一次读数,包含该小时的千瓦时(kWh)总消耗量。我们想要编写一个查询,将每小时这些值聚合为日值,然后创建一个当耗电(kWh)超过每日5000 kWh时触发的警报。

将初始Flux查询添加到您的Grafana警报规则

  1. 让我们从检查我们的7天周期每小时数据的典型查询及其结果时间图开始。以下是一个这样的查询示例

    flux
    from(bucket: "RetroEncabulator")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "ElectricityData")
    |> filter(fn: (r) => r["Location"] == "PlantD5")
    |> filter(fn: (r) => r["_field"] == "power_consumed")
    |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
    |> yield(name: "power")

    我们可以看到这里与示例1和2中相同的Flux函数模式。这样的查询将生成以下类似的图形

    grafana alerts from flux queries

  2. 现在让我们调整我们的查询以计算每日使用情况。对于许多数据源,这可能是一个相当复杂的操作。但使用Flux,通过简单更改aggregateWindow函数参数,我们可以在相同的7天周期内计算每日使用情况

    flux
    from(bucket: "RetroEncabulator")
      |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r["_measurement"] == "ElectricityData")
      |> filter(fn: (r) => r["Location"] == "PlantD5")
      |> filter(fn: (r) => r["_field"] == "power_consumed")
      |> aggregateWindow(every: 1d, fn: sum)
      |> yield(name: "power")

    注意我们如何调整我们的aggregateWindow()函数到aggregateWindow(every: 1d, fn: sum)。这会产生如下图所示的图形

    grafana alerts from flux queries

  3. 将表达式添加到Grafana警报规则中。

    现在我们已经正确地执行了每日查询,我们可以继续使用以前的相同模式,添加表达式以减少我们的结果并执行数学运算。

    如前所述,让我们将查询简化为单个值

    grafana alerts from flux queries

    现在创建一个要警报的数学表达式并设置评估行为。在这种情况下,我们想要写入$B > 5000

    grafana alerts from flux queries

    现在我们正在警报我们的每日电耗,只要超过5000千瓦时就会进行警报。以下是我们警报的预览

    grafana alerts from flux queries

结论

将绘图和汇总电能是结合InfluxDB和Grafana的常见用例。使用Flux,我们看到了 grouping我们的数据按天然后对此进行警报是多么容易。在我们的下两个示例中,我们将检查Grafana警报的更复杂形式:多维警报。

示例4:使用Flux创建动态(多维)Grafana警报

让我们回到示例1中的流体储罐,但这一次假设我们有5个储罐(A5、B4、C3、D2和E1)。我们现在正在跟踪五个储罐的温度:A5、B4、C3、D2和E1。

我们想要创建一个单一的多维警报,在任一储罐的温度低于30°C或高于60°C时提醒我们。

将初始Flux查询添加到您的Grafana警报规则

我们像往常一样开始编写我们的初始查询。这非常类似于我们的示例1中的查询,但请注意我们的第三个filter()函数捕获了来自所有五个储罐的数据,而不仅仅是A5

flux
from(bucket: "HyperEncabulator")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "TemperatureData")
|> filter(fn: (r) => r["MeasType"] == "actual")
|> filter(fn: (r) => r["Tank"] == "A5" or r["Tank"] == "B4" or r["Tank"] == "C3" or r["Tank"] == "D2" or r["Tank"] == "E1")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")

💡技巧:如果储罐每晚从23:00到07:00关闭,它们可能会低于30°C的阈值。如果不想在这些小时内收到警报,可以使用Flux函数hourSelection()按指定的小时范围内的时间值筛选行。

flux
|> hourSelection(start: 7, stop: 23)`

上面的查询将生成如下所示的时间序列图

grafana alerts from flux queries

向Grafana警报规则添加表达式

  1. 我们创建一个Reduce表达式,将每个储罐的时间序列减少为单个值。这给我们五个不同的温度值

    grafana alerts from flux queries)

  2. 创建一个要警报的数学表达式。这是与示例1中完全相同的表达式,$B < 30 || $B > 60

    grafana alerts from flux queries

如我们所见,有三个储罐在可接受的阈值内,而有两个储罐越过了上限。这将对储罐D2E1触发展警报。

结论

使用多维警报我们可以避免重复。但如果情况更复杂会怎样?在接下来的最后一个示例中,我们将检查如何使用多维警报创建最动态的警报。

示例5:如何使用多个查询和多个阈值与Flux创建动态(多维)Grafana警报

对于这个最终示例,让我们继续用我们的五个流体储罐及其五个数据集。再次假设每个储罐都有一个温度控制器,其设定点值存储在InfluxDB中。让我们混合一下,假设每个储罐都有一个不同的设定点,其中我们始终需要在其特定允许范围内。

我们想要创建一个单一的多维警报,涵盖每个储罐的每个唯一场景,当任何储罐的温度超过其特定允许范围时触发警报。

为了更好地直观展示这一挑战,以下是一个表格,展示了我们的五个罐体、它们的温度设定点和允许的范围

罐体设定点允许范围(±3)
A54542至48
B45552至58
C36057至63
D27269至75
E18077至83

使用Grafana警报,我们可以创建一个单维规则,涵盖所有五个罐体,并可以使用Flux比较每个罐体的设定点和实际值。换句话说,一个多维警报可以监控五个不同的罐体,每个罐体的设定点和实际值不同,但都有一个共同的“允许阈值”(即温差±3度)。

将初始Flux查询添加到您的Grafana警报规则

让我们从数据查询开始。它与我们的过去查询类似,但现在更加复杂。我们必须添加额外的函数来形成合适格式的数据,包括pivot()map()rename()keep()drop()

flux
from(bucket: "HyperEncabulator")
 |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
 |> filter(fn: (r) => r["_measurement"] == "TemperatureData")
 |> filter(fn: (r) => r["MeasType"] == "actual" or r["MeasType"] == "setpoint")
 |> filter(fn: (r) => r["Tank"] == "A5" or r["Tank"] == "B4" or r["Tank"] == "C3" or r["Tank"] == "D2" or r["Tank"] == "E1")
 |> filter(fn: (r) => r["_field"] == "Temperature")
 |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
 |> pivot(rowKey:["_time"], columnKey: ["MeasType"], valueColumn: "_value")
 |> map(fn: (r) => ({ r with _value: (r.setpoint - r.actual)}))
 |> rename(columns: {_value: "difference"})
 |> keep(columns: ["_time", "difference", "Tank"])
 |> drop(columns: ["actual", "setpoint"])
 |> yield(name: "mean")

注意上面我们计算了实际值与设定点之间的差异。Grafana解析从InfluxDB的结果的方式是,如果发现一个_value列,就假设它是一个时间序列。快速解决方法是添加以下rename()函数。

flux
 |> rename(columns: {_value: "something"})

上面的查询结果产生了这个时间序列。

grafana alerts from flux queries

向Grafana警报规则添加表达式

  1. 再次,我们为上述查询创建一个Reduce表达式,将上述每个结果减少到单个值。这个值表示每个罐体的设定点与其实际实时温度之间的温度差。

    grafana alerts from flux queries

  2. 现在我们创建一个数学表达式进行警报。这次我们创建一个条件,检查我们的Reduce计算的绝对值是否大于3,即abs($(B))>3.0

    grafana alerts from flux queries

现在我们可以看到,有两个罐体D2E1计算结果为真。当我们预览警报时,我们可以看到这两个罐体会触发通知并将它们的状态从正常变为警报

grafana alerts from flux queries grafana alerts from flux queries

结论

Flux查询和Grafana统一警报是识别您数据集中或整个系统中的任何可告警条件的强大组合。有关Grafana警报的更多信息,请访问此处文档。有关Flux查询语言的更多信息,请访问此文档