我已经实现了Kafka消费者,现在我有了一个场景。 从Kafka流2.2.5中读取数据。通过Srpingboot发布 加载数据库表1 将数据从表1复制到表2 清理桌子1 要执行上述操作,我需要使用quartz的调度作业(已编写)暂停/恢复Kafka使用者,该作业将数据从表1复制到表2。但是在这个活动中,我希望我的Kafka听众暂停,一旦复制完成,它应该继续。 我的实施:
我在“提交”主题中放置了一个json对象。我想使用Kafka流使用消息,但出现了一个错误 日志显示: 线程“test-streams-469f5ee6-d0de-472e-a602-a7b6d11f2e1c-StreamThread-1”组织中出现异常。阿帕奇。Kafka。流。错误。StreamsException:无法配置值serde类组织。阿帕奇。Kafka。常见的序列化。Serdes$包装器
好日子, 我想知道kafka队列是否可以保存数据几秒钟并释放数据。 我收到一条来自Kafka主题的消息,解析完数据后,我将其保存在内存中一段时间(10秒)(这会随着唯一消息的出现而增加),每条消息都有自己的计时器),我希望Kafka告诉我该消息已过期(10秒),以便我可以继续执行其他任务。 但由于Flink/Kafka是事件驱动的,我希望Kafka有某种圆形计时轮,可以在10秒后向消费者复制信息的
我不知道当您从Apache Kafka中摄取数据时,水印应该如何工作。 我读到Flink通过从消息中获取时间戳来自动处理水印,但他们没有指定从何处开始。从消息负载、从标头还是从CreateTime 以下格式的事件: Topic
我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一
我正在使用Apache Flink 1.3.2的集群。我们正在使用Kafka消息,自从将代理升级到1.1.0(从0.10.2)后,我们经常在日志中发现此错误: 因此,有时我们在处理过程中会遇到缺失事件。我们在工作中使用FlinkKafkaConsumer010。 已启用检查点(间隔10秒,超时1分钟,检查点之间的最小暂停时间为5秒,最大并发检查点时间为1秒。E2E的平均持续时间不到1秒,甚至不到半
从本文档中,我可以阅读以下内容: 默认情况下,记录将使用Kafka ConsumerRecord中嵌入的时间戳作为事件时间。您可以定义自己的水印策略,从记录本身提取事件时间,并向下游发出水印: 本文档详细描述了如何定义水印策略。 如果用户不提供可选的水印策略,将使用什么水印策略? 我这样问是因为我发现源代码中的默认值是空的。 我们正在运行Flink 1.11。
我们希望在读取消息表单kafka时实现并行性。因此我们想在flinkkafkaconsumer中指定分区号。它将从kafka中的所有分区读取消息,而不是特定的分区号。以下是示例代码: 请建议任何更好的选择来获得并行性。
我第一次试着让它工作,所以请容忍我。我正在尝试学习Kafka的检查点设置和处理“错误”消息,在不丢失状态的情况下重新启动。 用例:使用检查点。从Kafka那里读取一个整数流,保持一个连续的和。如果读到“坏”Kafka消息,请重新启动应用程序,跳过“坏”消息,保持状态。我的流看起来像这样: set1,5 set1,7 set1,foobar set1,6 我希望我的应用程序保留它看到的整数的运行总和
假设一个主题有3个kafka分区,我希望我的事件按小时窗口,使用事件时间。 当某个分区位于当前窗口之外时,kafka使用者是否会停止读取该分区?还是打开一个新窗口?如果它正在打开新的窗口,那么,如果一个分区的事件时间与其他分区相比会非常倾斜,那么从理论上讲,它不可能打开无限数量的窗口,从而耗尽内存吗?当我们重播一些历史时,这种情况尤其可能发生。 我一直试图从阅读留档中得到这个答案,但是在分区上找不
我试图遵循示例:https://blog.knoldus.com/a-quick-demo-kafka-to-flink-to-cassandra/我试图从kafka解析我的Shippingorder JSON消息并将其解析为对象。然后按一些属性对其进行分组,但在平面图步骤时出现错误。 我的sbt文件: 我的主文件。 我的订单对象 运行此作业时出错 我不知道这个错误。请解释并帮助我解决这个问题。
所以,我试图在我的Flink Kafka流媒体工作中启用EXACTLY_ONCE语义以及检查点。 但是我没有让它工作,所以我尝试从Github下载测试示例代码:https://github.com/apache/flink/blob/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kaf
我将log4j2与一个JSON属性文件一起使用,我将其命名为log4j2。json并尝试添加Kafka追加器。有很多使用旧格式或xml的示例,但很难在JSON中获得正确的格式。这很可能是一个愚蠢的问题,但我一直在努力让它发挥作用,在任何地方都找不到任何例子。我很想放弃在json中配置log4j2,转而使用XML,但我觉得这应该相当简单。 这是一个用XML定义的示例kafka appender。 l
我正在试用Flink的新Python流API,并尝试使用<代码>运行我的脚本/flink-1.6.1/bin/pyflink-stream。sh示例/阅读Kafka的\u。py 。python脚本相当简单,我只是尝试使用现有主题并将所有内容发送到stdout(或日志目录中的*.out文件,默认情况下,输出方法在该目录中发出数据)。 我从maven repos中抓取了一些jar文件,即、和并将它们复
我有一个启用了Kafka的Azure Event Hub,我正试图从Google Cloud的数据流服务连接到它,以将数据流式传输到Google Big Query。我成功地可以使用Kafka CLI与Azure Event Hub交谈。但是,使用GCP,5分钟后,我在GCP数据流作业窗口中收到超时错误。 Azure EH已启用Kafka- 为了设置启用Kafka的事件中心,我遵循此GitHub页