当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka Java生产商Scala消费者缺少流

从智明
2023-03-14

我用的是阿帕奇·Kafka。我创建了一个war文件,其中生产者用Java编码,消费者用Scala编码。制作人正在从HTML页面获取数据。我可以看到,生产商发布的大部分数据都是关于消费者的,但有些数据缺失。

这是我的制片人代码

文件1

package com.cts.rest;

import java.util.Properties;

import kafka.producer.ProducerConfig;

public class Configuration {

static ProducerConfig setKafkaProducerParameter() {
    Properties properties = new Properties();
    properties.put("zk.connect", "localhost:2181");
    properties.put("metadata.broker.list", "localhost:9092");
    properties.put("serializer.class", "kafka.serializer.StringEncoder");
    properties.put("acks", 0);  
    ProducerConfig producerConfig = new ProducerConfig(properties);
    return producerConfig;
    }

}

文件2

package com.cts.rest;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


public class RTTSKProducer {

static void sendDataToProducer(String line){

    ProducerConfig producerConfig = configuration.setKafkaProducerParameter();
    Producer<String, String> producer = new Producer<String, String>(producerConfig);       

    String topic = "jsondata";      
    KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, line);
    System.out.print(msg);
    producer.send(msg);
    producer.close();
            }
    }

现在,我使用以下命令检查消费者的消息。

bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic jsondata --from-beginning

我是否缺少任何生产者配置?

共有1个答案

彭宜人
2023-03-14

您可以尝试增加ack配置以确保更耐用。最重要的是,您应该调用带有回调函数的“发送”方法来处理那些没有成功发布到Kafka的消息,如下所示:

producer.send(myRecord,
           new Callback() {
               public void onCompletion(RecordMetadata metadata, Exception e) {
                   if(e != null)
                       e.printStackTrace();
                   System.out.println("The offset of the record we just sent is: " + metadata.offset());
               }
           });
 类似资料:
  • 我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统