Apache Kafka Binder

优质
小牛编辑
137浏览
2023-12-01

用法

对于使用Apache Kafka绑定器,您只需要使用以下Maven坐标将其添加到您的Spring Cloud Stream应用程序:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,您也可以使用Spring Cloud Stream Kafka Starter。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Apache Kafka Binder概述

以下可以看到Apache Kafka绑定器操作的简化图。

卡夫卡粘合剂 图13. Kafka Binder

Apache Kafka Binder实现将每个目标映射到Apache Kafka主题。消费者组织直接映射到相同的Apache Kafka概念。分区也直接映射到Apache Kafka分区。

配置选项

本节包含Apache Kafka绑定器使用的配置选项。

有关binder的常见配置选项和属性,请参阅核心文档。

Kafka Binder Properties

spring.cloud.stream.kafka.binder.brokers

Kafka活页夹将连接的经纪人列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers允许使用或不使用端口信息指定的主机(例如,host1,host2:port2)。当在代理列表中没有配置端口时,这将设置默认端口。

默认值:9092

spring.cloud.stream.kafka.binder.zkNodes

Kafka绑定器可以连接的ZooKeeper节点列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultZkPort

zkNodes允许使用或不使用端口信息指定的主机(例如,host1,host2:port2)。当在节点列表中没有配置端口时,这将设置默认端口。

默认值:2181

spring.cloud.stream.kafka.binder.configuration

客户端属性(生产者和消费者)的密钥/值映射传递给由绑定器创建的所有客户端。由于这些属性将被生产者和消费者使用,所以使用应该限于常见的属性,特别是安全设置。

默认值:空地图。

spring.cloud.stream.kafka.binder.headers

将由活页夹传送的自定义标题列表。

默认值:空。

spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow

以毫秒为单位的频率(以毫秒为单位)保存偏移量。0忽略。

默认值:10000

spring.cloud.stream.kafka.binder.offsetUpdateCount

频率,更新次数,哪些消耗的偏移量会持续存在。0忽略。与offsetUpdateTimeWindow相互排斥。

默认值:0

spring.cloud.stream.kafka.binder.requiredAcks

经纪人所需的acks数量。

默认值:1

spring.cloud.stream.kafka.binder.minPartitionCount

只有设置autoCreateTopicsautoAddPartitions才有效。绑定器在其生成/消耗数据的主题上配置的全局最小分区数。它可以由生产者的partitionCount设置或生产者的instanceCount * concurrency设置的值替代(如果更大)。

默认值:1

spring.cloud.stream.kafka.binder.replicationFactor

如果autoCreateTopics处于活动状态,则自动创建主题的复制因子。

默认值:1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为true,绑定器将自动创建新主题。如果设置为false,则绑定器将依赖于已配置的主题。在后一种情况下,如果主题不存在,则绑定器将无法启动。值得注意的是,此设置与代理的auto.topic.create.enable设置无关,并不影响它:如果服务器设置为自动创建主题,则可以将其创建为元数据检索请求的一部分,并使用默认代理设置。

默认值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为true,则绑定器将根据需要创建新的分区。如果设置为false,则绑定器将依赖于已配置的主题的分区大小。如果目标主题的分区计数小于预期值,则绑定器将无法启动。

默认值:false

spring.cloud.stream.kafka.binder.socketBufferSize

Kafka消费者使用的套接字缓冲区的大小(以字节为单位)。

默认值:2097152

Kafka消费者Properties

以下属性仅适用于Kafka消费者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀。

autoRebalanceEnabled

true,主题分区将在消费者组的成员之间自动重新平衡。当false根据spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex为每个消费者分配一组固定的分区。这需要在每个启动的实例上适当地设置spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex属性。在这种情况下,属性spring.cloud.stream.instanceCount通常必须大于1。

默认值:true

autoCommitOffset

是否在处理邮件时自动提交偏移量。如果设置为false,则入站消息中将显示带有org.springframework.kafka.support.Acknowledgment类型的密钥kafka_acknowledgment的报头。应用程序可以使用此标头来确认消息。有关详细信息,请参阅示例部分。当此属性设置为false时,Kafka binder将ack模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL

