当前位置: 首页 > 知识库问答 >
问题:

vert中的异常。x工作线程

施鸿
2023-03-14

我是vert. x平台的新手。我的项目中有一个标准和一个工作人员垂直层,它们通过eventBus进行通信。工作人员垂直层在循环和数据库访问中执行多个REST API调用。

我的问题是工人垂直完成任务在一些运行没有问题,但有时它抛出以下错误。

线程“vert.x-worker-thread-12”io.vertx.core中的异常。VertxException:连接已关闭

我正在使用科特林协程来处理构造设备(vertx:Vertx)函数,该函数执行大多数REST API调用和数据库访问。

有人能告诉我上述问题的原因吗?还有没有任何方法可以改进<code>constructDevice(vertx:vertx)

    // worker verticle to handle multiple REST API calls and MongoDB database access
    
    class DeviceDiscoverVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val consumer = vertx.eventBus().localConsumer<String>("listDevice")
            consumer.handler { message ->
                CoroutineScope(vertx.dispatcher()).launch {
                    constructDevice(vertx)
                }
                message.reply("discovered")
            }
        }
    }
    
    // standard verticle to communicate with worker verticle 
    
    class ListDeviceVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val reply = awaitResult<Message<String>> { h ->
                vertx.eventBus().request("listDevice", "deviceIPs", h)
            }
            println("Reply received: ${reply.body()}")
        }
    }
    
    fun main() {
        val vertx = Vertx.vertx()
        val workOption = DeploymentOptions().setWorker(true)
        vertx.deployVerticle(DeviceDiscoverVerticle(), workOption)
        vertx.deployVerticle(ListDeviceVerticle())
    }


    suspend fun constructDevice(vertx: Vertx) {
        val deviceRepository = listOf(
            "10.22.0.106",
            "10.22.0.120",
            "10.22.0.115",
            "10.22.0.112"
        )
    
        val webClient = WebClient.create(vertx)
        val config = json { obj("db_name" to "mnSet", "connection_string" to "mongodb://localhost:27017") }
        val mongoClient: MongoClient = MongoClient.create(vertx, config)
        val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true))
        
        // loop through the IP list and calls REST endpoints
        
        val deviceList = deviceRepository.map { deviceIP ->
            val deviceIPconfig: DeviceIPconfig
            val deviceType: DeviceType
            val requestDeviceIP: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/ipconfig/")
            val requestDeviceType: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/information/")
    
            val responseDeviceIP = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceIP.send(handler)
            }
            deviceIPconfig = if (responseDeviceIP.statusCode() == 200) {
                json.parse(DeviceIPconfig.serializer(), responseDeviceIP.bodyAsString())
            } else {
                println("request to device $deviceIP failed with ${responseDeviceIP.statusCode()}")
                DeviceIPconfig()
            }
            
            val responseDeviceType = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceType.send(handler)
            }
            if (responseDeviceType.statusCode() == 200) {
                deviceType = json.parse(DeviceType.serializer(), responseDeviceType.bodyAsString())
                val device = DeviceModel(deviceIPconfig, deviceType)
                json {
                    obj(
                        "_id" to deviceIPconfig.localMac,
                        "device" to json.stringify(DeviceModel.serializer(), device)
                    )
                }
            } else {
                println("request to device $deviceIP failed with ${responseDeviceType.statusCode()}")
                jsonObjectOf()
            }
    
        }.filterNot { it.isEmpty }
        
        // construct data to upload in mongoDB
        
        val activeDeviceIDs = json {
            obj("_id" to "activeDeviceIDs",
                "activeDeviceIDs" to deviceList.map { it.get<String>("_id") })
        }
        val activeDevices = json {
            obj("_id" to "activeDevices",
                "activeDevices" to json { array(deviceList) }
            )
        }
        
        // save the data in MongoDB
        
        mongoClient.save("devices", activeDeviceIDs) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
        mongoClient.save("devices", activeDevices) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
    }

