当前位置: 首页 > 知识库问答 >
问题:

导致IllegalArgumentException的KStream.Map中的Null值:有效负载不能为Null

晁砚
2023-03-14

我正在创建一个spring cloud Kafka流应用程序。我有一个输入主题和一个输出主题,我试图使用KStream.map函数对输入主题应用KStream键值转换操作
问题如果转换后的值为null,该函数将抛出IllegalArgumentException
我的问题是:
1:异常的原因?尽管文档中说:“忽略具有空键或空值的输入记录”
2:处理无状态/有状态操作中异常的最佳实践?一个try/catch围绕整个处理计划是否就足够了?或者我应该在每个转换函数中都有一个try/catch(例如,筛选、映射、联接、减少)?

spring:
  application:
    name:kafka-streams-test
  cloud.stream:
    kafka.streams:
      binder:
        brokers: localhost:9093
        configuration:
          commit.interval.ms: 1000
          security.protocol: SASL_PLAINTEXT
          sasl.mechanism: GSSAPI
          sasl.kerberos.service.name: kafka
        serdeError: logAndContinue
      bindings:
        streams-words-input:
          consumer:
            application-id: Input-Words
        streams-words-output:
          consumer:
            application-id: Output-Words
    bindings:
      streams-words-input:
        destination: streams-words-input
      streams-words-output:
        destination: streams-words-output
@StreamListener()
@SendTo("streams-words-output")
public KStream<String, Long> createWords(
    @Input("streams-words-input") final KStream<String, String> wordsInput){
    return wordsInput
            .map((key,value) -> KeyValue.pair(key, null));
}
java.lang.IllegalArgumentException: Payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:198)
at org.springframework.messaging.support.MessageBuilder.<init>(MessageBuilder.java:57)
at org.springframework.messaging.support.MessageBuilder.withPayload(MessageBuilder.java:179)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound$0(KafkaStreamsMessageConversionDelegate.java:86)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate$$Lambda$743/1725151361.apply(Unknown Source)
at org.apache.kafka.streams.kstream.internals.AbstractStream$2.apply(AbstractStream.java:87)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:33)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:48)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

共有1个答案

贺高飞
2023-03-14

您看到的异常来自Spring消息传递的长期规则--“没有有效负载为空的消息”。换句话说,如果没有什么要交流,就没有消息要发送。

也就是说,KStream和它如何处理这种情况显然有一个问题,所以我建议在Kafka Binder中提出一个问题。同时,您可以轻松地向管道添加filter操作,以过滤掉空值。

 类似资料:
  • 问题内容: 我有一个动态编写的查询(通过Joomla的OO PHP)将一些值插入MySQL数据库。用户填写的表单上有一个用于美元金额的字段,如果他们将该字段留空,我希望输入系统的值为NULL。我已经在查询运行时将查询写到了错误日志中。这是查询的样子: 但是,当我查看数据库表时,尽管ActivatedDT正确设置为NULL,但BalanceInit和BalanceCurrent均为0.00。Acti

  • 当我启动时,遇到一个奇怪的错误: 这是我的类: 当参数被指定为非空时,该错误指出它为空;但它是可为null的(),并且方法在源代码中标记为。 我在其他Kotlin项目中没有遇到过这个错误。我使用的是Kotlin版本1.1.2-5;与1.1.2-3有相同的误差。

  • 问题内容: 为什么以下计算会产生负值? 问题答案: 正在评估其中的每个表达式(当然是在编译时;它是一个常量),而不是。结果在某个时候溢出。因此,只需使所有操作数文字变长即可: 当然,仅使 一些 操作数变长就可以逃脱,但是我倾向于发现仅更改所有内容会更容易。 综上所述,如果您要查找“ 30天的毫秒数”,最好使用:

  • 问题内容: 我正在使用Java Jersey 2.17构建RESTful Web服务。客户。我正在使用ExtJS 5开发。 我在服务上的课程 Main.java UserRi.java ResponseCorsFilter.java 目前,我一直坚持删除内容。在客户端上,我从表单面板中获取记录并调用擦除功能。请求/响应看起来像这样: 在控制台上,我看到了 也可以通过以下jQuery.ajax()调

  • 我正在做一个Spring Boot项目,Spring Boot版本1.5.4.release.我们需要用户Hibernate5.2.10,所以我将hibernate.version设置为5.2.10。最终。有错误:java.lang.IllegalStateExctive:EntityManagerFactory必须为null。git上的代码https://github.com/shuanshua