跳转到主要内容

构建流数据源插件

本教程将指导您如何使用 create-plugin 工具和示例源代码构建 Grafana 流数据源插件。

流数据源插件仅在数据源中有新数据时才刷新插件中的数据。这种方法与仪表板设置为在指定间隔自动刷新的典型方法不同。流的主要优势是消除了每次数据需要更新时重新运行查询的开销。

本教程展示了如何创建具有前端和后端的数据源后端插件。完成教程后,您将构建一个插件,该插件在后台生成随机数并将其作为可视化返回到前端。

示例截图和代码

以下图像显示了使用此数据源的仪表板

Grafana streaming data source.
Grafana 流数据源。

我们将一起构建的代码是 具有 WebSockets 的流数据源 示例的简化版本。请注意,我们不会使用 WebSockets,但在跟随本教程时可以参考此代码。

步骤 1:搭建插件

虽然可以从头开始创建所有插件代码,只要您遵循 Grafana 插件规范并实现所有必要的接口。但是,创建插件的最简单方法是使用我们提供的 @grafana/create-plugin 工具。

  1. 转到您想要创建插件目录的位置,并运行
npx @grafana/create-plugin@latest
注意

关于设置开发环境的完整要求和建议,请参考开始使用

创建插件时会提示您一些关于插件名称、类型、组织以及其他选项的问题。

  1. 对于本教程,将插件的类型设置为数据源,并指定它具有后端部分。

此工具将创建一个包含所有必需代码和依赖项的框架,以运行数据源后端插件。如果您编译代码,您将拥有一个非常简单的后端插件。然而,此生成的代码还不是流插件,我们需要进行一些修改。

第二步:设置前端

由于我们想将一些参数传递给插件的后端部分,我们首先修改/src/components/QueryEditor.tsx。此组件定义了用户可以向查询提供的输入。我们将添加三个数字输入,用于生成我们的数据

  • tickInterval - 生成数据的间隔
  • upperLimit - 随机数的上限
  • lowerLimit - 随机数的下限

要完成此操作,请按照以下步骤操作。

  1. /src/components/QueryEditor.tsx中的QueryEditor函数替换为以下代码
export function QueryEditor({ query, onChange, onRunQuery }: Props) {
const onLowerLimitChange = (event: ChangeEvent<HTMLInputElement>) => {
onChange({ ...query, lowerLimit: event.target.valueAsNumber });
};

const onUpperLimitChange = (event: ChangeEvent<HTMLInputElement>) => {
onChange({ ...query, upperLimit: event.target.valueAsNumber });
};

const onTickIntervalChange = (event: ChangeEvent<HTMLInputElement>) => {
onChange({ ...query, tickInterval: event.target.valueAsNumber });
};

const { upperLimit, lowerLimit, tickInterval } = query;

return (
<>
<InlineField label="Lower Limit" labelWidth={16} tooltip="Random numbers lower limit">
<Input onChange={onLowerLimitChange} onBlur={onRunQuery} value={lowerLimit || ''} type="number" />
</InlineField>
<InlineField label="Upper Limit" labelWidth={16} tooltip="Random numbers upper limit">
<Input onChange={onUpperLimitChange} onBlur={onRunQuery} value={upperLimit || ''} type="number" />
</InlineField>
<InlineField label="Tick interval" labelWidth={16} tooltip="Server tick interval">
<Input onChange={onTickIntervalChange} onBlur={onRunQuery} value={tickInterval || ''} type="number" />
</InlineField>
<>
);
}
  1. src/datasource.ts文件中,通过流通道指示一个查询。此文件是执行插件前端部分的查询的地方。向DataSource类添加以下方法
  query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const observables = request.targets.map((query, index) => {

return getGrafanaLiveSrv().getDataStream({
addr: {
scope: LiveChannelScope.DataSource,
namespace: this.uid,
path: `my-ws/custom-${query.lowerLimit}-${query.upperLimit}-${query.tickInterval}`, // this will allow each new query to create a new connection
data: {
...query,
},
},
});
});

return merge(...observables);
}

getGrafanaLiveSrv()的调用返回Grafana后端的引用,而getDataStream的调用创建流。如代码片段所示,我们需要提供一些数据以创建流

  • scope - 定义如何使用和控制通道(指定数据源
  • namespace - 我们特定数据源的标识符
  • path - 可以区分由相同数据源创建的通道的部分。在此示例中,我们希望为每个查询创建不同的通道,因此我们将使用查询参数作为路径的一部分。
  • data - 用于在前端和后端之间交换信息

值得注意的是,path可以用于与后端部分交换数据,但对于复杂格式,请使用data属性。

注意

流由参数的组合唯一标识,其格式为:${scope}/${namespace}/${path}。这仅是实现细节,您可能不需要知道这一点,除非用于调试。

第三步:设置插件后端

现在我们需要将必要的代码添加到后端。为此,我们修改 pkg/plugin/datasource.go,这是后端部分定义了处理前端创建的查询的地方。在我们的情况下,因为我们想处理流,所以需要实现 backend.StreamHandler

  1. var 的替换中添加以下部分
var (
_ backend.CheckHealthHandler = (*Datasource)(nil)
_ instancemgmt.InstanceDisposer = (*Datasource)(nil)
_ backend.StreamHandler = (*Datasource)(nil)
)

这意味着我们的 DataSource 将实现 backend.CheckHealthHandlerinstancemgmt.InstanceDisposerbackend.StreamHandler。前两个所需的方法已经在框架中提供;因此,我们只需要实现 backend.StreamHandler 所需的方法。

  1. 让我们先添加 SubscribeStream 方法
func (d *Datasource) SubscribeStream(context.Context, *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
}, nil
}