更新问题:1

@Damian我已经根据您的输入更新了我的问题。为了容易理解,我简化了上面的问题,但是当我试图用promise/未来来实现这些事情时,我在某个点上卡住了。

我的任务是从不同的RESTendpoint获取数据,并从中收缩kotlin类,我想并行使用它。

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<List<Future<HttpResponse<Buffer>>>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Promise<List<Future<HttpResponse<Buffer>>>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            // this will return Json array and each element of that array needs to be called again in a loop.
            val result = asyncResult.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            deviceDevicesPromise.complete(result)
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): List<Future<HttpResponse<Buffer>>> {

    val deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>> = constructDeviceDevices(deviceIP, webClient)
    // I need to call other rest points similar to this and I need map the result to kotlin class.

   // how do get HTTP response out of each future request in deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>>. 

}

class DeviceDiscoverVerticle : AbstractVerticle() {
        override fun start() {
            val deviceRepository = // list of IP strings
    
            val webClient = WebClient.create(vertx)
            vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
                deviceRepository.forEach { deviceIP ->
                    val futureList = constructDevice(vertx, webClient, deviceIP)
                    CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                            if (allFuturesResult.succeeded()) {
                                // how to handle individual future result here to construct data
                            } else {
                                println("failed")
                            }
                        }
                }
            }
        }

更新的问题:2

@Damian按照你的建议,我已经更新了代码。

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed")
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/")
    val deviceDevicesPromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        }
        else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}


fun constructDevice(webClient: WebClient, deviceIP: String): Future<DeviceFlow> {
    val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true, isLenient = true))
    val constructDevicePromise: Promise<DeviceFlow> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if(ar.succeeded()) {
            val futureList = ar.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            CompositeFuture.all(futureList).onComplete { asyncResult ->
                if (asyncResult.succeeded()) {
                    asyncResult.result().list<HttpResponse<Buffer>>().forEach { res ->
                        //not all future in futureList are completed here some of them shows Future{unresolved}
                    }
                    constructDevicePromise.complete(DeviceFlow(label = "xyz"))
                }
                else {
                    constructDevicePromise.fail("failed")
                }
            }

        }
    }
    return constructDevicePromise.future()
}


class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = //list of IPs

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                val constructDeviceFuture = constructDevice(webClient, deviceIP)
                constructDeviceFuture.onComplete {ar ->
                    //println(ar.result().toString())
                }
            }
        }
    }
}

我的问题出在里面

CompositeFuture.all(futureList).onComplete { asyncResult ->
                        if (asyncResult.succeeded()) {
                            asyncResult.result().list<HttpResponse<Buffer>>().forEach {

在这里,大多数未来都没有解决,死刑在这里被绞死。

[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@67d2e79}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@8bad0c6}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@c854509}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

所以我将<code>改为CompositeFuture.all(futureList)。onCompletetoCompositeFuture.join(未来列表)。onComplete根据垂直方向。x文档加入将等待所有未来完成

join组合等待,直到所有的futures完成,无论是成功还是失败。CompositeFuture.join接受几个未来参数(最多6个),当所有未来都成功时返回成功的未来,当所有未来都完成且至少有一个失败时返回失败的未来

但现在很少有未来变得失败。以下是更改为复合未来.join 后的未来列表的输出

CompositeFuture.join(futureList).onComplete { asyncResult ->
println(futureList)
                            if (asyncResult.succeeded()) { res ->
// println(res) this one gets hanged and not printing all response
                                asyncResult.result().list<HttpResponse<Buffer>>().forEach {



[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5e9d3832}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@379c326a}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@51a39962}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@edcd528}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@293c3e5c}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5f86d3ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@12a329f7}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@7abedb1e}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@3238d4cb}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5bc868d3}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@50af1ecc}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5cc549ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@282f4033}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@41a890b3}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@147d772a}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

