菜单
文档breadcrumb arrow Grafana Alloybreadcrumb arrow 参考breadcrumb arrow 组件breadcrumb arrow otelcolbreadcrumb arrow otelcol.receiver.kafka
开源

otelcol 接收器 Kafka

otelcol.receiver.kafka 从 Kafka 代理接收遥测数据并将其转发到其他 otelcol.* 组件。

注意otelcol.receiver.kafka 是在 otelcol-contrib 分发中上游 OpenTelemetry Collector kafka 接收器的包装器。如有必要,错误报告或功能请求将重定向到上游存储库。

可以通过提供不同的标签来指定多个 otelcol.receiver.kafka 组件。

用法

alloy
otelcol.receiver.kafka "LABEL" {
  brokers          = ["BROKER_ADDR"]
  protocol_version = "PROTOCOL_VERSION"

  output {
    metrics = [...]
    logs    = [...]
    traces  = [...]
  }
}

参数

支持以下参数

名称类型描述默认值必需的
brokersarray(string)要连接的 Kafka 代理。yes
protocol_versionstring要使用的 Kafka 协议版本。yes
topicstring要从其中读取的 Kafka 主题。下面将说明no
encodingstring从 Kafka 读取的有效载荷编码。"otlp_proto"no
group_idstring消费消息的消费者组。"otel-collector"no
client_idstring要使用的消费者客户端 ID。"otel-collector"no
initial_offsetstring如果没有先前提交偏移量,请使用初始偏移量。"latest"no
resolve_canonical_bootstrap_servers_onlybool是否在启动时解析并反向查找代理 IP。"false"no

如果未设置 topic,则将为不同的遥测信号使用不同的主题

  • 将从 otlp_metrics 主题接收指标。
  • 将从 otlp_spans 主题接收跟踪。
  • 将从 otlp_logs 主题接收日志。

如果将 topic 设置为特定值,则必须在输出块中设置与主题中存储的数据对应的信号类型。例如,如果将 topic 设置为 "my_telemetry",则 "my_telemetry" 主题只能包含指标、日志或跟踪之一。如果它只包含指标,则应将 otelcol.receiver.kafka 配置为仅输出指标。

encoding 参数确定如何解码从 Kafka 读取的消息。 encoding 必须是以下字符串之一

  • "otlp_proto":将消息解码为 OTLP protobuf。
  • "jaeger_proto":将消息解码为单个 Jaeger protobuf span。
  • "jaeger_json":将消息解码为单个 Jaeger JSON span。
  • "zipkin_proto":将消息解码为Zipkin protobuf跨度列表。
  • "zipkin_json":将消息解码为Zipkin JSON跨度列表。
  • "zipkin_thrift":将消息解码为Zipkin Thrift跨度列表。
  • "raw":将日志消息的字节数据复制到日志记录的主体中。
  • "text":将日志消息解码为文本并插入到日志记录的主体中。默认情况下使用UTF-8进行解码。可以选择不同的编码,方法是用text_<编码>。例如,text_utf-8text_shift_jis
  • "json":解码JSON有效负荷并将其插入到日志记录的主体中。
  • "azure_resource_logs":将有效负荷从Azure资源日志格式转换为OTLP日志。

"otlp_proto"必须用于从Kafka读取所有遥测类型;其他编码是信号特定的。

initial_offset必须是"latest""earliest"

以下块在otelcol.receiver.kafka的定义中受支持

层次结构描述必需的
验证验证配置连接到Kafka代理的验证。no
验证 > 明文明文使用明文对Kafka代理进行身份验证。no
验证 > saslsasl使用SASL对Kafka代理进行身份验证。no
验证 > sasl > aws_mskaws_msk使用AWS_MSK_IAM时,添加SASL的额外参数。no
验证 > tlstls配置连接到Kafka代理的TLS。no
验证 > kerberoskerberos使用Kerberos对Kafka代理进行身份验证。no
元数据元数据配置如何从Kafka代理检索元数据。no
元数据 > 重试重试配置如何重试元数据检索。no
auto Commitauto Commit配置如何自动将更新的主题偏移量提交回Kafka代理。no
消息标记消息标记配置何时将Kafka消息标记为已读。no
header_extractionheader_extraction从Kafka记录中提取头信息。no
debug_metricsdebug_metrics配置由此组件生成的指标,以监控其状态。no
输出输出配置将收到的遥测数据发送到何处。yes

>符号表示更深层次的嵌套。例如,authentication > tls指的是在authentication块内部定义的tls块。

