菜单
开源

otelcol.exporter.kafka

otelcol.exporter.kafka 从其他 otelcol 组件接收日志、指标和跟踪遥测数据并将其发送到 Kafka。

使用 otelcol.exporter.kafka 时,与 otelcol.processor.batch 一起使用很重要,以确保 otelcol.exporter.kafka 不会因为向 Kafka 发送大量小有效负载而变慢。

注意:

当需要时,otelcol.exporter.kafka 是上游 OpenTelemetryCollector kafka 导出器从 otelcol-contrib 分发的一个包装器。如果有必要,错误报告或特性请求将被重定向到上游存储库。

可以通过给它们不同的标签来指定多个 otelcol.exporter.kafka 组件。

用法

alloy
otelcol.exporter.kafka "LABEL" {
  protocol_version = "PROTOCOL_VERSION"
}

参数

支持以下参数:

名称类型描述默认值必须
protocol_version字符串要使用的 Kafka 协议版本。
brokers列表(字符串)要连接到的 Kafka 代理。["localhost:9092"]
topic字符串要发送到的 Kafka 主题。见下文
topic_from_attribute字符串用作消息主题的值应使用的资源属性。""
encoding字符串从 Kafka 读取的有效负载的编码。"otlp_proto"
client_id字符串使用消费者客户端 ID。该 ID 将用于所有生产请求。"sarama"
timeout持续时间每次尝试将数据发送到后端的超时时间。"5s"
resolve_canonical_bootstrap_servers_only布尔值在启动时是否解析然后再反向查找代理 IP。"false"
partition_traces_by_id布尔值是否在发送到 Kafka 的跟踪消息中将跟踪 ID 包含为消息键。"false"
partition_metrics_by_resource_attributes布尔值是否在发送到 Kafka 的指标消息中将排序后的资源属性的哈希包含为消息分区键。"false"

如果未设置 topic,则对不同的遥测信号将使用不同的主题

  • 指标将被发送到 otlp_metrics 主题。
  • 跟踪将被发送到 otlp_spans 主题。
  • 日志将被发送到 otlp_logs 主题。

如果设置了主题,则将为所有遥测信号 - 指标、日志和跟踪使用相同的话题。

如果设置了 topic_from_attribute,则它将覆盖 topic

encoding 参数指定如何编码发送到 Kafka 的消息。《encoding》必须是以下字符串之一

  • 适用于跟踪、日志和指标的编码
    • "otlp_proto":将消息编码为 OTLP protobuf。
    • "otlp_json":将消息编码为 OTLP JSON。
  • 仅适用于跟踪的编码
    • "jaeger_proto":将有效负载序列化到单个 Jaeger proto Span,并按 TraceID 键入。
    • "jaeger_json":使用 jsonpb 将有效负载序列化到单个 Jaeger JSON Span,并按 TraceID 键入。
    • "zipkin_proto":将有效负载序列化到 Zipkin v2 proto Span。
    • "zipkin_json":将有效负载序列化到 Zipkin v2 JSON Span。
  • 仅适用于日志的编码
    • "raw":如果日志记录正文是字节数组,则原样发送。否则,将其序列化到 JSON。忽略资源和记录属性。

"partition_traces_by_id"对 Jaeger 编码导出器没有影响,因为 Jaeger 导出器默认将跟踪 ID 包含在消息键中。

以下块在otelcol.exporter.kafka的定义内部得到支持

层次结构描述必须
认证认证配置连接到 Kafka 代理的认证。
认证 > 明文明文使用明文对 Kafka 代理进行认证。
认证 > saslsasl使用 SASL 对 Kafka 代理进行认证。
认证 > sasl > aws_mskaws_msk在 AWS_MSK_IAM 的情况下使用额外的 SASL 参数。
认证 > tlstls配置连接到 Kafka 代理的 TLS。
认证 > kerberoskerberos使用 Kerberos 对 Kafka 代理进行认证。
元数据元数据配置如何从 Kafka 代理检索元数据。
元数据 > 重试重试配置如何配置元数据检索的重试机制。
重试_on_failure重试_on_failure配置失败请求数据的重试机制。
队列队列配置发送前数据的批处理。
生产者生产者Kafka 生产者配置,
调试度量调试度量配置生成的 metrics,以便监视组件的状态。

>符号表示更深层的嵌套。例如,authentication > tls 指定了在 authentication 块内定义的 tls 块。

认证块

认证 块包含连接到 Kafka 代理时要使用的不同认证机制的定义。它不支持任何参数,完全通过内部块进行配置。

明文块

明文 块配置对 Kafka 代理的纯文本认证。

支持以下参数:

名称类型描述默认值必须
用户名字符串用于纯文本认证的用户名。
密码secret用于纯文本认证的密码。

机制块

机制 块配置对 Kafka 代理的 SASL 认证。

