当前位置: 首页 > 知识库问答 >
问题:

Spring-Kafka集成 1.0.0.发布 生产者问题

杨起运
2023-03-14

我无法使用Spring Kafka集成发布消息,尽管我的Kafka Java客户端工作正常。

Java代码在Windows上运行,Kafka在Linux上运行。

 KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
    ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>("test-cass");
    producerMetadata.setValueClassType(String.class);
    producerMetadata.setKeyClassType(String.class);
    Encoder<String> encoder = new StringEncoder<String>();
    producerMetadata.setValueEncoder(encoder);
    producerMetadata.setKeyEncoder(encoder);

    ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata, "172.16.1.42:9092");

    ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
    kafkaProducerContext.setProducerConfigurations(Collections.singletonMap("test-cass", config));
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<String, String>(kafkaProducerContext);
    handler.handleMessage(MessageBuilder.withPayload("foo")
            .setHeader("messagekey", "3")
            .setHeader("topic", "test-cass")
            .build());

我得到以下错误

"C:\Program Files\Java\jdk1.7.0_71\bin\java" -Didea.launcher.port=7542 "-Didea.launcher.bin.path=C:\Program Files (x86)\JetBrains\IntelliJ IDEA 13.1.6\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.7.0_71\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\jce.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\jfxrt.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\resources.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\rt.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.7.0_71\jre\lib\ext\zipfs.jar;C:\projects\SpringCassandraInt\target\classes;C:\Users\hs\.m2\repository\org\springframework\data\spring-data-cassandra\1.1.2.RELEASE\spring-data-cassandra-1.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\data\spring-cql\1.1.2.RELEASE\spring-cql-1.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-context\4.1.4.RELEASE\spring-context-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-aop\4.1.4.RELEASE\spring-aop-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;C:\Users\hs\.m2\repository\org\springframework\spring-beans\4.0.9.RELEASE\spring-beans-4.0.9.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-core\4.1.2.RELEASE\spring-core-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;C:\Users\hs\.m2\repository\org\springframework\spring-expression\4.1.2.RELEASE\spring-expression-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-tx\4.1.4.RELEASE\spring-tx-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\data\spring-data-commons\1.9.2.RELEASE\spring-data-commons-1.9.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\slf4j\slf4j-api\1.7.10\slf4j-api-1.7.10.jar;C:\Users\hs\.m2\repository\org\slf4j\jcl-over-slf4j\1.7.10\jcl-over-slf4j-1.7.10.jar;C:\Users\hs\.m2\repository\com\datastax\cassandra\cassandra-driver-dse\2.0.4\cassandra-driver-dse-2.0.4.jar;C:\Users\hs\.m2\repository\com\datastax\cassandra\cassandra-driver-core\2.0.4\cassandra-driver-core-2.0.4.jar;C:\Users\hs\.m2\repository\io\netty\netty\3.9.0.Final\netty-3.9.0.Final.jar;C:\Users\hs\.m2\repository\com\codahale\metrics\metrics-core\3.0.2\metrics-core-3.0.2.jar;C:\Users\hs\.m2\repository\com\google\guava\guava\15.0\guava-15.0.jar;C:\Users\hs\.m2\repository\org\liquibase\liquibase-core\3.1.1\liquibase-core-3.1.1.jar;C:\Users\hs\.m2\repository\org\yaml\snakeyaml\1.13\snakeyaml-1.13.jar;C:\Users\hs\.m2\repository\ch\qos\logback\logback-classic\1.1.2\logback-classic-1.1.2.jar;C:\Users\hs\.m2\repository\ch\qos\logback\logback-core\1.1.2\logback-core-1.1.2.jar;C:\Users\hs\.m2\repository\org\springframework\integration\spring-integration-core\4.1.2.RELEASE\spring-integration-core-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\projectreactor\reactor-core\1.1.4.RELEASE\reactor-core-1.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\com\goldmansachs\gs-collections\5.0.0\gs-collections-5.0.0.jar;C:\Users\hs\.m2\repository\com\goldmansachs\gs-collections-api\5.0.0\gs-collections-api-5.0.0.jar;C:\Users\hs\.m2\repository\com\lmax\disruptor\3.2.1\disruptor-3.2.1.jar;C:\Users\hs\.m2\repository\io\gatling\jsr166e\1.0\jsr166e-1.0.jar;C:\Users\hs\.m2\repository\org\springframework\retry\spring-retry\1.1.1.RELEASE\spring-retry-1.1.1.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-messaging\4.1.4.RELEASE\spring-messaging-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\integration\spring-integration-stream\4.1.2.RELEASE\spring-integration-stream-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\integration\spring-integration-xml\4.1.2.RELEASE\spring-integration-xml-4.1.2.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\spring-oxm\4.1.4.RELEASE\spring-oxm-4.1.4.RELEASE.jar;C:\Users\hs\.m2\repository\org\springframework\ws\spring-xml\2.2.0.RELEASE\spring-xml-2.2.0.RELEASE.jar;C:\Users\hs\.m2\repository\com\jayway\jsonpath\json-path\1.2.0\json-path-1.2.0.jar;C:\Users\hs\.m2\repository\net\minidev\json-smart\2.1.0\json-smart-2.1.0.jar;C:\Users\hs\.m2\repository\net\minidev\asm\1.0.2\asm-1.0.2.jar;C:\Users\hs\.m2\repository\asm\asm\3.3.1\asm-3.3.1.jar;C:\Users\hs\.m2\repository\org\springframework\integration\spring-integration-kafka\1.0.0.RELEASE\spring-integration-kafka-1.0.0.RELEASE.jar;C:\Users\hs\.m2\repository\org\apache\avro\avro-compiler\1.7.6\avro-compiler-1.7.6.jar;C:\Users\hs\.m2\repository\org\apache\avro\avro\1.7.6\avro-1.7.6.jar;C:\Users\hs\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.9.13\jackson-core-asl-1.9.13.jar;C:\Users\hs\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.9.13\jackson-mapper-asl-1.9.13.jar;C:\Users\hs\.m2\repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;C:\Users\hs\.m2\repository\org\xerial\snappy\snappy-java\1.0.5\snappy-java-1.0.5.jar;C:\Users\hs\.m2\repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;C:\Users\hs\.m2\repository\org\tukaani\xz\1.0\xz-1.0.jar;C:\Users\hs\.m2\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;C:\Users\hs\.m2\repository\org\apache\velocity\velocity\1.7\velocity-1.7.jar;C:\Users\hs\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\hs\.m2\repository\com\yammer\metrics\metrics-annotation\2.2.0\metrics-annotation-2.2.0.jar;C:\Users\hs\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\hs\.m2\repository\org\apache\kafka\kafka_2.10\0.8.1.1\kafka_2.10-0.8.1.1.jar;C:\Users\hs\.m2\repository\org\apache\zookeeper\zookeeper\3.3.4\zookeeper-3.3.4.jar;C:\Users\hs\.m2\repository\log4j\log4j\1.2.15\log4j-1.2.15.jar;C:\Users\hs\.m2\repository\javax\mail\mail\1.4\mail-1.4.jar;C:\Users\hs\.m2\repository\javax\activation\activation\1.1\activation-1.1.jar;C:\Users\hs\.m2\repository\javax\jms\jms\1.1\jms-1.1.jar;C:\Users\hs\.m2\repository\com\sun\jdmk\jmxtools\1.2.1\jmxtools-1.2.1.jar;C:\Users\hs\.m2\repository\com\sun\jmx\jmxri\1.2.1\jmxri-1.2.1.jar;C:\Users\hs\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\hs\.m2\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;C:\Users\hs\.m2\repository\org\scala-lang\scala-library\2.10.1\scala-library-2.10.1.jar;C:\Users\hs\.m2\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar;C:\Program Files (x86)\JetBrains\IntelliJ IDEA 13.1.6\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain com.agillic.dialogue.kafka.outbound.SpringKafkaTest
15:39:11.736 [main] INFO  o.s.i.k.support.ProducerFactoryBean - Using producer properties => {metadata.broker.list=172.16.1.42:9092, compression.codec=0}
2015-02-19 15:39:12 INFO  VerifiableProperties:68 - Verifying properties
2015-02-19 15:39:12 INFO  VerifiableProperties:68 - Property compression.codec is overridden to 0
2015-02-19 15:39:12 INFO  VerifiableProperties:68 - Property metadata.broker.list is overridden to 172.16.1.42:9092
15:39:12.164 [main] INFO  o.s.b.f.config.PropertiesFactoryBean - Loading properties file from URL [jar:file:/C:/Users/hs/.m2/repository/org/springframework/integration/spring-integration-core/4.1.2.RELEASE/spring-integration-core-4.1.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
15:39:12.208 [main] DEBUG o.s.i.k.o.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler@5204db6b received message: GenericMessage [payload=foo, headers={timestamp=1424356752208, id=00c483d9-ecf8-2937-4a2c-985bd3afcae4, topic=test-cass, messagekey=3}]
Exception in thread "main" org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler@5204db6b]; nested exception is java.lang.NullPointerException
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
    at com.agillic.dialogue.kafka.outbound.SpringKafkaTest.main(SpringKafkaTest.java:40)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.lang.NullPointerException
    at org.springframework.integration.kafka.support.KafkaProducerContext.getTopicConfiguration(KafkaProducerContext.java:58)
    at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:190)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:81)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    ... 6 more

