当前位置: 首页 > 知识库问答 >
问题:

kafka.common.FailedToSendMessageException: kafka produce error

席宜修
2023-03-14

我正在读取一个 json 文件并尝试使用 kafka 生成它。这是我的代码:

public class FlatFileDataProducer {

    private String topic = "JsonTopic";
    private Producer<String, String> producer = null;
    KeyedMessage<String, String> message = null;
    public JsonReader reader;

    public void run(String jsonPath) throws ClassNotFoundException, FileNotFoundException, IOException, ParseException{
        reader = new JsonReader();
        System.out.println("---------------------");
        System.out.println("JSON FILE PATH IS : "+jsonPath);
        System.out.println("---------------------");
        Properties prop = new Properties();
        prop.put("metadata.broker.list", "192.168.63.145:9092");
        prop.put("serializer.class", "kafka.serializer.StringEncoder");
        // prop.put("partitioner.class", "example.producer.SimplePartitioner");
        prop.put("request.required.acks", "1");


        ProducerConfig config = new ProducerConfig(prop);
        producer = new Producer<String, String>(config);
        List<Employee> emp = reader.readJsonFile(jsonPath);     
        for (Employee employee : emp) 
        {
            System.out.println("---------------------");
            System.out.println(employee.toString());
            System.out.println("---------------------");
            message = new KeyedMessage<String, String>(topic, employee.toString());

            producer.send(message);
            producer.close();

        }
         System.out.println("Messages to Kafka successfully");
    }

读取 json 文件的代码是:

public List<Employee> readJsonFile(String path) throws FileNotFoundException, IOException, ParseException{
        Employee employee = new Employee();
        parser=new JSONParser();
        Object obj = parser.parse(new FileReader(path));
        JSONObject jsonObject = (JSONObject) obj;
        employee.setId(Integer.parseInt(jsonObject.get("id").toString()));      
        employee.setName((String)jsonObject.get("name"));
        employee.setSalary(Integer.parseInt(jsonObject.get("salary").toString()));
        list.add(employee);
        return list;
    }

但当我执行程序时,问题1:

> [root@sandbox ~]# java -jar sparkkafka.jar /root/customer.json
> JSON FILE PATH IS : /root/customer.json
>  log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please
> initialize the log4j system properly.
> 1,Smith,25
> Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages
> after 3 tries.
>         at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
>         at kafka.producer.Producer.send(Producer.scala:77)
>         at kafka.javaapi.producer.Producer.send(Producer.scala:33)
>         at com.up.jsonType.FlatFileDataProducer.run(FlatFileDataProducer.java:41)
>         at com.up.jsonType.FlatFileDataProducer.main(FlatFileDataProducer.java:49)

它给出了错误,但是当我检查cosumer shell时,我得到如下结果:对于JSON文件中的一行,我在shell中看到4个条目..问题2:

[root @ sandbox bin]#[root @ sandbox bin]#。/Kafka-console-consumer . sh-zookeeper localhost:2181-topic JSON topic-从头开始

1,Smith,25
1,Smith,25
1,Smith,25
1,Smith,25

我得到了相同数据的4倍。

共有2个答案

汤承允
2023-03-14

能否尝试添加以下属性:

prop.put("producer.type","async");
邵兴文
2023-03-14

您需要删除以下两个属性:

    //prop.put("request.required.acks", "1");
    //prop.put("producer.type","async");

此属性实际上会考虑确认。

 类似资料:

相关问答

相关文章

相关阅读