当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring integration配置具有多个分区的kafka producer主题

柴阳云
2023-03-14

我读了很多文章,但没有找到如何使用Spring Integration Kafka配置具有多分区主题(在运行时创建主题)的Producer。

我正在使用github链接来理解并为我的应用程序配置kafka。

请提供解决方案

还有一点,KafKaheader.MessageKey的用途是什么。

    KafkaProducerContext<String, SMSNotificationVO> ctx = new KafkaProducerContext<String, SMSNotificationVO>();
    ProducerMetadata<String, SMSNotificationVO> meta = new ProducerMetadata<String, SMSNotificationVO>("sms_topic");
    meta.setValueClassType(SMSNotificationVO.class);
    meta.setKeyClassType(String.class);
    meta.setValueEncoder(new SMSObjectSerializer());

    ProducerMetadata<String, SMSNotificationVO> meta1 = new ProducerMetadata<String, SMSNotificationVO>("sms_topic_10");
    meta1.setValueClassType(SMSNotificationVO.class);
    meta1.setKeyClassType(String.class);
    meta1.setValueEncoder(new SMSObjectSerializer());
    Properties producerProperties = new Properties();
    producerProperties.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, "1");//"message.send.max.retries"
    ProducerFactoryBean<String, SMSNotificationVO> producer = new ProducerFactoryBean<String, SMSNotificationVO>(meta,"192.168.1.147:9092");
    ProducerFactoryBean<String, SMSNotificationVO> producer1 = new ProducerFactoryBean<String, SMSNotificationVO>(meta1,"192.168.1.147:9092", producerProperties);


    ProducerConfiguration<String, SMSNotificationVO> p = new ProducerConfiguration<String, SMSNotificationVO>(meta, producer.getObject());
    ProducerConfiguration<String, SMSNotificationVO> p1 = new ProducerConfiguration<String, SMSNotificationVO>(meta1, producer1.getObject());

    Map<String, ProducerConfiguration<String, SMSNotificationVO>> map = new HashMap<String, ProducerConfiguration<String, SMSNotificationVO>>();
    map.put("sms_topic", p);
    map.put("sms_topic_10", p1);

    ctx.setProducerConfigurations(map);

    // java code to send message
    Map<String, String> params = new HashMap<>();
    params.put("Topic", "TEST MULTIPLE TOPIC : this is sms_topic_1");
    List<String> smsRecipients = new ArrayList<>();
    smsRecipients.add("9953225211");


     String message = "This Test message from junit topic sms_topic_1";
     Integer messageType =1;

    SMSNotificationVO vo = new SMSNotificationVO();
    vo.setMessage(message);
    vo.setMessageType(messageType);
    vo.setParams(params);
    vo.setSmsRecipients(smsRecipients);


    try{
     KafkaProducerMessageHandler<String, SMSNotificationVO> handler = new KafkaProducerMessageHandler<String, SMSNotificationVO>(ctx);
     handler.setMessageKeyExpression(new LiteralExpression("sms_topic_10"));
     handler.handleMessage(MessageBuilder.withPayload(vo)
             //if i remove the below comments I will get null pointer exception
             //.setHeader(KafkaHeaders.MESSAGE_KEY, "some key")
             //.setHeader(KafkaHeaders.PARTITION_ID, "1")
                .setHeader(KafkaHeaders.TOPIC, "sms_topic_10")
                .build());

    }catch(Exception e){
        e.printStackTrace();
        Assert.fail("Kafka SMS Producer fail");
    }

}  
}`

我得到空指针异常。下面是提及日志:

`org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler@7b94089b]; nested exception is java.lang.NullPointerException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at dd.kafka.producer.OutboundTests.mytest(OutboundTests.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130)
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:127)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:127)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at org.springframework.integration.kafka.support.ProducerConfiguration.send(ProducerConfiguration.java:70)
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:197)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:81)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 24 more`

谢谢

共有1个答案

唐声
2023-03-14

主题与生产者无关。在向主题发送或从主题接收消息之前,您应该预先配置主题。

您可以在运行时通过Spring Integration高级API使用adminutils.createTopic()创建主题,这只是为了方便测试,在生产中应该避免。

KafkaHeaders.MessageKey完全映射到标准的KafkaMessageKey并且可以使用yes来确定主题中的目标分区

因此,您肯定应该访问官方的Apache Kafka文档,了解如何创建带有“多分区”的主题,以及在producer端对partitionMessageKey参数做什么。

 类似资料:
  • 我想要任何关于Kafka如何维护消息序列的信息/解释,当消息被写入多个分区的主题时。例如,我有多个消息生成器,每个消息生成器按顺序生成消息,并用超过1个分区编写Kafka主题。在这种情况下,消费者组将如何工作来消费消息。

  • 我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?

  • 我使用的是Kafka 0.8.2,而我的使用者却出现了一个错误:“OFFSET commit FAILL with...”。当查看主题“__consumer_offsets”时。我看到它有50个分区计数。正常吗?我只能通过删除所有的Kafka日志并重新启动我的Kafka服务器来解决这个问题。是否有一种方法,我可以删除这个主题时,它达到一定数量的分区,还是我提交的偏移量是错误的? 下面是我提交偏移的

  • 我已经在c中创建了kafka消费者,并创建了一个具有10个分区的主题,当我尝试使用消费者读取数据时,它仅从2个分区读取,然后说没有更多的消息。我尝试使用这两种方法,即订阅和分配,但它们都不起作用。我应该如何将所有10个分区分配给单个使用者,这是将分区分配给使用者的正确方法吗?我已经使用此存储库构建了自定义消费者 https://github.com/edenhill/librdkafka/blob

  • 假设一个主题有3个kafka分区,我希望我的事件按小时窗口,使用事件时间。 当某个分区位于当前窗口之外时,kafka使用者是否会停止读取该分区?还是打开一个新窗口?如果它正在打开新的窗口,那么,如果一个分区的事件时间与其他分区相比会非常倾斜,那么从理论上讲,它不可能打开无限数量的窗口,从而耗尽内存吗?当我们重播一些历史时,这种情况尤其可能发生。 我一直试图从阅读留档中得到这个答案,但是在分区上找不

  • 对集群设置有点困惑: > Zookeeper可以通过配置myid(1,2,3...)将其设置为群集例如,在zoo.cfg文件中具有zookeeper1:2888:3888、zookeeper2:2889:3889 对于Kafka,在server.properties文件中,是否必须为参数zookeeper.connect指定zookeeper服务器的完整列表,还是只要1就足够了?有什么不同吗?我见