当前位置: 首页 > 知识库问答 >
问题:

Spring 云流应用程序在一次垃圾输入后退出

柳景胜
2023-03-14

我对使用KStream组件的Spring云流应用程序有问题。它正在监听一个输入并在处理后将消息定向到一个输出。

它期待一个JSON字符串进来,并在到达时尝试将其转换为Spring Tuple。将消息发送出去时会发生相反的情况。

问题是,当Sysadmin想要使用kafka控制台生成器测试主题时。例如sh…并打印字符串

“哈哈”

在其中,整个Spring云流应用程序将立即消亡,只有以下例外:

java.lang.RuntimeException: com.fasterhtml" target="_blank">xml.jackson.core.JsonParseException: Unrecognized token 'lol': was expecting ('true', 'false' or 'null')
 at [Source: lol; line: 1, column: 7]
    at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:71) ~[spring-tuple-1.0.0.RELEASE.jar:na]
    at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:31) ~[spring-tuple-1.0.0.RELEASE.jar:na]
    at org.springframework.tuple.TupleBuilder.fromString(TupleBuilder.java:153) ~[spring-tuple-1.0.0.RELEASE.jar:na]
    at org.springframework.cloud.stream.converter.TupleJsonMessageConverter.convertFromInternal(TupleJsonMessageConverter.java:90) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:167) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:55) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.cloud.stream.binder.kstream.KStreamListenerParameterAdapter$1.apply(KStreamListenerParameterAdapter.java:66) ~[spring-cloud-stream-binder-kstream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) ~[kafka-streams-0.10.1.1.jar:na]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lol': was expecting ('true', 'false' or 'null')
 at [Source: lol; line: 1, column: 7]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2839) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3850) ~[jackson-databind-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3799) ~[jackson-databind-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2397) ~[jackson-databind-2.8.10.jar:2.8.10]
    at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:44) ~[spring-tuple-1.0.0.RELEASE.jar:na]

我希望这个框架对这种行为至少有一定的容错能力。你不能指望输入总是很好很漂亮。于是我查阅了Spring文档:https://docs . Spring . io/Spring-cloud-stream/docs/current/reference/html single/# _ configuration _ options

此外,还有一些配置选项,用于在出现故障时重试逻辑的隐藏实现。例如< code>maxAttempts参数。但是这个参数已经使用了默认值3,但是我没有看到Spring cloud stream应用程序试图从这个错误中恢复过来。

所以我想知道为Spring云流应用程序构建一些不良输入容忍度的推荐方法是什么。

应用程序的配置如下所示:

spring:
    cloud:
        stream:
          bindings:
            input:
              content-type: application/json
              destination: inbound
              group: fraud
              consumer:
                headerMode: raw
            output:
              content-type: application/x-spring-tuple
              destination: outbound
              producer:
                headerMode: raw
                useNativeEncoding: true

spring.cloud.stream.kstream.binder.configuration:
  key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

共有1个答案

红明德
2023-03-14

在Spring Cloud Stream 1.3.x (Ditmars)中,对Kafka流的错误处理支持非常有限。事实上,由应用程序来处理1.3 kafka streams库中的任何错误。然而,在2.0.0中,我们增加了对KIP-161的支持。https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:流反序列化异常处理程序

在kafka streams binder的2.0.0版本中使用这个新特性,您可以记录跳过记录,或者记录在反序列化错误时失败。除此之外,绑定器还提供了DLQ发送异常处理程序的实现。所有这些文档仍在2.0版本中更新。一旦准备好了,我会在这里更新文档链接。但是,这里是它的要点。

spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq (or logAndFail or logAndSkip)

spring.cloud.stream.kafka.stream.bindings.input.consumer.dlqName:[dlq name] - 如果未提供,则为错误。来访主题]。[组名]。

然后您将在DLQ主题中看到反序列化的错误记录。同样,这些html" target="_blank">功能仅在2.0.0.BUILD-SNAPSHT中可用,并且将是即将发布的2.0.0.RC1版本的一部分。

 类似资料:
  • 在C++中,垃圾回收机制(自动回收没有被引用的内存区域)是可选的;也就是说在编译器中并不是一定要实现垃圾回收器。尽管如此,C++0x还是定义了垃圾回收器的功能。与此同时,C++0x还提供了应用程序二进制接口(ABI: Application Binary Interface)来辅助控制垃圾回收器的行为。 我们用“safely derived pointer”(3.7.3.3)(译注:我搜索后发现,

  • 我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?

  • 我用firebase实时数据库和firebase身份验证方法创建了这个聊天应用程序,但我真的很困惑,我不知道如何在聊天应用程序中获得用户的最后一次露面。 下面是我在onCreate medthod中尝试的代码。 任何想法或替代方法都将不胜感激。

  • 问题内容: 我的应用程序包含A,B和C这三个活动。我通过“确定”按钮从A移到B,并且我想使用Android设备的默认后退按钮从B移到A。但是,当我按下按钮时,整个应用程序将关闭。我该如何解决这个问题? 问题答案: 我怀疑您是通过“确定”按钮onclick监听器调用的。不要那样做 从活动堆栈中删除您的活动。 在这里阅读更多。

  • 问题内容: 我正在开发使用Facebook SDK登录的ios应用程序。我在情节提要中将a设置为初始View Controller,用户从中使用FB帐户登录。 我有另一个ViewController,一旦用户登录,它将正确加载。 我正在检查AppDelegate文件,如果不是nil,则直接加载第二个ViewController,因为用户已经登录。 但是,如果我退出该应用程序并重新启动它,则始终为零

  • 我正在开发移动应用程序与离子有飞溅 截至目前,我正在使用以下代码进行配置 此外,我在第一次启动页面上使用了双击退出,下面是一段代码。 所以在启动页面,若我在5秒内点击两次后退按钮,它就会关闭。我可以在“打开的应用程序”列表中看到应用程序仍处于打开状态。(android手机中的第一个或最后一个按钮)。 问题是,如果我在双击退出后重新启动应用程序,那么它会显示一段时间的白屏和启动屏幕(但没有闪屏)。我