当前位置: 首页 > 软件库 > 云计算 > >

sqs-consumer

授权协议 View license
开发语言 JavaScript
所属分类 云计算
软件类型 开源软件
地区 不详
投 递 者 欧阳勇军
操作系统 跨平台
开源组织
适用人群 未知
 软件概览

sqs-consumer

Build SQS-based applications without the boilerplate. Just define an async function that handles the SQS message processing.

Installation

npm install sqs-consumer --save

Usage

const { Consumer } = require('sqs-consumer');

const app = Consumer.create({
  queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
  handleMessage: async (message) => {
    // do some work with `message`
  }
});

app.on('error', (err) => {
  console.error(err.message);
});

app.on('processing_error', (err) => {
  console.error(err.message);
});

app.start();
  • The queue is polled continuously for messages using long polling.
  • Messages are deleted from the queue once the handler function has completed successfully.
  • Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An SQS redrive policy can be used to move messages that cannot be processed to a dead letter queue.
  • By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the batchSize option detailed below.
  • By default, the default Node.js HTTP/HTTPS SQS agent creates a new TCP connection for every new request (AWS SQS documentation). To avoid the cost of establishing a new connection, you can reuse an existing connection by passing a new SQS instance with keepAlive: true.
const { Consumer } = require('sqs-consumer');
const AWS = require('aws-sdk');

const app = Consumer.create({
  queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
  handleMessage: async (message) => {
    // do some work with `message`
  },
  sqs: new AWS.SQS({
    httpOptions: {
      agent: new https.Agent({
        keepAlive: true
      })
    }
  })
});

app.on('error', (err) => {
  console.error(err.message);
});

app.on('processing_error', (err) => {
  console.error(err.message);
});

app.start();

Credentials

By default the consumer will look for AWS credentials in the places specified by the AWS SDK. The simplest option is to export your credentials as environment variables:

export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...

If you need to specify your credentials manually, you can use a pre-configured instance of the AWS SQS client:

const { Consumer } = require('sqs-consumer');
const AWS = require('aws-sdk');

AWS.config.update({
  region: 'eu-west-1',
  accessKeyId: '...',
  secretAccessKey: '...'
});

const app = Consumer.create({
  queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
  handleMessage: async (message) => {
    // ...
  },
  sqs: new AWS.SQS()
});

app.on('error', (err) => {
  console.error(err.message);
});

app.on('processing_error', (err) => {
  console.error(err.message);
});

app.on('timeout_error', (err) => {
 console.error(err.message);
});

app.start();

API

Consumer.create(options)

Creates a new SQS consumer.

Options

  • queueUrl - String - The SQS queue URL
  • region - String - The AWS region (default eu-west-1)
  • handleMessage - Function - An async function (or function that returns a Promise) to be called whenever a message is received. Receives an SQS message object as it's first argument.
  • handleMessageBatch - Function - An async function (or function that returns a Promise) to be called whenever a batch of messages is received. Similar to handleMessage but will receive the list of messages, not each message individually. If both are set, handleMessageBatch overrides handleMessage.
  • handleMessageTimeout - Number - Time in ms to wait for handleMessage to process a message before timing out. Emits timeout_error on timeout. By default, if handleMessage times out, the unprocessed message returns to the end of the queue.
  • attributeNames - Array - List of queue attributes to retrieve (i.e. ['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']).
  • messageAttributeNames - Array - List of message attributes to retrieve (i.e. ['name', 'address']).
  • batchSize - Number - The number of messages to request from SQS when polling (default 1). This cannot be higher than the AWS limit of 10.
  • visibilityTimeout - Number - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
  • heartbeatInterval - Number - The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding visibilityTimeout to the number of seconds since the start of the handler function. This value must less than visibilityTimeout.
  • terminateVisibilityTimeout - Boolean - If true, sets the message visibility timeout to 0 after a processing_error (defaults to false).
  • waitTimeSeconds - Number - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning.
  • authenticationErrorTimeout - Number - The duration (in milliseconds) to wait before retrying after an authentication error (defaults to 10000).
  • pollingWaitTimeMs - Number - The duration (in milliseconds) to wait before repolling the queue (defaults to 0).
  • sqs - Object - An optional AWS SQS object to use if you need to configure the client manually

consumer.start()

Start polling the queue for messages.

consumer.stop()

Stop polling the queue for messages.

consumer.isRunning

Returns the current polling state of the consumer: true if it is actively polling, false if it is not.

Events

Each consumer is an EventEmitter and emits the following events:

