当前位置: 首页 > 知识库问答 >
问题:

如何编写脚本/代码来阻止apache kafka使用者使用消息?

姚建树
2023-03-14
private static Consumer<String, String> createConsumer() {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    // Create the consumer using props.
    consumer = new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(TOPIC));
    return consumer;
}

public static void main(String args[]) throws Exception {
    runConsumer();
}

private static void runConsumer(){
    Consumer<String, String> consumer = createConsumer();
    try{
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); 
            }
            consumer.commitAsync();
        }
    }catch(WakeupException wue){
        System.out.println("Wake Up Exception Occured");
        wue.printStackTrace();
    }
    catch(Exception e){
        e.printStackTrace();
    }finally {
        consumer.close();
    }
}`

共有1个答案

鲜于华容
2023-03-14

使用close方法关闭KafKaconsumer只需调用Consumer.close();

公共无效关闭()

关闭使用者,等待最多30秒的默认超时以进行任何需要的清理。如果启用了自动提交,这将在可能的默认超时内提交当前偏移量。有关详细信息,请参见close(long,TimeUnit)。注意,wakeup()不能用于中断关闭。

 类似资料:
  • 我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放 我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。 消费者设置: 生产者设置: 豆接线: 骆驼路线: 但

  • 我在使用spring WebSockets时遇到以下错误: 用例:在我们的服务器端代码中,我们有一个在数据库中搜索值的功能。如果这些值不在数据库中。它将访问servlet并获取数据。第二部分,即访问servlet并获取数据是异步调用。 因此,对于一个请求,我们必须在数据库中搜索多个内容… 示例:在request中,我们得到了一个参数channel:1这个channel映射到多个ID,比如1映射到1

  • 问题内容: 例如,我有一个名为的文件。其内容是: 所以在这里我想为每个人添加图片链接 如何编写脚本以将密钥添加到每个人并添加person.name.lowercase +“ .png”作为值? 在此过程结束时,将对people.json进行编辑并将其保存到硬件中,而不是内存中。 非常感谢你。 问题答案: 这是一个完整的程序,使用JavaScript(使用node.js),可以完成所需的工作: 作为

  • 问题内容: 我正在尝试使用一个数据库中存在但另一个数据库中没有的information_schema.routines查询存储过程定义的列表。 这给了我一行查询的定义,所以当我有这样的评论时 SQL被注释掉了,我不能使用该定义来创建SP。 如何使用适当的换行符将其解析回去? 还是 有更好的方法(使用代码)来获取这些脚本? 问题答案: 存储过程仅在Management Studio中显示在一行上。如

  • 本文向大家介绍如何使用JavaScript集更快地编写代码?,包括了如何使用JavaScript集更快地编写代码?的使用技巧和注意事项,需要的朋友参考一下 要了解如何使用集合使代码更快,我们必须首先了解必须使用集合而不是数组的场景- 由于Set仅包含唯一元素,因此如果我们事先知道我们希望避免将重复数据保存到我们的结构中会更容易。 设置像的基本操作,,,等...很容易实现的有效基础上,原生内置业务提

  • 警告:在命令行界面上使用密码可能不安全。 但是,我必须使用存储密码的环境变量()来执行上述语句,因为我希望从Terminal内部在bash脚本中迭代运行该命令,并且我绝对不喜欢等待提示符出现并强迫我在一个脚本中输入50或100次密码的想法。所以我的问题是: > 压制警告是否可行?正如我所说的,该命令工作正常,但当我循环运行该命令50或100次时,窗口变得非常混乱。 我是否应该服从警告信息,不在我的