支持以下参数:

名称类型描述默认值必须
用户名字符串用于 SASL 认证的用户名。
密码secret用于 SASL 认证的密码。
机制字符串在认证时使用的 SASL 机制。
版本号码在认证时使用的 SASL 协议的版本。0

可以设置 机制 参数为以下字符串之一

  • "PLAIN"
  • "AWS_MSK_IAM"
  • "SCRAM-SHA-256"
  • "SCRAM-SHA-512"

机制 设置为 "AWS_MSK_IAM" 时,必须也提供 aws_msk 子块。

可以设置 版本 参数为 01

aws_msk 块

aws_msk 块配置在 AWS_MSK_IAM 机制下 SASL 认证时要使用的额外参数。

支持以下参数:

名称类型描述默认值必须
区域字符串基于 MSK 群集的 AWS 区域。
broker_addr字符串用于认证的 MSK 地址。

tl 块

tl 块配置用于连接到 Kafka 代理的 TLS 设置。如果没有提供 tls 块,则不会使用 TLS 进行通信。

支持以下参数:

名称类型描述默认值必须
ca_file字符串CA 文件的路径。
ca_pem字符串CA PEM 编码的文本,用于验证服务器。
cert_file字符串TLS 证书的路径。
cert_pem字符串用于客户端认证的证书 PEM 编码的文本。
insecure_skip_verify布尔值忽略不安全的服务器 TLS 证书。
include_system_ca_certs_pool布尔值是否与证书颁发机构一起加载系统证书颁发机构池。false
不安全布尔值连接到配置的服务器时禁用TLS。
密钥文件字符串TLS证书密钥的路径。
密钥PEMsecret用于客户端认证的密钥PEM编码文本。
最大版本号字符串接受连接的最大TLS版本。"TLS 1.3"
最小版本号字符串接受连接的最小TLS版本。"TLS 1.2"
加密套件列表(字符串)TLS传输可以使用的一组TLS加密套件。[]
重新加载间隔持续时间证书重新加载的持续时间。"0s"
服务器名称字符串当设置时,验证服务器证书的主机名。

如果服务器不支持TLS,必须将insecure参数设置为true

要禁用连接到服务器的tls,请将insecure参数设置为true

如果重新加载间隔设置为"0s",则证书永远不会重新加载。

以下参数对相互排斥,不能同时设置。

  • ca_pemca_file
  • cert_pemcert_file
  • key_pemkey_file

如果留空cipher_suites,则使用一组安全默认值。有关支持的加密套件列表,请参阅Go TLS文档

kerberos块

kerberos块配置对Kafka代理进行Kerberos身份验证。

支持以下参数:

名称类型描述默认值必须
服务名称字符串Kerberos服务名称。
领域字符串Kerberos领域。
使用keytab字符串启用使用keytab而不是密码。
用户名字符串用作认证的Kerberos用户名。
密码secret用于认证的Kerberos密码。
配置文件字符串指向Kerberos位置(例如,/etc/krb5.conf)的路径。
密钥表文件字符串指向密钥表文件的路径(例如,/etc/security/kafka.keytab)。
禁用快速协商布尔值禁用PA-FX-FAST协商。false

use_keytab设置为false时,需要password参数。当use_keytab设置为true时,使用由keytab_file参数指定的文件进行认证。最多只能提供一个passwordkeytab_file参数。

disable_fast_negotiation对于不支持PA-FX-FAST(预认证框架 - 快速)协商的Kerberos实现很有用。

元数据块

metadata块配置如何从Kafka代理检索和存储元数据。

支持以下参数:

名称类型描述默认值必须
包含所有主题布尔值当为true时,维护所有主题的元数据。true

如果include_all_topics参数设置为true,则维护所有主题的完整元数据集而不是迄今为止所需的最低元数据集。包含完整的元数据集对用户来说更方便,但如果您有许多主题和分区,可能会消耗大量内存。

如果Kafka代理在与Alloy组件同时启动时检索元数据可能会失败。可以提供retry子块以自定义重试行为。

重试块

retry块配置在检索失败时如何重试检索元数据。

支持以下参数:

名称类型描述默认值必须
最大重试次数号码重试检索元数据多少次。3
退避时间持续时间重试之间的等待时间。"250ms"

重试失败块

retry_on_failure块配置如何重试失败的Kafka请求。

支持以下参数:

名称类型描述默认值必须
启用布尔值启用重试失败的请求。true
初始间隔持续时间在重试失败的请求之前等待的初始时间。"5s"
最大经过时间持续时间在丢弃失败的批次之前等待的最大时间。"5m"
最大间隔持续时间重试之间的最大时间。"30s"
乘数号码扩展重试之前等待时间的基本因子。1.5
随机化系数号码用于随机化重试前等待时间的因子。0.5

