我正在使用Spring-Kafka,并试图实现请求回复模式。我的用例是,客户端调用具有有效负载的Restendpoint,我将此消息发送到kafka主题(请求主题)。我有火花工作,消耗此消息,处理它,并在另一个Kafka主题(回复主题)发送响应。一旦消息被写回回复主题,我的Web应用程序应该使用此消息,并作为超文本传输协议响应返回给客户端。
到目前为止我所取得的成就。
我使用SpringKafka来解决这个用例。我可以将请求正文作为Kafka消息发送到请求主题。Spring Kafka在发送Kafka消息之前,正在生成一个Kafka_correlationId作为Kafka头。我已经注册了producertinterceptor,获得了生成的correlationId并将其传递到消息体中。
在spark job中,我可以使用这个kafka消息,对其进行处理,并在回复主题中发送回消息时,添加与生成的消息具有相同值的消息头kafka_correlationId。
当我只有一个消费者时,用例工作得非常好。
什么不起作用。
现在,我已经部署了我的web应用程序的两个实例,回复主题有两个具有相同用户组Id的分区。
App-instance-1 : consuming from partition-0
App-instance-2 : consuming from partition-1
如果我的请求转到应用实例-1,如果我的火花作业能够写入回复主题的partion-0,我能够得到响应。但是,如果火花作业写在回复主题的partion-1,因为App-incent-1正在监听只有partion-0,我不能得到响应和应用程序失败与超时例外。另一个应用实例的类似情况。
请让我知道我应该配置什么来实现这一点。
我在回答我自己的问题,以便它能帮助别人。
根据@Gary Russel的输入,我继续在消息中设置REPLY_分区头。
我没有静态地将分区分配给消费者(因为我不确定我的应用程序中会有多少消费者),而是选择动态地标识分配的分区,并在REPLY_PARTITION头中传递它。下面是实现相同的代码。
定义侦听回复主题的bean。
@Bean
public KafkaMessageListenerContainer<String, JsonNode> replyContainer(ConsumerFactory<String, JsonNode> cf) {
ContainerProperties containerProperties = new ContainerProperties("reply-topic");
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
要获得分配的分区,
@Autowired
private KafkaMessageListenerContainer<String, JsonNode> replyContainer;
/**
* <P>
* gets first assigned partition
* </P>
* @return
*/
private Integer getAssignedPartition() throws Exception {
Integer partitionId = null;
if (replyContainer.getAssignedPartitions() != null) {
for (TopicPartition assignedPartition : replyContainer.getAssignedPartitions()) {
if(assignedPartition.topic().equalsIgnoreCase("reply-topic")){
partitionId = assignedPartition.partition();
}
}
}
if(partitionId == null){
//throw exception
}
return partitionId;
}
/**
* <P>
* int to byte array
* </P>
* @param value
* @return
*/
private static byte[] toByteArray(int value) {
return new byte[] {
(byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value
};
}
发送和等待回复主题的方法。
/**
* <P>
* Sends message to request topic and wait for reply-topic response.
* </P>
* @param request
* @return
* @throws Exception
*/
public JsonNode process(JsonNode request) throws Exception {
// create producer record
Integer partitionId = getAssignedPartition();
ProducerRecord<String, JsonNode> record = new ProducerRecord<>(requestTopic, request);
// set reply topic in header
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, toByteArray(partitionId) ));
RequestReplyFuture<String, JsonNode, JsonNode> sendAndReceive = kafkaTemplate.sendAndReceive(record);
// get consumer record
ConsumerRecord<String, JsonNode> consumerRecord = sendAndReceive.get(2, TimeUnit.SECONDS);
// return consumer value
return consumerRecord.value();
}
有两种解决方案-
请参阅文档。
使用单个回复配置时,只要每个实例侦听不同的分区,就可以对多个模板使用相同的回复主题。使用单个回复主题进行配置时,每个实例必须使用不同的group.id
。在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例找到相关ID。这对于自动缩放可能很有用,但是额外的流量开销和丢弃每个不需要的回复的小成本。当您使用此设置时,我们建议您将模板的share dReplyTopic
设置为true
,这将降低对DEBUG的意外回复的日志级别,而不是默认的ERROR。
我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。
我怎样才能暗示SpringKafka把每一个话题传播给一个不同的消费者呢? 干杯
在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费
我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。
问题内容: 我有一个主题列表(目前为10个),其规模将来可能会增加。我知道我们可以在每个主题中产生多个线程(每个主题)使用,但是就我而言,如果主题数量增加,那么从主题中使用的线程数量就会增加,这是我不希望的,因为主题不是太频繁地获取数据,因此线程将处于理想状态。 有没有办法让一个消费者从所有主题中消费?如果是,那我们如何实现呢?另外,Kafka将如何维护偏移量?请提出答案。 问题答案: 我们可以使
我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用