我有两个Kafka主题-推荐
和点击
。第一个主题包含由唯一Id(称为recommendationsId
)键入的recommendations对象。每个产品都有一个用户可以单击的URL。
clicks
主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由推荐id
键控。
请注意
>
每个单击对象都会有一个相应的推荐对象。
click对象的时间戳将晚于Recommensions对象。
建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天)。
我的目标是使用Kafka streams join连接这两个主题。我不清楚的是我应该使用KStream x KStream连接还是KStream x KTable连接。
我通过加入clicks
streambyrecommendations
表来实现KStream x KTable
join。但是,如果建议是在joiner启动之前生成的,而单击是在joiner启动之后到达的,则我无法看到任何连接的clicks建议对。
我是否使用了正确的连接?我应该使用KStream x KStream
连接吗?如果是的话,为了最多过去7天能够加入一个有推荐的点击,我是不是应该把窗口大小设置为7天呢?在这种情况下,我是否也需要设置“保留”期限?
我执行KStream x KTable
连接的代码如下。请注意,我已经定义了类建议
和单击
及其相应的serde。单击消息只是普通的字符串(url)。此URL字符串与
推荐
对象连接,以创建一个单击
对象,该对象被发送到jointTopic
。
public static void main(String[] args){
if(args.length!=4){
throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
}
final String booststrapList = args[0];
final String clicksTopic = args[1];
final String recsTopic = args[2];
final String jointTopic = args[3];
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
// load clicks as KStream
KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);
// load recommendations as KTable
KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);
// join the two
KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));
// emit the join to the jointTopic
join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);
// let the action begin
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
只要在joiner(上面的程序)运行后生成了建议和单击,这就可以正常工作。但是,如果在运行joiner之前为其生成建议的单击到达,我看不到任何连接发生。我该如何解决这个问题?
如果解决方案是使用
KStream x KStream
join,请帮助我了解应选择的窗口大小和保留期。
你的总体观察是正确的。从概念上讲,两种方法都可以得到正确的结果。如果使用流表联接,则有两个缺点(不过在未来的Kafka版本中可能会重新讨论和改进)
null
(即,您得到重试逻辑)时将单击事件写回输入主题,连续单击单个建议可能会出现问题,您可能需要在应用程序代码中对此进行说明
如果您使用流-流连接,并且在推荐后7天可能会发生单击,则您的窗口大小必须为7天-否则,单击将不会与推荐连接。
对于流-流连接,保留时间的答案有点不同。它必须至少7天,因为窗口大小是7天。否则,您将删除您的“运行窗口”的记录。您还可以将保留期设置得更长,以便能够处理“后期数据”。假设用户在窗口时间帧结束时(推荐的7天时间跨度结束前5分钟)点击,但点击仅在1小时后报告给您的应用程序。如果您的保留期为7天,则此延迟到达的记录将无法再处理(因为建议已被删除)。如果您设置了较大的保留期,例如,8天,您仍然可以处理延迟记录。这取决于你的应用程序/语义需要你想要使用什么保留时间。
摘要:从实现的角度来看,使用流连接比使用流表连接简单。但是,内存/磁盘节省是可以预期的,并且可能会很大,具体取决于您的点击流数据速率。
我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?
我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我
当kafka streams应用程序运行且kafka突然停机时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当kafka恢复时,一切(理论上)都应该恢复正常。我正在尝试获取有关此情况的警报,但我无法找到捕获该警报的位置并发送日志/度量。我尝试了以下方法: 但这只发生在异常情况下,而不是这里 扩展并将属性更改为我的类,从而扩展了此接口。 我知道Kafka有自己的衡量标准,我可
我正在导入一个DB,其中包含一些表示多对多和一对多关系的链接表。 1-到目前为止,根据我对Kafka流的理解,我似乎需要为每个链接表提供一个流,以便执行聚合。KTable将不可用,因为记录是按键更新的。但是,聚合的结果可能是Ktable中的一个。 2-然后是外键上的连接问题。似乎唯一的方法是通过GlobalKtable。link-table-topic->link-table-stream->li
我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据: 我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。 这是我的映射csv文件: 所以我的输出最好是这样的 该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。 我目前有一个不适合我的代码: 代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中
我们已经成功地使用了MySQL - 使用jdbc独立连接器的kafka数据摄取,但现在在分布式模式下使用相同的连接器(作为kafka connect服务)时面临问题。 用于独立连接器的命令,工作正常 - 现在,我们已经停止了这一项,并以分布式模式启动了kafka connect服务,如下所示 2 个节点当前正在运行具有相同连接服务。 连接服务已启动并正在运行,但它不会加载 下定义的连接器。 应该对