菜单
开源

loki.source.kafka

loki.source.kafka 使用消费者组从 Kafka 读取消息并将它们转发到其他 loki.* 组件。

该组件为给定的参数启动一个新的 Kafka 消费者组并将传入条目发散到 forward_to 中的接收者列表。

在使用 loki.source.kafka 之前,Kafka 应至少有一个生产者将事件写入至少一个主题。请遵循 Kafka 快速入门 步骤以开始使用 Kafka。

可以通过给它们不同的标签指定多个 loki.source.kafka 组件。

用法

alloy
loki.source.kafka "LABEL" {
  brokers    = BROKER_LIST
  topics     = TOPIC_LIST
  forward_to = RECEIVER_LIST
}

参数

loki.source.kafka 支持以下参数

名称类型描述默认必需
brokerslist(string)连接到 Kafka 的代理列表。
topicslist(string)The list of Kafka topics to consume.
group_idstringThe Kafka consumer group id."loki.source.kafka"
assignorstringThe consumer group rebalancing strategy to use."range"
versionstringKafka version to connect to."2.2.1"
use_incoming_timestampboolWhether or not to use the timestamp received from Kafka.false
labelsmap(string)The labels to associate with each received Kafka event.{}
forward_tolist(LogsReceiver)要发送日志条目的接收者列表。
relabel_rulesRelabelRules对日志条目应用的重命名规则。{}

assignor values can be either "range", "roundrobin", or "sticky".

将 `labels` 参数的标签应用于组件读取的每条消息。

relabel_rules字段可以利用来自loki.relabel组件的rules导出值,在将日志条目发送到forward_to中的接收者列表之前,应用一个或多个重标记规则。

除自定义标签外,还可用以下以__开头的前缀内部标签

  • __meta_kafka_message_key
  • __meta_kafka_message_offset
  • __meta_kafka_topic
  • __meta_kafka_partition
  • __meta_kafka_member_id
  • __meta_kafka_group_id

在转发日志条目之前,所有以__开头的标签都会被删除。为了保留这些标签,请使用loki.relabel组件对其进行重标记,并将其rules导出传递给relabel_rules参数。

以下块在loki.source.kafka定义内部受支持

层次名称描述必需
身份验证身份验证可选的Kafka代理身份验证配置。
身份验证 > tls_configtls_config可选的Kafka代理身份验证配置。
身份验证 > sasl_configsasl_config可选的Kafka代理身份验证配置。
身份验证 > sasl_config > tls_configtls_config可选的Kafka代理身份验证配置。
身份验证 > sasl_config > oauth_configoauth_config可选的Kafka代理身份验证配置。

身份验证块

authentication块定义了与Kafka事件代理通信时的身份验证方法。

名称类型描述默认必需
typestring身份验证类型。"none"

type支持以下值:"none""ssl""sasl"。如果使用"ssl",则必须设置tls_config块。如果使用"sasl",则必须设置sasl_config块。

tls_config块

名称类型描述默认必需
ca_pemstring用于验证服务器的CA PEM编码文本。
ca_filestring用于验证服务器的CA证书。
cert_pemstring客户端身份验证的PEM编码证书文本。
cert_filestring客户端身份验证的证书文件。
insecure_skip_verifybool禁用服务器证书的验证。
key_filestring客户端身份验证的密钥文件。
key_pemsecret客户端身份验证的PEM编码密钥文本。
min_versionstring可接受的最低TLS版本。
server_namestring服务器名称扩展,用于指示服务器名称。

以下参数对互斥,且不能同时设置

  • ca_pemca_file
  • cert_pemcert_file
  • key_pemkey_file

在配置客户端身份验证时,必须同时提供客户端证书(使用cert_pemcert_file)和客户端密钥(使用key_pemkey_file)。

如果未提供min_version,则可接受的最低TLS版本将继承自Go的默认最低版本,TLS 1.2。如果提供了min_version,则必须将其设置为以下字符串之一

  • "TLS10" (TLS 1.0)
  • "TLS11" (TLS 1.1)
  • "TLS12" (TLS 1.2)
  • "TLS13" (TLS 1.3)

sasl_config块

sasl_config块定义了监听器期望将Kafka消息发送到的监听地址和端口。

名称类型描述默认必需
mechanismstring指定客户端用于与代理进行身份验证的SASL机制。"PLAIN"
userstring用于SASL身份验证的用户名。""
passwordsecret用于SASL身份验证的密码。""
use_tlsbool如果为true,则SASL身份验证在TLS上执行。false

oauth_config块

当SASL机制设置为OAUTHBEARER时,需要oauth_config

名称类型描述默认必需
token_providerstring要使用的OAuth提供者。唯一受支持的提供者是azure""
scopeslist(string)要在访问令牌中设置的作用域[]

导出字段

loki.source.kafka不导出任何字段。

组件健康状态

loki.source.kafka仅在配置无效时报告为不健康。

调试信息

loki.source.kafka不公开额外的调试信息。

示例

本示例从指定的代理和主题消费Kafka事件,然后使用Kafka时间戳将它们转发到loki.write组件。

alloy
loki.source.kafka "local" {
  brokers                = ["localhost:9092"]
  topics                 = ["quickstart-events"]
  labels                 = {component = "loki.source.kafka"}
  forward_to             = [loki.relabel.kafka.receiver]
  use_incoming_timestamp = true
  relabel_rules          = loki.relabel.kafka.rules
}

loki.relabel "kafka" {
  forward_to      = [loki.write.local.receiver]

  rule {
    source_labels = ["__meta_kafka_topic"]
    target_label  = "topic"
  }
}

loki.write "local" {
  endpoint {
    url = "loki:3100/api/v1/push"
  }
}

兼容组件

loki.source.kafka可接受以下组件的参数

注意

连接某些组件可能不合理,或者组件可能需要更进一步的配置才能正确连接。请参阅链接文档以获取更多信息。