otelcol 接收器 Kafka
otelcol.receiver.kafka
从 Kafka 代理接收遥测数据并将其转发到其他 otelcol.*
组件。
注意:
otelcol.receiver.kafka
是在otelcol-contrib
分发中上游 OpenTelemetry Collectorkafka
接收器的包装器。如有必要,错误报告或功能请求将重定向到上游存储库。
可以通过提供不同的标签来指定多个 otelcol.receiver.kafka
组件。
用法
otelcol.receiver.kafka "LABEL" {
brokers = ["BROKER_ADDR"]
protocol_version = "PROTOCOL_VERSION"
output {
metrics = [...]
logs = [...]
traces = [...]
}
}
参数
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
brokers | array(string) | 要连接的 Kafka 代理。 | yes | |
protocol_version | string | 要使用的 Kafka 协议版本。 | yes | |
topic | string | 要从其中读取的 Kafka 主题。 | 下面将说明 | no |
encoding | string | 从 Kafka 读取的有效载荷编码。 | "otlp_proto" | no |
group_id | string | 消费消息的消费者组。 | "otel-collector" | no |
client_id | string | 要使用的消费者客户端 ID。 | "otel-collector" | no |
initial_offset | string | 如果没有先前提交偏移量,请使用初始偏移量。 | "latest" | no |
resolve_canonical_bootstrap_servers_only | bool | 是否在启动时解析并反向查找代理 IP。 | "false" | no |
如果未设置 topic
,则将为不同的遥测信号使用不同的主题
- 将从
otlp_metrics
主题接收指标。 - 将从
otlp_spans
主题接收跟踪。 - 将从
otlp_logs
主题接收日志。
如果将 topic
设置为特定值,则必须在输出块中设置与主题中存储的数据对应的信号类型。例如,如果将 topic
设置为 "my_telemetry"
,则 "my_telemetry"
主题只能包含指标、日志或跟踪之一。如果它只包含指标,则应将 otelcol.receiver.kafka
配置为仅输出指标。
encoding
参数确定如何解码从 Kafka 读取的消息。 encoding
必须是以下字符串之一
"otlp_proto"
:将消息解码为 OTLP protobuf。"jaeger_proto"
:将消息解码为单个 Jaeger protobuf span。"jaeger_json"
:将消息解码为单个 Jaeger JSON span。"zipkin_proto"
:将消息解码为Zipkin protobuf跨度列表。"zipkin_json"
:将消息解码为Zipkin JSON跨度列表。"zipkin_thrift"
:将消息解码为Zipkin Thrift跨度列表。"raw"
:将日志消息的字节数据复制到日志记录的主体中。"text"
:将日志消息解码为文本并插入到日志记录的主体中。默认情况下使用UTF-8进行解码。可以选择不同的编码,方法是用text_<编码>
。例如,text_utf-8
或text_shift_jis
。"json"
:解码JSON有效负荷并将其插入到日志记录的主体中。"azure_resource_logs"
:将有效负荷从Azure资源日志格式转换为OTLP日志。
"otlp_proto"
必须用于从Kafka读取所有遥测类型;其他编码是信号特定的。
initial_offset
必须是"latest"
或"earliest"
。
块
以下块在otelcol.receiver.kafka
的定义中受支持
层次结构 | 块 | 描述 | 必需的 |
---|---|---|---|
验证 | 验证 | 配置连接到Kafka代理的验证。 | no |
验证 > 明文 | 明文 | 使用明文对Kafka代理进行身份验证。 | no |
验证 > sasl | sasl | 使用SASL对Kafka代理进行身份验证。 | no |
验证 > sasl > aws_msk | aws_msk | 使用AWS_MSK_IAM时,添加SASL的额外参数。 | no |
验证 > tls | tls | 配置连接到Kafka代理的TLS。 | no |
验证 > kerberos | kerberos | 使用Kerberos对Kafka代理进行身份验证。 | no |
元数据 | 元数据 | 配置如何从Kafka代理检索元数据。 | no |
元数据 > 重试 | 重试 | 配置如何重试元数据检索。 | no |
auto Commit | auto Commit | 配置如何自动将更新的主题偏移量提交回Kafka代理。 | no |
消息标记 | 消息标记 | 配置何时将Kafka消息标记为已读。 | no |
header_extraction | header_extraction | 从Kafka记录中提取头信息。 | no |
debug_metrics | debug_metrics | 配置由此组件生成的指标,以监控其状态。 | no |
输出 | 输出 | 配置将收到的遥测数据发送到何处。 | yes |
>
符号表示更深层次的嵌套。例如,authentication > tls
指的是在authentication
块内部定义的tls
块。
验证块
authentication
块包含了连接到Kafka代理时使用的不同身份验证机制的定义。它不支持任何参数,并且完全通过内部块进行配置。
明文块
plaintext
块配置了对Kafka代理的明文身份验证。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
用户名 | string | 用于明文验证的用户名。 | yes | |
密码 | 机密 | 用于明文验证的密码。 | yes |
sasl块
sasl
块配置了对Kafka代理的SASL身份验证。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
用户名 | string | 用于SASL验证的用户名。 | yes | |
密码 | 机密 | 用于SASL验证的密码。 | yes | |
机制 | string | 身份验证时使用的SASL机制。 | yes | |
版本 | 数字 | 在身份验证时使用的SASL协议的版本。 | 0 | no |
机制
参数可以设置以下字符串之一
"PLAIN"
"AWS_MSK_IAM"
"SCRAM-SHA-256"
"SCRAM-SHA-512"
当机制
设置为"AWS_MSK_IAM"
时,必须还提供aws_msk
子块。
版本
参数可以设置为0
或1
。
aws_msk块
aws_msk
块配置了在使用AWS_MSK_IAM
机制时的SASL身份验证的额外参数。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
区域 | string | MSK集群所在的AWS区域。 | yes | |
代理地址 | string | 用于身份验证的MSK地址。 | yes |
tls块
tls
区域配置用于连接到Kafka代理的TLS设置。如果没有提供tls
区域,通信将不会使用TLS。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
ca_file | string | CA文件的路径。 | no | |
ca_pem | string | 用于验证服务器的CA PEM编码的文本。 | no | |
cert_file | string | TLS证书的路径。 | no | |
cert_pem | string | 用于客户端认证的证书PEM编码的文本。 | no | |
insecure_skip_verify | 布尔值 | 忽略不安全的服务器TLS证书。 | no | |
include_system_ca_certs_pool | 布尔值 | 是否在证书颁发机构旁边加载系统证书颁发机构池。 | false | no |
insecure | 布尔值 | 在连接到配置的服务器时禁用TLS。 | no | |
key_file | string | TLS证书私钥的路径。 | no | |
key_pem | 机密 | 用于客户端认证的密钥PEM编码的文本。 | no | |
max_version | string | 连接可接受的最大TLS版本。 | "TLS 1.3" | no |
min_version | string | 连接可接受的最小TLS版本。 | "TLS 1.2" | no |
cipher_suites | 列表(string) | TLS传输可以使用的一组TLS cipher suites列表。 | [] | no |
reload_interval | 持续时间 | 在证书重新加载之后的时间。 | "0s" | no |
server_name | string | 当设置时,验证服务器证书的主机名。 | no |
如果服务器不支持TLS,您必须将insecure
参数设置为true
。
要禁用连接到服务器的TLS,将insecure
参数设置为true
。
如果将reload_interval
设置为"0s"
,证书永不重新加载。
以下参数对相互排斥,不能同时设置
ca_pem
和ca_file
cert_pem
和cert_file
key_pem
和key_file
如果留空cipher_suites
,将使用一组安全默认列表。有关支持的cipher suites列表,请参阅Go TLS文档。
kerberos block
kerberos
区域配置对Kafka代理进行Kerberos身份验证。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
service_name | string | Kerberos服务名。 | no | |
realm | string | Kerberos域。 | no | |
use_keytab | string | 启用使用密钥表而不是密码。 | no | |
用户名 | string | 用于认证的Kerberos用户名。 | yes | |
密码 | 机密 | 用于认证的Kerberos密码。 | no | |
config_file | string | Kerberos配置文件路径(例如,/etc/krb5.conf )。 | no | |
keytab_file | string | 密钥表文件路径(例如,/etc/security/kafka.keytab )。 | no | |
disable_fast_negotiation | bool | 禁用PA-FX-FAST协商。 | false | no |
当use_keytab
设置为false
时,必须提供password
参数。当use_keytab
设置为true
时,将使用keytab_file
参数指向的文件进行认证。最多提供一个password
或keytab_file
。
disable_fast_negotiation
对不支持PA-FX-FAST(预认证框架-快速)协商的Kerberos实现很有用。
metadata block
metadata
区域配置如何从Kafka代理检索和存储元数据。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
include_all_topics | bool | 当为true时,维护所有主题的元数据。 | true | no |
如果将include_all_topics
参数设置为true
,将维护所有主题的完整元数据集,而不是迄今为止所需的最低元数据集。包括完整的元数据集对用户来说更方便,但如果您有大量主题和分区,则可能会消耗大量内存。
如果Kafka代理在与Alloy组件同时启动时检索元数据可能会失败。可以提供retry
子区域来自定义重试行为。
retry block
retry
区域配置在检索失败时如何重试检索元数据。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
max_retries | 数字 | 重试检索元数据多少次。 | 3 | no |
回退 | 持续时间 | 重试之间的等待时间。 | "250ms" | no |
自动提交块
autocommit
块配置了如何将更新的主题偏移量自动提交回 Kafka 协议代理。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
启用 | bool | 启用自动提交更新的主题偏移量。 | true | no |
间隔 | 持续时间 | 自动提交的频率。 | "1s" | no |
消息标记块
message_marking
块配置了何时将 Kafka 消息标记为已读。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
after_execution | bool | 在将遥测数据转发给其他组件后标记消息。 | false | no |
include_unsuccessful | bool | 是否应将失败转发标记为已读。 | false | no |
默认情况下,从 Kafka 协议代理检索到的 Kafka 消息在检索后立即被标记为已读。如果 after_execution
参数为 true,则消息在将遥测数据转发到在 输出块 中指定的组件后才会被读取。
当 after_execution
为 true 时,只有在消息成功解码且将数据转发的组件没有返回错误时,消息才会被标记为已读。如果 include_unsuccessful
参数为 true,即使解码或转发失败,消息也会被标记为已读。如果 after_execution
为 false,则设置 include_unsuccessful
无效。
警告:如果消息处理返回可能永久性错误(如解码失败)并设置
after_execution
为 true 和include_unsuccessful
为 false,则会阻塞整个 Kafka 分区。
header_extraction 块
header_extraction
块配置了如何从 Kafka 记录中提取头信息。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
extract_headers | bool | 启用将头字段附加到资源属性。 | false | no |
headers | 列表(string) | 要从 Kafka 记录中提取的头列表。 | [] | no |
headers
参数不允许使用正则表达式。仅执行精确匹配。
debug_metrics 块
debug_metrics
块配置了该组件生成的用于监控其状态的指标。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
disable_high_cardinality_metrics | 布尔值 | 是否禁用某些高基数指标。 | true | no |
级别 | string | 控制封装收集器发出的指标的详细程度。 | "detailed" | no |
disable_high_cardinality_metrics
是 Grafana Alloy 中对应于 OpenTelemetry Collector 中的 telemetry.disableHighCardinalityMetrics
功能标志。它删除可能导致高基数指标的特性。例如,关于 HTTP 和 gRPC 连接的指标中带有 IP 地址和端口号的属性被删除。
注意
如果已配置,则disable_high_cardinality_metrics
只适用于otelcol.exporter.*
和otelcol.receiver.*
组件。
level
是 OpenTelemetry Collector 中 telemetry.metrics.level
功能标志的 Alloy 对应物。可能的值是 "none"
、"basic"
、"normal"
和 "detailed"
。
output 块
output
块配置了一组组件,这些组件负责将生成的遥测数据转发到其他组件。
支持以下参数
名称 | 类型 | 描述 | 默认值 | 必需的 |
---|---|---|---|---|
日志 | list(otelcol.Consumer) | 要发送到日志的消费者列表。 | [] | no |
指标 | list(otelcol.Consumer) | 要发送到指标的消费者列表。 | [] | no |
跟踪 | list(otelcol.Consumer) | 要发送到跟踪的消费者列表。 | [] | no |
您必须指定 output
块,但其所有参数都是可选的。默认情况下,遥测数据会被丢弃。根据需要配置 metrics
、logs
和 traces
参数,以便将遥测数据发送到其他组件。
导出字段
otelcol.receiver.kafka
不导出任何字段。
组件健康
otelcol.receiver.kafka
只有在提供无效配置的情况下才会报告为不健康。
调试信息
otelcol.receiver.kafka
不公开任何特定于组件的调试信息。
示例
此示例在最终将读取的遥测数据发送到具有 OTLP 功能的端点之前将其通过批处理器转发。
otelcol.receiver.kafka "default" {
brokers = ["localhost:9092"]
protocol_version = "2.0.0"
output {
metrics = [otelcol.processor.batch.default.input]
logs = [otelcol.processor.batch.default.input]
traces = [otelcol.processor.batch.default.input]
}
}
otelcol.processor.batch "default" {
output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}
otelcol.exporter.otlp "default" {
client {
endpoint = env("OTLP_ENDPOINT")
}
}
兼容组件
otelcol.receiver.kafka
可以接受来自以下组件的参数
注意
连接某些组件可能有悖常理,或者可能需要进一步配置才能正确连接。请参阅链接文档以获取更多详细信息。