otelcol.exporter.kafka
otelcol.exporter.kafka
从其他 otelcol
组件接收日志、指标和跟踪遥测数据并将其发送到 Kafka。
使用 otelcol.exporter.kafka
时,与 otelcol.processor.batch
一起使用很重要,以确保 otelcol.exporter.kafka
不会因为向 Kafka 发送大量小有效负载而变慢。
注意:
当需要时,otelcol.exporter.kafka
是上游 OpenTelemetryCollectorkafka
导出器从otelcol-contrib
分发的一个包装器。如果有必要,错误报告或特性请求将被重定向到上游存储库。
可以通过给它们不同的标签来指定多个 otelcol.exporter.kafka
组件。
用法
otelcol.exporter.kafka "LABEL" {
protocol_version = "PROTOCOL_VERSION"
}
参数
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
protocol_version | 字符串 | 要使用的 Kafka 协议版本。 | 是 | |
brokers | 列表(字符串) | 要连接到的 Kafka 代理。 | ["localhost:9092"] | 否 |
topic | 字符串 | 要发送到的 Kafka 主题。 | 见下文 | 否 |
topic_from_attribute | 字符串 | 用作消息主题的值应使用的资源属性。 | "" | 否 |
encoding | 字符串 | 从 Kafka 读取的有效负载的编码。 | "otlp_proto" | 否 |
client_id | 字符串 | 使用消费者客户端 ID。该 ID 将用于所有生产请求。 | "sarama" | 否 |
timeout | 持续时间 | 每次尝试将数据发送到后端的超时时间。 | "5s" | 否 |
resolve_canonical_bootstrap_servers_only | 布尔值 | 在启动时是否解析然后再反向查找代理 IP。 | "false" | 否 |
partition_traces_by_id | 布尔值 | 是否在发送到 Kafka 的跟踪消息中将跟踪 ID 包含为消息键。 | "false" | 否 |
partition_metrics_by_resource_attributes | 布尔值 | 是否在发送到 Kafka 的指标消息中将排序后的资源属性的哈希包含为消息分区键。 | "false" | 否 |
如果未设置 topic
,则对不同的遥测信号将使用不同的主题
- 指标将被发送到
otlp_metrics
主题。 - 跟踪将被发送到
otlp_spans
主题。 - 日志将被发送到
otlp_logs
主题。
如果设置了主题,则将为所有遥测信号 - 指标、日志和跟踪使用相同的话题。
如果设置了 topic_from_attribute
,则它将覆盖 topic
。
encoding
参数指定如何编码发送到 Kafka 的消息。《encoding》必须是以下字符串之一
- 适用于跟踪、日志和指标的编码
"otlp_proto"
:将消息编码为 OTLP protobuf。"otlp_json"
:将消息编码为 OTLP JSON。
- 仅适用于跟踪的编码
"jaeger_proto"
:将有效负载序列化到单个 Jaeger protoSpan
,并按 TraceID 键入。"jaeger_json"
:使用jsonpb
将有效负载序列化到单个 Jaeger JSON Span,并按 TraceID 键入。"zipkin_proto"
:将有效负载序列化到 Zipkin v2 proto Span。"zipkin_json"
:将有效负载序列化到 Zipkin v2 JSON Span。
- 仅适用于日志的编码
"raw"
:如果日志记录正文是字节数组,则原样发送。否则,将其序列化到 JSON。忽略资源和记录属性。
"partition_traces_by_id"
对 Jaeger 编码导出器没有影响,因为 Jaeger 导出器默认将跟踪 ID 包含在消息键中。
块
以下块在otelcol.exporter.kafka
的定义内部得到支持
层次结构 | 块 | 描述 | 必须 |
---|---|---|---|
认证 | 认证 | 配置连接到 Kafka 代理的认证。 | 否 |
认证 > 明文 | 明文 | 使用明文对 Kafka 代理进行认证。 | 否 |
认证 > sasl | sasl | 使用 SASL 对 Kafka 代理进行认证。 | 否 |
认证 > sasl > aws_msk | aws_msk | 在 AWS_MSK_IAM 的情况下使用额外的 SASL 参数。 | 否 |
认证 > tls | tls | 配置连接到 Kafka 代理的 TLS。 | 否 |
认证 > kerberos | kerberos | 使用 Kerberos 对 Kafka 代理进行认证。 | 否 |
元数据 | 元数据 | 配置如何从 Kafka 代理检索元数据。 | 否 |
元数据 > 重试 | 重试 | 配置如何配置元数据检索的重试机制。 | 否 |
重试_on_failure | 重试_on_failure | 配置失败请求数据的重试机制。 | 否 |
队列 | 队列 | 配置发送前数据的批处理。 | 否 |
生产者 | 生产者 | Kafka 生产者配置, | 否 |
调试度量 | 调试度量 | 配置生成的 metrics,以便监视组件的状态。 | 否 |
>符号表示更深层的嵌套。例如,
authentication > tls
指定了在 authentication
块内定义的 tls
块。
认证块
认证
块包含连接到 Kafka 代理时要使用的不同认证机制的定义。它不支持任何参数,完全通过内部块进行配置。
明文块
明文
块配置对 Kafka 代理的纯文本认证。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
用户名 | 字符串 | 用于纯文本认证的用户名。 | 是 | |
密码 | secret | 用于纯文本认证的密码。 | 是 |
机制块
机制
块配置对 Kafka 代理的 SASL 认证。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
用户名 | 字符串 | 用于 SASL 认证的用户名。 | 是 | |
密码 | secret | 用于 SASL 认证的密码。 | 是 | |
机制 | 字符串 | 在认证时使用的 SASL 机制。 | 是 | |
版本 | 号码 | 在认证时使用的 SASL 协议的版本。 | 0 | 否 |
可以设置 机制
参数为以下字符串之一
"PLAIN"
"AWS_MSK_IAM"
"SCRAM-SHA-256"
"SCRAM-SHA-512"
当 机制
设置为 "AWS_MSK_IAM"
时,必须也提供 aws_msk
子块。
可以设置 版本
参数为 0
或 1
。
aws_msk 块
aws_msk
块配置在 AWS_MSK_IAM 机制下 SASL 认证时要使用的额外参数。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
区域 | 字符串 | 基于 MSK 群集的 AWS 区域。 | 是 | |
broker_addr | 字符串 | 用于认证的 MSK 地址。 | 是 |
tl 块
tl
块配置用于连接到 Kafka 代理的 TLS 设置。如果没有提供 tls
块,则不会使用 TLS 进行通信。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
ca_file | 字符串 | CA 文件的路径。 | 否 | |
ca_pem | 字符串 | CA PEM 编码的文本,用于验证服务器。 | 否 | |
cert_file | 字符串 | TLS 证书的路径。 | 否 | |
cert_pem | 字符串 | 用于客户端认证的证书 PEM 编码的文本。 | 否 | |
insecure_skip_verify | 布尔值 | 忽略不安全的服务器 TLS 证书。 | 否 | |
include_system_ca_certs_pool | 布尔值 | 是否与证书颁发机构一起加载系统证书颁发机构池。 | false | 否 |
不安全 | 布尔值 | 连接到配置的服务器时禁用TLS。 | 否 | |
密钥文件 | 字符串 | TLS证书密钥的路径。 | 否 | |
密钥PEM | secret | 用于客户端认证的密钥PEM编码文本。 | 否 | |
最大版本号 | 字符串 | 接受连接的最大TLS版本。 | "TLS 1.3" | 否 |
最小版本号 | 字符串 | 接受连接的最小TLS版本。 | "TLS 1.2" | 否 |
加密套件 | 列表(字符串) | TLS传输可以使用的一组TLS加密套件。 | [] | 否 |
重新加载间隔 | 持续时间 | 证书重新加载的持续时间。 | "0s" | 否 |
服务器名称 | 字符串 | 当设置时,验证服务器证书的主机名。 | 否 |
如果服务器不支持TLS,必须将insecure
参数设置为true
。
要禁用连接到服务器的tls
,请将insecure
参数设置为true
。
如果重新加载间隔
设置为"0s"
,则证书永远不会重新加载。
以下参数对相互排斥,不能同时设置。
ca_pem
和ca_file
cert_pem
和cert_file
key_pem
和key_file
如果留空cipher_suites
,则使用一组安全默认值。有关支持的加密套件列表,请参阅Go TLS文档。
kerberos块
kerberos
块配置对Kafka代理进行Kerberos身份验证。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
服务名称 | 字符串 | Kerberos服务名称。 | 否 | |
领域 | 字符串 | Kerberos领域。 | 否 | |
使用keytab | 字符串 | 启用使用keytab而不是密码。 | 否 | |
用户名 | 字符串 | 用作认证的Kerberos用户名。 | 是 | |
密码 | secret | 用于认证的Kerberos密码。 | 否 | |
配置文件 | 字符串 | 指向Kerberos位置(例如,/etc/krb5.conf )的路径。 | 否 | |
密钥表文件 | 字符串 | 指向密钥表文件的路径(例如,/etc/security/kafka.keytab )。 | 否 | |
禁用快速协商 | 布尔值 | 禁用PA-FX-FAST协商。 | false | 否 |
当use_keytab
设置为false
时,需要password
参数。当use_keytab
设置为true
时,使用由keytab_file
参数指定的文件进行认证。最多只能提供一个password
或keytab_file
参数。
disable_fast_negotiation
对于不支持PA-FX-FAST(预认证框架 - 快速)协商的Kerberos实现很有用。
元数据块
metadata
块配置如何从Kafka代理检索和存储元数据。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
包含所有主题 | 布尔值 | 当为true时,维护所有主题的元数据。 | true | 否 |
如果include_all_topics
参数设置为true
,则维护所有主题的完整元数据集而不是迄今为止所需的最低元数据集。包含完整的元数据集对用户来说更方便,但如果您有许多主题和分区,可能会消耗大量内存。
如果Kafka代理在与Alloy组件同时启动时检索元数据可能会失败。可以提供retry
子块以自定义重试行为。
重试块
retry
块配置在检索失败时如何重试检索元数据。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
最大重试次数 | 号码 | 重试检索元数据多少次。 | 3 | 否 |
退避时间 | 持续时间 | 重试之间的等待时间。 | "250ms" | 否 |
重试失败块
retry_on_failure
块配置如何重试失败的Kafka请求。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
启用 | 布尔值 | 启用重试失败的请求。 | true | 否 |
初始间隔 | 持续时间 | 在重试失败的请求之前等待的初始时间。 | "5s" | 否 |
最大经过时间 | 持续时间 | 在丢弃失败的批次之前等待的最大时间。 | "5m" | 否 |
最大间隔 | 持续时间 | 重试之间的最大时间。 | "30s" | 否 |
乘数 | 号码 | 扩展重试之前等待时间的基本因子。 | 1.5 | 否 |
随机化系数 | 号码 | 用于随机化重试前等待时间的因子。 | 0.5 | 否 |
当 enabled
为 true
时,失败的批次会在指定的时间间隔后重试。《initial_interval》参数指定了第一次重试尝试前等待的时间。如果请求持续失败,重试之间的等待时间会通过《multiplier》参数指定的系数增加,该系数必须大于 1.0
。《max_interval》参数指定了重试之间最长等待时间。
randomization_factor
参数在为重试 Alloy 实例之间添加抖动时很有用。如果 randomization_factor
大于 0
,则重试前的等待时间乘以一个位于 [ I - randomization_factor * I, I + randomization_factor * I]
范围内的随机因子,其中 I
是当前间隔。
如果批量数据在指定的时间 max_elapsed_time
内未被成功发送,则会被丢弃。如果 max_elapsed_time
设置为 "0s"
,则失败请求将无限期重试直到成功。
队列阻塞
《queue》阻塞配置了在将数据发送到 gRPC 服务器之前,数据要经过的内存缓冲区。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
启用 | 布尔值 | 启用在向客户端发送数据之前的内存缓冲。 | true | 否 |
num_consumers | 号码 | 并行发送写入队列的批次的读取器数量。 | 10 | 否 |
queue_size | 号码 | 队列中在同一时间内允许的最大未写入批次数量。 | 1000 | 否 |
当 enabled
为 true
时,首先将数据写入内存缓冲区,然后再将其发送到配置的服务器。只要未发送批次的数量不超过配置的《queue_size》,发送到组件的 input
导出字段的批次就会添加到缓冲区。
queue_size
决定了可容忍的端点中断时间。假设每秒钟 100 请求,默认队列大小 1000
提供了大约 10 秒的中断容忍时间。要计算 queue_size
的正确值,请将每秒平均出去请求数量乘以中断容忍的时间(秒)。过大的值可能导致内存不足(OOM)杀死。
《num_consumers》参数控制了有多少个读取器从缓冲区中读取并发送数据。较大的 《num_consumers》 值可以使数据更快地发送,但以增加网络流量为代价。
生产者阻塞
《producer》阻塞配置了在获取元数据失败时如何重试获取。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
max_message_bytes | 号码 | 消息允许的最大字节数。 | 1000000 | 否 |
required_acks | 号码 | 控制何时认为消息已传输。 | 1 | 否 |
compression | 字符串 | 重试之间的等待时间。 | "none" | 否 |
flush_max_messages | 号码 | 重试之间的等待时间。 | 0 | 否 |
有关 required_acks
的更多信息,请参阅 sarama 文档。
《compression》可以设置为 none
、gzip
、snappy
、lz4
或 zstd
。有关更多信息,请参阅 Sarama 文档。
debug_metrics 阻塞
《debug_metrics》阻塞配置了此组件生成以监控其状态的指标。
支持以下参数:
名称 | 类型 | 描述 | 默认值 | 必须 |
---|---|---|---|---|
disable_high_cardinality_metrics | 布尔值 | 是否禁用某些高基数指标。 | true | 否 |
level | 字符串 | 控制包装收集器发出的指标的详细程度。 | "detailed" | 否 |
disable_high_cardinality_metrics
是 Grafana Alloy 对 OpenTelemetry Collector 中的 telemetry.disableHighCardinalityMetrics
功能门的等效。它删除可能导致高基数指标的特性。例如,从 HTTP 和 gRPC 连接的指标中删除具有 IP 地址和端口的属性。
注意:
如果配置了disable_high_cardinality_metrics
,则它仅适用于otelcol.exporter.*
和otelcol.receiver.*
组件。
level
是 Alloy 对 OpenTelemetry Collector 中的 telemetry.metrics.level
功能门的等效。可能的值是 "none"
、"basic"
、"normal"
和 "detailed"
。
导出字段
以下字段被导出,并可由其他组件引用
名称 | 类型 | 描述 |
---|---|---|
输入 | otelcol.Consumer | 其他组件可以使用该值向其发送遥测数据。 |
input
接受来自任何遥测信号(指标、日志或跟踪)的otelcol.Consumer
数据。
组件健康状态
只有当给定无效配置时,otelcol.exporter.kafka
才会报告为不健康。
调试信息
otelcol.exporter.kafka
不公开任何特定组件的调试信息。
示例
此示例通过批处理程序将遥测数据转发了,最终将其发送到Kafka。
otelcol.receiver.otlp "default" {
http {}
grpc {}
output {
metrics = [otelcol.processor.batch.default.input]
logs = [otelcol.processor.batch.default.input]
traces = [otelcol.processor.batch.default.input]
}
}
otelcol.processor.batch "default" {
output {
metrics = [otelcol.exporter.kafka.default.input]
logs = [otelcol.exporter.kafka.default.input]
traces = [otelcol.exporter.kafka.default.input]
}
}
otelcol.exporter.kafka "default" {
brokers = ["localhost:9092"]
protocol_version = "2.0.0"
}
兼容的组件
otelcol.exporter.kafka
的可导出内容可以由以下组件消费
- 消费OpenTelemetry
otelcol.Consumer
的组件
注意:
连接某些组件可能不合理,或者组件可能需要进一步配置才能正确连接。有关更多详细信息,请参阅相关文档。