您可以使用Flink Catalogs连接到元数据存储库,请参见https://nightlies.apache.org/Flink/flink-docs-master/docs/dev/table/Catalogs/
如何确保我总是从Kafka主题的一开始就与Flink一起消费? Kafka0.9。x consumer是Flink 1.0.2的一部分,它似乎不再是Kafka,而是Flink来控制偏移量: Flink在内部快照偏移量,作为其分布式检查点的一部分。Kafka/动物园管理员promise的补偿只是为了让外界对进展的看法与Flink对进展的看法保持同步。通过这种方式,监控和其他工作可以了解Flink K
我正在使用Apache Beam的kafkaIO阅读一个主题,该主题在Confluent schema Registry中有一个avro模式。我可以反序列化消息并写入文件。但最终我想写给BigQuery。我的管道无法推断架构。我如何提取/推断模式并将其附加到管道中的数据,以便我的下游进程(写入BigQuery)能够推断模式? 下面是我使用模式注册表url设置反序列化器的代码,以及我从Kafka读到
我们有一个Kafka主题,有源源不断的数据。为了处理它,我们有一个无状态的Flink管道,它使用该主题并写入另一个主题。 我们是不是漏掉了什么?我们误会什么了吗?有没有更好的解决办法? 谢了!
我通过以下说明创建了一个主题: 然后,我测试了这个主题是否有正确的数据。之后,我想在Flink程序中打印这个主题。我的计划是: 但是我得到了这个信息(因为信息太长了,我不得不写一些): [main]INFOorg.apache.flink.streaming.api.environment.LocalStream环境-在本地嵌入式Flink迷你集群上运行作业[main]INFOorg.apache
我用Flink来读写来自不同Kafka主题的数据。具体来说,我使用的是FlinkKafkaConsumer和FlinkkafKapProducer。 我想知道是否有可能根据我程序中的逻辑或记录本身的内容,将我正在阅读和写作的Kafka主题更改为“即时”。 例如,如果读取带有新字段的记录,我希望创建一个新主题,并开始将带有该字段的记录转移到新主题。 谢谢。
我正在尝试每 提供了Kafka主题中的数据,但它不保留顺序。我在循环中做错了什么?此外,必须将Flink的版本从< code>1.13.5更改为< code>1.12.2。 我最初使用的是< code > Flink < code > 1 . 13 . 5 、< code >连接器和< code>2.11的< code>Scala。我到底错过了什么?