当前位置: 首页 > 知识库问答 >
问题:

多个消费者使用spring kafka,并在spring之外编写回复主题

昝晗昱
2023-03-14

我正在使用Spring-Kafka,并试图实现请求回复模式。我的用例是,客户端调用具有有效负载的Restendpoint,我将此消息发送到kafka主题(请求主题)。我有火花工作,消耗此消息,处理它,并在另一个Kafka主题(回复主题)发送响应。一旦消息被写回回复主题,我的Web应用程序应该使用此消息,并作为超文本传输协议响应返回给客户端。

到目前为止我所取得的成就。

我使用SpringKafka来解决这个用例。我可以将请求正文作为Kafka消息发送到请求主题。Spring Kafka在发送Kafka消息之前,正在生成一个Kafka_correlationId作为Kafka头。我已经注册了producertinterceptor,获得了生成的correlationId并将其传递到消息体中。

在spark job中,我可以使用这个kafka消息,对其进行处理,并在回复主题中发送回消息时,添加与生成的消息具有相同值的消息头kafka_correlationId。

当我只有一个消费者时,用例工作得非常好。

什么不起作用。

现在,我已经部署了我的web应用程序的两个实例,回复主题有两个具有相同用户组Id的分区。

App-instance-1 : consuming from partition-0
App-instance-2 : consuming from partition-1

如果我的请求转到应用实例-1,如果我的火花作业能够写入回复主题的partion-0,我能够得到响应。但是,如果火花作业写在回复主题的partion-1,因为App-incent-1正在监听只有partion-0,我不能得到响应和应用程序失败与超时例外。另一个应用实例的类似情况。

请让我知道我应该配置什么来实现这一点。

共有2个答案

许博易
2023-03-14

我在回答我自己的问题,以便它能帮助别人。

根据@Gary Russel的输入,我继续在消息中设置REPLY_分区头。

我没有静态地将分区分配给消费者(因为我不确定我的应用程序中会有多少消费者),而是选择动态地标识分配的分区,并在REPLY_PARTITION头中传递它。下面是实现相同的代码。

定义侦听回复主题的bean

@Bean
public KafkaMessageListenerContainer<String, JsonNode> replyContainer(ConsumerFactory<String, JsonNode> cf) {
    ContainerProperties containerProperties = new ContainerProperties("reply-topic");
    return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

要获得分配的分区,

@Autowired
private KafkaMessageListenerContainer<String, JsonNode> replyContainer;

    /**
     * <P>
     *     gets first assigned partition
     * </P>
     * @return
     */
    private Integer getAssignedPartition() throws Exception {
        Integer partitionId = null;
        if (replyContainer.getAssignedPartitions() != null) {
            for (TopicPartition assignedPartition : replyContainer.getAssignedPartitions()) {
                if(assignedPartition.topic().equalsIgnoreCase("reply-topic")){
                    partitionId = assignedPartition.partition();
                }
            }
        }
        if(partitionId == null){
            //throw exception
        }
        return partitionId;
    }

    /**
     * <P>
     *     int to byte array
     * </P>
     * @param value
     * @return
     */
    private static byte[] toByteArray(int value) {
        return new byte[] {
                (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value
        };
    }

发送和等待回复主题的方法。

/**
 * <P>
 *     Sends message to request topic and wait for reply-topic response.
 * </P>
 * @param request
 * @return
 * @throws Exception
 */
public JsonNode process(JsonNode request) throws Exception {
    // create producer record
    Integer partitionId = getAssignedPartition();
    ProducerRecord<String, JsonNode> record = new ProducerRecord<>(requestTopic, request);
    // set reply topic in header
    record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
    record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, toByteArray(partitionId) ));
    RequestReplyFuture<String, JsonNode, JsonNode> sendAndReceive = kafkaTemplate.sendAndReceive(record);
    
    // get consumer record
    ConsumerRecord<String, JsonNode> consumerRecord = sendAndReceive.get(2, TimeUnit.SECONDS);
    // return consumer value
    return consumerRecord.value();
}
凌意
2023-03-14

有两种解决方案-

  • 还要设置REPLY_分区头(并让spark应用程序将应答发送到该分区),并静态分配分区

请参阅文档。

使用单个回复配置时,只要每个实例侦听不同的分区,就可以对多个模板使用相同的回复主题。使用单个回复主题进行配置时,每个实例必须使用不同的group.id。在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例找到相关ID。这对于自动缩放可能很有用,但是额外的流量开销和丢弃每个不需要的回复的小成本。当您使用此设置时,我们建议您将模板的share dReplyTopic设置为true,这将降低对DEBUG的意外回复的日志级别,而不是默认的ERROR。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我怎样才能暗示SpringKafka把每一个话题传播给一个不同的消费者呢? 干杯

  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

  • 问题内容: 我有一个主题列表(目前为10个),其规模将来可能会增加。我知道我们可以在每个主题中产生多个线程(每个主题)使用,但是就我而言,如果主题数量增加,那么从主题中使用的线程数量就会增加,这是我不希望的,因为主题不是太频繁地获取数据,因此线程将处于理想状态。 有没有办法让一个消费者从所有主题中消费?如果是,那我们如何实现呢?另外,Kafka将如何维护偏移量?请提出答案。 问题答案: 我们可以使

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用