当前位置: 首页 > 知识库问答 >
问题:

如何设计具有无限阻塞循环的工作线程顶点?

颜欣怡
2023-03-14

我正在尝试构建一个worker verticle,通过采用kotlin的PubSub示例,结合这个关于worker无限阻塞循环处理的答案,将Google cloud PubSub主题订阅与vert.x的事件总线连接起来。

它确实有效,但Vert. X在收到来自PubSub的消息后的某个时间通过抛出异常来不断唠叨Thread阻止的日志(请暂时忽略阻止初始化):

9:15:12 AM: Executing task 'run'...

WARNING: You are a using release candidate 2.0.0-rc5. Behavior of this plugin has changed since 1.3.5. Please see release notes at: https://github.com/GoogleCloudPlatform/app-gradle-plugin.
Missing a feature? Can't get it to work?, please file a bug at: https://github.com/GoogleCloudPlatform/app-gradle-plugin/issues.
:compileKotlin UP-TO-DATE
:compileJava NO-SOURCE
:processResources NO-SOURCE
:classes UP-TO-DATE
Mar 10, 2019 9:15:18 AM io.vertx.core.impl.launcher.commands.Watcher
INFO: Watched paths: [/home/username/IdeaProjects/project_name/./src]
Mar 10, 2019 9:15:18 AM io.vertx.core.impl.launcher.commands.Watcher
INFO: Starting the vert.x application in redeploy mode
:run
Starting vert.x application...
f48ba7fd-a52b-487f-b553-2b74473e58ba-redeploy
Creating topic gcs-project-id:vertx.
Mar 10, 2019 9:15:18 AM com.google.auth.oauth2.DefaultCredentialsProvider warnAboutProblematicCredentials
WARNING: Your application has authenticated using end user credentials from Google Cloud SDK. We recommend that most server applications use service accounts instead. If your application continues to use end user credentials from Cloud SDK, you might receive a "quota exceeded" or "API not enabled" error. For more information about service accounts, see https://cloud.google.com/docs/authentication/.
Mar 10, 2019 9:15:21 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2759 ms, time limit is 2000 ms
Topic gcs-project-id:vertx successfully created.
Creating subscription gcs-project-id:kotlin.
Mar 10, 2019 9:15:22 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3759 ms, time limit is 2000 ms
Mar 10, 2019 9:15:23 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 4758 ms, time limit is 2000 ms
Mar 10, 2019 9:15:24 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 5759 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:142)
    at com.google.common.util.concurrent.Futures.getUnchecked(Futures.java:1309)
    at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:52)
    at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
    at com.google.cloud.pubsub.v1.SubscriptionAdminClient.createSubscription(SubscriptionAdminClient.java:359)
    at com.google.cloud.pubsub.v1.SubscriptionAdminClient.createSubscription(SubscriptionAdminClient.java:260)
    at com.example.project.MainVerticle.subscribeTopic(MainVerticle.kt:76)
    at com.example.project.MainVerticle.init(MainVerticle.kt:46)
    at io.vertx.core.impl.DeploymentManager.lambda$doDeploy$8(DeploymentManager.java:492)
    at io.vertx.core.impl.DeploymentManager$$Lambda$28/1902260856.handle(Unknown Source)
    at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
    at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
    at io.vertx.core.impl.EventLoopContext$$Lambda$29/1640639994.run(Unknown Source)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Subscription gcs-project-id:kotlin successfully created.
Listening to messages on kotlin:
Mar 10, 2019 9:15:25 AM io.vertx.core.impl.launcher.commands.VertxIsolatedDeployer
INFO: Succeeded in deploying verticle

Message Id: 462746807438186 Data: Bazinga
Message Id: 462746750387788 Data: Another message

