跳转到主要内容

构建流数据源插件

本教程将教会您如何使用 create-plugin 工具和示例源代码构建 Grafana 流数据源插件。

流数据源插件仅在您的数据源中有新数据可访问时才会刷新插件中的数据。这种方法与仪表板自动在指定间隔刷新数据的典型方法不同。流式传输的主要优势是消除了每次数据需要更新时重新运行查询的开销。

本教程展示了如何创建具有前端和后端的数据源后端插件。完成本教程后,您将构建出一个插件,该插件在后端生成随机数,作为可视化返回到前端。

示例截图和代码

以下图片显示了使用此数据源的仪表板

Grafana streaming data source.
Grafana 流数据源。

我们将一起构建的代码是 使用 WebSocket 的流数据源 示例的简化版本。请注意,我们不使用 WebSocket,但在跟随本教程时,您可以将此代码作为参考。

步骤 1:搭建插件

您可以从头开始创建所有插件代码,只要您遵循 Grafana 插件规范并实现所有必要的接口。然而,创建插件最简单的方法是使用我们提供的 @grafana/create-plugin 工具。

  1. 访问您想创建插件目录的位置,并运行
npx @grafana/create-plugin@latest
注意

有关完整的前提条件和设置开发环境的建议,请参阅开始使用

创建插件将提示您有关插件名称、类型、组织以及许多其他选项的问题。

  1. 对于本教程,请输入数据源作为插件类型,并指定它有一个后端部分。

工具将创建一个包含运行数据源后端插件所需的所有代码和依赖项的基本骨架。如果您编译代码,您将拥有一个非常简单的后端插件。然而,这个生成的代码还不是流式插件,我们需要进行一些修改。

步骤 2:设置前端']}}

由于我们想要将一些参数传递给插件的后端部分,我们首先修改/src/components/QueryEditor.tsx。该组件定义了用户可以为查询提供的输入。我们将添加三个数字输入,用于生成我们的数据

  • tickInterval - 生成数据的间隔
  • upperLimit - 随机数的上限
  • lowerLimit - 随机数的下限

为此,请按照以下步骤操作。

  1. /src/components/QueryEditor.tsx中的QueryEditor函数替换为以下代码
export function QueryEditor({ query, onChange, onRunQuery }: Props) {
const onLowerLimitChange = (event: ChangeEvent<HTMLInputElement>) => {
onChange({ ...query, lowerLimit: event.target.valueAsNumber });
};

const onUpperLimitChange = (event: ChangeEvent<HTMLInputElement>) => {
onChange({ ...query, upperLimit: event.target.valueAsNumber });
};

const onTickIntervalChange = (event: ChangeEvent<HTMLInputElement>) => {
onChange({ ...query, tickInterval: event.target.valueAsNumber });
};

const { upperLimit, lowerLimit, tickInterval } = query;

return (
<>
<InlineField label="Lower Limit" labelWidth={16} tooltip="Random numbers lower limit">
<Input onChange={onLowerLimitChange} onBlur={onRunQuery} value={lowerLimit || ''} type="number" />
</InlineField>
<InlineField label="Upper Limit" labelWidth={16} tooltip="Random numbers upper limit">
<Input onChange={onUpperLimitChange} onBlur={onRunQuery} value={upperLimit || ''} type="number" />
</InlineField>
<InlineField label="Tick interval" labelWidth={16} tooltip="Server tick interval">
<Input onChange={onTickIntervalChange} onBlur={onRunQuery} value={tickInterval || ''} type="number" />
</InlineField>
<>
);
}
  1. src/datasource.ts文件中,通过流通道指定一个查询。该文件定义了插件前端部分执行的查询。向DataSource类添加以下方法
  query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const observables = request.targets.map((query, index) => {

return getGrafanaLiveSrv().getDataStream({
addr: {
scope: LiveChannelScope.DataSource,
namespace: this.uid,
path: `my-ws/custom-${query.lowerLimit}-${query.upperLimit}-${query.tickInterval}`, // this will allow each new query to create a new connection
data: {
...query,
},
},
});
});

return merge(...observables);
}

getGrafanaLiveSrv()的调用返回一个Grafana后端的引用,并且getDataStream的调用创建了一个流。如代码片段所示,我们需要提供一些数据来创建流

  • scope - 定义通道的使用和控制方式(指定数据源
  • namespace - 我们特定数据源的唯一标识符
  • path - 可区分由同一数据源创建的通道的部分。在本例中,我们希望为每个查询创建不同的通道,因此我们将查询参数作为路径的一部分。
  • data - 用于在前端和后端之间交换信息

值得一提的是,path可以用于与后端部分交换数据,但是对于复杂的格式,请使用data属性。

注意

流由参数组合唯一标识,其格式为:${scope}/${namespace}/${path}。这仅是实现细节,您可能不需要了解这一点,除非用于调试。

步骤 3:设置插件后端']}}

现在我们需要向后端添加必要的代码。要这样做,我们更改pkg/plugin/datasource.go,这是定义处理前端创建的查询的后端部分的地方。在我们的情况下,由于我们想要处理流,我们需要实现backend.StreamHandler

  1. 将以下部分添加到替换的var
var (
_ backend.CheckHealthHandler = (*Datasource)(nil)
_ instancemgmt.InstanceDisposer = (*Datasource)(nil)
_ backend.StreamHandler = (*Datasource)(nil)
)

这意味着我们的DataSource将实现backend.CheckHealthHandlerinstancemgmt.InstanceDisposerbackend.StreamHandler。前两个所需的方法已经在支架中提供,因此我们只需要实现必要的backend.StreamHandler方法。

  1. 让我们先添加SubscribeStream方法
func (d *Datasource) SubscribeStream(context.Context, *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
}, nil
}

