当前位置: 首页 > 知识库问答 >
问题:

在Kafka中获取最后一个事件-JAVA

丁兴德
2023-03-14

实际上,我用@KafkaListener来阅读一个主题中的事件,我想阅读100个事件,然后放一个线程。在一定时间内睡觉。我的问题是,当线程返回时,侦听器继续执行我读到的最后一个事件,但我想在线程睡眠时放弃这些事件,继续执行主题中的最后一个事件。

比如:

1-100-捕获

睡线

101-500

线程返回

501-601-捕获

101-500事件可以被丢弃

代码:

@KafkaListener(topics = "topic")
public void consumeBalance(ConsumerRecord<String, String> payload) throws InterruptedException {

    this.valorMaximoDeRequest = this.valorMaximoDeRequest + 1;
    if (this.valorMaximoDeRequest <= 100) {
        log.info("Encontrou evento )");
        log.info("Key: " + payload.key() + ", Value:" + payload.value());
        log.info("Partition:" + payload.partition() + ",Offset:" + payload.offset());

        JsonObject jsonObject = new Gson().fromJson(payload.value(), JsonObject.class);
        String accountId = jsonObject.get("accountId").getAsString();
        log.info(">>>>>>>>> accountId: " + accountId);

    } else {
        this.valorMaximoDeRequest = 0;
        Thread.sleep(60*1000);
    }

}

kafka配置:

@Bean
    public Map<String, Object> kafkaFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put("specific.avro.reader", Boolean.TRUE);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokers");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "1");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put("security.protocol", "SASL_PLAINTEXT");
        return props;
    }

共有1个答案

蒯华彩
2023-03-14

首先,您不应该强制监听线程Hibernate。消费者可能被认为已经死亡,并触发消费者重新平衡。您最好对消费者使用暂停和恢复。请参见https://docs.spring.io/spring-kafka/docs/current/reference/html/#pause-resume

然后,如果你想跳过当消费者睡着时发布的记录,你必须在消费者醒来时寻找。请参见https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek但是这并不简单:当消费者活跃或不拥有分区时,Kafka消费者不允许你寻找。

 类似资料:
  • 我正在从事一个小型Drools项目,因为我想了解更多关于使用规则引擎的知识。我有一个名为Event的类,它有以下字段: <代码>字符串标记 可以是任何字符串的标记 我在我的知识库中插入了数百个事件实例,现在我想得到3个最近的事件,它们都标记为“OK”(确定)。我想出了以下代码,它可以工作: 但是我有一种感觉,应该有更好的方法来做到这一点。这很冗长,不容易重复使用:如果我想获取具有

  • 问题内容: 我正在使用String split方法,并且我想拥有最后一个元素。数组的大小可以更改。 例: 我想分割上面的字符串并得到最后一个项目: 我不知道在运行时数组的大小:( 问题答案: 将数组保存在局部变量中,并使用数组的字段查找其长度。减去1以说明它是基于0的: 注意:如果原始字符串仅由分隔符组成,例如或,则为0,这将引发ArrayIndexOutOfBoundsException。示例:

  • 是否可能只得到最新的产品的木商订单?我知道笔记作为例子是可能的: 我能为订单中的产品做些类似的吗?

  • 所以我试图弄清楚当玩家到达我的世界中的虚空时如何获得最后的伤害原因。但是,我找不到如何获得损坏器的方法。 这是我的事件处理器 不知怎的,当我(球员)受伤时,它会发出两条消息。有人能帮我吗?谢谢

  • 我想检索进程的最后2个版本: 预期结果将是: 我写了这个查询,但我不喜欢它。。。我不能将它作为子查询包含在中<代码>从中选择(从中选择子字符串索引(按顺序排列(ORDER BY

  • 问题内容: 所以我有一个弹性的搜索索引,并且要向它发送带有时间戳的文档。我想知道是否有一种方法可以根据时间戳提取最后一个文档。即说要弹性给我最后一次的文档。 谢谢。 问题答案: 是的,您只需索取一个文档()并通过减少时间戳进行排序