Mar 10, 2019 9:16:25 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-worker-thread-0,5,main] has been blocked for 60171 ms, time limit is 60000 ms
io.vertx.core.VertxException: Thread blocked
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
    at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
    at com.example.project.MainVerticle$start$1.handle(MainVerticle.kt:32)
    at com.example.project.MainVerticle$start$1.handle(MainVerticle.kt:13)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:272)
    at io.vertx.core.impl.ContextImpl$$Lambda$33/1101004004.run(Unknown Source)
    at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
    at io.vertx.core.impl.TaskQueue$$Lambda$26/1213216872.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Mar 10, 2019 9:16:26 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-worker-thread-0,5,main] has been blocked for 61172 ms, time limit is 60000 ms
io.vertx.core.VertxException: Thread blocked
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
    at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
    at com.example.project.MainVerticle$start$1.handle(MainVerticle.kt:32)
    at com.example.project.MainVerticle$start$1.handle(MainVerticle.kt:13)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:272)
    at io.vertx.core.impl.ContextImpl$$Lambda$33/1101004004.run(Unknown Source)
    at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
    at io.vertx.core.impl.TaskQueue$$Lambda$26/1213216872.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

这是源代码:

package com.example.project_name

import com.google.api.gax.rpc.ApiException
import com.google.cloud.pubsub.v1.*
import com.google.pubsub.v1.ProjectSubscriptionName
import com.google.pubsub.v1.ProjectTopicName
import com.google.pubsub.v1.PubsubMessage
import com.google.pubsub.v1.PushConfig
import io.vertx.core.*
import java.util.concurrent.LinkedBlockingDeque


class MainVerticle : MessageReceiver, AbstractVerticle() {
  private val projectId = "gcs-project-id"
  private val topicId = "vertx"
  private val topic: ProjectTopicName = ProjectTopicName.of(projectId, topicId)
  private val subscriptionId = "kotlin"
  private val subscription = ProjectSubscriptionName.of(projectId, subscriptionId)
  private val messages = LinkedBlockingDeque<PubsubMessage>()
  private lateinit var subscriber: Subscriber

  override fun receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer) {
    messages.offer(message)
    consumer.ack()
  }

  override fun start() {
    vertx.executeBlocking<Void>({
      try {
        println("Listening to messages on $subscriptionId:")
        subscriber.awaitRunning()
        while (true) {
          val message = messages.take()
          println("Message Id: ${message.messageId} Data: ${message.data.toStringUtf8()}")
        }
      } finally {
        subscriber.stopAsync()
        it.complete()
      }
    }, { println("done, ${it.cause()}") })
  }

  override fun init(vertx: Vertx?, context: Context?) {
    super.init(vertx, context)
    try {
      createTopic()
      subscribeTopic()
      subscriber = Subscriber.newBuilder(subscription, this).build()
      subscriber.startAsync()
    } catch (e: ApiException) {
      // example : code = ALREADY_EXISTS(409) implies topic already exists
      println("Failed: $e")
    }
  }

  override fun stop(stopFuture: Future<Void>?) {
    super.stop(stopFuture)
    try {
      deleteSub()
      deleteTopic()
    } catch (e: ApiException) {
      println("Failed: $e")
    } finally {
      subscriber.stopAsync()
      stopFuture!!.complete()
    }
  }

  private fun createTopic() { // expects 1 arg: <topic> to create
    println("Creating topic ${topic.project}:${topic.topic}.")
    TopicAdminClient.create().use { topicAdminClient -> topicAdminClient.createTopic(topic) }
    println("Topic ${topic.project}:${topic.topic} successfully created.")
  }

  private fun subscribeTopic() { // expects 2 args: <topic> and <subscription>
    println("Creating subscription ${subscription.project}:${subscription.subscription}.")
    SubscriptionAdminClient.create().use { it.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0) }
    println("Subscription ${subscription.project}:${subscription.subscription} successfully created.")
  }

  private fun deleteTopic() {
    println("Deleting topic ${topic.project}:${topic.topic}.")
    TopicAdminClient.create().use { it.deleteTopic(topic) }
    println("Topic ${topic.project}:${topic.topic} successfully deleted.")
  }

  private fun deleteSub() { // expects 1 arg: <subscription> to delete
    println("Deleting subscription ${subscription.project}:${subscription.subscription}.")
    SubscriptionAdminClient.create().use { it.deleteSubscription(subscription) }
    println("Subscription ${subscription.project}:${subscription.subscription} successfully deleted.")
  }
}

