当前位置: 首页 > 知识库问答 >
问题:

kafka consumer用于动态检测添加的主题

洪国兴
2023-03-14
    null

但问题是,如果主题是动态创建的(我的意思是说在使用者代码启动之后),它将不起作用,但API说它将支持动态主题创建。这里是供你参考的链接。

Kafka版本:0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaConsumer.html

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

任何帮助都是非常感谢的...

共有1个答案

董子平
2023-03-14

在阿帕奇Kafaka的邮件档案中有一个答案。我将其复制如下:

使用者支持一个配置选项“metadata.max.age.ms”,它基本上控制获取主题元数据的频率。默认情况下,这个值设置得相当高(5分钟),这意味着最多需要5分钟来发现与正则表达式匹配的新主题。您可以将其设置得更低,以便更快地发现主题。

所以在你的道具中你可以:

props.put("metadata.max.age.ms", 5000);
 类似资料:
  • 问题内容: 我本周刚开始使用AngularJS进行一个新项目,所以我必须尽快加快速度。 我的要求之一是动态添加html内容,并且该内容可能带有click事件。 因此,我在下面的代码Angular代码中显示了一个按钮,当单击该按钮时,它会动态添加另一个按钮。单击动态添加的按钮,应该添加另一个按钮,但是我无法让ng- click来处理动态添加的按钮 工作代码示例在此处 http://plnkr.co/

  • 将 Azure Monitor 开放遥测添加到 Java 应用程序的有文档记录的方法是下载应用程序洞察代理 3.2.11.jar并使用以下命令:-javaagent:路径/收件人/应用程序洞察-代理-3.2.11.jar。 因此,在Spring Boot中,它可能的运行方式是:

  • 我需要将预填充的PDF/XFA表单设置为只读(因为在没有输入的情况下,例如文本、复选框、单选按钮等可以更改其值)。 对于常规AcroForms PDF和静态XFA表单,我可以通过在PdfStamper实例上调用setFormFlatting(true)来实现这一点。对于动态XFA表单,我必须将XDP的“字段”节点的“访问”属性设置为“只读”。 问题是,如何检测表单是否是动态XFA?不区分静态或动态

  • 有没有办法动态设置@JsonProperty注释,例如: 或者我可以简单地重命名实例的字段吗?如果是这样,请给我一个主意。此外,可以以何种方式与序列化一起使用?

  • Hibernate 的用户曾要求一个既可自动分配新持久化标识(identifier)保存瞬时(transient)对象,又可更新/重新关联脱管(detached)实例的通用方法。saveOrUpdate() 方法实现了这个功能。 // in the first session Cat cat = (Cat) firstSession.load(Cat.class, catID); // in a

  • 我在这里查看的是KafkaConsumer源代码。通过查看源代码(java版本,我不太熟悉),暂停一个topicpartition似乎将其偏移量状态设置为Paused。我试图弄清楚KafkaConsumer在后台执行的to获取操作发生了什么,暂停主题分区是否会停止kafka使用者执行的预取操作,还是它继续预取,但在轮询时不返回该主题分区的ConsumerRecords。