enabledtrue 时,失败的批次会在指定的时间间隔后重试。《initial_interval》参数指定了第一次重试尝试前等待的时间。如果请求持续失败,重试之间的等待时间会通过《multiplier》参数指定的系数增加,该系数必须大于 1.0。《max_interval》参数指定了重试之间最长等待时间。

randomization_factor 参数在为重试 Alloy 实例之间添加抖动时很有用。如果 randomization_factor 大于 0,则重试前的等待时间乘以一个位于 [ I - randomization_factor * I, I + randomization_factor * I] 范围内的随机因子,其中 I 是当前间隔。

如果批量数据在指定的时间 max_elapsed_time 内未被成功发送,则会被丢弃。如果 max_elapsed_time 设置为 "0s",则失败请求将无限期重试直到成功。

队列阻塞

《queue》阻塞配置了在将数据发送到 gRPC 服务器之前,数据要经过的内存缓冲区。

支持以下参数:

名称类型描述默认值必须
启用布尔值启用在向客户端发送数据之前的内存缓冲。true
num_consumers号码并行发送写入队列的批次的读取器数量。10
queue_size号码队列中在同一时间内允许的最大未写入批次数量。1000

enabledtrue 时,首先将数据写入内存缓冲区,然后再将其发送到配置的服务器。只要未发送批次的数量不超过配置的《queue_size》,发送到组件的 input 导出字段的批次就会添加到缓冲区。

queue_size 决定了可容忍的端点中断时间。假设每秒钟 100 请求,默认队列大小 1000 提供了大约 10 秒的中断容忍时间。要计算 queue_size 的正确值,请将每秒平均出去请求数量乘以中断容忍的时间(秒)。过大的值可能导致内存不足(OOM)杀死。

《num_consumers》参数控制了有多少个读取器从缓冲区中读取并发送数据。较大的 《num_consumers》 值可以使数据更快地发送,但以增加网络流量为代价。

生产者阻塞

《producer》阻塞配置了在获取元数据失败时如何重试获取。

支持以下参数:

名称类型描述默认值必须
max_message_bytes号码消息允许的最大字节数。1000000
required_acks号码控制何时认为消息已传输。1
compression字符串重试之间的等待时间。"none"
flush_max_messages号码重试之间的等待时间。0

有关 required_acks 的更多信息,请参阅 sarama 文档

《compression》可以设置为 nonegzipsnappylz4zstd。有关更多信息,请参阅 Sarama 文档

debug_metrics 阻塞

《debug_metrics》阻塞配置了此组件生成以监控其状态的指标。

支持以下参数:

名称类型描述默认值必须
disable_high_cardinality_metrics布尔值是否禁用某些高基数指标。true
level字符串控制包装收集器发出的指标的详细程度。"detailed"

disable_high_cardinality_metrics 是 Grafana Alloy 对 OpenTelemetry Collector 中的 telemetry.disableHighCardinalityMetrics 功能门的等效。它删除可能导致高基数指标的特性。例如,从 HTTP 和 gRPC 连接的指标中删除具有 IP 地址和端口的属性。

注意:

如果配置了 disable_high_cardinality_metrics,则它仅适用于 otelcol.exporter.*otelcol.receiver.* 组件。

level 是 Alloy 对 OpenTelemetry Collector 中的 telemetry.metrics.level 功能门的等效。可能的值是 "none""basic""normal""detailed"

导出字段

以下字段被导出,并可由其他组件引用

名称类型描述
输入otelcol.Consumer其他组件可以使用该值向其发送遥测数据。

input接受来自任何遥测信号(指标、日志或跟踪)的otelcol.Consumer数据。

组件健康状态

只有当给定无效配置时,otelcol.exporter.kafka才会报告为不健康。

调试信息

otelcol.exporter.kafka不公开任何特定组件的调试信息。

示例

此示例通过批处理程序将遥测数据转发了,最终将其发送到Kafka。

alloy
otelcol.receiver.otlp "default" {
  http {}
  grpc {}

  output {
    metrics = [otelcol.processor.batch.default.input]
    logs    = [otelcol.processor.batch.default.input]
    traces  = [otelcol.processor.batch.default.input]
  }
}

otelcol.processor.batch "default" {
  output {
    metrics = [otelcol.exporter.kafka.default.input]
    logs    = [otelcol.exporter.kafka.default.input]
    traces  = [otelcol.exporter.kafka.default.input]
  }
}

otelcol.exporter.kafka "default" {
  brokers          = ["localhost:9092"]
  protocol_version = "2.0.0"
}

兼容的组件

otelcol.exporter.kafka的可导出内容可以由以下组件消费

  • 消费OpenTelemetry otelcol.Consumer的组件

注意:

连接某些组件可能不合理,或者组件可能需要进一步配置才能正确连接。有关更多详细信息,请参阅相关文档。