验证块

authentication块包含了连接到Kafka代理时使用的不同身份验证机制的定义。它不支持任何参数,并且完全通过内部块进行配置。

明文块

plaintext块配置了对Kafka代理的明文身份验证。

支持以下参数

名称类型描述默认值必需的
用户名string用于明文验证的用户名。yes
密码机密用于明文验证的密码。yes

sasl块

sasl块配置了对Kafka代理的SASL身份验证。

支持以下参数

名称类型描述默认值必需的
用户名string用于SASL验证的用户名。yes
密码机密用于SASL验证的密码。yes
机制string身份验证时使用的SASL机制。yes
版本数字在身份验证时使用的SASL协议的版本。0no

机制参数可以设置以下字符串之一

  • "PLAIN"
  • "AWS_MSK_IAM"
  • "SCRAM-SHA-256"
  • "SCRAM-SHA-512"

机制设置为"AWS_MSK_IAM"时,必须还提供aws_msk子块。

版本参数可以设置为01

aws_msk块

aws_msk块配置了在使用AWS_MSK_IAM机制时的SASL身份验证的额外参数。

支持以下参数

名称类型描述默认值必需的
区域stringMSK集群所在的AWS区域。yes
代理地址string用于身份验证的MSK地址。yes

tls块

tls区域配置用于连接到Kafka代理的TLS设置。如果没有提供tls区域,通信将不会使用TLS。

支持以下参数

名称类型描述默认值必需的
ca_filestringCA文件的路径。no
ca_pemstring用于验证服务器的CA PEM编码的文本。no
cert_filestringTLS证书的路径。no
cert_pemstring用于客户端认证的证书PEM编码的文本。no
insecure_skip_verify布尔值忽略不安全的服务器TLS证书。no
include_system_ca_certs_pool布尔值是否在证书颁发机构旁边加载系统证书颁发机构池。falseno
insecure布尔值在连接到配置的服务器时禁用TLS。no
key_filestringTLS证书私钥的路径。no
key_pem机密用于客户端认证的密钥PEM编码的文本。no
max_versionstring连接可接受的最大TLS版本。"TLS 1.3"no
min_versionstring连接可接受的最小TLS版本。"TLS 1.2"no
cipher_suites列表(string)TLS传输可以使用的一组TLS cipher suites列表。[]no
reload_interval持续时间在证书重新加载之后的时间。"0s"no
server_namestring当设置时,验证服务器证书的主机名。no

如果服务器不支持TLS,您必须将insecure参数设置为true

要禁用连接到服务器的TLS,将insecure参数设置为true

如果将reload_interval设置为"0s",证书永不重新加载。

以下参数对相互排斥,不能同时设置

  • ca_pemca_file
  • cert_pemcert_file
  • key_pemkey_file

如果留空cipher_suites,将使用一组安全默认列表。有关支持的cipher suites列表,请参阅Go TLS文档

kerberos block

kerberos区域配置对Kafka代理进行Kerberos身份验证。

支持以下参数

名称类型描述默认值必需的
service_namestringKerberos服务名。no
realmstringKerberos域。no
use_keytabstring启用使用密钥表而不是密码。no
用户名string用于认证的Kerberos用户名。yes
密码机密用于认证的Kerberos密码。no
config_filestringKerberos配置文件路径(例如,/etc/krb5.conf)。no
keytab_filestring密钥表文件路径(例如,/etc/security/kafka.keytab)。no
disable_fast_negotiationbool禁用PA-FX-FAST协商。falseno

use_keytab设置为false时,必须提供password参数。当use_keytab设置为true时,将使用keytab_file参数指向的文件进行认证。最多提供一个passwordkeytab_file

disable_fast_negotiation对不支持PA-FX-FAST(预认证框架-快速)协商的Kerberos实现很有用。

metadata block

metadata区域配置如何从Kafka代理检索和存储元数据。

支持以下参数

名称类型描述默认值必需的
include_all_topicsbool当为true时,维护所有主题的元数据。trueno

如果将include_all_topics参数设置为true,将维护所有主题的完整元数据集,而不是迄今为止所需的最低元数据集。包括完整的元数据集对用户来说更方便,但如果您有大量主题和分区,则可能会消耗大量内存。

如果Kafka代理在与Alloy组件同时启动时检索元数据可能会失败。可以提供retry子区域来自定义重试行为。

retry block

retry区域配置在检索失败时如何重试检索元数据。

支持以下参数

