当前位置: 首页 > 知识库问答 >
问题:

同一Kinesis流的多个不同消费者

水恩
2023-03-14

我有一个Kinesis生产者,它将单一类型的消息写入流。我想在多个完全不同的消费者应用程序中处理这个流。因此,给定主题/流的具有单个发布者的发布/订阅。我还想利用检查点来确保每个消费者处理写入流的每条消息。

最初,我对所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动多个消费者,我就开始收到以下错误:

通用域名格式。amazonaws。服务。运动。模型InvalidArgumentException:StartingSequenceNumber 4956423629634456659779527257172303925768853369405442在账户*******下创建的流包中的shard shardId-000000000000上的GetShardIterator中使用,因为它不是来自此流,所以无效。(服务:AmazonKinesis;状态代码:400;错误代码:InvalidArgumentException;请求ID:…)

这似乎是因为消费者在使用相同的应用程序名称时与他们的检查点发生冲突。

从阅读文档来看,使用检查点进行发布/订阅的唯一方法似乎是拥有一个针对每个消费者的流应用程序,这要求每个生产者了解所有可能的消费者。这比我想要的更紧密地结合在一起;这真的只是一个队列。

Kafka似乎支持我想要的:任意使用给定的主题/分区,因为消费者完全可以控制自己的检查点。如果我想要有检查点的酒吧/酒吧,我唯一的选择是搬到Kafka还是其他的选择?

我的RecordProcessor代码,在每个消费者中都是相同的:

override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
  log.trace("Received record(s) from kinesis")
  for {
    record <- processRecordsInput.getRecords
    json   <- jawn.parseByteBuffer(record.getData).toOption
    msg    <- decode[T](json.toString).toOption
  } yield subscriber ! msg
  processRecordsInput.getCheckpointer.checkpoint()
}

代码解析消息并将其发送给订阅者。目前,我只是将所有消息标记为已成功接收。我可以在AWS Kinesis仪表板上看到正在发送的消息,但不会发生读取,可能是因为每个应用程序都有自己的AppName,并且看不到任何其他消息。

共有1个答案

柴嘉禧
2023-03-14

您想要的模式,一个发布者的模式

你是怎么做到的?您需要为每个消费者指定不同的应用程序名称。这样,一个消费者的检查点信息就不会与另一个消费者的检查点信息发生冲突。

检查对此的第一个响应:https://forums.aws.amazon.com/message.jspa?messageID=554375

 类似资料:
  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我有一个spring boot项目,我是spring-kafka来连接底层的kafka事件枢纽。 我不得不在同一节消费者课上听2个不同的话题。我有两种方法可以这样做。 一个是要有两个这样的Kafka听众: 另一种方法是在同一个kafkaListener中有两个主题,如下所示 ===================edit===============application.yml中的Kafka属性

  • 我有4个分区和4个消费者(例如A、B、C、D)。如何使用使用者组配置哪个使用者将从哪个分区读取数据。我用的是Kafka的春靴。

  • 在我的Spring Boot Kafka应用程序中,我有以下使用者配置: 消费者: 如果我理解正确的话,现在我有一个消费者的实例。我想增加post消费者的数量,假设有5个消费者将消费来自${kafka.topic.post.send}的不同(不同)消息,以加快消息消费。 它是否像添加工厂一样简单。setConcurrency(5) 至我的PostKafkAlisterContainerFactor

  • 我有1个消费者群体和5个消费者。也有5个分区,因此每个消费者得到1个分区。 CLI还显示 bin/Kafka-console-consumer . sh-bootstrap-server localhost:9092-Topic Topic-1-from-beginning-partition { n }正确显示每个分区的不同消息。 然而,我经常看到两个或两个以上的消费者在处理同一条信息,而且对于

  • 因此,我有一个AWS动态流,在这里我为多个消费者发布事件。对于大多数人来说,接收热门数据很重要,这意味着他们中的许多人可能会同时轮询和读取最新数据。根据AWS文档,增加分片的数量将提高并行度,而每秒读取的数量最多为5次/秒。我的问题是是否(以及如何?)添加更多的碎片是否有助于解决我的所有消费者都是最新的并试图从同一个碎片读取新的传入数据的情况?似乎这种每秒读取数限制自动对您可以拥有的消费者数量进行