当前位置: 首页 > 知识库问答 >
问题:

在SubscriptionClient上调用RegisterMessageHandler时出现超时异常

司空镜
2023-03-14

我试图在运行在Android中的SubscriptionClient实例上调用RegisterMessageHandler方法。

在将消息发布到特定主题(通过自动测试)20秒左右之后,我在我的Android应用程序中收到了几个超时异常,该应用程序实际上包含了SubscriptionClient实例的RegisterMessageHandler。

输出窗口

这里有个错误:

**System.TimeoutException:**“Timeout”

member x.Enable () =

    async {

        if not (isEnabled) then

            subscriptionClient <- new SubscriptionClient(connectionString, "Topic.courier-requested", "Subscription.all-messages")
            subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(1.0)

            let! rulesFound     = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
            let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

            if hasDefaultRule then
                do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

            else
                let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
                msgOptions.AutoComplete         <- false
                msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
                msgOptions.MaxConcurrentCalls   <- 1

                subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)

    } |> Async.StartAsTask
let processMessageAsync (message:Message) (_:CancellationToken) = 

    let json    = Encoding.UTF8.GetString(message.Body)
    ...

    subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously
    Task.CompletedTask

Android测试-失败

下面的测试发布了一条消息,该消息在大约20秒后导致我的Android应用出现超时异常:

[<Fact>]
let ``Publish message courier-requested message to servicebus topic``() =

    // Setup
    let connectionstring = ConfigurationManager.ConnectionStrings.["servicebus_testEnv"].ConnectionString

    // Test
    async {

        let client    = TopicClient(connectionstring, "Topic.courier-requested")
        let data      = "test_data"
        let message   = Message(Encoding.UTF8.GetBytes(data))
        let courierId = "b965f552-31a4-4644-a9c6-d86dd45314c4"
        message.Label <- sprintf "courier-id(%s)" courierId

        do! client.SendAsync(message) |> Async.AwaitTask
        do! client.CloseAsync()       |> Async.AwaitTask
    }

控制台测试-通过

[<EntryPoint>]
let main argv =

    printfn "Welcome to Subscription.Console"

    async {

        let subscriber = Subscriber(connectionString)
        do! subscriber.Listen()

    } |> Async.RunSynchronously


    Console.ReadKey() |> ignore
    0 // return an integer exit code
type Subscriber(connectionString:string) =

    let mutable subscriptionClient : SubscriptionClient = null

    let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) =
        printfn "Got an exception: %A" args.Exception
        Task.CompletedTask

    let processMessageAsync (message:Message) (_:CancellationToken) = 

        let json = Encoding.UTF8.GetString(message.Body)

        subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously
        Task.CompletedTask

    member x.Listen() =

        async {

            subscriptionClient <- new SubscriptionClient(connectionString, "Topic.courier-requested", "Subscription.all-messages")
            subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(1.0)

            let! rulesFound     = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
            let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

            if hasDefaultRule then
                do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

            else

                let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
                msgOptions.AutoComplete         <- false
                msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
                msgOptions.MaxConcurrentCalls   <- 1

                subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)
        }

我将以下内容应用于我的连接字符串:

TransportType=AmqpWebSockets;

我参考了以下链接:

https://github.com/azure/azure-service-bus-dotnet/issues/529

共有1个答案

闻华容
2023-03-14

Android模拟器没有连接到Internet。

 类似资料:
  • 2.)AVD没有互联网问题(原因稍后解释)。 3.)与我的volley singleton类或请求队列无关(原因稍后解释)。 所以我想我在Volley/Request Future的用法上犯了某种错误。

  • 这是我的客户端代码 这是我的Rest控制器代码: 如果我在调用后使用 String.class 作为返回类型,则相同的代码也有效。但不是响应实体类。我做错了什么,如果我需要客户端也相同的响应实体

  • 但是,该方法(authContext.AcquireTokenAsync)与其他参数(如客户端id等)很好地配合使用,如下所示。result=await authContext.AcquireTokenSilentAsync(ResourceId,clientId); UWP是否对具有ClientCredential(clientid,key)的AuthContext.AcquireTokenSi

  • 我已经设置了docker并创建了四个容器(每个容器都安装了一个mongodb实例,并启用了Autetication),以便使用Arbiter启用mongodb复制。我还向主机公开了MongoDB端口,以便外部应用程序能够连接它,但我无法从Eclipse与JAVA Spring应用程序连接。

  • 我有一个数十万对象的列表。当每一个运行时,它都会根据给定的值执行一个可能很长的计算。正因为如此,我希望异步运行每个任务(最好是通过使用某种执行器),并在30秒后检索每次计算的结果,取消那些没有及时完成的结果。(所得值在其他地方使用。) 到目前为止,我就是这样实现它的: ArrayList存储每个要执行的,然后将其发送到ExecutorService以运行所有任务。我遇到的问题是,任务似乎是同步启动

  • 有没有人能告诉我如何用Google SQL Cloud解决GAE中的这个错误我用localhost运行没有问题,但是当我部署到GAE时,它会显示这个错误