名称类型描述默认值必需的
max_retries数字重试检索元数据多少次。3no
回退持续时间重试之间的等待时间。"250ms"no

自动提交块

autocommit 块配置了如何将更新的主题偏移量自动提交回 Kafka 协议代理。

支持以下参数

名称类型描述默认值必需的
启用bool启用自动提交更新的主题偏移量。trueno
间隔持续时间自动提交的频率。"1s"no

消息标记块

message_marking 块配置了何时将 Kafka 消息标记为已读。

支持以下参数

名称类型描述默认值必需的
after_executionbool在将遥测数据转发给其他组件后标记消息。falseno
include_unsuccessfulbool是否应将失败转发标记为已读。falseno

默认情况下,从 Kafka 协议代理检索到的 Kafka 消息在检索后立即被标记为已读。如果 after_execution 参数为 true,则消息在将遥测数据转发到在 输出块 中指定的组件后才会被读取。

after_execution 为 true 时,只有在消息成功解码且将数据转发的组件没有返回错误时,消息才会被标记为已读。如果 include_unsuccessful 参数为 true,即使解码或转发失败,消息也会被标记为已读。如果 after_execution 为 false,则设置 include_unsuccessful 无效。

警告:如果消息处理返回可能永久性错误(如解码失败)并设置 after_execution 为 true 和 include_unsuccessful 为 false,则会阻塞整个 Kafka 分区。

header_extraction 块

header_extraction 块配置了如何从 Kafka 记录中提取头信息。

支持以下参数

名称类型描述默认值必需的
extract_headersbool启用将头字段附加到资源属性。falseno
headers列表(string)要从 Kafka 记录中提取的头列表。[]no

headers 参数不允许使用正则表达式。仅执行精确匹配。

debug_metrics 块

debug_metrics 块配置了该组件生成的用于监控其状态的指标。

支持以下参数

名称类型描述默认值必需的
disable_high_cardinality_metrics布尔值是否禁用某些高基数指标。trueno
级别string控制封装收集器发出的指标的详细程度。"detailed"no

disable_high_cardinality_metrics 是 Grafana Alloy 中对应于 OpenTelemetry Collector 中的 telemetry.disableHighCardinalityMetrics 功能标志。它删除可能导致高基数指标的特性。例如,关于 HTTP 和 gRPC 连接的指标中带有 IP 地址和端口号的属性被删除。

注意

如果已配置,则 disable_high_cardinality_metrics 只适用于 otelcol.exporter.*otelcol.receiver.* 组件。

level 是 OpenTelemetry Collector 中 telemetry.metrics.level 功能标志的 Alloy 对应物。可能的值是 "none""basic""normal""detailed"

output 块

output 块配置了一组组件,这些组件负责将生成的遥测数据转发到其他组件。

支持以下参数

名称类型描述默认值必需的
日志list(otelcol.Consumer)要发送到日志的消费者列表。[]no
指标list(otelcol.Consumer)要发送到指标的消费者列表。[]no
跟踪list(otelcol.Consumer)要发送到跟踪的消费者列表。[]no

您必须指定 output 块,但其所有参数都是可选的。默认情况下,遥测数据会被丢弃。根据需要配置 metricslogstraces 参数,以便将遥测数据发送到其他组件。

导出字段

otelcol.receiver.kafka 不导出任何字段。

组件健康

otelcol.receiver.kafka 只有在提供无效配置的情况下才会报告为不健康。

调试信息

otelcol.receiver.kafka 不公开任何特定于组件的调试信息。

示例

此示例在最终将读取的遥测数据发送到具有 OTLP 功能的端点之前将其通过批处理器转发。

alloy
otelcol.receiver.kafka "default" {
  brokers          = ["localhost:9092"]
  protocol_version = "2.0.0"

  output {
    metrics = [otelcol.processor.batch.default.input]
    logs    = [otelcol.processor.batch.default.input]
    traces  = [otelcol.processor.batch.default.input]
  }
}

otelcol.processor.batch "default" {
  output {
    metrics = [otelcol.exporter.otlp.default.input]
    logs    = [otelcol.exporter.otlp.default.input]
    traces  = [otelcol.exporter.otlp.default.input]
  }
}

otelcol.exporter.otlp "default" {
  client {
    endpoint = env("OTLP_ENDPOINT")
  }
}

兼容组件

otelcol.receiver.kafka 可以接受来自以下组件的参数

注意

连接某些组件可能有悖常理,或者可能需要进一步配置才能正确连接。请参阅链接文档以获取更多详细信息。