我试图创建一个简单的程序来打印一个Kafka主题的Kstream。我不断地得到一个NPE和完全没有想法。
我已经使用了spring cloud-stream-binder-kafka-streams依赖项,并且我正在使用spring cloud的最新版本“Finchley.m9”。
我写的代码是:
@Component
@EnableBinding(KafkaStreamsProcessor.class)
public class EventListener{
@StreamListener("input")
public void listen(KStream<String,String> kstream){
kstream.print();
}
}
Application.Properties具有:
spring.cloud.stream.bindings.input.destination=slot-events
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.binder.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.binder.valueSerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.applicationId=listener
spring.cloud.stream.kafka.streams.binder.configuration.zookeeper.connect=localhost:2181
当我启动服务时,我在控制台上不断得到以下错误:
018-03-31 22:57:52.641 INFO 26301 --- [ main] sStreamListenerSetupMethodOrchestrator$1 : values:
application.id = default
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 1000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
timestamp.extractor = null
value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect = localhost:2181
2018-03-31 22:57:52.656 INFO 26301 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2018-03-31 22:57:52.685 INFO 26301 --- [ main] ConditionEvaluationReportLoggingListener :
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2018-03-31 22:57:52.693 ERROR 26301 --- [ main] o.s.boot.SpringApplication : Application run failed
java.lang.IllegalStateException: java.lang.NullPointerException
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:273) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:154) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_161]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:777) ~[spring-beans-5.0.4.RELEASE.jar!/:5.0.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:868) ~[spring-context-5.0.4.RELEASE.jar!/:5.0.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.0.4.RELEASE.jar!/:5.0.4.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:388) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1246) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1234) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at dg.athena.sideprojects.kafkastreampoc.KafkastreampocApplication.main(KafkastreampocApplication.java:15) [classes!/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
Caused by: java.lang.NullPointerException: null
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.getkStream(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:294) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:235) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
... 24 common frames omitted
谁能建议一下吗?
这是一个在最新快照中得到修复的bug。您能尝试将绑定器升级到1.0.0.build-Snapshot,然后再试一次吗?
使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?
我正在开发SpringCloudStream的Brooklyn.Release版本。我的用例具有多个接收器的HttpSource。当我将初学者应用程序依赖项添加到应用程序中并使用它时,如下所示: 我的聚合应用程序是 一直得到如下响应:
问题内容: 我 必须 使用商业Java库,并且想从Python中做到这一点。Jython非常强大,在后面有一些发行版本的情况下,我对此表示满意。但是,我 也想 使用NumPy,它显然不适用于Jython。CPype和Java数字库等选项不受欢迎。前者基本上已经死了。后者大多不成熟,缺乏NumPy的易用性和广泛接受性。我的问题是:如何使Jython和Python代码互操作?从Cpython或其他方式
问题内容: 我正在建立一个图表,我想接收每个月的数据。 这是我的第一个有效的要求: 但是,就我而言,我必须编写12个请求以接收前12个月的数据,有什么技巧可以避免写: 我听说过INTERVAL,我去了MySQL文档,但是我没有实现它。 有使用INTERVAL的例子吗? 问题答案: 您需要: 应该管用。
问题内容: 我一直在阅读此选择器,并得到相互矛盾的答案。 在:/ deep /和::shadow在CSS选择器中是什么意思? 我们看: 正如Joel H.在评论中指出的那样,Chrome自此之后就弃用了/ deep /组合器,并在IE中给出了语法错误。 我们看: / deep /不再存在,所以我认为我们不应该支持它。>>>是新版本,可能应该支持 我们看: / deep /选择器还具有别名>>>。我
问题内容: 我想在我的Flask应用程序中包含一个sass编译器。有一种普遍接受的方法吗? 问题答案: Flask-Assets扩展(使用webassets库)可以用于此目的。以下是将其配置为使用SCSS的pyScss编译器(在Python中实现)的方法: 并在模板中包括以下内容: SCSS文件也将在调试模式下进行编译。 pyScss只支持SCSS语法,但也有其他过滤器(,和)使用原始的Ruby实
问题内容: 我正在尝试学习python,mongodb和flask,并使用了Miguel Grinberg的VERY EXCELLENT博客,他在blog.miguelgrinberg.com上提供了很多教程 我有一个小型的RESTful服务器,可以正常工作,但现在想从mongo而不是mysql中提取内容 我可以使用下面的代码提取一条mongo记录,但正在努力使其呈现。 我在下面的代码中使用箭头来
修复我的函数,使用聚合查询,计算每个用户的推文数量,将它们添加到数组中,并返回推文最多的5个用户。 使用聚合查询,计算每个用户的tweet数量。在相同的阶段,使用累积每个用户的所有tweet文本。 将你的输出限制在推特最多的5个用户。 结果文档应仅包括以下字段: (用户的屏幕名称), (为用户找到的推文数量), (为用户找到的tweet文本列表) 为了实现前面的目标,我正在测试以下代码: 首先,我