我对使用kafka streams和spring cloud stream相对较新,在使用窗口聚合功能时遇到了困难。
我想做的是
我的代码是:
@EnableBinding(KafkaStreamsProcessor::class)
inner class SessionProcessorApplication {
@StreamListener("input")
@SendTo("output")
fun process(input: KStream<*, UserInteractionEvent>): KStream<*, UserSession> {
return input
.groupBy({ _, v -> v.userProjectId }, Serialized.with(Serdes.String(), UserInteractionEventSerde()))
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(15)))
.aggregate(
Initializer<Session>(::Session),
Aggregator<String, UserInteractionEvent, Session> { _, event, session -> session.interactions + event.interaction; session },
Merger<String, Session> { _, session1, session2 -> Session.merge(session1, session2)},
Materialized.`as`<String, Session, SessionStore<Bytes, ByteArray>>("windowed-sessions")
.withKeySerde(Serdes.String()).withValueSerde(SessionSerde()))
.toStream()
.map { windowed, session ->
KeyValue(windowed.key(),
UserSession(windowed.key(),
session.interactions,
Instant.ofEpochSecond(windowed.window().start()),
Instant.ofEpochSecond(windowed.window().end()))
)
}
}
}
我似乎在聚合部分有问题。在尝试刷新窗口会话存储时看到类强制转换异常。我很困惑如何从这里继续。如果有人能指出我哪里出错了,或者一些处理使用带有自定义服务器的会话窗口的留档,我将不胜感激!
非常感谢!
完整堆栈跟踪如下:
我的配置:
spring.cloud.stream.kafka.streams.bindings:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
input:
consumer:
valueSerde: com.teckro.analytics.UserInteractionEventSerde
output:
producer:
valueSerde: com.teckro.analytics.UserSessionSerde
spring.cloud.stream.bindings:
input:
destination: test-interaction
consumer:
headerMode: raw
output:
destination: test-session
producer:
headerMode: raw
我发现您的配置存在一些问题。
默认Serde的配置方式应更改如下:
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings:
input:
consumer:
valueSerde: com.teckro.analytics.UserInteractionEventSerde
output:
producer:
valueSerde: com.teckro.analytics.UserSessionSerde
您似乎正在对所有反序列化/序列化使用本机Serde。您想在配置中包含它。默认情况下,绑定器执行输入/输出序列化。
spring.cloud.stream.bindings:
input:
destination: test-interaction
consumer:
useNativeDecoding: true
output:
destination: test-session
producer:
useNativeEncoding: true
如果问题仍然存在,请在Github上创建一个简单的示例项目并与我们分享。我们会看看。
3.48.1 获取用户某自定义ID某天的小时维度流量数据 通过该接⼝可以根据用户自定义的customid获取某天的小时维度流量数据 地址: https://spark.bokecc.com/api/traffic/user/custom/hourly 需要传递以下参数: 参数 说明 userid 用户ID,必选 customid 自定义ID,必选 date 查询日期,格式yyyy-MM-dd 返
问题内容: 我正在使用Jersey 2.22.1和Jackson 2.6.3编写Web应用程序。我的pom.xml看起来像这样: 目标是将不同的自定义ObjectMappers用于JSON和XML映射。我创建了两个提供程序类:JSONMapperProvider 和XMLMapperProvider 它们都在同一包中,并在Application资源类中注册 如果我向@Produces(MediaT
本文向大家介绍使用jQuery获取data-的自定义属性,包括了使用jQuery获取data-的自定义属性的使用技巧和注意事项,需要的朋友参考一下 废话少说,先上代码 通过jQuery制作组件,可以轻松获取到我们data-的自定义属性,也可以给data-属性来赋值。 获取: 赋值:
我正在使用PostgreSQL、Sequelize和Express开发一个简单的CRUD应用程序。并以本教程作为参考。我的模型或数据库连接似乎有问题。详情如下: 文件夹结构: server |__config |__db.js |__env.js |__models |__missions.js |__router |__routes |__missions.js |__index.js |__i
我试图创建一个Alexa技能,从我的网站上提取数据,当我使用HTTPS时,请求错误被排除,尽管在其他需要api密钥的网站上使用HTTPS是可以的,比如《纽约时报》。这是一个代码片段
这是我的AngularJS代码(如果我删除header选项,它可以正常工作)。 请求如下: 答复: 我添加了和头,但请求仍然失败 大写字母(我的意思是vs)是否会导致失败?如果是,我如何才能让AngularJS停止这样做? Go服务器路由代码: