当前位置: 首页 > 知识库问答 >
问题:

Apache Flink动态接收器数

梁丘赞
2023-03-14

我正在使用Apache Flink和Kafka消费者阅读Kafka主题中的一些值。我还有一个通过读取文件获得的流。

根据收到的值,我想就不同的Kafka主题编写这个流。

基本上,我有一个与许多孩子联系在一起的领导者的网络。对于每个孩子,领导者需要编写在孩子特定的Kafka主题中读取的流,以便孩子可以阅读它。当子级启动时,它会在从领导者读取的Kafka主题中注册自己。问题是我不知道我有多少个孩子。

例如,我从Kafka主题中阅读1,我只想在一个名为Topic1的Kafka主题中编写流。我读了1-2,我想写两个Kafka主题(Topic1Topic2)。

我不知道这是否可能,因为为了写这个主题,我使用了Kafka生产者和addSink方法,据我理解(从我的尝试来看),Flink似乎需要知道汇的数量先验。

但是,难道没有办法获得这种行为吗?

共有2个答案

颛孙博易
2023-03-14

现在不建议使用KeyedSeriazationSchema。相反,您必须使用"KafkaSeriazationSchema"

同样可以通过重写serialize方法来实现。

    public ProducerRecord<byte[], byte[]> serialize(
String inputString, @Nullable Long aLong){ 
        return new ProducerRecord<>(customTopicName,
 key.getBytes(StandardCharsets.UTF_8), inputString.getBytes(StandardCharsets.UTF_8));
}
楚嘉玉
2023-03-14

如果我能很好地理解你的问题,我认为你可以用一个接收器来解决它,因为你可以根据正在处理的记录来选择Kafka主题。似乎源代码中的一个元素可能会写入多个主题,在这种情况下,您需要一个FlatMapFunction将每个源代码记录复制N次(每个输出主题一个)。我建议将(topic,record)作为一对(akaTuple2)输出。

DataStream<Tuple2<String, MyValue>> stream = input.flatMap(new FlatMapFunction<>() {
    public void flatMap(MyValue value, Collector<Tupple2<String, MyValue>> out) {
        for (String topic : topics) {
            out.collect(Tuple2.of(topic, value));
        }
    }
});

然后,您可以使用前面通过创建FlinkKafkaProducer计算出的主题,该主题带有一个KeyedSerializationSchema,在其中实现getTargetTopic,以返回对中的第一个元素。

stream.addSink(new FlinkKafkaProducer10<>(
        "default-topic",
        new KeyedSerializationSchema<>() {
            public String getTargetTopic(Tuple2<String, MyValue> element) {
                return element.f0;
            }
            ...
        },
        kafkaProperties)
);
 类似资料:
  • 我在试着让我的弹性搜索下沉并运行。然而,我得到了以下错误,并正在耗尽如何修复它的想法。任何帮助都很感激。以下是错误: 我运行的是CDH 5.3和elasticsearch 1.4.2

  • 我的应用程序的MainActivity从用户那里获取一些数据来计算他/她的BMI和BMR,并将这些数据用于应用程序的主要部分,即另一个名为DailyActivity的活动。我希望这样,用户第一次启动应用程序时,它会要求提供数据。但在第一次之后,每次用户启动应用程序时,都应使用之前输入的数据直接启动DailyActivity。(按下后退按钮也不应显示MainActivity)。我如何实现这一点?

  • 问题内容: 对于我来说,目前尚不清楚,在这种情况下,我想使用值接收器而不是始终使用指针接收器。 回顾一下文档: 该 文档 还说:“对于基本类型,切片和小型结构之类的类型,值接收器非常便宜,因此,除非该方法的语义要求使用指针,否则值接收器是高效且清晰的。” 首先, 它说“非常便宜”,但问题是它比指针接收器便宜。因此,我做了一个小的基准测试(基于要点的代码),向我展示了,即使对于只有一个字符串字段的结

  • 技巧 有人问我,如何通过选项来指定动态连接器,而不使用缺省系统自带的动态连接器。我后来查了下ld的手册,有这么一个选项: -Ifile --dynamic-linker=file Set the name of the dynamic linker. This is only meaningful when generating dynamically linked ELF ex

  • 我在我的崩溃,有没有人面临这样的问题? Thx,提前 异常java.lang.RuntimeException:无法启动接收方android.support.v4.media.session.mediabuttonreceiver:java.illegalstateException:找不到任何处理[REDACTED_DOMAIN_NAME]_button的服务或媒体浏览器服务实现 (ZygoTe

  • 我目前正在使用SharedReferences跟踪通过AlarmManager启动的BroadcastReceiver中要执行工作的项列表。除了一个特定的场景外,一切都很好。当我触发一个新项目来执行工作时,让它完成工作,然后删除该项目(全部通过SharedReferences编辑),它在应用程序运行时工作得很好。当列表中没有任何内容,我打开任务管理器并终止应用程序时,该项突然出现在Broadcas