以下代码将在用户尝试订阅频道时被调用。在这里可以实现权限检查,但在这个例子中我们只想用户在每次尝试中都成功连接;因此,我们只返回后端:SubscribeStreamStatusOK

  1. 实现PublishStream方法
func (d *Datasource) PublishStream(context.Context, *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
return &backend.PublishStreamResponse{
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
}

这将在用户尝试向频道发布时被调用。由于我们不希望在第一个连接创建后接收来自前端的任何数据,因此我们只需要返回backend.PublishStreamStatusPermissionDenied

  1. 为了避免随机错误,将默认提供的CheckHealth方法更改为以下代码
  func (d *Datasource) CheckHealth(_ context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
return &backend.CheckHealthResult{
Status: backend.HealthStatusOk,
Message: "Data source is working",
}, nil
}

请注意,此代码与流插件本身无关。它只是为了防止不更改默认设置而导致的随机错误。

  1. 最后,实现RunStream方法
func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
q := Query{}
json.Unmarshal(req.Data, &q)

s := rand.NewSource(time.Now().UnixNano())
r := rand.New(s)

ticker := time.NewTicker(time.Duration(q.TickInterval) * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
// we generate a random value using the intervals provided by the frontend
randomValue := r.Float64()*(q.UpperLimit-q.LowerLimit) + q.LowerLimit

err := sender.SendFrame(
data.NewFrame(
"response",
data.NewField("time", nil, []time.Time{time.Now()}),
data.NewField("value", nil, []float64{randomValue})),
data.IncludeAll,
)

if err != nil {
Logger.Error("Failed send frame", "error", err)
}
}
}
}

一旦实现,此方法将针对每个连接调用一次。在这里,我们可以解析前端发送的数据,以便Grafana可以持续发送数据。在这个特定的例子中,我们将使用前端发送的数据来定义发送框架的频率和随机数的生成范围。

因此,方法的第一部分解析数据,查询被定义为struct

  1. 将以下代码添加到./pkg/plugin/query.go
  type Query struct {
UpperLimit float64 `json:"upperLimit"`
LowerLimit float64 `json:"lowerLimit"`
TickInterval float64 `json:"tickInterval"`
}

在这个例子中,我们基于计时器创建了一个计时器和无限循环。当计时器超时时,我们将进入select的第二种情况,并生成一个随机数randomValue。之后,我们将使用data.NewFrame创建一个数据帧,其中包含randomValue和当前时间,并使用sender.SendFrame发送。这个循环将无限期运行,直到我们关闭通道。

提示

您可以移除由支架创建的QueryData方法以及与其相关的所有代码,因为我们不再实现QueryDataHandler

第4步:修改plugin.json

Grafana需要在第一次加载时识别出新插件是一个流插件。要实现这一点,只需将"streaming":true属性添加到您的src/plugin.json中。它应该看起来像这样

{
...
"id": "grafana-streamingbackendexample-datasource",
"metrics": true,
"backend": true,
...
"streaming": true
}

第5步:构建插件

插件开发过程的编码部分已完成。现在我们需要生成插件包,以便能够在Grafana中运行它。由于我们的插件既有前端部分也有后端部分,所以我们分两步来做。

  1. 构建前端
npm install
npm run build

这将下载所有依赖项,并在./dist目录中创建前端插件文件。

  1. 编译后端代码并生成插件二进制文件。为此,您应该已经安装了mage,然后只需运行
mage -v

此命令编译Go代码并为所有Grafana支持的平台生成二进制文件,存放在./dist目录。

步骤6:运行插件

插件构建完成后,您可以复制./dist目录及其内容,将其重命名为你的插件id,并将其放在你的Grafana实例的插件路径中,随后进行测试。

然而,有一个更简单的方法来完成这一切。

  1. 只需运行
npm run server

该命令通过docker compose运行Grafana容器,并将构建好的插件放在正确的位置。

  1. 为了验证构建结果,请访问https://localhost:3000上的Grafana。

步骤7:测试插件

  1. https://localhost:3000的Grafana,使用默认凭据:用户名admin和密码admin。如果你没有被引导到登录页面,请点击页面顶部的登录并输入凭据。

  2. 添加数据源。由于我们运行在docker compose环境中,我们不需要安装它,它将直接可用。请前往连接 > 数据源,使用以下图像所示的左侧菜单

  3. 新页面打开后,点击添加数据源。Grafana将打开另一个页面,在那里您可以查找我们刚刚创建的数据源名称。

  4. 点击数据源卡片,然后在下一页上点击保存 & 测试

  5. 点击右上角的构建仪表板。在下一页上,点击按钮以+ 添加可视化。在对话框中,点击新添加的数据源。数据开始如下显示

  6. 为了更好地可视化数据,请将可视化时间范围更改为从现在开始的1分钟,并应用。

此时,你应该开始看到实时数据。如果您想改变上限、下限或刻度间隔,您可以这样做。这将生成新的查询,并更新面板。如果您想,还可以添加更多查询。

Grafana streaming data source.
Grafana 流数据源。

您已成功创建一个具有前端和后端的Grafana流数据源插件。该插件已准备好供您自定义以满足特定需求。