当用户尝试订阅频道时,将调用此代码。您可以在此处实现权限检查,但在这个例子中,我们只想让用户在每次尝试中都能成功连接;因此,我们只返回后端的 SubscribeStreamStatusOK

  1. 实现 PublishStream 方法
func (d *Datasource) PublishStream(context.Context, *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
return &backend.PublishStreamResponse{
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
}

当用户尝试向频道发布数据时,将调用此代码。由于我们不想在建立第一个连接后从前端接收任何数据,因此我们将只返回 backend.PublishStreamStatusPermissionDenied

  1. 为了防止随机错误,将默认提供的 CheckHealth 方法更改为以下代码
  func (d *Datasource) CheckHealth(_ context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
return &backend.CheckHealthResult{
Status: backend.HealthStatusOk,
Message: "Data source is working",
}, nil
}

请注意,此代码与流插件本身无关。这只是为了避免不更改默认设置而导致的随机错误。

  1. 最后,实现 RunStream 方法
func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
q := Query{}
json.Unmarshal(req.Data, &q)

s := rand.NewSource(time.Now().UnixNano())
r := rand.New(s)

ticker := time.NewTicker(time.Duration(q.TickInterval) * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
// we generate a random value using the intervals provided by the frontend
randomValue := r.Float64()*(q.UpperLimit-q.LowerLimit) + q.LowerLimit

err := sender.SendFrame(
data.NewFrame(
"response",
data.NewField("time", nil, []time.Time{time.Now()}),
data.NewField("value", nil, []float64{randomValue})),
data.IncludeAll,
)

if err != nil {
Logger.Error("Failed send frame", "error", err)
}
}
}
}

一旦实现,此方法将在每个连接上调用一次,并且我们可以解析前端发送的数据,以便 Grafana 可以连续发送数据。在这个特定的例子中,我们将使用前端发送的数据来定义发送帧的频率和随机数生成的范围。

因此,方法的第一个部分解析数据,查询被定义为 struct

  1. 将以下代码添加到 ./pkg/plugin/query.go
  type Query struct {
UpperLimit float64 `json:"upperLimit"`
LowerLimit float64 `json:"lowerLimit"`
TickInterval float64 `json:"tickInterval"`
}

在这个例子中,我们创建了一个基于计时器的计时器和无限循环。每当计时器超时时,我们将进入 select 的第二个情况并生成一个随机数 randomValue。然后我们将使用 data.NewFrame 创建一个数据帧,包含 randomValue 和当前时间,并使用 sender.SendFrame 发送它。此循环将无限期运行,直到我们关闭通道。

提示

您可以删除由框架创建的 QueryData 方法以及与之相关的所有代码,因为我们不再实现 QueryDataHandler

第4步:修改 plugin.json

Grafana 需要在第一次加载时识别新插件是一个流插件。为此,只需将 "streaming":true 属性添加到您的 src/plugin.json 中。它应该看起来像这样

{
...
"id": "grafana-streamingbackendexample-datasource",
"metrics": true,
"backend": true,
...
"streaming": true
}

步骤 5:构建插件

插件开发过程中的编码部分已完成。现在我们需要生成我们的插件包,以便能够在 Grafana 中运行。由于我们的插件既有前端也有后端部分,我们分两步进行。

  1. 构建前端
npm install
npm run build

这应该会下载所有依赖项,并在 ./dist 目录中创建前端插件文件。

  1. 编译后端代码并生成插件二进制文件。为此,您应该已安装 mage,然后只需运行
mage -v

此命令编译 Go 代码,并在 ./dist 中生成所有 Grafana 支持平台的二进制文件。

步骤 6:运行插件

一旦插件构建完成,您可以复制 ./dist 及其内容,将其重命名为您插件的 id,将其放入 Grafana 实例的插件路径中,并进行测试。

但是,有一种更简单的方法来做这些。

  1. 只需运行
npm run server

此命令使用 docker compose 运行 Grafana 容器,并将构建的插件放在正确的位置。

  1. 要验证构建,请转到 https://127.0.0.1:3000 上的 Grafana。

步骤 7:测试插件

  1. https://127.0.0.1:3000 上的 Grafana,使用默认凭据:用户名 admin 和密码 admin。如果您没有出现登录页面,请在页面顶部点击 登录 并输入凭据。

  2. 添加数据源。由于我们在 docker compose 环境中运行,我们不需要安装它,它将直接可用。转到 连接 > 数据源,使用以下图像所示的左侧菜单

  3. 打开一个新页面,然后点击 添加数据源。Grafana 打开另一个页面,您可以在其中搜索我们刚刚创建的数据源名称。

  4. 点击数据源的卡片,然后点击下一页上的 保存 & 测试

  5. 在右上角点击 构建仪表板。在下一页上,点击一个按钮以 + 添加可视化。在对话框中,点击新添加的数据源。数据开始如下显示

  6. 为了更好地可视化数据,将可视化时间范围更改为例如从现在开始的 1 分钟,并应用它。

到那时,您应该开始看到实时数据。如果您想,可以更改上限和下限或刻度间隔。这将生成一个新的查询,面板将更新。您也可以添加更多查询。

Grafana streaming data source.
Grafana 流数据源。

您已成功创建了一个既有前端也有后端的前端 Grafana 流数据源插件。插件已准备好供您根据特定需求进行定制。