当前位置: 首页 > 知识库问答 >
问题:

Flink流未完成

林彬
2023-03-14

我正在使用kafka和elasticsearch设置flink流处理器。我想重播我的数据,但当我将并行度设置为1以上时,它不会完成程序,我认为这是因为Kafka流只看到一条消息,将其标识为流的结尾。


    public CustomSchema(Date _endTime) {
        endTime = _endTime;
    }

@Override
    public boolean isEndOfStream(CustomTopicWrapper nextElement) {
        if (this.endTime != null && nextElement.messageTime.getTime() >= this.endTime.getTime()) {
            return true;
        }
        return false;
    }

有没有办法告诉flink消费群中的所有线程在一个线程完成后立即结束?

共有1个答案

郎弘业
2023-03-14

如果实现了自己的SourceFunction,请使用Flink SourceFunction中的示例所示的取消方法。FlinkKafkaConsumerBase类还具有cancel方法。

 类似资料:
  • 我正在尝试加入apache flink中的两个流以获得一些结果。 我的项目的当前状态是,我正在获取twitter数据并将其映射到一个2元组中,其中保存用户的语言和定义时间窗口中的推文总和。我这样做是为了每种语言的推文数量和每种语言的转发。推文/转发聚合在其他进程中运行良好。 我现在想得到一个时间窗口内转发次数占所有推文次数的百分比。 因此我使用以下代码: 当我打印或时,输出似乎很好。我的问题是我从

  • 我们正在努力计算 1 分钟翻滚时间窗口内不同类型的事件的最大并发计数。 这些事件就像传感器数据,这些数据是从我们的桌面代理每分钟收集的,然而,一些代理得到了一个错误的时间戳,比如说,它甚至比现在晚了几个小时。 所以,我的问题是如何处理/删除这些事件,目前我只是应用过滤器(s = 我的第一个问题是,如果我不这样做,我怀疑这个坏的“未来”事件会触发窗口计算,即使是那些不完整的数据窗口 第二个问题是,我

  • 我是Apache Flink的新手,正在尝试了解一些与Kafka一起扩展Flink流媒体作业的最佳实践。我无法找到合适答案的一些问题包括: 您可以/应该运行多少个流作业?运行太多流是否存在可扩展性问题?太多是多少? 如果我们运行假设2,000个流来满足业务需求,那么管理这些流的最佳方法是什么? 从一个流读取流数据到另一个流的首选方式是什么?我们可以加入流、执行连续查询等吗...? 提前感谢您的支持

  • 主要内容:1.分流,2.Union聚合,3.Connect 连接,4.Join 合流,5.总结分流和合流 分流的方式: 侧输出流 合流的方式: Union, Connect, Join, CoGroup 1.分流 所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子 DataStream,如图 8-1 所示。一般来说,我们会定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。 1.1 简单实现 其实根据条件筛选数据的

  • 我正在使用JBPM 5.4.0和通过spring容器创建的注入以及本地任务服务。我在会话中注册了一个LocalHttItemHandler,但即使在将我的任务标记为Complete(完成时没有问题)之后,流也不会继续。这里会出什么问题? 即使使用LocalTaskService,我也需要启动任务服务器吗?

  • 我正在浏览Apache Flink的基本WordCount示例。这是代码: 当我尝试在群集中部署此作业时,请使用: 我得到这个例外: 我不明白为什么,因为我是Flink的新手。请帮助我理解这个问题。谢谢你。 当我尝试直接从IDE运行此代码而不将JAR部署到集群时,它完全可以正常工作。