当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream将值生成为包含JSON的字符串,而不仅仅是JSON

秦宏硕
2023-03-14

在使用SpringCloudStream的流处理应用程序中,我获取一个输入流(键入一个整数),并在其上调用selectKey,以创建一个具有相同值但具有不同键(字符串)的新主题。输入主题中包含正确JSON格式的记录,例如:

"key": {
  "id": 1
},
"value": {
  "id": 1,
  "public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...

问题是,流处理应用程序创建的主题的是包含JSON的字符串,而不是正确的JSON,即:

"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"

代码如下:

@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
         stream.selectKey { _, value -> value.publicId }

上面的函数所做的是消耗输入流,并生成一个输出流(被发送到输出)。该输出流与输入流具有相同的值,但只是一个不同的键。(在这种情况下,键来自值的公共ID属性。)

应用程序。yml如下:

spring.cloud.stream:
  bindings:
    input:
      destination: input-topic
    output:
      destination: output-output
  kafka:
    streams:
      binder:
        application-id: test-app-id-1
      bindings:
        input:
          consumer:
            keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
        output:
          producer:
            keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde

我缺什么了吗?这实际上是一个问题,还是JSON可以作为字符串存储在Spring Cloud Stream生成的消息中?

其他我尝试过但没有产生影响的事情:

  • 使用本机解码/编码
  • spring.cloud.stream.bindings.output.content-type 设置为 application/json
  • 使用映射而不是选择键

共有1个答案

齐涛
2023-03-14

这意味着您将公共ID:"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a"作为字符串而不是POJO发送。

如果这是您要发送的内容,则应使用StringSerde而不是JsonSerde

编辑

我刚刚用一个Java应用程序测试了它,它像预期的那样工作...

@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class So58538297Application {

    public static void main(String[] args) {
        SpringApplication.run(So58538297Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public KStream<String, Foo> process(@Input(Processor.INPUT) KStream<String, Foo> stream) {
        return stream.selectKey((key, value) -> value.getBar());
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        ObjectMapper mapper = new ObjectMapper();
        return args -> {
            template.send(Processor.INPUT, mapper.writeValueAsString(new Foo("baz")));
        };
    }

    @KafkaListener(id = "outputGroup", topics = Processor.OUTPUT)
    public void out(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("out:" + in + ", key:" + key);
    }

    @KafkaListener(id = "copyOfInput", topics = Processor.INPUT)
    public void in(String in) {
        System.out.println("in:" + in);
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

    }

}

spring.application.name=so58538297

spring.kafka.consumer.auto-offset-reset=earliest

in:{"bar":"baz"}
out:{"bar":"baz"}, key:baz
 类似资料:
  • 使用。但仍不确定为什么上面的方法不起作用。 不管怎样,多亏了之前有人建议这个答案。:)

  • 问题内容: 我想检查a是否仅包含数字。我用这个: 但意识到它也允许和。基本上,我要确保只能包含数字,而不能包含其他字符。由于和都是数字,所以不是正确的方法。也许我需要一个正则表达式?有小费吗? 问题答案: 怎么样

  • 问题内容: 这个想法是读取String并确认它不包含任何数字字符。因此,“ smith23”之类的内容将不被接受。 问题答案: 你想要什么?速度还是简单?为了提高速度,请选择基于循环的方法。为简单起见,请使用一种基于内衬RegEx的方法。 速度 简单

  • 我能够通过使用下面这个问题的注释提供的代码来解决这个问题。所有其他帖子都是有效的! 我使用的有用的东西来自第一个评论。虽然提供的所有示例代码似乎也是有效的!

  • 问题内容: 我正在尝试使用以下命令选择MySQL中仅包含字母数字字符的所有行: 但是,它将返回所有行,而不管它们包含非字母数字字符的事实。 问题答案: 试试这个代码: 这样可以确保所有字符都匹配。

  • 我正在编写一个程序,将二进制数转换为十进制数,我只是一个新手程序员。我一直试图确定输入的字符串是二进制数,这意味着它只包含1和0。到目前为止,我的代码是: 也就是说,无论输入什么(然后退出),即使输入了二进制数,程序都会输出“您没有输入二进制数”。我确信我的if语句是错误的或我的字符声明是错误的,但是我已经尝试了我个人知道的一切来纠正这个问题,但我在网上找不到任何帮助。任何帮助都将不胜感激。