当前位置: 首页 > 知识库问答 >
问题:

Cosmos DB变更馈送的多个消费者

郁明诚
2023-03-14

我使用Change Feed处理器库(或者Azure Functions Cosmos DB trigger)来订阅集合更新。如何为同一个集合的提要设置多个独立的(非竞争的)消费者?

一种方法是使用多个租赁集合,例如,leases1leases2等,但这有点浪费。

有没有办法只用一个租约集合就做到这一点?(例如,通过在某处指定消费者组名称,类似于事件中心处理器)

共有2个答案

高皓
2023-03-14

我注意到直接通过变更提要处理器库使用变更提要与通过函数集成使用变更提要之间存在一些不一致。

使用变更馈送处理器库时,会生成如下文档:

{
    "id": "somegraph.documents.azure.com_obtRAA==_obtRAJvr8AU=..0",
    "_etag": "\"47006e54-0000-0000-0000-59d4fdf20000\"",
    "state": 2,
    "PartitionId": "0",
    "Owner": "CosmosChangeIngestionServiceType",
    "ContinuationToken": "\"143641\"",
    "SequenceNumber": 3322,
    "_rid": "obtRAIhO1RIFAAAAAAAAAA==",
    "_self": "dbs/obtRAA==/colls/obtRAIhO1RI=/docs/obtRAIhO1RIFAAAAAAAAAA==/",
    "_attachments": "attachments/",
    "_ts": 1507130866
}

从函数中生成的代码可疑地省略了< code>Owner属性,并将其设置为null。我的理解是,这个< code>Owner字段区分了变更提要消费者,并允许多个消费者跟踪同一个租约集合中的进度(这显然是理想的)。所以我不确定这是一个bug还是我在设置函数绑定时遗漏了什么,但是看起来目前每个租约集合只能有一个函数消费者。

刚刚与 Cosmos 团队进行了每周一次的通话,并询问了他们这个具体问题,以及其他租赁存储提供商(如表存储)的状态。他们应该在一天结束之前回复我们,并做出一些澄清。当我们取回官方信息时,我将进一步更新。

齐阎宝
2023-03-14

您可以为Azure Function Cosmos DB触发器定义leaseCollection Prefix。在Azure门户中,只需单击您的函数,然后单击集成,然后单击高级编辑器,这将打开您的function.json。在那里,您可以在触发器上定义属性,例如:

"bindings": [
    {
      "type": "cosmosDBTrigger",
      "name": "documents",
      "direction": "in",
      "leaseCollectionName": "leases",
      "connectionStringSetting": "myDatabase_DOCUMENTDB",
      "databaseName": "myDbName",
      "collectionName": "myCollectionName",
      "createLeaseCollectionIfNotExists": false,
      "leaseCollectionPrefix": "myFunctionSpecificValue"
    }

其他设置记录在文档下:

以下设置自定义内部更改馈送机制和租赁收款用途,并可在功能中进行设置。高级编辑器中的json和相应的属性名称:

    < Li > < code > leasescollectionprefix :设置时,它会向在此函数的租约集合中创建的租约添加前缀,从而有效地允许两个单独的Azure函数通过使用不同的前缀来共享同一租约集合。 < li> feedPollDelay:该位置位时,它定义在清空所有当前更改后,轮询分区以查找feed上的新更改之间的延迟时间,以毫秒为单位。默认值为5000 (5秒)。 < Li > < code > lease acquire interval :设置时,它以毫秒为单位定义启动任务的间隔,以计算分区是否均匀分布在已知的主机实例中。默认值为13000 (13秒)。 < Li > < code > leaseExpirationInterval :设置时,它定义代表一个分区的租约的租用间隔,以毫秒为单位。如果在此时间间隔内没有续订租约,将导致租约到期,分区的所有权将转移到另一个实例。默认值为60000 (60秒)。 < Li > < code > lease renewinterval :设置时,它定义实例当前持有的分区的所有租约的续订间隔(以毫秒为单位)。默认值为17000 (17秒)。 < Li > < code > check point frequency :设置时,它定义租用检查点之间的间隔,以毫秒为单位。默认值总是在函数调用成功后。 < Li > < code > maxItemsPerInvocation :设置时,它自定义每次函数调用接收的最大项数。
 类似资料:
  • 我正在尝试使用cosmos db change feed(我指的是https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed-processor和https://github.com/azure/azure-cosmos-dotnet-v2/tree/master/samples/code-samples/changeFeedProce

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 在Kafka文献中: Kafka的处理方式不同。我们的主题被划分为一组完全有序的分区,每个分区在任何给定时间都由一个使用者使用。这意味着消费者在每个分区中的位置只是一个整数,即要消费的下一条消息的偏移量。这使得消耗量的状态非常小,每个分区只有一个数字。这种状态可以定期检查。这使得消息确认的等价物非常便宜。 然而,按照同一份文件中的快速入门指南,我很容易就能: 使用单个分区创建主题 创建一个游戏机制

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?

  • 有一个基本示例,它对1个消费者起作用。它接收消息。但是添加一个额外的消费者将被忽略。 consumer2的“22”事件从未引发问题。如果我使用命令行工具检查该主题,则该主题的数据存在