因为我的设备不能处理并发请求而失败的期货很少吗?还有为什么程序执行卡在里面

asyncResult.result().list<HttpResponse<Buffer>>().forEach { 

如果设备并发请求处理有问题,那么这个问题的另一个解决方案是什么。是否可以在 vertx 环境中运行整个剩余调用,并通过事件总线与其通信?

此外,如果我将设备发现虚拟版本部署为标准顶点而不是工作顶点,则应用程序将完全卡在复合未来.all(未来列表).onComplete。

共有2个答案

艾原
2023-03-14

我不熟悉kotlin和协同程序,但我可能对vert有一些建议。x本身。首先,根据文件

在大多数情况下,Web客户端应该在应用程序启动时创建一次,然后重用。否则,您将失去许多好处,例如连接池,并且如果实例没有正确关闭,可能会泄漏资源。

我看到您在constructDevice方法中调用Webclient.create(vertx ),所以您每次发送' listDevice '事件时都创建新的Webclient,以便您可以考虑更改它。

我最近做了一件非常类似的事情,最后使用了期货。请注意,当您调用awaitResult时,您正在阻止线程等待异步执行,如果这是标准的垂直,您确实会收到大量被阻止的线程警告。相反,您可以做的是创建一个promise,在http处理程序内部完成/失败它,在处理程序外部您只返回promise。future()对象。在html" target="_blank">循环之外,您可以处理所有的期货,不同的是期货处理也是异步的,因此您不会阻塞线程。

此外,为了使代码更简洁,并利用vert.x的异步特性,最好将http和mongo处理分成独立的部分,即

    < li > HttpVerticle获取listDevice事件 < li>HttpVerticle为5个不同的请求创建5个未来 < li >当所有期货完成时,触发future . on complete()/composite future . all()并发送“updateDB”事件 < li>MongoVerticle接收并处理“updateDB”事件

您的具体问题可能不会在这里得到解决,但我希望它至少会让您更进一步

在注释之后有一个java未来的例子

public class HttpVerticle extends AbstractVerticle {

WebClient webClient;

@Override
public void start() throws Exception {

    webClient = WebClient.create(vertx);

    vertx.eventBus().consumer("run_multiple_requests", event -> {
        //When event is received this block is handled by some thread from worker pool, let's call it 'main thread'
        Promise<HttpResponse<Buffer>> request1Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request2Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request3Promise = Promise.promise();

        //Since webclient is async, all calls will be asynchronous
        webClient.get("ip1", "/endpoint")
                .send(asyncResult -> {
                    //async block #1 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request1Promise.complete(asyncResult.result());
                    } else {
                        request1Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 is probably still processing
        webClient.get("ip2", "/endpoint")
                .send(asyncResult -> {
                    //async block #2 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request2Promise.complete(asyncResult.result());
                    } else {
                        request2Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 and #2 are probably still processing
        webClient.get("ip3", "/endpoint")
                .send(asyncResult -> {
                    //async block #3 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request3Promise.complete(asyncResult.result());
                    } else {
                        request3Promise.fail("Http request failed");
                    }
                });

        //retrieving futures from promises
        Future<HttpResponse<Buffer>> future1 = request1Promise.future();
        Future<HttpResponse<Buffer>> future2 = request2Promise.future();
        Future<HttpResponse<Buffer>> future3 = request3Promise.future();

       
        CompositeFuture.all(future1, future2, future3).onComplete(allFuturesResult -> {
            //async block #4 this will be executed only when all futures complete, but since it's async it does
            // not block our 'main thread'
            if (allFuturesResult.succeeded()) {
                //all requests succeeded
                vertx.eventBus().send("update_mongo", someMessage);
            } else {
                //some of the requests failed, handle it here
            }
        });
        
        //at this point async block #1 #2 #3 are probably still processing and #4 is waiting for callback
        //but we leave our event handler and free 'main thread' without waiting for anything
    });
}

当然,这段代码可以(并且应该)短得多,所有这些都是硬编码的,没有任何数组和循环,只是为了清晰起见

如果您使用logback或log4j(其他可能也是),您可以将[%t]放在日志模式中,它会在日志消息中显示您的线程名称,对我个人来说,了解所有这些异步块的流程非常有帮助

还有一件事,通过此设置,所有三个请求实际上将同时发送,因此请确保http服务器能够同时处理多个请求。

许法
2023-03-14

知道了一些你想要达到的目的,首先在方法< code > constructDeviceDevices()中,我将返回类型改为just Future

然后,在<code>constructDevice()

基本上,你被卡住了,因为你试图在异步世界中重现顺序执行,特别是当你试图从structDevice()方法返回一个值的时候,这意味着我们实际上想要等待所有的执行在处理这一行代码的时候完成,而在vert. x中不是这样。

它看起来像这样(语法可能已关闭,因此将其视为伪代码)

    fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Future<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): Future<SomeDomainObject> {

    //Type of below promise depends on what you are mapping responses to. It may also be a list of mapped objects
    val constructDevicePromise: Promise<SomeDomainObject> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if (ar.succeeded()) {
            val futureList: List<Future<HttpResponse<Buffer>>>
            //loop through ar.result() and populate deviceDevicesFuture list

            CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                if (allFuturesResult.succeeded()) {
                    // here you have access to allFuturesResult.list() method
                    // at this point you know all futures have finished, you can retrieve result from them (you may need to cast them from Object)
                    // when you have List<HttpResponse> you map it to whatever you want
                    val myMappedObject: SomeDomainObject = mappingResult()
                    constructDevicePromise.complete(myMappedObject)
                } else {
                    constructDevicePromise.fail("failed")
                }
            }
        }
    }

    return constructDevicePromise.future()
}

class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = // list of IP strings

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                //here dependent on your logic, you handle each future alone or create a list and handle them together
                val constructDeviceFuture: Future<SomeDomainObject> = constructDevice(vertx, webClient, deviceIP)
                constructDeviceFuture.onComplete(ar -> {
                    ar.result() // <- this is your mapped object
                    eventBus.send("SOME_NEXT_LOGIC", serializedDomainObject)
                })
            }
            
            //if you need to handle all devices at once, once again you need to make CompositeFuture from all responses of constructDevice
        }
    }
}

