当前位置: 首页 > 知识库问答 >
问题:

Vert.x反应式Kafka客户端:写的时候链接不起作用?

贺桐
2023-03-14

我使用的是io.vertx.reactivex.kafka.client.producer。KafkaProducer客户端。客户端有一个<code>rxWrite<code>函数,它返回<code>Single

我已经编写了以下工作示例。

test():测试链接和日志记录的函数

    fun test(): Single<Int> {
    val data = Single.just(ArrayList<String>().apply {
        add("Hello")
        add("World")
    })

    data.flattenAsObservable<String> { list -> list }
        .flatMap { advertiser ->
       //does not work with writeKafka
            writeError(advertiser).toObservable().doOnError({ println("Error $data") })
        }
        .subscribe({ record -> println(record) }, { e -> println("Error2 $e") })

    return data.map { it.size }
}

writeKafka:将给定的字符串写入Kafka并返回Single

fun writeKafka(param: String): Single<RecordMetadata> {
 //null topic to produce IllegalArgumentException()
    val record = KafkaProducerRecord.create(null, UUID.randomUUID().toString(), param)
    return kafkaProducer.rxWrite(record)
}

writeError:始终返回具有相同类型错误的单个

fun writeError(param: String): Single<RecordMetadata> {
    return Single.error<RecordMetadata>(IllegalArgumentException())
}

所以当我调用< code>writeKafka时,它只打印< code>Error2,但是如果我使用< code>writeError,它同时打印< code>Error和< code>Error2。看来< code>writeKafka返回的single还在等待结果,但是后来为什么连< code>Error2都打印出来了?

我是RxJava2中的新手,有人可以指出其中的任何错误吗?

共有1个答案

芮瑾瑜
2023-03-14

阅读并发布错误堆栈非常重要,这样可以隔离问题。

在本例中,看起来您从<code>create<code>获得<code>IllegalArgumentException<code>并且您没有得到任何<code>单个返回kafkaProducer。rxWrite(record)根本不执行,实际上会使<code>平面图错误从不起作用,因此只打印“Error2”。

 类似资料:
  • 我想在我的反应式sql客户端事务中使用Kotlin协程。为了简单起见,我打算使用提供的助手函数<code>io.vertx.mutiny.sqlclient。此处文档中提到的池#withTransaction。由于传递的函数不是协程挂起函数,因此当我试图编译以下代码时,我得到了一个类似<code>的错误:挂起函数只能在协程体中调用 withTransaction的函数头如下所示 我问自己,是否还有

  • 我试图测试Spring反应式Webclient的默认超时。为此,我创建了一个需要 10 小时才能返回响应的 rest endpoint。 我使用spring-reactive Webclient创建了一个rest客户端。但我看到,springReactiveWebclient一直在等待10个小时。 spring reactive Webclient没有任何默认超时吗?

  • 执行kafka客户端的生产者/消费者连接池有意义吗? kafka是否在内部维护已初始化并准备好使用的连接对象列表? 我们希望最小化连接创建的时间,这样在发送/接收消息时就不会有额外的开销。 目前,我们正在使用apache共享池库来保持连接。 任何帮助都将不胜感激。

  • 当我试图从http客户端响应对象读取正文时,会出现以下错误。我并不总是得到异常,所以我想这是一个与完整未来相关的线程问题。你知道我做错了什么吗?我使用Vert.x3.8.1 引发异常的代码: http客户端响应对象由以下代码创建: 尸体被找回...

  • 我想使用Quarkus中的JooqDSL来构建我的SQL(并希望执行它们) 因此,我添加了以下Quarkus JOOQ扩展。 因为我想在我的项目中使用反应式PG SQL客户端,所以我问自己,例如JOOQ的< code>fetch()方法是否会阻塞线程?它是与引擎盖下的反应式vertx客户端兼容还是使用阻塞式客户端?看起来像后者,因为它不返回未来或者类似的东西。 在这种情况下,我应该只使用JOOQ来

  • 我试图使用Spring反应式WebClient将文件上传到Spring控制器。控制器非常简单,看起来像这样: 当我使用这个控制器与cURL一切正常 multipartFile转到正确的参数,其他参数进入Map。 当我尝试从WebClient做同样的事情时,我被卡住了。我的代码如下所示: 这会导致400错误 有人知道如何解决这个问题吗?