Process finished with exit code 1

共有1个答案

呼延钱明
2023-03-14

实际上,当我们引入<code>KafkaHeaders</code>时,我们做了适当的文档更改:https://github.com/spring-projects/spring-integration-kafka/blob/master/README.md.参见重要说明:

自上次Milestone以来,我们引入了带有常量的Kafkahead ers接口。消息键和主题默认标头现在需要kafka_前缀。从早期版本迁移时,您需要在…上指定headers.message键和主题表达式=headers.topic,或者使用或消息构建器将上游标头更改为Kafkahead ers的新标头。当然,如果您使用常量值,请在适配器上配置它们。

更新

关于NullPointerExcsion:这真的是一个问题。请随意提出JIRA票证,我们会处理的。我们甚至欢迎您的贡献!

 类似资料:
  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我有一个用例“XML文件==>Kafka主题==>Build REST API to Query”来自Kafka主题的数据。我熟悉将数据转换为Avro格式,并编写到kafka主题。 您能建议如何发布XML吗?

  • 我正在使用Spring Kafka 2.3.9编写一个Kafka制作人,该制作人假设向一个主题发布大约200000条消息。例如,我有一个从数据库中提取的200000个对象的列表,我想将这些对象的json消息发布到一个主题。 我写的制作人在发布1000条消息方面做得很好。然后它创建了一些空指针错误(我已经包括了下面的屏幕截图)。 在调试过程中,我发现Kafka Producer网络线程的数量非常高。

  • 一、生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前已经在 Prod

  • Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?

  • Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用