更新2响应

关于复合未来.all():你错过了一件事,复合未来.all()等到所有的未来都成功或至少一个失败。即使有一个失败了,它也不会等待其他人(这就像逻辑上的AND,没有必要等待其余的,因为我们已经知道结果了)。另一方面,CompositeFuture.join()只是等待所有期货完成,但如果其中任何一个失败,最终的未来也将失败(但你至少应该得到所有期货的结果)。

这实际上是您在输出中看到的,使用< code > composite future . all()您会得到一堆已完成的期货,其中一个失败了,其余的都没有解决。

这部分还缺少一点:

vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
        deviceRepository.forEach { deviceIP ->
            val constructDeviceFuture = constructDevice(webClient, deviceIP)
            constructDeviceFuture.onComplete {ar ->
                //println(ar.result().toString())
            }
        }
    }

您没有检查是否为ar。succeed(),如果您愿意,您将看到最终的未来实际上是失败的,这就是为什么最终结果不是您所期望的。

现在只是在猜测发生了什么。您可能会(在某种程度上)杀死这个rest API(我假设每个vertx事件都是相同的API),因为有这么多并发请求,如果您在单个请求处理程序中放入一些毫秒精度的日志消息,您可能会看到请求之间的间隔只有几毫秒。我假设API能够为少数请求提供服务,然后下一个请求由于某些异常/块/超时或其他原因而失败,其他所有请求可能根本没有得到响应,或者等待直到到达某个超时。如果您将Verticle定义为标准,当任何事情持续超过两秒时,您将收到警告。此外,有一个线程处理所有这些事情,因此如果一个请求挂起很长时间,标准Verticle在这段时间内将完全没有响应。这可能是您陷入<code>CompositeFuture的原因。join()方法。

