我有以下控制器
@RestController
public void MyController {
@GetMapping("/foo")
public Flux<Foo> getFoos() { /* return a flux of Foos*/ }
}
和一个假装客户
public interface MyFeignClient {
@RequestLine("GET /foo")
Mono<Foo> getFoos();
}
public class MyClients {
public static MyFeignClient myFeignClient() {
return ReactorFeign.builder().target(MyFeignClient.class, "http://localhost:8080");
}
}
但是当我打电话的时候
StepVerifier.create(myFeignClient.foo())
.consumeNextWith(foo -> println(foo))
.verifyCompleted();
我收到这个错误
java.lang.AssertionError:预期“consumeNextWith”失败(预期:onNext();actual:onError(feign.FeignException:无法反序列化com.example.Foo在[Source:(BufferedReader);line:1,column:1]读取GET时超出START_ARRAY标记http://localhost:8080/foo))
reactor.test.ErrorFormatter.assertionError(ErrorFormater.java:105)位于reactor.test。ErrorFormatter.failPrefix(ErrorFormatter.java:94)位于reator.test.ErrorFormatter。fail(Error格式化器.java:64)位于reactor.test.错误格式化器.failOptional(Error格式器.java:79)位于reaction.test.DefaultStepVerifierBuilder.lambda$consumeNextWith$1(DefaultStep Verifier Builder.java:256)reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStep Verifier Builder.java:2112reactor.core.publisher.FluxSubscribeOn$Subscriber.onError(FluxSubscription.java:157)处reactor.corece.publisher.FlexTake$TakeSubscribers.onEror(FluxTake$take.java:138)的reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriberreactor.core.publisher.FluxSubscribeOn$subscribe_Subscriber.requestUpstream(FluxSubscription.java:131),位于reactor.corece.publisher.FlushSource.subscribe(FluxSource.java:52)的reactor.coreactive.ReactiveInvocationHandler.lambda$invokeMethod$0(ReactiveInvocationHandler.java:99)reactor.core.publisher.Flux.subscribe(Flux.java:7777)位于reactor.corece.publisher.FlushSubscribeOn$SubscriberOnSubscribe.run(FluxSubscribe On.java:194),位于reactor.core.scheduler.WorkerTask.call(WorkerTask.java:264),位于java.base/java.util.concurrent.FutureTask.run(FutureAttask.java:264)java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ScheduledThreadPool Executor java:304)位于java.base.java.util.consurrent.ThreadPoolExecutor.runWorker(ThreadPoOLExecuttor.java:1128),位于java.base/java.til.concurrent.SthreadPool执行器$Worker.run(ThreadpoolExecutior.java:628反序列化[Source:(BufferedReader);line:1,column:1]处读取GET的START_ARRAY标记中com.example.Foo的实例http://localhost:8080/foo在feign.reactive.ReactiveInvocationHandler$1.request(ReactiveInvocationHandler.java:109)的feign.SynchronousMethodHandler.executeAndDecode(SynchronousMet方法处理程序.java:162)的feign.feign.FeignException.errorReading(FeignException.java:130)…还有13个原因是:com.fasterxml.jjackson.databind.exc.MismatchedInputException:无法反序列化com.example.Foo
在com.fasterxml.jockson.databind.reportInputException.from(MismatchedInputExeption.java:63)的[Source:(BufferedReader);line:1,column:1]处的START_ARRAY标记的实例com.fasterxml.jockson.databind.DeserializationContext.handleUnexpectedToken(DeserialiizationContext.java:1139位于com.fasterxml.jockson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethHandler.java:147)处的feign.SSynchronoosMethod_Handler.decode(SyncronousmethodHanlder.java:183)处的jeign.jJackson.JacksonDecoder.decode(JacksonDecoder.java:61)…还有15个
我在这里做错了什么?我该如何修复它?
一个简单的谷歌搜索显示,假装客户端不支持反应式。他们有一个正在孵化的项目试图创造支持
添加Webflux支持问题
假核心
所以回答你的问题
“我做错了什么?”
您已选择使用不支持生产者/使用者的 Http 客户端。
“我怎么能修好它?”
通过使用假装客户端,而是使用支持非阻塞操作的 HttpClient,例如,Spring Web 客户端。
我将feign与OkHttpClient一起使用,并希望使用重试机制对返回状态代码200、ok和空正文的endpoint进行短轮询。用假动作可能吗?在状态代码2xx上没有调用自定义错误解码器。还有其他更喜欢的方式来进行短轮询吗?顺便说一句,我在用科特林。
我对Spring Reactive还是个新手。 我想将一个Spring反应式回购通量响应转换为另一种类型的单声道响应: 我已经尝试了提供的运算符(转换是我需要的闭包,但它仍然将repo结果的每一个参数都提供为reactive参数),但最终不能得到任何我知道如何使用的东西。
因此,我从文档中了解到,并行通量本质上是将通量元素划分为单独的轨道。(本质上类似于分组)。就线程而言,这将是调度程序的工作。让我们考虑一下这样的情况。所有这些都将在通过runOn()方法提供的同一个调度程序实例上运行。让我们考虑如下情况: 现在让我们打大约100个电话 如果我们使用parailFlux: 因此,如果我的理解是正确的,那么它似乎非常相似。那么,平行磁通相对于磁通的优势是什么?什么时候
我实现了一个自定义ErrorDecoder: 我想用这个ist实现的目的是转发错误响应。我的问题是,如果响应状态为404,则ErrorResource为null。 例如,如果我将返回的响应状态更改为400,则错误资源包含响应。 提前感谢!
我现在的问题是,我到底能实现什么/如何实现这些,或者是否有一种方法来扩展ErrorDecoder,使我能够处理这些错误消息。我想我应该能够将它们放入解码器,甚至实现/覆盖HttpClient,但我不确定正确/最好的方法是什么。
我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据? 比如: