我尝试将一些简单的任务迁移到Flink 1.0.0版本,但失败了,出现以下异常: 该代码由两个通过Kafka主题连接的独立任务组成,其中一个任务是简单消息生成器,另一个任务是简单消息消费者,它使用timeWindowAll计算每分钟消息的到达率。 同样,类似的代码在0.10.2版本中工作时没有任何问题,但现在看起来系统错误地解释了一些事件时间戳,如Long。导致任务失败的MIN\u值。 问题是,我
查看Flink的留档和书籍,我对时间戳有疑问:如果流设置为事件时间模式,这意味着时间戳在进入Flink之前具有源的时间(甚至在通过消息传递队列之前,可能是Kafka),为什么Flink将时间戳作为元数据附加到记录中?幻灯片3根据它们所占的内容具有不同类型的时间戳:https://www.slideshare.net/dataArtisans/apache-flink-training-time-a
我们正在Kubernetes上以应用程序模式运行Flink作业,问题是当作业完成/停止时,作业管理器容器将退出,但1。任务管理器2的部署。作业管理器服务3。除非我们运行kubectl delete来清理它,否则configMap仍然存在。 如果我们手动停止作业,这没什么大不了的,但是如果我们的Flink作业是一个批处理作业,稍后会完成,这意味着我们需要一个外部服务来保持监控作业管理器容器并在完成后
我正在向Flink接收数据流。对于这些数据的每个“实例”,我都有一个时间戳。我可以检测从中获取数据的机器是否正在“生产”或“不生产”,这是通过位于其自身静态类中的自定义平面映射函数完成的。 我想计算机器生产/不生产的时间。我目前的方法是在两个简单列表中收集生产和非生产时间戳。对于数据的每个“实例”,我通过从最早的时间戳中减去最新的时间戳来计算当前的生产/非生产持续时间。但是,这给了我不正确的结果。
我试图开发以下代码,但它不起作用。我想使用apache Flink来延迟时间(在时间戳字段中指定的)与当前日期不同的事件。 样品: > 当前日期:2022-05-06 10:30 事件1[{“user1”:“1”,“user2”:“2”,“timestamp”:“2022-05-06 10:30”}-- 事件2[{“user1”:“1”,“user2”:“2”,“timestamp”:“2022-
尽管Flink有一些内置的工具来处理延迟数据,比如允许延迟,但我想自己处理延迟数据。例如,我想监控延迟事件或将它们保存到数据库中。 我该怎么做?
我想使用Flink SQL将一个Kafka主题转换成一个表,然后将其转换回DataStream。 以下是: 有办法做到这一点吗?谢了。
我试图将数据从kafka主题读入DataStream并注册DataStream,然后使用tableEnvironment.sqlQuery(“sql”)查询数据,当tableEnvironment.execute()没有错误也没有输出时。 依赖关系: flink-streaming-java2.11版本:1.9.0-csa1.0.0.0; Flink-Streaming-Scala2.11版本:1
目前,我有一个Flink集群,它想通过一个模式消费Kafka主题,通过使用这种方式,我们不需要维护一个硬代码Kafka主题列表。 --update--我需要知道主题信息的原因是我们需要这个主题名称作为参数,在即将到来的Flink sink部分中使用。
我正在阅读Flink配置单元连接器代码,查找接口,它有很多实现类。但是我发现实现类使用反射API。为什么不导入依赖项,代码太晦涩了。
我目前正在开发一个Flink应用程序,它使用一些Hadoop依赖项将数据写入S3位置。在本地环境中,它工作得很好,但是当我在EMR集群上部署这个Flink应用程序时,它引发了一个与兼容性问题有关的异常。 我得到的错误消息是 我在POM依赖项中包含了jar的maven依赖项。但它没有探测到它。我使用的Flink版本是1.2.0 但是,当我显式地将兼容性JAR复制到位置时,我不会得到任何异常,并且能够
当我们在Flink SQL CLI中选择作为SQL Server数据库表源的任何表时,为什么Flink作业在从数据库表中获取所有记录后“完成”?是否可以让它“运行”以便从数据库表中获取最新数据?
我们正在尝试使用StreamingFileSink写入S3桶。这是一个简单的工作,从Kafka读到S3。s3的凭据在flink集群中配置。我们使用的Flink1.7.2没有预绑定的Hadoop。正如文档中所建议的,我们已经将flink-s3-fs-hadoop jar添加到flink集群的lib目录中。当我们运行作业时,我们会得到这个特殊的Kerberos异常。我们做错了什么?我们是否缺少任何配置
我正在尝试使用主题列表中的单个kafka使用者组合两个kafka主题,进一步将流中的json字符串转换为POJO。然后,通过keyBy(On事件时间字段)将它们加入,并将它们合并为单个胖json,我计划使用窗口流并在窗口流上应用窗口函数。假设主题A 我有几个问题。 这种方法适合合并主题并创建单个JSON吗 所有窗口流上的窗口函数似乎工作不正常;任何指点都将不胜感激 代码片段: 我得到了- AllW
我有一个用例,我需要处理存储在s3中的文件中的数据,并将处理后的数据写入本地文件。s3 文件会不断添加到存储桶中。 每当一个文件被添加到bucket中时,完整的路径被发布到一个kafka主题。 我想在一份工作中实现以下目标: 从kafka(无界流)读取文件名 一个计算器,它接收文件名,从s3(第二个源)读取内容并创建数据流 处理数据流(为每行添加一些逻辑) 接收到文件 我设法完成了设计的第一、第三