我正在使用azure服务总线,我得到的锁已过期。
如何我已经实现了锁定1天,但我仍然得到错误
我的代码:
import json
from typing import Any, Dict, Generator, List, Optional
from azure.servicebus import (AutoLockRenewer, ServiceBusClient,
ServiceBusMessage)
class Queue():
def __init__(
self,
connection_string: Optional[str] = None,
queue_name: Optional[str] = None,
max_lock_renewal_duration: Optional[int] = 86400,
logging_enable: Optional[bool] = True
):
self.queue_name = queue_name
self.connection_string = connection_string
self.logging_enable = logging_enable
self.max_lock_renewal_duration = max_lock_renewal_duration
self.client = ServiceBusClient.from_connection_string(
conn_str=self.connection_string, logging_enable=logging_enable)
self.sender = self.client.get_queue_sender(queue_name=self.queue_name)
self.renewer = AutoLockRenewer(
max_lock_renewal_duration=max_lock_renewal_duration)
self.receiver = self.client.get_queue_receiver(
queue_name=self.queue_name, auto_lock_renewer=self.renewer)
def get_msgs(
self,
max_message_count: Optional[int] = 1,
max_wait_time: Optional[int] = 5
) -> Generator[ServiceBusMessage, None, None]:
for msg in self.receiver.receive_messages(
max_message_count=max_message_count,
max_wait_time=max_wait_time
):
yield msg
def abandon_msg(self, msg: ServiceBusMessage) -> None:
self.receiver.abandon_message(msg)
q = Queue(connection_string="secret",
queue_name="my_queue",
logging_enable=True
)
for msg in q.get_msgs(max_message_count=100, max_wait_time=5):
print("process my messages")
print("some reason abondan")
错误:
---日志记录错误---回溯(最近一次通话最后一次):
文件“/test/.env/lib64/python3.6/site packages/azure/servicebus/_servicebus_receiver.py”,第782行,在放弃消息self中_在第415行,用重试(消息,消息放弃)文件“/test/.env/lib64/python3.6/site packages/azure/servicebus/_servicebus_receiver.py”结算消息。自动更新错误,azure。服务巴士。例外。ServiceBusError:消息锁上的锁已过期。
我的目标是放弃消息,这样它就会重试,还有一种方法,我们可以增加重试时间,而不是像每abandon_msg延迟300秒那样无休止地排队?
默认情况下,锁定时间将设置为30秒,我们可以将其增加到5分钟。
设置锁定期为1天将被拒绝从控制台本身,如下截图:
更新到最大锁定时间后的屏幕截图:
但是,锁可以更新,直到功能完成。因为我们可以使用自动锁更新功能,您可以指定想要继续更新锁的时间长度。
下面的代码将帮助我们自动更新会话锁100秒
renewer.register(receiver, receiver.session, max_lock_renewal_duration=100)
print('Register session into AutoLockRenewer.')
received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
time.sleep(100) # message handling for long period (E.g. application logic)
for msg in received_msgs:
receiver.complete_message(msg)
要了解有关每个参数的更多信息,请参阅auto_lock_renew。py来自azure sdk for python。
从这里还可以查看_auto_lock_renew_task的定义。
建议:尽量增加锁定期,减少更新锁定期。
通过使用Azure服务总线主题和订阅,我能够在两个系统之间传递消息。但有时,我会遇到锁过期的异常。如何避免? 异常-消息处理程序遇到异常Microsoft.Azure.ServiceBus.MessageLockLostException:提供的锁无效。锁定过期,或者消息已从队列中删除。 下面是代码:
有人能解释一下新发布的WindowsServiceBus(内部部署,而不是Azure)和NServiceBus之间的区别吗? 寻找详细的答案什么窗口SB可能会丢失,因为我很熟悉NSErviceBus可以做什么: 它是真正的服务总线,而不仅仅是使用队列的消息代理吗? 是否可以支持消息多态?(消息子类化其他消息和支持此层次结构的处理程序 长时间运行的进程和相关性 向外扩展
我在Azure服务总线中构建了一个支持多队列订阅的服务,但是我正在得到一些奇怪的行为。 我的subscription singleton类有一个如下所示的方法: 其思想是,您为特定类型的消息订阅Azure Service Bus,该消息直接对应于队列。在订阅中,传入一个委托以了解如何处理消息。 谁能告诉我,我需要做什么不同的,以确保消费者拥有的消息,直到完成?
我更习惯于为总线队列提供服务,但在与Azure Function应用程序一起使用时遇到了挑战。 我们有Azure函数应用程序,它通过ServiceBugTrigger从服务总线队列读取数据。根据此链接,Azure Function App在内部(在队列触发器和函数执行端)管理队列消息PeekLock,我们不需要在流程结束时完成()消息。 我的队列消息锁定持续时间设置为3min(这对于我的执行来说已
我正在开发一个客户端,它可以从Windows服务总线读取消息,该消息是使用发送的。净额。客户端是使用Java开发的,据我所知,它创建会话,但当它创建会话时,使用者抛出一个JMSException,它只告诉我以下消息:amqp:不允许 有线索吗? 顺致敬意,
我一直在尝试创建一个客户端来连接我的Azure服务总线队列。我一直在参考此文档: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-queues#send-消息到队列。 我的代码: 在创建客户端时,我遇到以下错误: 通用域名格式。微软蔚蓝色的服务总线。原语。Service