我的问题是,我无法足够快地轮询我的队列,以保持我的队列为空或接近空。我最初的想法是,我可以让使用者以x/s的速率通过Camel从SQS接收消息。从那里,我可以简单地创建更多的消费者,以达到我需要的消息处理速度。
我的消费者:
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
如图所示,我设置了delay=1
和&maxmessagesperpoll=10
以提高消息的速率,但是我无法生成具有相同endpoint的多个使用者。
我在文档中读到,默认情况下假定endpoint不支持多个使用者。
我相信SQSendpoint也是如此,因为生成多个使用者将只给我一个使用者,在运行系统一分钟后,输出消息是count for actor:x
,而不是其他输出count for actor:0
的消息。
如果这是有用的;使用当前的实现,我可以在单个消费者上读取大约33条消息/秒。
这是从Akka中的SQS队列读取消息的正确方式吗?如果是这样,有没有办法让它向外扩展,以便我可以将我的消息消耗率提高到接近900条消息/秒?
遗憾的是,Camel目前不支持在SQS上并行使用消息。
http://camel.465427.n5.nabble.com/amazon-sqs-listener-as-multi-threaded-td5741541.html
为了解决这个问题,我编写了自己的Actor来使用AWS-Java-SDK轮询批处理消息SQS。
def receive = {
case BeginPolling => {
// re-queue sending asynchronously
self ! BeginPolling
// traverse the response
val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry]
val messages = sqs.receiveMessage(receiveMessageRequest).getMessages
messages.toList.foreach {
node => {
deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle))
//log.info("Node body: {}", node.getBody)
filterSupervisor ! node.getBody
}
}
if(deleteEntryList.size() > 0){
val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList)
sqs.deleteMessageBatch(deleteMessageBatchRequest)
}
}
case _ => {
log.warning("Unknown message")
}
}
我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github
我在使用者组中轮询来自 Kafka 的消息时遇到问题。我的使用者对象分配给给定的分区 之后,消费者向该分区分配: 之后,我可以计算分区内的消息 和 ..... 在我的主题中有超过30000条消息。问题是我只收到一条消息。 具有< code > max _ poll _ records = 200 < code > AUTO _ OFFSET _ RESET 的消费者配置是最早的 这是我的函数,我正
假设我的使用者从一个代理轮询,该代理有多个主题,每个主题有多个分区。我在同一个消费群体中总共有5个消费者。如果我的每个消费者都进行投票,将返回的数据顺序是什么? topicD-分区5 我的问题是,在这个单一的1轮询中,在按顺序移动到下一个主题/分区之前,我会收到来自该主题/分区的所有可用消息吗?意思例如: 在一次投票循环中,我收到了这个... 或者在那个单一的1轮询循环中,有可能接收到这个消息顺序
我正在查看关于使用Quarkus从SQS消费的指南。 问题是我想在无休止的循环中执行它,例如每10秒获取一次新消息,并使用Hibernate Reactive从消息中插入一些数据到数据库中。 我创建了一个Quarkus调度程序,但由于它不支持返回Uni,我不得不阻止Hibernate Responsive的响应,因此出现了这个错误 使用Quarkus和reactive实现我所需的最佳方法是什么?
我想在特定时间停止对特定主题的轮询。 Spring防尘套2.X Springkafka 2.5.5 Kafka版本2.5.1 比如即使有消息进来测试题目分区,消息也是从00到01堆在分区里,没有消耗。 01点之后,我想再次使用有关TEST主题的消息。 如何暂停和恢复?
Kafka 消费者在每个投票中轮询 500 条消息。我们禁用了, 假设我们已成功处理 100 条消息,偏移量也为 100 现在在第101条消息中,我们遇到了一个错误,我们没有提交偏移量 但是因为我们已经有了500条消息,所以我们处理了第102条消息,我们成功地处理了它,并且我们还提交了第102条消息的偏移量。 雀: 第 101 条消息会发生什么。 如何克服这个问题。