你能帮助我如何使用Kafka流实现这一点吗?
场景:将订单数据的所有发票分组。在实时流媒体中,可能会延迟接收发票。因此,我们希望在加入之前等待20分钟将所有发票分组。
示例:订单“x”有3张发票,预计将在20分钟内收到。
预期输出:订单和3张发票应作为输出主题中的单个数据提供。
我们有下面的拓扑结构来实现这一点。
>
我们正在根据订单密钥对发票进行分组。我们设置了20分钟的翻滚窗口
将订单数据与生成的发票组关联
将输出写入新主题
问题:步骤3未等待步骤2完成。加入在收到订单后立即执行的操作。因此,我们没有得到预期的产出。
我们尝试使用joinwindows实现同样的功能。但是,由于联接窗口是滑动窗口,因此我们在输出主题中获得了重复的数据。
对于上面的示例,如果我们使用联接窗口而不是滚动窗口,我们将获得3个输出数据,订单分别有1张发票、2张发票和3张发票。
请帮助我解决此问题或建议任何替代方法
代码段:
KTable<Windowed<String>, List<InvoiceList>> invoiceList= invoiceStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(1200)))
.aggregate(() -> new ArrayList<InvoiceList>(),
(key, newValue, agg) -> {
new KeyValue<>(key, agg.add(newValue));
return agg;
},
Materialized.as("invoice-list").with(Serdes.String(), new ArrayListSerde<InvoiceList>(AppSerdes.InvoiceList())))
KStream<String, Order> orderOutput=
orderStream.join(invoiceList, Joiner);
orderOutput.to(AppConfig.OutputTopic.OUTPUT_ORDER,Produced.with(Serdes.String(), AppSerdes.Order()));
这种结合在我们的情况下起作用。因此,我们将其作为两个独立的流接收,并在消费者上添加了自定义逻辑来处理我们的用例。
谢谢
我想,订单先到,然后是发票,而不是相反。如果我的假设是正确的,那么你的逻辑就行不通了。因为当订单进入您的KStream时,可能没有发票,因此连接不会获取任何发票。请记住,KStream KTable连接是非窗口连接,可以像查找KTable(changelog流)一样使用。
问题内容: 我试图了解哪些是应用程序的“物理”限制。 在客户端: 在服务器端: 在OSX中达到文件限制(256)时,统计信息如下 让我感到困惑的是: 如果我强行关闭连接(这是我想对客户端执行的操作,为什么我仍在使用文件句柄(因此达到文件限制),请执行以下操作:编辑:添加延迟似乎使服务器可以保持呼吸并且永远不会达到文件限制)? 有没有一种方法可以完全关闭套接字,以便可以确定很少达到文件限制(我知道可
我对MongoDB中的多对多关系实现有一个特定的问题。 我收集了歌曲和艺术家的作品(数百万份文档)。在这里,这首歌可以被许多艺术家演唱,一个艺术家可以唱许多首歌。所以我在两个集合中都遵循了文档引用的方法。像这样... 1.歌曲集:- 2.艺术家收藏:- 但这里的问题是,在删除艺术家的同时,我必须从歌曲所有文档中的艺术家数组中删除一个艺术家,如果该文档中有艺术家,反之亦然。这会导致原子性问题。我如何
我有一个关于Hibernate ManyToMany映射的问题。我有两个类 A 和 B,它们之间的映射是由 Hibernate 解析的 ManyToMany 映射: 用户和组的外键是“A_id”和“B_id”。联接表称为A_B。 现在,我想加上C。我想A_B与C有关系,与C创建多对多关系,A_B我可以称之为A_B_C。 编辑:所以我会创建一个_B实体,A和B和A_B的关系为2 @OneToMany
问题内容: 我有2张桌子: 电影:movieID 用户:userID 这些表通过Queue表具有多对多关系,并带有一个附加属性listOrder: 队列:movieID,userID,listOrder 我正在尝试使用EclipseLink对此模型建模,但是却收到“不兼容映射”错误。这是我的代码的示例: QueueItemPK的目的是使我可以拥有movieID和userID的复合主键。我不确定这是
本文向大家介绍使用java连接Redis,Maven管理操作,包括了使用java连接Redis,Maven管理操作的使用技巧和注意事项,需要的朋友参考一下 pom配置 创建db.properties文件 书写工具类 书写测试类 补充知识:JAVA使用Redis所需的MAVEN的POM文件 redis不仅可以通过命令行进行操作,同时redis也可以通过javaAPI进行操作,这是操作redis所需的
> 我有3个实体用户、应用程序和角色。 null null Role1.getUsers().Add(user);Role1.getUsers().Add(user); role2.getUsers().add(user);role2.getUsers().add(user); user.getApplications().add(app1);user.getApplications().add(