我的目标是使用Flink KafkaSource阅读来自Kafka主题的所有消息。我尝试用批处理和流模式执行。问题如下:当我将env.setParallelism设置为高于2时,我必须使用包含bug的接收器。于是,我设置了例如:< code > streamexecutionenvironment . setparallelism(1); 我想使用的Kafka主题包含3个分区。这是我的代码片段:
我需要一些关于Flink流媒体的帮助。我在下面生成了一个简单的Hello world类型的代码。这将流式传输来自RabbitMQ的Avro消息,并将其持久化到HDFS。我希望有人可以查看代码,也许它可以帮助其他人。 我发现的Flink流媒体的大多数示例都会将结果发送到std out。我实际上想把数据保存到Hadoop中。我读到,理论上,你可以和Flink一起去任何你喜欢的地方。实际上,我还没有找到
我有一个java应用程序,它对通过查询数据库中的表获得的批进行flink批处理,并将其输入kafka主题。我将如何定期安排这项工作。有flink调度程序吗?例如,我的java应用程序应该在后台持续运行,flink调度程序应该定期从数据库查询表,flink批处理它并将其输入kafka(flink批操作和输入Kafca已经在我的应用程序中完成了)。如果有人有这方面的建议,请帮忙。
我正在使用kafka主题作为数据流,并使用FlatMapFunction来处理数据。处理包括用更多的数据来丰富来自流的实例,这些数据从数据库获取,执行查询以收集输出,但它感觉这不是最好的方法。 阅读文档,我知道我可以从数据库查询创建数据集,但我只看到了批处理的示例。 我可以使用DataStream和DataSet执行合并/减少(或其他操作)来完成吗? 使用数据集而不是直接访问数据库可以提高性能吗?
我的目标是使用Kafka作为源设置一个高吞吐量集群 我在主服务器和辅助服务器上设置了一个2节点集群,配置如下。 flink-conf.yaml大师 Worker flink-conf.yaml 主节点上的文件如下所示: 两个节点上的 flink 设置位于具有相同名称的文件夹中。我通过运行 这将启动Worker节点上的任务管理器。 我的输入源是Kafka。以下是片段。 这是我的水槽功能 这是我的po
我有一个基于Apache Flink的流媒体应用程序,具有以下设置: < li >数据源:每分钟生成数据。 < li >使用CountWindow的窗口流,size=100,slide=1(滑动计数窗口)。 < li>ProcessWindowFunction对窗口中的数据应用一些计算(比如F(x))。 < li >使用输出流的数据接收器 这很好。现在,我想让用户能够提供一个函数G(x ),并将其
我们有一个kafka事件流,我们希望使用一些驻留在MySQL DB中的元数据来丰富它。 元数据每隔几个小时就会发生变化。本质上,我们希望定期读取数据库,并使用新的元数据不断丰富事件。 一种方法是将广播状态与每隔几分钟/小时读取一次 DB 的定期源一起使用。广播此流并使用它来加入。但问题可能是广播流的首次读取可能晚于从Kafka Stream读取的某些消息。 有没有更好的方法?
我是pyflink的新手。我正在尝试编写一个python程序来从kafka主题读取数据并将数据打印到标准输出。我按照链接Flink Python Datastream API Kafka Producer Sink Serializaion进行了操作。但由于版本不匹配,我一直看到NoSuchMethod odError。我添加了https://repo.maven.apache.org/maven
我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据: 我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。 这是我的映射csv文件: 所以我的输出最好是这样的 该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。 我目前有一个不适合我的代码: 代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中
我正在学习Flink,我从使用DataStream的简单字数统计开始。为了增强处理能力,我过滤了输出,以仅显示找到3个或更多单词的结果。 我想创建一个WindowFunction,根据找到的单词值对输出进行排序。我试图实现的WindowFunction根本不编译。我正在努力定义WindowFunction接口的apply方法和参数。
需要一些建议,我已经使用scala创建了一个flink作业来消费来自Kafka的消息。但是消息是用base64编码压缩的。我已经试过这个代码了 代码由于它不是有效的Json格式而失败。 然后我尝试使用SimpleStringSchema(),就像下面的代码一样 Kafka的信息完美地消耗了,但是输出如下 如何将此数据解码为有效的JSON? 此致敬意
我使用MQTT消费者作为我的flink作业的数据源。我想知道如何将数据偏移保存到检查点中,以确保flink集群在故障后重新启动时不会丢失数据。我看过很多介绍apache flink如何管理kafka消费者补偿的文章。有人知道apache flink是否有自己的功能来管理MQTT使用者吗?谢谢
我遇到的问题是,当Kafka和Flink作业重新启动时,Flink Kafka消费者偏移量会重置为0,因此即使我启用了检查点并且我在Flink作业中启用了精确一次语义学,数据也会被重新处理。 这是我的环境详细信息 < li >在Kubernetes下奔跑 < li>Kafka源主题有10个分区,没有复制。 < li >Kafka有三个经纪人。 < li>Flink checkpointing启用了
我们有一个具有 20 个独立管道的流式处理作业,每个管道具有一个或多个 Kafka 主题源。 当我们使用一个新的jar(我又添加了一个管道)重新启动作业,并且AllowNonRestoredState=true时,我们注意到从检查点恢复Operatorstate的奇怪行为。 我们当前用于添加管道的配置是静态的,我们基本上正在更改代码以添加任何新管道。 我们没有为任何运算符设置任何UID。 当我们从
我想在flink中测试一次端到端的处理。我的工作是: Kafka资料来源- 我在mapper1中放了一个< code > thread . sleep(100000),然后运行了这个作业。我在停止作业时获取了保存点,然后从mapper1中删除了< code > thread . sleep(100000),我希望该事件应该会被重放,因为它没有下沉。但这并没有发生,乔布斯正在等待新的事件。 我的Ka