当前位置: 首页 > 知识库问答 >
问题:

如何使用spring boot app中的“li apache kafka客户端”从kafka producer发送大消息(1MB以上)?

阎伟志
2023-03-14

如何在spring boot app中使用li apache kafka客户端从kafka制作人向kafka消费者发送大消息(1MB以上)?以下是li apache kafka客户端的GitHub链接:https://github.com/linkedin/li-apache-kafka-clients

我已经导入了。li apache kafka客户端的jar文件,并为producer设置以下配置:

props.put("large.message.enabled", "true");
props.put("max.message.segment.bytes", 1000 * 1024);
props.put("segment.serializer", DefaultSegmentSerializer.class.getName());

对于消费者:

message.assembler.buffer.capacity, 
max.tracked.messages.per.partition, 
exception.on.message.dropped, 
segment.deserializer.class

但对于大型消息,仍然会出现错误。请帮我解决错误。

下面是我的代码,请告诉我需要在哪里创建LiKafkaProducer:

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.boot.server}")
    private String kafkaServer;


    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

    @Bean
    public Map<String, Object> producerConfig() {
        // TODO Auto-generated method stub
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put("bootstrap.servers", "localhost:9092");
        config.put("acks", "all");
        config.put("retries", 0);
        config.put("batch.size", 16384);
        config.put("linger.ms", 1);
        config.put("buffer.memory", 33554432);

        // The following properties are used by LiKafkaProducerImpl
        config.put("large.message.enabled", "true");
        config.put("max.message.segment.bytes", 1000 * 1024);
        config.put("segment.serializer", DefaultSegmentSerializer.class.getName());
        config.put("auditor.class", LoggingAuditor.class.getName());

        return config;
    }
}

@RestController
@RequestMapping("/kafkaProducer")
public class KafkaProducerController {

    @Autowired
    private KafkaSender sender;

    @PostMapping
    public ResponseEntity<List<Student>> sendData(@RequestBody List<Student> student){

        sender.sendData(student);
        return new ResponseEntity<List<Student>>(student, HttpStatus.OK);
    }
}

@Service
public class KafkaSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.topic.name}")
    private String topicName;

    public void sendData(List<Student> student) {

        // TODO Auto-generated method stub
        Map<String, Object> headers = new HashMap<>();
        headers.put(KafkaHeaders.TOPIC, topicName);
        headers.put("payload", student.get(0));

        // Construct a JSONObject from a Map.
        JSONObject HeaderObject = new JSONObject(headers);
        System.out.println("\nMethod-2: Using new JSONObject() ==> " + HeaderObject);
        final String record = HeaderObject.toString();

        Message<String> message = MessageBuilder.withPayload(record).setHeader(KafkaHeaders.TOPIC, topicName)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "Message")
                .build();

        kafkaTemplate.send(topicName, message.toString());
    }
}

共有1个答案

乐正秦斩
2023-03-14

您需要实现自己的消费工厂(ConsumerFactory)和生产工厂(ProducerFactory),以分别创建LiKafkaConsumer和LiKafkaProducer。

您应该能够子类化框架提供的默认工厂。

 类似资料:
  • 我正在通过在WebSocketClientFlow上遵循doc来尝试客户端websocket。

  • 问题内容: 我将实现类似于Facebook通知和此网站的内容(StackOverflow的通知会通知我们是否有人为我们的问题写评论/答案等)。请注意,用户将使用我的应用程序作为网站而不是移动应用程序。 我遇到以下获取结果的答案,但我需要推送结果而不是获取结果。 根据建议,我在实体类中创建了一个简单方法,并向其中添加了@PostPersist,但此方法不起作用,因此基于此答案,我添加了persist

  • 问题内容: 我从socket.io + node.js开始,我知道如何在本地发送消息和广播功能:-所有连接的客户端都收到相同的消息。 现在,我想知道如何向特定的客户端发送私人消息,我的意思是一个套接字,用于2个人之间的私人聊天(客户端到客户端流)。谢谢。 问题答案: 当用户连接时,它应使用唯一的用户名(例如电子邮件)向服务器发送消息。 一对用户名和套接字应存储在这样的对象中: 在客户端上,使用以下

  • 我需要一个推送消息服务器的方案。 现在我选择AutobahnJs+AutobahnPython方案。 AutobahnJs库使用html5的websocket连接AutobahnPython使用的服务器 在javascript中,使用订阅通道,每个客户端使用相同的订阅通道。clientD将消息发布到通道(与订阅通道相同),服务器将消息传递给订阅该通道的所有客户端。 这里有一个问题,我需要应用程序(

  • 在文档的socket.io客户端套接字部分http://socket.io/docs/client-api/#io#socket指的是套接字文档,这意味着服务器套接字对象和客户端套接字对象是相同的。

  • 我尝试将Spring与websocket一起使用。我从本教程开始调查。 在我的侧客户端,我有类似的东西来初始化到服务器的连接: 它工作得很好,在我的控制器中,我可以在下面的类中执行我的过程: 现在我想做的是让一个线程向监听“/主题/问候”的客户端发送消息。我这样写Runnable类: 这样完成了我的控制器: 该方法采用光电控制器。fireGreeting按我的要求调用,但客户端没有发生任何事情。有