当前位置: 首页 > 知识库问答 >
问题:

合并单个流中的Kafka记录值

林涵映
2023-03-14

我有一个主题,它接收可能包含部分数据的JSON记录。我想合并这些数据,所以我试图在最终的数据记录内收集尽可能多的信息。

 t1: { id: '1234', attribute1: 'foo' }
 t2: { id: '1234', attribute2: 'bar' }

合并记录值后所需的流:

 t1: { id: '1234', attribute1: 'foo' }
 t2: { id: '1234', attribute1: 'bar', attribute2: 'bar' }
 //key of the topic is id
 KStream<String, MyObject> input = ...
 return input.groupByKey().reduce((current, newEvent) -> return newEvent.merge(current)).toStream();

编辑:流定义是正确的,默认情况下reduce不会向下游发送所有消息,而是在这样做之前缓存它们。要禁用此行为,配置属性:

 cache.max.bytes.buffering: 0

必须设置。

共有1个答案

栾鸣
2023-03-14
public class MergeStreams {

    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();

        final String rockTopic = allProps.getProperty("input.rock.topic.name");
        final String classicalTopic = allProps.getProperty("input.classical.topic.name");
        final String allGenresTopic = allProps.getProperty("output.topic.name");

        KStream<String, SongEvent> rockSongs = builder.stream(rockTopic);
        KStream<String, SongEvent> classicalSongs = builder.stream(classicalTopic);
        KStream<String, SongEvent> allSongs = rockSongs.merge(classicalSongs);

        allSongs.to(allGenresTopic);
        return builder.build();
    }

    public void createTopics(Properties allProps) {
        AdminClient client = AdminClient.create(allProps);

        List<NewTopic> topics = new ArrayList<>();

        topics.add(new NewTopic(
                allProps.getProperty("input.rock.topic.name"),
                Integer.parseInt(allProps.getProperty("input.rock.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.rock.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("input.classical.topic.name"),
                Integer.parseInt(allProps.getProperty("input.classical.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.classical.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("output.topic.name"),
                Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));

        client.createTopics(topics);
        client.close();
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        Properties allProps = new Properties();
        FileInputStream input = new FileInputStream(fileName);
        allProps.load(input);
        input.close();

        return allProps;
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        MergeStreams ms = new MergeStreams();
        Properties allProps = ms.loadEnvProperties(args[0]);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        allProps.put(SCHEMA_REGISTRY_URL_CONFIG, allProps.getProperty("schema.registry.url"));
        Topology topology = ms.buildTopology(allProps);

        ms.createTopics(allProps);

        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

需要更多详细信息,请点击此处

 类似资料:
  • 给定:我在Kafka中有两个主题,假设主题A和主题B。Kafka流从主题A中读取一条记录,对其进行处理,并产生与所消耗记录相对应的多条记录(假设recordA和recordB)。现在的问题是我如何使用Kafka流来实现这一点。 在这里,读取的记录是消息;处理之后,它返回一个消息列表。如何将此列表划分为两个生产者流?任何帮助都将不胜感激。

  • 我有2个Kafka的主题流完全相同的内容从不同的来源,所以我可以有高可用性的情况下,其中一个来源失败。我正在尝试使用Kafka Streams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何关于失败的消息,并且当所有源都启动时没有重复的消息。 当使用KStream的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主主题关闭时,将不会向输出主题发送任何内容。这似乎是因为,

  • 有没有办法从Kafka主题中删除单个记录?我知道有一个脚本kafka-delete-records.sh删除指定主题和分区上指定偏移量之前的记录,但是我希望能够删除我指定的偏移量上的记录。有办法做到吗? 这不是在Java而是在裸露的Kafka实例上。

  • 我用Flink的table API创建了一个表。 当运行SQL以查看记录时,我得到: 我知道有一些坏的avro记录被推送到Kafka主题中。在JSON格式中,有一个选项可以通过设置来跳过/过滤这些记录。当从合流avro格式读取时,我们可以跳过这些记录吗? 这并不理想,但不幸的是,尽管有一个模式注册表,但我无法控制要推送到Kafka的内容。

  • 我有一个KStream KStream DSL如下所示: 阅读一些文章(例如Kafka流窗口) 但我想补充一点,这对我来说并不适用: Java编译器抛出以下错误: 老实说,我不明白。参数是正确的;虚拟现实类型是“历史”。 你知道我错过了什么吗? 这个windowedBy KTable的想法是让一个状态为一件“事情”保存所有事件一天。假设生成了一个新警报,我想将一天内“某物”的所有事件附加到警报上。

  • 合并 在执行pull之后,进行下一次push之前,如果其他人进行了推送内容到远程数据库的话,那么你的push将被拒绝。 这种情况下,在读取别人push的变更并进行合并操作之前,你的push都将被拒绝。这是因为,如果不进行合并就试图覆盖已有的变更记录的话,其他人push的变更(图中的提交C)就会丢失。 合并的时候,Git会自动合并已有的变更点!不过,也存在不能自动合并的情况。在下一个页面,我们会为大