默认值:true

autoCommitOnError

只有autoCommitOffset设置为true才有效。如果设置为false,它会禁止导致错误的邮件的自动提交,并且只会为成功的邮件执行提交,允许流在上次成功处理的邮件中自动重播,以防持续发生故障。如果设置为true,它将始终自动提交(如果启用了自动提交)。如果没有设置(默认),它实际上具有与enableDlq相同的值,如果它们被发送到DLQ,则自动提交错误的消息,否则不提交它们。

默认值:未设置。

recoveryInterval

连接恢复尝试之间的间隔,以毫秒为单位。

默认值:5000

resetOffsets

是否将消费者的偏移量重置为startOffset提供的值。

默认值:false

开始偏移

新组的起始偏移量,或resetOffsetstrue时的起始偏移量。允许的值:earliestlatest。如果消费者组被明确设置为消费者'绑定'(通过spring.cloud.stream.bindings.<channelName>.group),那么'startOffset'设置为earliest; 否则对于anonymous消费者组,设置为latest

默认值:null(相当于earliest)。

enableDlq

当设置为true时,它将为消费者发送启用DLQ行为。默认情况下,导致错误的邮件将转发到名为error.<destination>.<group>的主题。DLQ主题名称可以通过属性dlqName配置。对于错误数量相对较少并且重播整个原始主题可能太麻烦的情况,这为更常见的Kafka重播场景提供了另一种选择。

默认值:false

组态

使用包含通用Kafka消费者属性的键/值对映射。

默认值:空地图。

dlqName

接收错误消息的DLQ主题的名称。

默认值:null(如果未指定,将导致错误的消息将转发到名为error.<destination>.<group>的主题)。

Kafka生产者Properties

以下属性仅适用于Kafka生产者,必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.为前缀。

缓冲区大小

上限(以字节为单位),Kafka生产者将在发送之前尝试批量的数据量。

默认值:16384

同步

生产者是否是同步的

默认值:false

batchTimeout

生产者在发送之前等待多长时间,以便允许更多消息在同一批次中累积。(通常,生产者根本不等待,并且简单地发送在先前发送进行中累积的所有消息。)非零值可能会以延迟为代价增加吞吐量。

默认值:0

组态

使用包含通用Kafka生产者属性的键/值对映射。

默认值:空地图。

注意

Kafka绑定器将使用生产者的partitionCount设置作为提示,以创建具有给定分区计数的主题(与minPartitionCount一起使用,最多两个为正在使用的值) 。配置绑定器的minPartitionCount和应用程序的partitionCount时要小心,因为将使用较大的值。如果一个主题已经存在较小的分区计数,并且autoAddPartitions被禁用(默认值),则绑定器将无法启动。如果一个主题已经存在较小的分区计数,并且启用了autoAddPartitions,则会添加新的分区。如果一个主题已经存在的分区数量大于(minPartitionCountpartitionCount)的最大值,则将使用现有的分区计数。

用法示例

在本节中,我们举例说明了上述属性在具体情况下的使用。

示例:设置autoCommitOffset false并依赖手动确认。

该示例说明了如何在消费者应用程序中手动确认偏移量。

此示例要求spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset设置为false。使用相应的输入通道名称作为示例。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
 public static void main(String[] args) {
   SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }
 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
   Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
   if (acknowledgment != null) {
    System.out.println("Acknowledgment provided");
    acknowledgment.acknowledge();
   }
 }
}
示例:安全配置

Apache Kafka 0.9支持客户端和代理商之间的安全连接。要充分利用此功能,请遵循汇编文档中的Apache Kafka文档以及Kafka 0.9 安全性指导原则。使用spring.cloud.stream.kafka.binder.configuration选项为绑定器创建的所有客户端设置安全属性。

例如,要将security.protocol设置为SASL_SSL,请设置:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全属性可以以类似的方式设置。

使用Kerberos时,请按照参考文档中的说明创建和引用JAAS配置。

