当前位置: 首页 > 知识库问答 >
问题:

消费者/生产者AWS SQS akka scala与synchrone消费者

宫高义
2023-03-14

我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。

我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。

我在github上找到了这个例子。

我对消费者的配置更感兴趣(Thread.sleep只是为了模拟我的处理) :

class MySqsConsumer extends Consumer {

  //The SQS URI is an in-only message exchange (autoAck=true)
  override def endpointUri = "aws-sqs://sqs-akka-camel?  amazonSQSClient=#client"

  override def receive = {
    case msg: CamelMessage => {
      println("Start received %s" format msg.bodyAs[String])
      Thread.sleep(4000)
      println("Stop received %s" format msg.bodyAs[String])

    }

  }
 }

堆栈跟踪:

...
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!]
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!]
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!]
Start received Hello World1!
Stop received Hello World1!
Start received Hello World2!
Stop received Hello World2!
Start received Hello World3!
...

我的问题是 akka-camel 在接收方法完成之前从队列中读取消息。

我如何让我的消费者在接收新消息之前等待流程结束?如果我使用了错误的工具/库,您能指导我使用新工具吗?

共有2个答案

司徒兴思
2023-03-14

据我所知,你做不到。Camel一直使用队列中的消息。

您可以设置覆盖 def autoAck = false - 在您确认当前消息之前,它不会向 receive() 发送另一条消息。但“隐藏”的骆驼演员仍将消耗。

也许您可以提供一个定制的< code>amazonSQSClient并以某种方式控制它。

夔修伟
2023-03-14

不确定是否使用akka流,但如果使用,请将源发送到Sink.queue

val graph=yourSource.to(Sink.queue)

当具体化时(< code>graph.run),它将返回一个SinkQueue,您可以手动从中提取项目。它会使用背压机制,不用担心不拉物品会有什么后果。

注意:我故意忽略了泛型!但不要忘记他们。

 类似资料:
  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(