Event Params Description
error err, [message] Fired when an error occurs interacting with the queue. If the error correlates to a message, that error is included in Params
processing_error err, message Fired when an error occurs processing the message.
timeout_error err, message Fired when handleMessageTimeout is supplied as an option and if handleMessage times out.
message_received message Fired when a message is received.
message_processed message Fired when a message is successfully processed and removed from the queue.
response_processed None Fired after one batch of items (up to batchSize) has been successfully processed.
stopped None Fired when the consumer finally stops its work.
empty None Fired when the queue is empty (All messages have been consumed).

AWS IAM Permissions

Consumer will receive and delete messages from the SQS queue. Ensure sqs:ReceiveMessage and sqs:DeleteMessage access is granted on the queue being consumed.

Contributing

See contributing guidelines.

  • Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证。 sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格有序的,即消息接收的顺序是按照消息发送的顺序来的, 而标准队列是尽最大可能有序, 即不保证一定为有序, 此外FIFO还保证了消息在一定时间内不能重复发出,即使是重复发了, 它也不会把消息发送到队列上。

  • AMAZON SQS(2)Java Client Consumer and Producer Configuration File build.sbt "com.amazonaws" % "aws-java-sdk" % "1.10.6", // First class, SQS Client builder, SQSQueue.scala package com.sillycat.jobscon

  • 1、问:什么是可见性超时?       可见性超时是一个时段,在这个时段内,Amazon SQS 会阻止其他处理组件接收和处理某条消息。有关更多信息,请参阅 Amazon SQS 开发人员指南中的可见性超时。        这个特性,在工作当中,我们可以提高基于sqs消息服务的并发能力。比如有1000个sqs client在监听同一个消息队列,因为每个消息内容不一样,而且可见期间,只能被一个cli

  • 将数据发送到AWS-SQS  然后通过Apache Camel接收SQS消息然后路由分发到不同的子系统中 一、AWS SQS简介 Amazon Simple Queue Service (SQS) 是一项快速可靠、可扩展且完全托管的消息队列服务。SQS 使得云应用程序的组件去耦合大大简化,并且具有较高的成本效益。您可以使用 SQS 来传输任何容量的数据,使用任意的吞吐量,而不会丢失消息或要求其他服

  • 介绍 (Introduction) This article introduces the concept of message queues and discusses the strengths and weaknesses of three specific message queue services: Beanstalkd, IronMQ and Amazon SQS. 本文介绍了消息队

  • An Amazon SQS message has three basic states: Sent to a queue by a producer. Received from the queue by a consumer. Deleted from the queue. A message is considered to be stored after it is sent to a q

  • SQS (Simple Queue Service) 概念 简单消息服务,类似于Rabbit MQ,主要作用是作为消息中间件,解耦应用程序的各个组件 SQS不保证消息FIFO,但能保证消息的高可靠 消息的生命周期 生产者将消息发送到消息队列,此时消息的状态变为in-flight 消费者从消息队列中获取消息,此时消息的状态变为processed,在visibility timeout的时间段以内,其

  • SQS即Simple Queue Service, 是一个分布式的消息队列服务,使用它非常简单,消息队列服务可以用来buffer burst, 使整个服务异步处理,不要求组件始终可用. 开发人员最初使用 Amazon SQS 时只需用到五个 API: CreateQueue、SendMessage、ReceiveMessage、ChangeMessageVisibility 和 DeleteMes

  • Amazon SQS 消息相关接口测试用例 因项目功能需求,对接了Amazon的SQS消息队列,封装了常用的接口,本文是对部分接口测试用例的简要说明。 1.发送消息 1.1.发送单条消息到标准队列 @Test public void sendMessage() { try { // 队列url String queueUrl = "https://sqs.cn-northwes

  •   因为我们新浪项目的需要,接触了一下SINA SQS(Sina Simple Queue Service ),使用它可以创建一个队列,然后从不同的地方往里面放东西,然后又可以在不同的地方不停的往外取东西,其实它就是一个分布式的消息传递通道。其实的它的前身是Amazon SQS,包括亚马逊简单队列服务也是基于它。包括现在很热的云计算也可以使用它来做数据传递。   队列的基础结构通常是在公司网络的内

  • 及时处理信息 设置可见性超时取决于你的应用程序需要多长时间来处理和删除一条消息。例如,如果你的应用程序需要10秒来处理一条消息,而你将可见性超时设置为15分钟,那么如果前一次处理尝试失败,你必须等待相对较长的时间来尝试再次处理该消息。或者,如果你的应用程序需要10秒来处理一条消息,但你只把可见性超时设置为2秒,那么当原来的消费者还在处理该消息时,另一个消费者就会收到一条重复的消息。 为了确保有足够

  • SQS: queue system-receivers have to poll or pull messages from SQS-standard/FIFO SNS: notification system-publish subscribe system, receiver will be pushed to subscribers when they are sent by publish

  • SQS http://blog.turret.io/rabbitmq-vs-amazon-sqs-a-short-comparison/ https://stackoverflow.com/questions/46880229/migrate-from-amqp-to-amazon-sns-sqs-need-to-understand-concepts https://sookocheff.com

  • 概念 web–> SQS --> worker 是EB用来做均衡负载和自动扩容的办法 原理 如果一个操作需要很长时间才会完成,而网页要求500ms之内完成响应,那么单单有主机器来完成这个操作就显得非常困难。 解决这个困难的方法有几种。 一种是可以使用异步处理机制,让其他计算机协同处理。但是在高负载的情况下,协同处理的进程也会被任务占满。如果提前起好多几个计算器,在负载小的时候会浪费资源 另一种就是

  • SQS Amazon Simple Queue Service (SQS) 是一种完全托管的消息队列服务,可让您分离和扩展微服务、分布式系统和无服务器应用程序。SQS 消除了与管理和运营消息型中间件相关的复杂性和开销,并使开发人员能够专注于重要工作。借助 SQS,您可以在软件组件之间发送、存储和接收任何规模的消息,而不会丢失消息,并且无需其他服务即可保持可用。使用 AWS 控制台、命令行界面或您选

  • AMAZON SQS(1)PHP Producer 1. Some Basic Information Price: 1 million 2$ Send Throughput: Receive Throughput: Message Latency: 500 ms Message Size: 256 KB Batch Size: 10 message 2. Env Set Up Install c

  • Amazon SQS 短轮询和长轮询 默认是短轮询(Short Polling)。像http一样,客户端请求,服务器端马上返回。 但是SQS服务器有多个,消息分布在各个服务器上。请求的时候,并不是查看所有服务器来确实是否有消息,而只是查看一部分服务器(采样),如果刚好消息不在所有查看的服务器上,会返回空响应(即使消息实际存在,实在存在于查看的服务器意外的服务器上)。 即使是空响应,也是收费的,AW

 相关资料
  • 我有一个连接到lambda的队列(fifo)。我想在lambda中向标准队列发送一条消息。但没有发送/接收任何消息。然而,如果我尝试从非SQS连接的lambda(通过AppSync)发送它,它会工作。 我查过: lambda有权发送SQS消息(您可以在那里看到) 由于我已成功地从另一个lambda(非SQS)向标准队列发送消息,因此正确配置了标准队列 SQS URL是否正确 控制台中不会显示任何错

  • 我需要创建一个队列进行处理。队列本身的容量相对较低。每小时可能会有1000封信。每个任务的执行可能需要大约一分钟的时间,并且几乎在项目添加到队列后立即进行处理。 我有没有理由想要实现RabbitMQ而不是像Amazon SQS这样的现成的东西?为什么应用程序需要自己的队列系统而不是像SQS这样的东西?

  • 我正在查看关于使用Quarkus从SQS消费的指南。 问题是我想在无休止的循环中执行它,例如每10秒获取一次新消息,并使用Hibernate Reactive从消息中插入一些数据到数据库中。 我创建了一个Quarkus调度程序,但由于它不支持返回Uni,我不得不阻止Hibernate Responsive的响应,因此出现了这个错误 使用Quarkus和reactive实现我所需的最佳方法是什么?

  • 我们现有的服务使用子队列的概念,这样用户就不会接收到不是为他们准备的消息。例如: /MyQueueName/user1/ /MyQueueName/user2/ SQS似乎不具备创建子队列的能力,也不具备在消息属性上进行筛选的灵活性。我可以允许所有的消息发送到所有的计算机,然后只有在内容相关的情况下才采取行动,但这似乎是浪费,特别是考虑到我们正在谈论的区域2万+子队列目前。 如何在SQS上最好地实

  • 我的Sring引导应用程序监听Amazon SQS队列。现在,我需要实现正确的消息确认--我需要接收一条消息,做一些业务逻辑,只有在此之后,如果成功,我需要ack消息(从队列中删除消息)。例如,如果我的业务逻辑出现错误,消息必须重新排队。 现在我不知道如何从我的听众那里获取信息。

  • 我的测试处理器: 所以我有下一种情况:1)骆驼成功地从“test-camel-start”队列读取消息2)基处理器成功地处理消息3)在重试将消息传递到“test-camel-success”时失败 我收到了下一个日志跟踪: Log说“请求必须包含参数MessageBody”。 我不知道为什么这个消息体没有出现。