我有一个项目,我需要订阅与数据库中注册的任务相关的特定主题。每个任务都与一个主题相关,可以执行任务元数据中描述的特定转换。任务可以由其他系统注册、更新或禁用,flink作业必须反映这些更改,而无需重新启动作业。 基于此,我尝试了类似的方法https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html其中我有两个源函数,一个以
我试图使用ApacheFlink ML包的随机选择模型。 我不知道如何使用Kafka作为数据源,我知道它需要一个DataSet而不是DataStream,但我似乎无法窗口我的Kafka DataStream成为一个DataSet。 有没有一种方法可以将我的流视为一系列小数据集。例如,有没有办法说流中每10个元素匹配一个模式(按元素唯一ID滑动窗口),就将它们视为固定大小的数据集,并检测这个固定大小
我正在编写一个Flink-Kafka集成程序,如下所示,但Kafka出现超时错误: 从终端我可以看到Kafka和zookeeper正在运行,但当我从Intellij运行上面的程序时,它显示了这个错误: 组织。阿帕奇。Kafka。常见的错误。TimeoutException:在60000毫秒后更新元数据失败。2017年12月15日14:42:50作业执行切换到失败状态。[错误](run-main-0
我的要求是将数据发送到不同的ES接收器(基于数据)。例如:如果数据包含特定信息,则将其发送到sink1,否则将其发送到sink2等(基本上是根据数据动态发送到任何一个接收器)。我还想分别为ES sink1、ES sink2、ES sink3等设置并行度。 有什么简单的方法可以在flink中实现上述目标吗? 我的解决方案:(但并不满意) 我可以想出一个解决方案,但有中间Kafka主题,我写(topi
我用Flink来读写来自不同Kafka主题的数据。具体来说,我使用的是FlinkKafkaConsumer和FlinkkafKapProducer。 我想知道是否有可能根据我程序中的逻辑或记录本身的内容,将我正在阅读和写作的Kafka主题更改为“即时”。 例如,如果读取带有新字段的记录,我希望创建一个新主题,并开始将带有该字段的记录转移到新主题。 谢谢。
例如,我想用和测试Kafka/Flink的集成。 该过程将是: 与Flink一起阅读Kafka主题 用Flink进行一些操作 和Flink一起写另一个Kafka主题 以字符串为例,从输入主题中读取字符串,转换为大写,写入新主题。 问题是如何测试流量? 当我说测试时,这是单元/集成测试。 谢谢!
我试图用Kafka不等式运行一个简单的Apache Flink脚本,但在执行过程中一直存在问题。脚本应该阅读来自Kafka制作人的消息,对其进行详细阐述,然后将处理结果再次发送回其他主题。我从这里得到了这个例子:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-td
我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:
我指的是Kafka源代码连接器的Flink 1.14版本,代码如下。 我期待以下要求。 在最新的应用程序开始时,必须阅读Kafka主题的最新偏移量 在检查点上,它必须将消耗的偏移量提交给Kafka 重新启动后(当应用程序手动终止/系统错误时),它必须从最后提交的偏移量中选取,并且必须消耗使用者延迟,然后再使用新的事件提要 有了Flink新的KafkaConsumer API(KafkaSource
场景:阿帕奇·Flink、Kafka、协议缓冲区数据消费者。 数据源是协议缓冲区格式的Kafka主题(多个主题:主题#1,主题#3,主题#3)。消费者是Apache Flink消费者。每个主题都有一个独特的原型定义。 我试图在Apache Flink中开发一个通用的数据摄取工作,将Kafka的数据摄取到数据库中。 如何为Apache Flink实现通用protobuf反序列化程序?我正在寻找实现,
我是Apache Flink的新手,所以我目前正在尝试做一些实验。我正在读Kafka的一个主题,然后在控制台上打印出来。打印大约100kkafka消息后,它抛出异常。日志输出如下。 我正在使用一个自定义类来扩展AbstractDeserializationSchema,以反序列化kafka记录值。我甚至尝试过在其中添加一些异常处理,但没有触发。 我使用Kafka的代码非常简单: 即使在异常之后,输
我正在使用Apache Flink和Kafka消费者阅读Kafka主题中的一些值。我还有一个通过读取文件获得的流。 根据收到的值,我想就不同的Kafka主题编写这个流。 基本上,我有一个与许多孩子联系在一起的领导者的网络。对于每个孩子,领导者需要编写在孩子特定的Kafka主题中读取的流,以便孩子可以阅读它。当子级启动时,它会在从领导者读取的Kafka主题中注册自己。问题是我不知道我有多少个孩子。
我有一个使用Flink Global Window的管道,它具有基于事件时间(从到达元素的时间戳)的自定义触发器和从窗口中删除不必要元素并将其传递给ProcessFunction的Evictor, 像这样的东西: 我知道我可以用提供水印操作的TestHarness测试我的WindowProcessFunction,但是我需要测试整个流程,触发退出器ProcessFunction。 我还尝试了使用T
当我使用 pyflink 蜂巢 sql 读取数据插入到 es 中时,抛出以下示例:环境: flink 1.11.2 flink-sql-连接器-hive-3.1.2_2.11-1.11.2.jar hive 3.1.2 如何解决这个问题?
我在两个无界流之间有一个简单的间隔连接。这适用于较小的工作负载,但对于较大的(正式生产环境),它不再有效。通过观察输出,我可以看到FlinkSQL作业仅在扫描整个主题(并因此读入内存?)后才触发/发出记录,但我希望作业在找到ingle匹配后立即触发记录。因为在我的正式生产环境中,作业无法承受将整个表读入内存。 我正在做的间隔连接与这里提供的示例非常相似:https://github.com/ver