Apache Kafka Binder
用法
对于使用Apache Kafka绑定器,您只需要使用以下Maven坐标将其添加到您的Spring Cloud Stream应用程序:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,您也可以使用Spring Cloud Stream Kafka Starter。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
Apache Kafka Binder概述
以下可以看到Apache Kafka绑定器操作的简化图。
图13. Kafka BinderApache Kafka Binder实现将每个目标映射到Apache Kafka主题。消费者组织直接映射到相同的Apache Kafka概念。分区也直接映射到Apache Kafka分区。
配置选项
本节包含Apache Kafka绑定器使用的配置选项。
有关binder的常见配置选项和属性,请参阅核心文档。
Kafka Binder Properties
- spring.cloud.stream.kafka.binder.brokers
Kafka活页夹将连接的经纪人列表。
默认值:
localhost
。- spring.cloud.stream.kafka.binder.defaultBrokerPort
brokers
允许使用或不使用端口信息指定的主机(例如,host1,host2:port2
)。当在代理列表中没有配置端口时,这将设置默认端口。默认值:
9092
。- spring.cloud.stream.kafka.binder.zkNodes
Kafka绑定器可以连接的ZooKeeper节点列表。
默认值:
localhost
。- spring.cloud.stream.kafka.binder.defaultZkPort
zkNodes
允许使用或不使用端口信息指定的主机(例如,host1,host2:port2
)。当在节点列表中没有配置端口时,这将设置默认端口。默认值:
2181
。- spring.cloud.stream.kafka.binder.configuration
客户端属性(生产者和消费者)的密钥/值映射传递给由绑定器创建的所有客户端。由于这些属性将被生产者和消费者使用,所以使用应该限于常见的属性,特别是安全设置。
默认值:空地图。
- spring.cloud.stream.kafka.binder.headers
将由活页夹传送的自定义标题列表。
默认值:空。
- spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow
以毫秒为单位的频率(以毫秒为单位)保存偏移量。
0
忽略。默认值:
10000
。- spring.cloud.stream.kafka.binder.offsetUpdateCount
频率,更新次数,哪些消耗的偏移量会持续存在。
0
忽略。与offsetUpdateTimeWindow
相互排斥。默认值:
0
。- spring.cloud.stream.kafka.binder.requiredAcks
经纪人所需的acks数量。
默认值:
1
。- spring.cloud.stream.kafka.binder.minPartitionCount
只有设置
autoCreateTopics
或autoAddPartitions
才有效。绑定器在其生成/消耗数据的主题上配置的全局最小分区数。它可以由生产者的partitionCount
设置或生产者的instanceCount
*concurrency
设置的值替代(如果更大)。默认值:
1
。- spring.cloud.stream.kafka.binder.replicationFactor
如果
autoCreateTopics
处于活动状态,则自动创建主题的复制因子。默认值:
1
。- spring.cloud.stream.kafka.binder.autoCreateTopics
如果设置为
true
,绑定器将自动创建新主题。如果设置为false
,则绑定器将依赖于已配置的主题。在后一种情况下,如果主题不存在,则绑定器将无法启动。值得注意的是,此设置与代理的auto.topic.create.enable
设置无关,并不影响它:如果服务器设置为自动创建主题,则可以将其创建为元数据检索请求的一部分,并使用默认代理设置。默认值:
true
。- spring.cloud.stream.kafka.binder.autoAddPartitions
如果设置为
true
,则绑定器将根据需要创建新的分区。如果设置为false
,则绑定器将依赖于已配置的主题的分区大小。如果目标主题的分区计数小于预期值,则绑定器将无法启动。默认值:
false
。- spring.cloud.stream.kafka.binder.socketBufferSize
Kafka消费者使用的套接字缓冲区的大小(以字节为单位)。
默认值:
2097152
。
Kafka消费者Properties
以下属性仅适用于Kafka消费者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.
为前缀。
- autoRebalanceEnabled
当
true
,主题分区将在消费者组的成员之间自动重新平衡。当false
根据spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
为每个消费者分配一组固定的分区。这需要在每个启动的实例上适当地设置spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
属性。在这种情况下,属性spring.cloud.stream.instanceCount
通常必须大于1。默认值:
true
。- autoCommitOffset
是否在处理邮件时自动提交偏移量。如果设置为
false
,则入站消息中将显示带有org.springframework.kafka.support.Acknowledgment
类型的密钥kafka_acknowledgment
的报头。应用程序可以使用此标头来确认消息。有关详细信息,请参阅示例部分。当此属性设置为false
时,Kafka binder将ack模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
。默认值:
true
。- autoCommitOnError
只有
autoCommitOffset
设置为true
才有效。如果设置为false
,它会禁止导致错误的邮件的自动提交,并且只会为成功的邮件执行提交,允许流在上次成功处理的邮件中自动重播,以防持续发生故障。如果设置为true
,它将始终自动提交(如果启用了自动提交)。如果没有设置(默认),它实际上具有与enableDlq
相同的值,如果它们被发送到DLQ,则自动提交错误的消息,否则不提交它们。默认值:未设置。
- recoveryInterval
连接恢复尝试之间的间隔,以毫秒为单位。
默认值:
5000
。- resetOffsets
是否将消费者的偏移量重置为
startOffset
提供的值。默认值:
false
。- 开始偏移
新组的起始偏移量,或
resetOffsets
为true
时的起始偏移量。允许的值:earliest
,latest
。如果消费者组被明确设置为消费者'绑定'(通过spring.cloud.stream.bindings.<channelName>.group
),那么'startOffset'设置为earliest
; 否则对于anonymous
消费者组,设置为latest
。默认值:null(相当于
earliest
)。- enableDlq
当设置为true时,它将为消费者发送启用DLQ行为。默认情况下,导致错误的邮件将转发到名为
error.<destination>.<group>
的主题。DLQ主题名称可以通过属性dlqName
配置。对于错误数量相对较少并且重播整个原始主题可能太麻烦的情况,这为更常见的Kafka重播场景提供了另一种选择。默认值:
false
。- 组态
使用包含通用Kafka消费者属性的键/值对映射。
默认值:空地图。
- dlqName
接收错误消息的DLQ主题的名称。
默认值:null(如果未指定,将导致错误的消息将转发到名为
error.<destination>.<group>
的主题)。
Kafka生产者Properties
以下属性仅适用于Kafka生产者,必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.
为前缀。
- 缓冲区大小
上限(以字节为单位),Kafka生产者将在发送之前尝试批量的数据量。
默认值:
16384
。- 同步
生产者是否是同步的
默认值:
false
。- batchTimeout
生产者在发送之前等待多长时间,以便允许更多消息在同一批次中累积。(通常,生产者根本不等待,并且简单地发送在先前发送进行中累积的所有消息。)非零值可能会以延迟为代价增加吞吐量。
默认值:
0
。- 组态
使用包含通用Kafka生产者属性的键/值对映射。
默认值:空地图。
注意 | Kafka绑定器将使用生产者的 |
用法示例
在本节中,我们举例说明了上述属性在具体情况下的使用。
示例:设置autoCommitOffset
false并依赖手动确认。
该示例说明了如何在消费者应用程序中手动确认偏移量。
此示例要求spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset
设置为false。使用相应的输入通道名称作为示例。
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
示例:安全配置
Apache Kafka 0.9支持客户端和代理商之间的安全连接。要充分利用此功能,请遵循汇编文档中的Apache Kafka文档以及Kafka 0.9 安全性指导原则。使用spring.cloud.stream.kafka.binder.configuration
选项为绑定器创建的所有客户端设置安全属性。
例如,要将security.protocol
设置为SASL_SSL
,请设置:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
所有其他安全属性可以以类似的方式设置。
使用Kerberos时,请按照参考文档中的说明创建和引用JAAS配置。
Spring Cloud Stream支持使用JAAS配置文件并使用Spring Boot属性将JAAS配置信息传递到应用程序。
使用JAAS配置文件
可以通过使用系统属性为Spring Cloud Stream应用程序设置JAAS和(可选)krb5文件位置。以下是使用JAAS配置文件启动带有SASL和Kerberos的Spring Cloud Stream应用程序的示例:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用Spring Boot属性
作为使用JAAS配置文件的替代方案,Spring Cloud Stream提供了一种使用Spring Boot属性为Spring Cloud Stream应用程序设置JAAS配置的机制。
以下属性可用于配置Kafka客户端的登录上下文。
- spring.cloud.stream.kafka.binder.jaas.loginModule
登录模块名称。在正常情况下不需要设置。
默认值:
com.sun.security.auth.module.Krb5LoginModule
。- spring.cloud.stream.kafka.binder.jaas.controlFlag
登录模块的控制标志。
默认值:
required
。- spring.cloud.stream.kafka.binder.jaas.options
使用包含登录模块选项的键/值对映射。
默认值:空地图。
以下是使用Spring Boot配置属性启动带有SASL和Kerberos的Spring Cloud Stream应用程序的示例:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
这相当于以下JAAS文件:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="kafka-client-1@EXAMPLE.COM";
};
如果所需的主题已经存在于代理上,或将由管理员创建,则自动创建可以被关闭,并且仅需要发送客户端JAAS属性。作为设置spring.cloud.stream.kafka.binder.autoCreateTopics
的替代方法,您可以简单地从应用程序中删除代理依赖关系。有关详细信息,请参阅基于绑定器的应用程序的类路径中排除Kafka代理jar。
注意 | 不要在同一应用程序中混合JAAS配置文件和Spring Boot属性。如果 |
注意 | 使用 |
使用绑定器与Apache Kafka 0.10
Spring Cloud Stream Kafka binder中的默认Kafka支持是针对Kafka版本0.10.1.1的。粘合剂还支持连接到其他0.10版本和0.9客户端。为了做到这一点,当你创建包含你的应用程序的项目时,包括spring-cloud-starter-stream-kafka
,你通常会对默认的绑定器做。然后将这些依赖项添加到pom.xml文件中的<dependencies>
部分的顶部以覆盖依赖关系。
以下是将应用程序降级到0.10.0.1的示例。由于它仍在0.10行,因此可以保留默认的spring-kafka
和spring-integration-kafka
版本。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
这是使用0.9.0.1版本的另一个例子。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
注意 | 以上版本仅为了举例而提供。为获得最佳效果,我们建议您使用最新的0.10兼容版本的项目。 |
从基于绑定器的应用程序的类路径中排除Kafka代理jar
Apache Kafka Binder使用作为Apache Kafka服务器库一部分的管理实用程序来创建和重新配置主题。如果在运行时不需要包含Apache Kafka服务器库及其依赖关系,因为应用程序将依赖于管理中配置的主题,Kafka binder允许排除Apache Kafka服务器依赖关系从应用程序。
如果您使用上述建议的Kafka依赖关系的非默认版本,则只需要包含kafka代理依赖项。如果您使用默认的Kafka版本,请确保从spring-cloud-starter-stream-kafka
依赖关系中排除kafka broker jar,如下所示。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
如果您排除Apache Kafka服务器依赖关系,并且该主题不在服务器上,那么如果在服务器上启用了自动主题创建,则Apache Kafka代理将创建该主题。请注意,如果您依赖此,则Kafka服务器将使用默认数量的分区和复制因子。另一方面,如果在服务器上禁用自动主题创建,则在运行应用程序之前必须注意创建具有所需数量分区的主题。
如果要完全控制分区的分配方式,请保留默认设置,即不要排除kafka代理程序jar,并确保将spring.cloud.stream.kafka.binder.autoCreateTopics
设置为true
,这是默认设置。
Dead-Letter主题处理
因为不可能预料到用户如何处理死信消息,所以框架不提供任何标准的机制来处理它们。如果死刑的原因是短暂的,您可能希望将邮件路由到原始主题。但是,如果问题是一个永久性的问题,那可能会导致无限循环。以下spring-boot
应用程序是如何将这些消息路由到原始主题的示例,但在三次尝试后将其移动到第三个“停车场”主题。该应用程序只是从死信主题中读取的另一个spring-cloud-stream应用程序。5秒内没有收到消息时终止。
这些示例假定原始目的地是so8400out
,而消费者组是so8400
。
有几个注意事项
- 当主应用程序未运行时,请考虑仅运行重新路由。否则,瞬态错误的重试将很快用尽。
- 或者,使用两阶段方法 - 使用此应用程序路由到第三个主题,另一个则从那里路由到主题。
- 由于这种技术使用消息标头来跟踪重试,所以它不会与
headerMode=raw
一起使用。在这种情况下,请考虑将一些数据添加到有效载荷(主应用程序可以忽略)。 - 必须将
x-retries
添加到headers
属性spring.cloud.stream.kafka.binder.headers=x-retries
和主应用程序,以便标头在应用程序之间传输。 - 由于kafka是发布/订阅,所以重播的消息将被发送给每个消费者组,即使是那些首次成功处理消息的消费者组。
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
应用@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}