当前位置: 首页 > 知识库问答 >
问题:

如果消息是由生产者产生的,如何从Kafka经纪人那里得到确认?

耿联
2023-03-14

有没有其他方法可以得到认可?

@Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        System.out.println("Called Callback method");
        if (metadata != null) {
            System.out.println("message(" + key + ", " + message
                    + ") sent to partition(" + metadata.partition() + "), "
                    + "offset(" + metadata.offset() + ") in " + elapsedTime
                    + " ms");
        } else {
            exception.printStackTrace();
        }

    }

props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime(); 
String ip = "192.168.2."+ rnd.nextInt(255); 
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`

我使用kafka-client api 0.9.1和broker版本0.8.2。

共有1个答案

习哲彦
2023-03-14

所以我不能百分之百地确定Kafaka的哪种版本与哪种版本相一致。目前我使用的是0.8.2,我知道0.9引入了一些突破性的变化,但我不能肯定地告诉你现在什么是可行的/不可行的。

一个非常强烈的建议是,我会使用对应于您的代理版本的Kafka-Client版本。如果您使用的是Broker0.8.2,那么我也会使用kakfa-client0.8.2。

您从未提供过如何使用它的代码,所以我只是在暗中猜测。但是我已经通过在Producer中使用这个方法在Kafka0.8.2中实现了回调特性。下面是方法签名。

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = //data to send to kafka
prod.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null) {
      e.printStackTrace();
    } else {
      //implement logic here, or call another method to process metadata
      System.out.println("Callback");
    }
  }
}); 
 类似资料:
  • 我有一组Kafka代理实例作为集群运行。我有一个客户正在生产数据给Kafka: 当我们使用tcpdump进行监控时,我可以看到只有到broker1和broker2的连接被建立,而对于broker3,没有来自我的生产者的连接。我有一个只有一个分区的单一主题。 我的问题是: > 为什么在我的情况下,我无法连接到broker3?或者至少我的网络监控没有显示我的制作人与broker3建立了连接? 如果我能

  • 我有两台机器localhost和192.168.1.110来运行两台独立的单机kafka。 kafka2.11-0.10.0.0 bin/kafka-console-producer.sh--broker-list 192.168.1.110:9092--topic test这是一条消息[2016-08-24 18:15:27,441]错误将消息发送到topic test时出错,关键字:null,

  • 为什么消费者连接到zookeeper来检索分区位置?kafka制作者必须连接到其中一个代理来检索元数据。 我的观点是,当每个经纪人都已经有了所有必要的元数据来告诉生产者发送信息的位置时,动物园管理员到底有什么用?经纪人不能把同样的信息发送给消费者吗? 我可以理解为什么经纪人拥有元数据,而不必在每次向他们发送新消息时都与动物园管理员建立连接。动物园管理员有什么功能是我错过的吗?我发现很难想到为什么在

  • 我很感激你在这方面的帮助。 我正在构建一个ApacheKafka消费者,以订阅另一个已经运行的Kafka。现在,我的问题是,当我的制作人将消息推送到服务器时。。。我的消费者没有收到。。我在打印的日志中得到以下信息: 我不确定我是否遗漏了任何重要的配置。。。但是,我可以使用WireShark看到一些来自我的服务器的消息,但是我的消费者没有消费这些消息。。。。 我的代码是示例消费者示例的精确副本:ht

  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

  • 我正在使用Spring Boot中的。Java 8 我的主要目的是,消费者不应重复使用信息。 1)调用表获取100行并将其发送到kafka 2) 假设我处理了70行(我得到了成功确认),然后Kafka宕机了(Kafka在RETRY机制计时内无法恢复) 因此,当我重新启动Spring启动应用程序时,我如何确保不再发送这70条消息。 一种选择是我可以在数据库表消息 中使用标志。 还有其他有效的方法吗?