fun main(vararg args: String) {
  Vertx.vertx().deployVerticle(MainVerticle(), DeploymentOptions().apply {
    isWorker = true
  })
}

我显然错过了一些东西。此外,如果您有更好的方法可以将Google的PubSub库(具有自己的异步循环)与Vert.X集成/统一,我很乐意听到我的原始示例方法。

共有1个答案

东郭瀚玥
2023-03-14

问题出在您的 while 循环上。

在这种情况下,“阻塞”并不意味着你可以永远保持运行。您对的调用。complete()从未达到,并且在某一点上达到Vert。x会抱怨这一点。

请参阅有关运行阻止代码的手册,特别是“警告”部分。

为了解决您的问题,您需要以一种或另一种方式安排对messages.take()的调用,例如使用set周期。在间隔处理程序中,使用执行阻止清空队列,然后通过调用完成()来返回控制权,无论是在您安排消息处理之前还是之后,这取决于您是否关心结果。

 类似资料:
  • 我有一个带感应帽的覆盆子皮。我制作了一个二进制时钟,我想在Sense hat的显示器上显示并保持更新。然而,我想要的能力,开关时钟与操纵杆中间。一切都很好,除了我的时钟的更新循环阻止任何新的输入一旦启动。 我一直在考虑如何解决这个问题。如何允许脚本/时钟保持运行,并且仍然接受来自操纵杆的新操作。但是一旦while循环开始,我就卡住了。我不知道该用谷歌搜索什么。我已经开始研究async/await,

  • 我有4-5个工作线程处理大型消息队列。我还有另一段代码,它使用2-3个worker运行。我想在处理大型消息队列时阻止所有其他工作者。 我正在使用JDK6和Jms 编辑: 队列进程工作者从未终止。当没有消息时,它们阻塞队列。这些工作者由执行器线程池管理,如果我使用读写锁,其中一个工作者也会被阻塞。此外,如果使用循环屏障,那么我必须终止线程,以便重新传递阻塞的第二个进程。由于工作者是由线程池管理的,所

  • 问题内容: Node.JS的最大优点是它具有非阻塞性。它是单线程的,因此不需要为每个新的传入连接生成新的线程。 在事件循环(实际上是单线程)的后面,有一个“非阻塞工作程序”。这个东西不再是单线程的,所以(据我了解),它可以为每个任务产生一个新线程。 也许我误会了一些东西,但是优势到底在哪里。如果要处理的任务很多,那么“非阻塞工作”会不会变成“阻塞工作人员”? 谢谢克里斯蒂安 问题答案: 您需要阅读

  • 线程实例的join()方法可用于将一个线程的执行开始“连接”到另一个线程的执行结束,这样一个线程在另一个线程结束之前不会开始运行。如果对线程实例调用join(),则当前运行的线程将阻塞,直到线程实例完成执行 但是如果我有多个线程并且当我在循环内部调用join时。所有线程并行运行。但是根据连接的概念,首先连接的线程应该完成,然后只有主线程才允许连接其他线程。 } 在上面的代码中,如果第一个线程被连接

  • 正在学习一些关于多线程的教程,并编写了一些代码来创建一个无限线程。然而,无限循环似乎不起作用。循环内的打印例程只运行一次。找到下面的代码以供参考。不明白我做错了什么。我正在使用MSVC编译器。

  • 我有一个vert。x标准Verticle基本上,它解析HttpRequest并准备JsonObject,然后我通过事件总线发送JsonObject。在另一个Worker verticale中,该事件被消耗,并将启动执行(包括对Penthao数据集成Java API的调用),它正在阻止API。完成“.kjb”文件的执行大约需要30分钟。但是vert。x不断警告Worker线程块,所以我的问题是ver