我们使用Flink 1.2.1,我们通过联合一个流到另一个流来从2个kafka流中消费,并处理联合流。例如stream1.union(stream 2)但是,stream 2的体积是stream 1的100多倍,我们正在经历的是stream 2有巨大的消耗滞后(超过3天的数据),但stream 1的滞后很少。我们已经有9个分区,但1个作为Parallelism,会增加paralelism解决stream 2的消耗滞后,或者我们根本不应该在这种情况下进行联合。
是的,尝试增加stream 2源代码的并行性-它应该有所帮助:env.addSource(kafkaStream2消费者). setParallelism(9)
目前您有一个1核的瓶颈,需要跟上消耗stream 2数据的速度。为了充分利用Kafka的并行性,Flink Kafka消费者并行性应该是
. Union()
不应该导致时滞,AFAIK。
是的,如果处理过程中的延迟实际上是由于使用的操作符(或接收器)受到CPU限制,那么增加并行性应该会有所帮助。
如果问题是接收器端的某些东西无法通过更高的并行性来解决(例如,您正在写入数据库,并且它处于最大摄取速率),那么当然,增加接收器并行性也无济于事。
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我是pyflink的新手。我正在尝试编写一个python程序来从kafka主题读取数据并将数据打印到标准输出。我按照链接Flink Python Datastream API Kafka Producer Sink Serializaion进行了操作。但由于版本不匹配,我一直看到NoSuchMethod odError。我添加了https://repo.maven.apache.org/maven
我正在建立一个新的Kafka集群,为了测试目的,我创建了一个有1个分区和3个副本的主题。 有什么想法哪种配置或其他东西可以帮助我消费更多的数据吗?? 提前致谢
如果每个事件间隔为1秒,并且有2秒的滞后,那么我希望示例输入和输出如下所示。 输入:1,2,3,4,5,6,7... 输出:NA,NA,1,2,3,4,5...
我在Flink的工作中使用Kafka资料来源的信息流,一次阅读50个主题,如下所示: 然后有一些运算符,如:过滤器- 我能获得的最大吞吐量是每秒10k到20k条记录,考虑到源发布了数十万个事件,这相当低,我可以清楚地看到消费者落后于生产者。我甚至试着移除水槽和其他操作员,以确保没有背压,但它仍然是一样的。我正在将我的应用程序部署到Amazon Kinesis data analytics,并尝试了
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka