当前位置: 首页 > 知识库问答 >
问题:

如何捕获与Kafka流连接条件不匹配的Kafka记录?

东方新霁
2023-03-14

我正在通过连接kstream和KTable来丰富数据。kstream包含车辆发送的消息,ktable包含车辆数据。我遇到的问题是,我想从表中没有相应联接键的流捕获消息。Kafka流静默跳过它们没有连接匹配的记录。是否有某种方法可以将这些记录发送到不同的主题,以便以后可以对它们进行处理?

StreamsBuilder builder = new StreamsBuilder();
        final KTable<String, VinMappingInfo> vinMappingTable = builder.table(vinInfoTopic, Consumed.with(Serdes.String(), valueSerde));
        KStream<String, VehicleMessage> vehicleStream = builder.stream(sourceTopic);
        vehicleStream.join(vinMappingTable, (vehicleMsg, vinInfo) -> {
            log.info("joining {} with vin info {}", vehicleMsg.getPayload().getId(), vinInfo.data.vin);
            vehicleMsg.setVin(vinInfo.data.vin);
            return vehicleMsg;
        }, Joined.with(null, null, valueSerde))
                .to(destinationTopic);

        final Topology topology = builder.build();
        log.info("The topology of connected processor nodes: \n {}", topology.describe());
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.cleanUp();
        streams.start();

共有1个答案

梁存
2023-03-14

可以使用左联接:

java prettyprint-override">stream.leftJoin(table,...);

这确保输入流中的所有记录都在输出流中。对于这种情况,将使用apply(streamValue,null)调用ValueJoiner

 类似资料:
  • 我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我正在尝试使用CockroachDB (v2.0.6)作为我的一个Kafka主题的接收器。 我找不到任何专门用于CockroachDB的Kafka连接器,所以我决定使用Confluent的jdbc sink连接器,因为CockroachDB支持postgreSQL语法。 我在Kafka Connect上使用的连接字符串如下 这基本上是我在现有工作的Postgres接收器连接器上所做的唯一更改。 不

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 我正在使用从oracle db获取数据,并按下(两个键 我有一个Kafka流收听这个主题,并有avro Genericrecord。当我启动流时,我开始得到<code>ClassCastException:java.lang.Long不能强制转换为org.apache.avro.generic。GenericRecordconnect生成的架构具有数据类型为“long”的字段 有人对如何解决这个问

  • 当kafka streams应用程序运行且kafka突然停机时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当kafka恢复时,一切(理论上)都应该恢复正常。我正在尝试获取有关此情况的警报,但我无法找到捕获该警报的位置并发送日志/度量。我尝试了以下方法: 但这只发生在异常情况下,而不是这里 扩展并将属性更改为我的类,从而扩展了此接口。 我知道Kafka有自己的衡量标准,我可