我使用的是运行在AWS中的spark独立集群(spark and spark-streaming-kafka version 1.6.1),并对检查点目录StreamingContext.getorCreate(config.sparkConfig.CheckpointDir,createStreamingContext)
使用S3桶,每个工作节点上没有调度延迟和足够的磁盘空间。
没有更改任何Kafka客户端初始化参数,非常肯定Kafka的结构没有更改:
val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
也不明白为什么当直接使用者描述说消耗的偏移量是由流本身消耗的
时,我仍然需要在创建流上下文时使用检查点目录?
这通常是通过将spark.streaming.backpressure.enabled
设置为true来启用背压的结果。通常情况下,当背压算法看到有更多的数据进入比它所习惯的,它开始将每一批限制到一个相当小的大小,直到它能够重新“稳定”自己。这有时会出现误报,并导致流的处理速度减慢。
如果您想稍微调整一下启发式,它使用了一些未记录的标志(只要确保您知道自己在做什么):
val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
如果您想要血淋淋的细节,那么pidrateestimator
就是您需要的。
我对Kafka有一个概念上的问题。 我们有许多机器在一个主题上充当消费者,有许多分区。这些机器运行在不同的硬件设置上,将会有比其他机器具有更高吞吐量的用户。 现在,使用者和一个或多个分区之间存在直接的相关性。
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我正在与Kafka和阿帕奇·Flink合作。我正在尝试使用apache Flink中的一个kafka主题中的记录(这些记录是avro格式的)。下面是我正在尝试的一段代码。 使用自定义反序列化器来反序列化主题中的avro记录。 我发送到主题“test-topic”的数据的Avro模式如下所示。 我正在使用的自定义反序列化器如下所示。 我的flink应用程序就是这样写的。 我得到的输出是{“name”