当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Kafka流:无法获取WindowedAggregateSessions使用自定义Serdes

宋博易
2023-03-14

我对使用kafka streams和spring cloud stream相对较新,在使用窗口聚合功能时遇到了困难。

我想做的是

  1. 获取我的初始UserInteractionEvents流,并按userProjectId(字符串)对它们进行分组
  2. 创建这些事件的窗口会话,15分钟不活动
  3. 将这些窗口会话聚合到自定义会话对象中
  4. 然后将这些会话对象转换为另一个自定义UserSession对象

我的代码是:

    @EnableBinding(KafkaStreamsProcessor::class)
    inner class SessionProcessorApplication {

        @StreamListener("input")
        @SendTo("output")
        fun process(input: KStream<*, UserInteractionEvent>): KStream<*, UserSession> {
            return input
                .groupBy({ _, v -> v.userProjectId }, Serialized.with(Serdes.String(), UserInteractionEventSerde()))
                .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(15)))
                .aggregate(
                        Initializer<Session>(::Session),
                        Aggregator<String, UserInteractionEvent, Session> { _, event, session ->  session.interactions + event.interaction; session  },
                        Merger<String, Session> { _, session1, session2 ->  Session.merge(session1, session2)},
                        Materialized.`as`<String, Session, SessionStore<Bytes, ByteArray>>("windowed-sessions")
                        .withKeySerde(Serdes.String()).withValueSerde(SessionSerde()))
                .toStream()
                .map { windowed, session ->
                    KeyValue(windowed.key(),
                            UserSession(windowed.key(),
                                    session.interactions,
                                    Instant.ofEpochSecond(windowed.window().start()),
                                    Instant.ofEpochSecond(windowed.window().end()))
                    )
                }
        }
    }

我似乎在聚合部分有问题。在尝试刷新窗口会话存储时看到类强制转换异常。我很困惑如何从这里继续。如果有人能指出我哪里出错了,或者一些处理使用带有自定义服务器的会话窗口的留档,我将不胜感激!

非常感谢!

完整堆栈跟踪如下:

我的配置:

spring.cloud.stream.kafka.streams.bindings:
  default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  input:
    consumer:
      valueSerde: com.teckro.analytics.UserInteractionEventSerde
  output:
    producer:
      valueSerde: com.teckro.analytics.UserSessionSerde

spring.cloud.stream.bindings:
  input:
    destination: test-interaction
    consumer:
      headerMode: raw
  output:
    destination: test-session
    producer:
      headerMode: raw

共有1个答案

池麒
2023-03-14

我发现您的配置存在一些问题。

默认Serde的配置方式应更改如下:

spring.cloud.stream.kafka.streams.binder.configuration:
  default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings:
  input:
    consumer:
      valueSerde: com.teckro.analytics.UserInteractionEventSerde
  output:
    producer:
      valueSerde: com.teckro.analytics.UserSessionSerde

您似乎正在对所有反序列化/序列化使用本机Serde。您想在配置中包含它。默认情况下,绑定器执行输入/输出序列化。

spring.cloud.stream.bindings:
  input:
    destination: test-interaction
    consumer:
      useNativeDecoding: true
  output:
    destination: test-session
    producer:
      useNativeEncoding: true

如果问题仍然存在,请在Github上创建一个简单的示例项目并与我们分享。我们会看看。

 类似资料:
  • 3.48.1 获取用户某自定义ID某天的小时维度流量数据 通过该接⼝可以根据用户自定义的customid获取某天的小时维度流量数据 地址: https://spark.bokecc.com/api/traffic/user/custom/hourly 需要传递以下参数: 参数 说明 userid 用户ID,必选 customid 自定义ID,必选 date 查询日期,格式yyyy-MM-dd 返

  • 问题内容: 我正在使用Jersey 2.22.1和Jackson 2.6.3编写Web应用程序。我的pom.xml看起来像这样: 目标是将不同的自定义ObjectMappers用于JSON和XML映射。我创建了两个提供程序类:JSONMapperProvider 和XMLMapperProvider 它们都在同一包中,并在Application资源类中注册 如果我向@Produces(MediaT

  • 本文向大家介绍使用jQuery获取data-的自定义属性,包括了使用jQuery获取data-的自定义属性的使用技巧和注意事项,需要的朋友参考一下 废话少说,先上代码 通过jQuery制作组件,可以轻松获取到我们data-的自定义属性,也可以给data-属性来赋值。 获取: 赋值:

  • 我正在使用PostgreSQL、Sequelize和Express开发一个简单的CRUD应用程序。并以本教程作为参考。我的模型或数据库连接似乎有问题。详情如下: 文件夹结构: server |__config |__db.js |__env.js |__models |__missions.js |__router |__routes |__missions.js |__index.js |__i

  • 我试图创建一个Alexa技能,从我的网站上提取数据,当我使用HTTPS时,请求错误被排除,尽管在其他需要api密钥的网站上使用HTTPS是可以的,比如《纽约时报》。这是一个代码片段

  • 这是我的AngularJS代码(如果我删除header选项,它可以正常工作)。 请求如下: 答复: 我添加了和头,但请求仍然失败 大写字母(我的意思是vs)是否会导致失败?如果是,我如何才能让AngularJS停止这样做? Go服务器路由代码: