当前位置: 首页 > 知识库问答 >
问题:

grpc流式传输时定期重置连接

嵇财
2023-03-14

服务器:

override fun subscribe(request: Subscribe, responseObserver: StreamObserver<SubscriptionEvent>) {
        sessionStore.grpcHandler(responseObserver, request.sessionId) { session ->
            eventStream.stream(session.id)
                .doOnNext {
                    try {
                        if ((responseObserver as ServerCallStreamObserver).isCancelled) {
                            log.debug { "Stopping to stream events, seems like client cancelled it" }
                            responseObserver.onCompleted()
                            return@doOnNext
                        }

                        responseObserver.onNext(it)
                    } catch (e: StatusRuntimeException) {
                        log.error("Could not stream an event", e)
                    }
                }
                .doOnError { throwable ->
                    log.error("Subscription failed", throwable)
                }
                .subscribe()
        }
    }

客户:

fun subscribe(sessionId: String, tenantId: String, botId: String) {
        subscriptionsThreadPool.submit {
            try {
                subscriptionService.withDeadlineAfter(Long.MAX_VALUE, TimeUnit.SECONDS).subscribe(
                    Subscribe.newBuilder().setSessionId(sessionId).build(),
                    SubscribeStreamObserver(sessionId, tenantId, botId)
                )

                finishLatch.await()
            } catch (e: Throwable) {
                log.error("Could not subscribe to connector-service", e)
            }
        }
    }

服务器正在使用https://github.com/LogNet/grpc-spring-boot-starter

客户端的netty配置(值得一提的是,grpc服务器前面没有任何代理):

private fun rpcChannel(): ManagedChannel =
        NettyChannelBuilder
            .forTarget(properties.connectorServiceUrl)
            .usePlaintext()
            .build()

一旦我启动客户端订阅(即调用订阅方法来流式处理事件),它将花费多达4分钟的时间,直到失败,并出现UNAVAILABE Connection reset异常。通常是3-4分钟。我确实尝试设置了所有可能的netty配置属性,但没有任何帮助。这是日志。。

服务器:

2021-07-06 11:56:26.045 DEBUG [/] [-worker-ELG-3-1] io.grpc.netty.NettyServerHandler         : Connection Error

java.io.IOException: Connection reset by peer
    at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.read(Unknown Source)
    at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source)
    at java.base/sun.nio.ch.IOUtil.read(Unknown Source)
    at java.base/sun.nio.ch.IOUtil.read(Unknown Source)
    at java.base/sun.nio.ch.SocketChannelImpl.read(Unknown Source)
    at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Unknown Source)

2021-07-06 11:56:26.045 DEBUG [/] [-worker-ELG-3-1] io.grpc.netty.NettyServerHandler         : [id: 0x1f9596dc, L:/172.17.0.110:8081 - R:/46.5.255.46:58262] OUTBOUND GO_AWAY: lastStreamId=2147483647 errorCode=2 length=24 bytes=436f6e6e656374696f6e2072657365742062792070656572
2021-07-06 11:56:26.046 DEBUG [/] [-worker-ELG-3-1] i.g.n.NettyServerTransport.connections   : Transport failed

客户:

2021-07-06 13:56:25.996 DEBUG [/] [-worker-ELG-1-1] io.grpc.netty.NettyClientHandler         : Caught a connection error

java.net.SocketException: Connection reset
    at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:367)
    at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:398)
    at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:832)

2021-07-06 13:56:26.008 DEBUG [/] [-worker-ELG-1-1] io.grpc.netty.NettyClientHandler         : [id: 0x52cea98f, L:/192.168.178.20:57940 - R:/116.202.155.130:30192] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=2 length=16 bytes=436f6e6e656374696f6e207265736574
2021-07-06 13:56:26.013 DEBUG [/] [-worker-ELG-1-1] io.grpc.netty.NettyClientHandler         : Network channel is closed

伊奥。grpc版本是1.37.0

有什么想法吗?

共有1个答案

艾自强
2023-03-14

这听起来像是网络路径上的一个设备在一段时间空闲后正在终止连接。它可以是代理、NAT或防火墙。

如果你能找到设备,你可以配置它。但通常情况下,你不能很好地配置这些东西。

gRPC支持为这种情况设计的保持活动。一段时间不活动后,grpc将导致活动,只是为了确保连接仍然良好并通知网络设备连接仍在使用。

您可以在客户端或服务器端配置keepalive。如果网络设备是服务器部署的一部分,那么让服务器管理keeepalive是最好的。由于客户机有一个不可路由的IP,我想问题是,这种情况下是客户机前面的NAT。因此,在客户端上配置keepalive更有意义,因为不同的客户端可能有不同的需求。

        NettyChannelBuilder
            .forTarget(properties.connectorServiceUrl)
            .usePlaintext()
            // Enable keepalive, with a time a bit smaller
            // than the observed resets
            .keepAliveTime(150, TimeUnit.SECONDS)
            .build()

为了防止滥用,gRPC服务器默认情况下将保留时间限制为不低于5分钟。因此,您还需要更改服务器。我没有使用grpc spring boot starter,但根据他们的文档,您似乎会使用:

@Component
public class MyGRpcServerBuilderConfigurer extends GRpcServerBuilderConfigurer{
        @Override
        public void configure(ServerBuilder<?> serverBuilder) {
            ((NettyServerBuilder) serverBuilder)
                .permitKeepAliveTime(150, TimeUnit.SECONDS);
        }
    };
}

 类似资料:
  • 我目前有一个依赖于通过安全套接字传输的JSON的原始RPC设置,但我想切换到gRPC。不幸的是,我还需要访问windows上的AF\U UNIX(Microsoft最近开始支持,但gRPC尚未实现)。 由于我有一个现有的工作连接(使用另一个库进行管理),我的首选方法是将其与GRPC结合使用来发送/接收命令,而不是JSON解析,但我正在努力确定实现这一点的最佳方法。 我已经看到将自定义传输插入gRP

  • 我正在尝试编写同时使用gRPC和REST的服务。实现技术有Java、Spring-Boot和gRPC。使用场景示例如下: 目的是有外部客户端可以通过RESTendpoint和/或通过进行gRPC调用与应用程序交互。在内部,有“网关”服务提供外部接口,并负责在外部客户端和执行实际工作的“域”服务之间传输/路由请求和响应。内部服务将通过gRPC进行通信。 外部客户端不知道如何在内部处理事情,域服务没有

  • 我通过在对象列表上进行流式处理来调用异步客户端方法。该方法返回Future。 迭代调用后返回的Futures列表的最佳方法是什么(以便处理那些首先出现的Future)? 注意:异步客户端只返回Future而不是CompletableFuture。 代码如下:

  • 问题内容: 我知道Java中有一个函数可以使用method 将标准输出流设置为任何用户定义的值。 但是,是否有任何方法可以将标准输出重置为先前存储的标准输出还是标准输出? 问题答案: 您可以通过持有标准的文件描述符。要重置标准以打印到控制台,您可以 另一种方法是仅保留原始对象,如下所示:

  • null null 但问题是我想以编程的方式完成这个任务。 我实际上想做的是: 步骤 但正如我所提到的,Firefox对MKV说不。 所以我尝试了hls的东西,但我不能完全得到命令来生成流,也不能在飞行中播放。 我是在命令行上开始的 所以我想再做一次 我要请求服务器播放视频文件 它生成一个子进程FFmpeg,该进程执行代码转换 向客户端发送流 客户端应该能够搜索到底,并且应该播放该内容。

  • 是否可以在OptaPlanner约束流中使用除inner join以外的其他join类型?在我的情况下,我需要做一个左联接。