所以现在你可以做几件事:

>

  • 您可以将并发执行更改为顺序执行。基本上,您不是事先创建n个未来,而是为单个元素创建一个未来,然后调用 future.compose(ar -

    您可以创建另一个标准垂直层,它将只响应单个事件,该事件将调用“/设备/$设备”endpoint。现在在您现在拥有的代码中,当您循环遍历初始超文本传输协议响应时,而不是产生20个更多的HTTP请求,您只需向eventBus发送20个事件。当您只有一个垂直层实例处理特定消息时,并且它是一个只有一个线程的标准垂直层,实际上目前应该只处理一条消息,并且应该只是排队。这也非常容易调整,因为您可以增加垂直实例的数量,并且您将拥有与垂直实例数量一样多的并发请求。

    你提到完全在vertx之外处理它,我认为这根本没有必要,但是如果你认为它最适合你,那么它就很简单了。如果你已经有了来自某个地方的Vertx对象,那么将该对象传递给其他类的构造函数是没有问题的。在那里,你可以有自己的超文本传输协议客户端、自己的方法,基本上是你想要的任何东西,在某个时候,当你决定要使用vert. x时,你可以调用vertx.eventBus(). send()并触发一些将由vert. x处理的逻辑。要记住的最重要的事情是不要创建多个Vertx对象实例,因为它们将有单独的事件总线。实际上是作为留档状态

    垂直…此模型完全可选,垂直。如果您不想,x不会强制您以这种方式创建应用程序。

    因此,您可以让常规应用程序在任何框架中编写,并且在某些时候仍然只是实例化Vertx对象,执行单个任务,然后返回到您的基本框架,但老实说,我认为您非常接近解决此:)

  •  类似资料:
    • 我正在我的一个工人垂直站中进行阻塞服务呼叫,该垂直站记录了一个警告。这是通过增加时限来“解决”的,但是,我 表示什么?是主顶点的某种痕迹吗?谢谢。

    • 我试图在Kotlin Vert中实现Postgres发布/订阅。x应用程序,但它似乎不起作用。以下是我尝试过的:

    • 在我当前的设置中,我使用Hazelcast群集管理器的默认多播选项。当我链接容器化Vertx模块的实例(通过Docker网络链接)时,我可以看到它们正在成功创建Hazelcast集群。但是,当我尝试从一个模块在事件总线上发布事件时,另一个模块不会对此做出反应。我不确定Hazelcast集群中的网络设置与事件总线的网络设置有何关联。 目前,我的每个Vert都有以下编程配置。x模块,每个模块部署在do

    • 我是vert的新手。x、 我不想把代码片段/示例作为vert。x github页面上满是它们。 我在寻找一些事实和最佳实践。 我正在编码一个应用程序,它的主要顶点是一个HttpServer,处理Restful请求。 我使用maven shade插件打包应用程序,如下所述:http://vertx.io/blog/my-first-vert-x-3-application/ 第一个问题: 运行应用程

    • 我有一个vert。x标准Verticle基本上,它解析HttpRequest并准备JsonObject,然后我通过事件总线发送JsonObject。在另一个Worker verticale中,该事件被消耗,并将启动执行(包括对Penthao数据集成Java API的调用),它正在阻止API。完成“.kjb”文件的执行大约需要30分钟。但是vert。x不断警告Worker线程块,所以我的问题是ver

    • 我有一个示例项目:https://github.com/svprdga/web-reactive-frameworks-comparison/tree/master/vertx-ktorm 我想从CLI运行它,但是我不能。我试图创建jar文件,但是当我运行它时,它提示: 有什么帮助吗?