Spring Cloud Stream支持使用JAAS配置文件并使用Spring Boot属性将JAAS配置信息传递到应用程序。

使用JAAS配置文件

可以通过使用系统属性为Spring Cloud Stream应用程序设置JAAS和(可选)krb5文件位置。以下是使用JAAS配置文件启动带有SASL和Kerberos的Spring Cloud Stream应用程序的示例:

 java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
  --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
  --spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
  --spring.cloud.stream.bindings.input.destination=stream.ticktock \
  --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用Spring Boot属性

作为使用JAAS配置文件的替代方案,Spring Cloud Stream提供了一种使用Spring Boot属性为Spring Cloud Stream应用程序设置JAAS配置的机制。

以下属性可用于配置Kafka客户端的登录上下文。

spring.cloud.stream.kafka.binder.jaas.loginModule

登录模块名称。在正常情况下不需要设置。

默认值:com.sun.security.auth.module.Krb5LoginModule

spring.cloud.stream.kafka.binder.jaas.controlFlag

登录模块的控制标志。

默认值:required

spring.cloud.stream.kafka.binder.jaas.options

使用包含登录模块选项的键/值对映射。

默认值:空地图。

以下是使用Spring Boot配置属性启动带有SASL和Kerberos的Spring Cloud Stream应用程序的示例:

 java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
  --spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
  --spring.cloud.stream.bindings.input.destination=stream.ticktock \
  --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
  --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
  --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
  --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
  --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
  --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

这相当于以下JAAS文件:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/etc/security/keytabs/kafka_client.keytab"
  principal="kafka-client-1@EXAMPLE.COM";
};

如果所需的主题已经存在于代理上,或将由管理员创建,则自动创建可以被关闭,并且仅需要发送客户端JAAS属性。作为设置spring.cloud.stream.kafka.binder.autoCreateTopics的替代方法,您可以简单地从应用程序中删除代理依赖关系。有关详细信息,请参阅基于绑定器的应用程序的类路径中排除Kafka代理jar。

注意

不要在同一应用程序中混合JAAS配置文件和Spring Boot属性。如果-Djava.security.auth.login.config系统属性已存在,则Spring Cloud Stream将忽略Spring Boot属性。

注意

使用autoCreateTopicsautoAddPartitions如果使用Kerberos,请务必小心。通常应用程序可能使用Kafka和Zookeeper中没有管理权限的主体,并且依赖Spring Cloud Stream创建/修改主题可能会失败。在安全环境中,我们强烈建议您使用Kafka工具管理性地创建主题并管理ACL。

使用绑定器与Apache Kafka 0.10

Spring Cloud Stream Kafka binder中的默认Kafka支持是针对Kafka版本0.10.1.1的。粘合剂还支持连接到其他0.10版本和0.9客户端。为了做到这一点,当你创建包含你的应用程序的项目时,包括spring-cloud-starter-stream-kafka,你通常会对默认的绑定器做。然后将这些依赖项添加到pom.xml文件中的<dependencies>部分的顶部以覆盖依赖关系。

以下是将应用程序降级到0.10.0.1的示例。由于它仍在0.10行,因此可以保留默认的spring-kafkaspring-integration-kafka版本。

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.10.0.1</version>
  <exclusions>
  <exclusion>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
  </exclusion>
  </exclusions>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.1</version>
</dependency>

这是使用0.9.0.1版本的另一个例子。

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.0.5.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-kafka</artifactId>
  <version>2.0.1.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.9.0.1</version>
  <exclusions>
  <exclusion>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
  </exclusion>
  </exclusions>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.9.0.1</version>
</dependency>
注意

以上版本仅为了举例而提供。为获得最佳效果,我们建议您使用最新的0.10兼容版本的项目。

从基于绑定器的应用程序的类路径中排除Kafka代理jar

Apache Kafka Binder使用作为Apache Kafka服务器库一部分的管理实用程序来创建和重新配置主题。如果在运行时不需要包含Apache Kafka服务器库及其依赖关系,因为应用程序将依赖于管理中配置的主题,Kafka binder允许排除Apache Kafka服务器依赖关系从应用程序。

