当前位置: 首页 > 知识库问答 >
问题:

Kafka流|连接流的聚合

太叔天宇
2023-03-14

我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。

        StreamsBuilder builder = new StreamsBuilder();
        KStream<Long, BidMessage> bidStream = builder.stream("bid", Consumed.with(new LongSerde(), new BidMessageSerde()).withTimestampExtractor(new BidMessageTimestampExtractor()));
        KStream<Long, ClickMessage> clickStream = builder.stream("click", Consumed.with(new LongSerde(), new ClickMessageSerde()).withTimestampExtractor(new ClickMessageTimestampExtractor()));

        KStream<String, BidMessage> newBidStream = bidStream.selectKey((key, value) -> value.getRequestId());
        KStream<String, CLickMessage> newClickStream = impStream.selectKey((key, value) -> value.getRequestId());

        KStream<String, BidMergedMessage> result = newBidStream.leftJoin(newImpStream,
                getValueJoiner(),
                JoinWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(0)),
                Joined.with(Serdes.String(), new BidMessageSerde(), new ClickMessageSerde()));

        result.groupBy((key, value) -> "" + value.getClientId(), Grouped.with(Serdes.String(), newBidMergedSerde()))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(40)))
                .aggregate(() -> new AggResult(0, 0), (key, value, aggregate) -> {
                    if (value.getClickId() != null) {
                        aggregate.clicks_++;
                    }
                    aggregate.bids_++;
                    return aggregate;
                }, Materialized.with(Serdes.String(),new AggResultJsonSerde()))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .foreach((key, value) -> {
                    logger.info("{}-{}, clientId : {}, Value: {}", new Date(key.window().start()), new Date(key.window().end()),key.key(), value);
                });

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

它是否可以修复以避免因连接而重复?

共有2个答案

龙俊英
2023-03-14

我可以想出两个选择。

>

  • 首先要尝试的是在KStream中添加一个filter

    使用内部联接newBidStream。加入(newImpStream…

    -比尔

  • 史商震
    2023-03-14

    我不确定我是否完全遵循了你的要求,但问题似乎是你只想在没有与RHS合并或加入成功的情况下计算出价。但是由于RHS主题中的一些缓慢,您偶尔会得到两个结果,第一个是“未合并”,然后在RHS记录到达时进行合并。

    您可以在结果KStream上添加TransformValues运算符,并使用状态存储来跟踪传入的记录。当一个成功的连接中出现了一个副本时,可以查看statestore并删除带有空RHS的记录(如果存在),然后转发正确的连接结果。

    为了转发没有成功连接的记录,您可以考虑使用<代码>标点>()/代码>定期地通过该存储并发出没有匹配的记录,并在您感觉到连接发生的时间内已经在StestStor中。

    Kafka教程中的本教程也可以作为指南。

     类似资料:
    • 我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天

    • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

    • 我正在导入一个DB,其中包含一些表示多对多和一对多关系的链接表。 1-到目前为止,根据我对Kafka流的理解,我似乎需要为每个链接表提供一个流,以便执行聚合。KTable将不可用,因为记录是按键更新的。但是,聚合的结果可能是Ktable中的一个。 2-然后是外键上的连接问题。似乎唯一的方法是通过GlobalKtable。link-table-topic->link-table-stream->li

    • 我有一个KStream,其中包含从主题到1的数据,如下所示: 和KTable,构造如下: 稍后,主题To2中出现以下消息: 现在,我希望我的KTable能够反映这些变化,并且看起来像这样: 但看起来是这样的: 我想我缩小了范围:显然聚合的只在第一次调用--之后聚合总是接收作为最后一个参数,例如。 其中,在第一次调用(通过初始值设定项创建)时为,但在第二次调用时为。 有什么想法吗? 编辑2 编辑3

    • 我们已经成功地使用了MySQL - 使用jdbc独立连接器的kafka数据摄取,但现在在分布式模式下使用相同的连接器(作为kafka connect服务)时面临问题。 用于独立连接器的命令,工作正常 - 现在,我们已经停止了这一项,并以分布式模式启动了kafka connect服务,如下所示 2 个节点当前正在运行具有相同连接服务。 连接服务已启动并正在运行,但它不会加载 下定义的连接器。 应该对

    • 当kafka streams应用程序运行且kafka突然停机时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当kafka恢复时,一切(理论上)都应该恢复正常。我正在尝试获取有关此情况的警报,但我无法找到捕获该警报的位置并发送日志/度量。我尝试了以下方法: 但这只发生在异常情况下,而不是这里 扩展并将属性更改为我的类,从而扩展了此接口。 我知道Kafka有自己的衡量标准,我可