我们面临的情况是,只要有滞后,我们的akka流kaka消费者处理率就会下降。当我们在分区中没有任何延迟的情况下启动它时,处理速度会突然增加。
MSK群集-10个主题-每个40个分区=
为了在系统中实现高吞吐量和并行性,我们实现了akka-stream-kafka消费者分别订阅每个主题分区,从而在消费者和分区之间实现1:1映射。
这是消费者设置:
因此,我们总共启动了分布在不同实例中的420个消费者。根据RangeAssignor分区策略(默认一),每个分区将分配给不同的消费者,400个消费者将使用400个分区,20个消费者将保持未使用状态。我们已经验证了这个分配,看起来不错。
使用的实例类型:c5。xlarge公司
MSK配置:
Apache Kafka版本-2.4.1.1
经纪人总数-9(分布在3个AZ)
经纪人类型:Kafka。m5.大型
每个区域的经纪人:3
汽车创造话题。启用=真
default.replication.factor=3
最小insync。副本=2
num.io.threads=8
网络数量。Thread=5
num.partitions=40
数字副本。取数器=2
复制品缓慢移动时间最大ms=30000
socket.receive.buffer.bytes=102400
插座要求最大字节数=104857600
插座邮寄缓冲器字节=102400
不干净。领导选举启用=真
动物园管理员。一场超时。ms=18000
log.retention.ms=259200000
这是我们为每个消费者使用的配置
akka.kafka.consumer {
kafka-clients {
bootstrap.servers = "localhost:9092"
client.id = "consumer1"
group.id = "consumer1"
auto.offset.reset="latest"
}
aws.glue.registry.name="Registry1"
aws.glue.avroRecordType = "GENERIC_RECORD"
aws.glue.region = "region"
kafka.value.deserializer.class="com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer"
# Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
# configured by `consumer.metadata-request-timeout`
connection-checker {
#Flag to turn on connection checker
enable = true
# Amount of attempts to be performed after a first connection failure occurs
# Required, non-negative integer
max-retries = 3
# Interval for the connection check. Used as the base for exponential retry.
check-interval = 15s
# Check interval multiplier for backoff interval
# Required, positive number
backoff-factor = 2.0
}
}
akka.kafka.committer {
# Maximum number of messages in a single commit batch
max-batch = 10000
# Maximum interval between commits
max-interval = 5s
# Parallelism for async committing
parallelism = 1500
# API may change.
# Delivery of commits to the internal actor
# WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
# SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
delivery = WaitForAck
# API may change.
# Controls when a `Committable` message is queued to be committed.
# OffsetFirstObserved: When the offset of a message has been successfully produced.
# NextOffsetObserved: When the next offset is observed.
when = OffsetFirstObserved
}
akka.http {
client {
idle-timeout = 10s
}
host-connection-pool {
idle-timeout = 10s
client {
idle-timeout = 10s
}
}
}
consumer.parallelism=1500
我们使用以下代码来实现从Kafka到空水槽的流程
override implicit val actorSystem = ActorSystem("Consumer1")
override implicit val materializer = ActorMaterializer()
override implicit val ec = system.dispatcher
val topicsName = "Set of Topic Names"
val parallelism = conf.getInt("consumer.parallelism")
val supervisionDecider: Supervision.Decider = {
case _ => Supervision.Resume
}
val commiter = committerSettings.getOrElse(CommitterSettings(actorSystem))
val supervisionStrategy = ActorAttributes.supervisionStrategy(supervisionDecider)
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicsName))
.mapAsync(parallelism) {
msg =>
f(msg.record.key(), msg.record.value())
.map(_ => msg.committableOffset)
.recoverWith {
case _ => Future.successful(msg.committableOffset)
}
}
.toMat(Committer.sink(commiter).withAttributes(supervisionStrategy))(DrainingControl.apply)
.withAttributes(supervisionStrategy)
代码中的库版本
"com.typesafe.akka" %% "akka-http" % "10.1.11",
"com.typesafe.akka" %% "akka-stream-kafka" % "2.0.3",
"com.typesafe.akka" %% "akka-stream" % "2.5.30"
观察结果如下:,
我们希望所有消费者都能并行运行并实时处理消息。这3天的处理延迟给我们造成了严重的停机时间。我尝试跟踪给定的链接,但我们已经在固定版本上了https://github.com/akka/alpakka-kafka/issues/549
有人能帮助我们在消费者配置或其他问题方面缺少什么吗?
每个主题每个分区的偏移量滞后图
在我看来,这个滞后图似乎表明您的整个系统无法处理所有负载,而且看起来一次只有一个分区真正取得了进展。
这种现象向我表明,在f中进行的处理最终会限制某些队列的清除速度,而mapAsync阶段的并行度太高,导致分区之间相互竞争。由于Kafka消费者批次记录(默认情况下为500个批次,假设消费者的延迟超过500个记录)如果tha并行度高于该值,则所有这些记录基本上与块同时进入队列。看起来mapAsync中的并行度是1500;考虑到Kafka默认500批大小的明显使用,这似乎太高了:它没有理由大于Kafka批大小,如果您希望分区之间的消耗率更均匀,它应该大大小于该批大小。
如果没有关于f中发生了什么的详细信息,很难说这个队列是什么以及应该减少多少并行性。但我可以分享一些一般准则:
I/O绑定/阻塞情况将通过您的实例上非常低的CPU利用率来证明。如果您正在为每个目标主机填充队列,您将看到有关“超出已配置的max-open请求值”的日志消息。
另一件值得注意的事情是,由于Kafka消费者天生就是阻塞的,因此阿尔帕克卡Kafka消费者参与者在其自己的调度程序中运行,默认情况下,调度程序的大小为16,这意味着每个主机一次最多只能有16个消费者或生产者工作。设置akka。Kafka。默认调度程序。线程池执行器。将池大小至少固定到应用程序启动时的消费者数量(每7个主题配置6个消费者中有42个)可能是个好主意。Alpakka Kafka dispatcher中的线程不足可能会导致消费者重新平衡,从而中断消费。
我建议,在不做任何其他更改的情况下,为了跨分区实现更均匀的消耗率,设置
akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-size = 42
consumer.parallelism = 50
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我正在建立一个新的Kafka集群,为了测试目的,我创建了一个有1个分区和3个副本的主题。 有什么想法哪种配置或其他东西可以帮助我消费更多的数据吗?? 提前致谢
我想检查手动分配给特定主题的消费者组的滞后,这可能吗。我使用的是Kafka-0.10.0.1。我用的是shKafka跑步课。shKafka。管理ConsumerGroupCommand-new consumer-description-bootstrap server localhost:9092-group test但它说不存在组,所以我想知道当我们手动分配分区时,是否可以检查使用者的延迟。
这是一个关于Kafka和信息如何被消费的非常基本的问题,但不幸的是,我在这一点上找不到任何答案。 假设我想过度分区,那么我将得到比消费者多10倍的分区。过度分区是必需的,因为我希望能够扩展(在未来并行处理更多的消息)。 1 个主题分为 1000 个分区,由 100 个使用者使用 =- 我的问题是: > 消息是如何为每个消费者消费的:它是以循环方式完成的吗?如果不是,分发是如何完成的? 有没有保证消
我们有一个基于spring boot的事务性Kafka制作人!使用的版本如下 spring-boot-starter-父-2.3.0。释放 spring-kafka-2.5.0。释放 我们的kafka(集群)版本是2.1. x! 作为生产者,我们启用了幂等性,定义了事务id前缀,并在事务中执行kafka模板调用。我们还有一个将隔离级别设置为只读的使用者! 现在我们遇到了一个行为,不知道如何推断,
我们所有的30个主题都是用kafka中的10个分区创建的。我们正在按分区监控所有主题/group p-id的滞后。 我们正在使用Fluentd插件从kafka读取和路由日志。该插件是使用高级消费者实现的。我们为插件的单个主题配置了一些消费者,为多个主题配置了一些消费者。总的来说,除了3个主题之外,数据正在流经,没有问题。 问题是,对于正在处理的30个主题中的3个,我们发现分区滞后值不一致,即查看特