当前位置: 首页 > 知识库问答 >
问题:

批处理PubSub请求

程承恩
2023-03-14
// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'your-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

pubsub
  .topic(topicName)
  .publisher({
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })
  .publish(dataBuffer)
  .then(results => {
    const messageId = results[0];
    console.log(`Message ${messageId} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

共有1个答案

桑璞
2023-03-14

如果您想要批处理消息,那么您需要保持发布服务器,并多次调用它的publish。例如,您可以将代码更改为如下所示:

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();


const topicName = 'my-topic';
const maxMessages = 10;
const maxWaitTime = 10000;
const data1 = JSON.stringify({ foo: 'bar1' });
const data2 = JSON.stringify({ foo: 'bar2' });
const data3 = JSON.stringify({ foo: 'bar3' });

const publisher = pubsub.topic(topicName).publisher({
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })

function handleResult(p) {
  p.then(results => {
    console.log(`Message ${results} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });
}

// Publish three messages
handleResult(publisher.publish(Buffer.from(data1)));
handleResult(publisher.publish(Buffer.from(data2)));
handleResult(publisher.publish(Buffer.from(data3)));

消息的批处理由MaxMessagesMaxMilliseconds属性处理。前者指示批处理中包含的最大消息数。后者指示发布批处理所需等待的最大毫秒数。这些属性在更大的批处理(可能更高效)和发布延迟之间进行了权衡。如果要快速发布许多消息,那么MaxMilliseconds属性不会有太大影响;一旦有十条消息准备好了,客户机库就会向Cloud Pub/Sub服务发出发布请求。但是,如果发布是零星的或缓慢的,那么可能会在有十条消息之前发送一批消息。

在上面的示例代码中,我们对三条消息调用publish。这不足以填满一批并发送。因此,在第一次调用publish之后10,000毫秒,这三条消息将作为批处理发送到Cloud pub/sub。

 类似资料:
  • 在Google PubSub的Node.js客户机中,是否可以监视批量发布者队列的大小并按需刷新它? 我正在开发一个Node/Express服务,它使用PubSub批处理发布器将应用程序事件推送到PubSub主题中。我想在批处理发布服务器的队列大小上设置一些指标和警报。此外,当此服务的实例接收到SIGTERM信号时,我希望运行一个关闭过程,以确保消息不会丢失。理想情况下,当发生这种情况时,我希望刷

  • 批处理 本书展示的几个例子中,ElasticSearch提供了高效的批量索引数据的功能,用户只需按批量索引的格式组织数据即可。同时,ElasticSearch也为获取数据和搜索数据提供了批处理功能。值得一提的是,该功能使用方式与批量索引类似,只需把多个请求组合到一起,每个请求可以独立指定索引及索引类型。接下来了解这些功能。 MultiGetMultiGet操作允许用户通过_mget端点在单个请求命

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 问题内容: 我已经实现了当前的一组路由(例如): 他们工作得很漂亮。现在,假设我要为同一API实现“批处理终结点”。它看起来应该像这样: 身体应该像这样: 为此,我想知道如何调用播放框架路由器来传递这些请求?我打算使用与单元测试建议类似的方法: 通过进入的源代码,您会发现如下所示: 所以我的问题是:与复制上面的代码相比,用Play做到这一点的方式是否更简单(我不反对将Scala和Java混合使用)

  • 考虑以下用户情况: 当我执行之前的代码时,预期的结果是我应该得到一个内存不足的异常,因为它是可流动的。阻止长时间订阅请求。最大值项,迭代器将耗尽1 TB的数据,订阅者无法跟上。 为了解决这个问题,我将rebatch请求添加到我的代码中: 当我执行以下代码时,我的预期结果是一切都工作得很好,正如Flowable.rebatch请求的留档所说: 该运算符允许阻止下游通过请求(Long.MAX_值)触发

  • 我在表中总共有8条记录,其中6条在spring批处理调用read时可以使用jpareader。现在我将页面大小和块大小设置为1以进行测试。期望作业运行时,它应该进行6次读取调用,然后它应该逐个处理,逐个写入。但实际上发生的是,它只是调用read 4次(从日志中我可以看到这样读取页面0...1)并处理4个,其中一个由于不匹配写入标准而被过滤掉,然后它只是更新了3个记录,作业标记为成功完成。