当前位置: 首页 > 知识库问答 >
问题:

kafka客户端Api问题

钱俊楚
2023-03-14

我使用的是kafka-clients-0.10.1.1(单节点单代理)

auto.create.topics.enable的默认值为true。

1.我正在使用以下方式向主题发送消息:

    kafkaProdcuer<String,String> producer> producer...
    producer.send(new ProducerRecord<String, String>("my- topic","message"));
    producer.close();

用于消费:

    kafkaConsumer<String,String> consumer....
    consumer.subscribe(Arrays.asList("my-topic"));
    ConsumerRecords<String, String> records = consumer.poll(200);

    while(true){
     for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
         }
     }

共有1个答案

勾长卿
2023-03-14

1)您是否总是在消费者中使用相同的group.id?您是否先生产后消费?这可能与消费者群体和抵销管理有关。请看关于消费者抵销行为的答案。

2)不确定你是有意阅读副本还是无意阅读副本。只要该消息未因主题保留策略而被删除,您就可以再次阅读相同的消息,并找到该位置。如果您的意思是无意中,将auto-commit设置为false只是意味着使用者不会为您提交偏移量,那么您必须手动调用commitSync()或commitAsync()来完成此操作。在任何情况下,您的使用者仍有可能处理消息并在提交之前崩溃,在这种情况下,当使用者恢复时,它将再次读取那些已处理但未提交的消息。如果您只想要一次语义化,那么您必须做一些其他的事情,比如用处理过的消息原子地存储偏移量。

3)正如Lhfcws所提到的,在一个流中没有像“所有消息”这样的概念。你可以做的一些事情(技巧)是:

  • 您可以检查轮询返回的记录列表是否为空,并且在配置了一定次数后,中断循环并退出。
  • 如果消息是有序的(您正在从单个分区读取),您可以发送一种特殊的END_OF_DATA消息,当您看到它时,您将关闭使用者。
  • 您可以让使用者读取多条消息,然后退出,下次它将从上次提交的偏移量继续。
 类似资料:
  • ngrok客户端公开了一个REST API,它授予对以下各项的编程访问权限: 收集状态和指标信息 收集和重放捕获的请求 动态启动和停止隧道 基本URL和身份验证 Base URL http://127.0.0.1:4040/api Authentication None ngrok客户端API作为ngrok的本地Web检查接口的一部分公开。因为它在本地接口上提供,所以API没有身份验证。如果您覆盖

  • http://redis.cn/clients.html

  • Docusaurus 提供了一组客户端 API,对于构建网站很有帮助。 组件 <Head/> 这是一个可重用 React 组件,用于管理对 HTML 文档标头(即 <head> 中的标签)的修改。此组件接收纯 HTML 标签并输出纯 HTML 标签,对初学者很友好。此组件始对 React Helmet 的二次包装。 用法示例: import React from 'react'; import

  • 5.3.1. Getting started with the client API 开始 当使用Jersey JAX-RS 客户端支持的依赖关系,请查看依赖。 您可能还希望使用一个自定义连接器实现。在这种情况下,您将需要包含额外的依赖模块包含您想要使用的自定义客户端连接器。请参见“配置自定义连接器”关于如何使用和配置自定义 Jersey 客户端传输连接器。 5.3.2. Creating and

  • 执行kafka客户端的生产者/消费者连接池有意义吗? kafka是否在内部维护已初始化并准备好使用的连接对象列表? 我们希望最小化连接创建的时间,这样在发送/接收消息时就不会有额外的开销。 目前,我们正在使用apache共享池库来保持连接。 任何帮助都将不胜感激。

  • DropWizard在REST的引擎盖下使用泽西。我正在尝试弄清楚如何为我的DropWizard应用程序将公开的RESTfulendpoint编写客户端。 为了这个例子,让我们假设我的DropWizard应用程序有一个< code>CarResource,它为CRUDding cars公开了几个简单的RESTfulendpoint: 因此,我认为结构化的API客户端将类似于: 但是我能找到的关于D