如果您使用上述建议的Kafka依赖关系的非默认版本,则只需要包含kafka代理依赖项。如果您使用默认的Kafka版本,请确保从spring-cloud-starter-stream-kafka依赖关系中排除kafka broker jar,如下所示。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  <exclusions>
  <exclusion>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.11</artifactId>
  </exclusion>
  </exclusions>
</dependency>

如果您排除Apache Kafka服务器依赖关系,并且该主题不在服务器上,那么如果在服务器上启用了自动主题创建,则Apache Kafka代理将创建该主题。请注意,如果您依赖此,则Kafka服务器将使用默认数量的分区和复制因子。另一方面,如果在服务器上禁用自动主题创建,则在运行应用程序之前必须注意创建具有所需数量分区的主题。

如果要完全控制分区的分配方式,请保留默认设置,即不要排除kafka代理程序jar,并确保将spring.cloud.stream.kafka.binder.autoCreateTopics设置为true,这是默认设置。

Dead-Letter主题处理

因为不可能预料到用户如何处理死信消息,所以框架不提供任何标准的机制来处理它们。如果死刑的原因是短暂的,您可能希望将邮件路由到原始主题。但是,如果问题是一个永久性的问题,那可能会导致无限循环。以下spring-boot应用程序是如何将这些消息路由到原始主题的示例,但在三次尝试后将其移动到第三个“停车场”主题。该应用程序只是从死信主题中读取的另一个spring-cloud-stream应用程序。5秒内没有收到消息时终止。

这些示例假定原始目的地是so8400out,而消费者组是so8400

有几个注意事项

  • 当主应用程序未运行时,请考虑仅运行重新路由。否则,瞬态错误的重试将很快用尽。
  • 或者,使用两阶段方法 - 使用此应用程序路由到第三个主题,另一个则从那里路由到主题。
  • 由于这种技术使用消息标头来跟踪重试,所以它不会与headerMode=raw一起使用。在这种情况下,请考虑将一些数据添加到有效载荷(主应用程序可以忽略)。
  • 必须将x-retries添加到headers属性spring.cloud.stream.kafka.binder.headers=x-retries和主应用程序,以便标头在应用程序之间传输。
  • 由于kafka是发布/订阅,所以重播的消息将被发送给每个消费者组,即使是那些首次成功处理消息的消费者组。
application.properties
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
应用
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
  private static final String X_RETRIES_HEADER = "x-retries";
  public static void main(String[] args) {
    SpringApplication.run(ReRouteDlqKApplication.class, args).close();
  }
  private final AtomicInteger processed = new AtomicInteger();
  @Autowired
  private MessageChannel parkingLot;
  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public Message<?> reRoute(Message<?> failed) {
    processed.incrementAndGet();
    Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
    if (retries == null) {
      System.out.println("First retry for " + failed);
      return MessageBuilder.fromMessage(failed)
          .setHeader(X_RETRIES_HEADER, new Integer(1))
          .setHeader(BinderHeaders.PARTITION_OVERRIDE,
             failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
          .build();
    }
    else if (retries.intValue() < 3) {
      System.out.println("Another retry for " + failed);
      return MessageBuilder.fromMessage(failed)
          .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
          .setHeader(BinderHeaders.PARTITION_OVERRIDE,
             failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
          .build();
    }
    else {
      System.out.println("Retries exhausted for " + failed);
      parkingLot.send(MessageBuilder.fromMessage(failed)
          .setHeader(BinderHeaders.PARTITION_OVERRIDE,
             failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
          .build());
    }
    return null;
  }
  @Override
  public void run(String... args) throws Exception {
    while (true) {
      int count = this.processed.get();
      Thread.sleep(5000);
      if (count == this.processed.get()) {
        System.out.println("Idle, terminating");
        return;
      }
    }
  }
  public interface TwoOutputProcessor extends Processor {
    @Output("parkingLot")
    MessageChannel parkingLot();
  }
}