当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams API:KStream到KTable

杜阳炎
2023-03-14
KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });

我能做到写作和阅读的中间主题:

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");

有没有简单的方法从Kstream中获取Ktable?这是我第一个使用Kafka Streams的应用程序,所以我可能错过了一些明显的东西。

共有1个答案

殷安顺
2023-03-14

更新:

在Kafka2.5中,将添加一个新方法kstream#totable(),它将提供一种方便的方法将kstream转换为ktable。有关详细信息,请参阅:https://cwiki.apache.org/confluence/display/kafka/kip-523%3a+add+kstream%23totable+to+the+streams+DSL

原答案:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() {
        @Override
        public Long apply(Long aggValue, Long newValue) {
            return newValue;
        }
    },
    "dummy-aggregation-store");

与选项1相比,这种方法在代码方面稍微复杂一些,但其优点是:(a)不需要手动的主题管理;(b)不需要从Kafka重新读取数据。

总的来说,你需要自己决定,你更喜欢哪种方法:

在选项2中,Kafka Streams将创建一个内部changelog主题来备份KTable以进行容错。因此,这两种方法都需要一些额外的Kafka存储,并导致额外的网络流量。总体而言,这是在选项2中稍微复杂的代码和选项1中的手动主题管理之间的权衡。

 类似资料:
  • 问题内容: 在PHP中,将RGB三元组转换为HSV值的最直接方法是什么? 问题答案:

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 问题内容: 对于正在编写的程序,我正在使用base64.b64encode(f.read(image))从一台计算机上传输图像,并尝试在接收脚本中读取它而不将其保存到硬盘驱动器中(以最大程度地减少这种情况)处理时间)。我很难弄清楚如何将图像读取到OpenCV中而不将其保存在本地。 这是我发送图像的代码如下所示: 同时,这是接收它的代码。(这在on_message函数中,因为我正在使用MQTT进行传

  • 签到开关状态 获取签到信息 签到 累计签到排行榜 新版签到 签到开关状态 签到应用具有开关性质,管理员可从后台控制签到是否被开启或者关闭,而开关会在「启动信息」接口中提供。提供格式如下: { "checkin": true // Or "checkin": false } 签到金额格式: { "checkin:attach_balance": 0 } 金额为0时表示未配置 但是

  • 实时了解外勤人员位置活动轨迹及分布,出差也可以异地打卡。 开始你的第一次签到 如何签到 打开手机钉钉-工作-签到 签到按钮自动生成签到时间及签到地点 拍照自带时间和地点水印,提交完成签到 签到设置 根据公司要求选择签到相关设置 查看足迹 签到足迹 ● 点击右下角足迹,直观查看团队足迹,根据部门和日期筛选历史签到记录 ● 选择未签到,可以查看未签到人员,对未签到人员Ding一下 ● 点击足迹分布,可

  • 在上课页面的右下侧点击“活动-签到” 选择签到时间(一般为上课时间),点击"开启签到"。 页面跳转,实时显示目前签到人数,并可随时关闭签到。**