当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud StreamListener@Output KStream Serdes似乎不起作用

赵骏奇
2023-03-14

我有一个流监听器,作为

@StreamListener(target = "requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {
    // Predicate<UUID, Event> isAccount = (key, value) ->
    // value.getEntity().getClass().equals(Account.class);

    // @formatter:off
    return events
            //.filter(isAccount)
            .peek((key, value) -> {
                log.debug("Processing {} {}", key, value);
            });
            /*
            .filter(isAccount)
            .map((key, value) -> process(value))

            .peek((key, value) -> {
                log.debug("Processed {} {}", key, value);
            });
            */
    // @formatter:on

}

其中@input(“requesti”)配置如下所示;

spring.cloud.stream.kafka.streams.bindings.requesti.consumer.application-id=repo-event-consumer
spring.cloud.stream.bindings.requesti.destination=request
spring.cloud.stream.bindings.requesti.content-type=application/json
spring.cloud.stream.bindings.requesti.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.responseo.consumer.application-id=repo-response-producer
spring.cloud.stream.bindings.responseo.destination=response
spring.cloud.stream.bindings.responseo.content-type=application/json
spring.cloud.stream.bindings.responseo.producer.header-mode=raw
spring.cloud.stream.bindings.responseo.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.responseo.producer.key-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.bindings.responseo.producer.value-serde=org.springframework.kafka.support.serializer.JsonSerde

发送记录ProducerRecord(TOPIC=Request,Partition=null,headers=recordheaders(headers=[recordheader(key=Key_TypeId,值=[106,97,118,97,46,117,116,105,108,85,85,85,73,68]),readheader(键=TypeId,值=[117,107,46,111,114,103,46,99,97,112,117,108,116,101,115,46,99,117,98,101,118,105,99,101,115,97,99,111,117,110,116,109,100,101,108,46,65,99,117,117,117,117,100,101,108,46,65,99,117,117,116]),isReadOnly=true),key=6f0f50e2-3add-4d22-a370-CAC66D016AF0,value=account(),回调org.springframework.kafka.core.kafkatemplate$$lambda$582/533392019@85AB964到主题请求分区2

默认的serdeConfig为

    spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde

回购

共有1个答案

邢飞昂
2023-03-14

下面的示例演示了jsonserde如何使用Kafka Streams绑定器进行出站:https://github.com/schacko-samples/json-serde-example。运行示例并确保其工作。查看application.yml以获得配置细节。我在提供的自述文件中添加了一些细节。

 类似资料:
  • 我的代码看起来像 我的文件如下所示 当我运行程序时,我看到 我怎样才能修好它呢?

  • 问题内容: 我在使用该功能时遇到了麻烦。 我只需要知道SQL查询是否返回零行。 我已经尝试过以下简单的语句: 类型是哪里。上面的代码似乎不起作用。无论是否为空,它将始终打印该消息。 我检查了SQL查询本身,当存在行时它正确返回了非空结果。 关于如何确定查询是否已返回0行的任何想法?我用谷歌搜索,找不到任何答案。 问题答案: ResultSet.getFetchSize()不返回结果数!从这里: 使

  • 问题内容: 即使将属性设置为,我仍然会插入重复的条目。 我设置了使用定期在。我没有用表 问题答案: 我没有使用JPA创建表 然后,您应该在语句中向表中添加唯一约束,例如,如果您使用的是MySQL:

  • 根据我所发现和看到的一切,这似乎是正确的。打印$query时,结果如下所示: “在客户(名字、名字、姓氏、地址、城市、州、邮政编码、电子邮件、性别)中插入值(?,,,,,,,,,,?,,,?)” 参数应该已经用bindValues()中的变量填充。所以,举个例子。。。 插入到客户(第一名、中间名、最后名、地址、城市、州、邮编、电子邮件、性别)值(比尔、霍普金斯、123大街、...) 我想继续使用这

  • 我是Java新手,正在努力学习。我目前陷入困境,不知道为什么无法从文件夹导入。

  • 问题内容: 我正在使用JDBC连接到MySQL服务器(我认为没有连接池)。在连接网址中,我有 但是我的连接仍然超时。我什至检查了它的错误。但是,当我尝试使用连接时,出现以下异常。 我知道在Java 1.6中可以使用它来检查连接,但是我正在使用Java 1.5 有没有办法确保它不会超时?还是我必须升级到Java 1.6? 问题答案: 我有同样的问题,这绝对令人发疯。这是文档在MySQL网站上的内容(

  • 问题内容: 只需创建一个首选大小的简单按钮即可。该方法似乎不起作用。我要去哪里错了? 问题答案: 您的框架受布局管理器的控制,它正在决定如何最好地布局组件,并覆盖您使用所指定的值 现代GUI需要在各种不同的图形环境中运行(甚至在同一OS上),例如,包括不同的DPI,屏幕大小和字体设置。 布局管理器使您不必担心(减少)这些问题,强烈建议您使用它们 看一眼 使用布局管理器 布局管理器的可视指南 更多细

  • 问题内容: 我只是在阅读这个问题,想尝试使用别名方法,而不是使用功能包装器方法,但是我似乎无法使其在Firefox 3或3.5beta4或GoogleChrome中在调试窗口和在测试网页中。 萤火虫: 如果将其放在网页中,则对myAlias的调用给我这个错误: Chrome(为清楚起见,插入了>>>): 在测试页中,我得到了相同的“非法调用”。 难道我做错了什么?有人可以复制吗? 另外,奇怪的是,