RxNetty

Netty 响应式扩展(Rx) 适配器
授权协议 Apache
开发语言 Java
所属分类 程序开发、 高性能网络开发库
软件类型 开源软件
地区 不详
投 递 者 太叔景曜
操作系统 跨平台
开源组织 Netflix
适用人群 未知
 软件概览

RxNetty 是 Netty 响应式扩展(Rx) 适配器。

代码示例:

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import java.nio.charset.Charset;
public final class RxNettyExample {
    public static void main(String... args) throws InterruptedException {
        HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, (request, response) -> {
            System.out.println("Server => Request: " + request.getPath());
            try {
                if ("/error".equals(request.getPath())) {
                    throw new RuntimeException("forced error");
                }
                response.setStatus(HttpResponseStatus.OK);
                response.writeString("Path Requested =>: " + request.getPath() + '\n');
                return response.close();
            } catch (Throwable e) {
                System.err.println("Server => Error [" + request.getPath() + "] => " + e);
                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                response.writeString("Error 500: Bad Request\n");
                return response.close();
            }
        });
        server.start();
        RxNetty.createHttpGet("http://localhost:8080/")
               .flatMap(response -> response.getContent())
               .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
               .toBlocking().forEach(System.out::println);
        RxNetty.createHttpGet("http://localhost:8080/error")
               .flatMap(response -> response.getContent())
               .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
               .toBlocking().forEach(System.out::println);
        RxNetty.createHttpGet("http://localhost:8080/data")
               .flatMap(response -> response.getContent())
               .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
               .toBlocking().forEach(System.out::println);
        server.shutdown();
    }
}

输出:

Server => Request: /
Client => Path Requested =>: /

Server => Request: /error
Server => Error [/error] => java.lang.RuntimeException: forced error
Client => Error 500: Bad Request

Server => Request: /data
Client => Path Requested =>: /data
 相关资料
  • 函数响应式编程是一个来自90年代后期受微软的一名计算机科学家Erik Meijer启发的思想,用来设计和开发微软的Rx库。 Rx 是微软.NET的一个响应式扩展。Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。开发者可以使用Observables模拟异步数据流,使用LINQ语法查询Observables,并且很容易管理调度器的并发。 Rx让众所周知的概念变得易于实现和消费,

  • 扩展说明 日志输出适配扩展点。 扩展接口 org.apache.dubbo.common.logger.LoggerAdapter 扩展配置 <dubbo:application logger="xxx" /> 或者: -Ddubbo:application.logger=xxx 已知扩展 org.apache.dubbo.common.logger.slf4j.Slf4jLoggerAdap

  • 我想在我的列表中实现部分。我有一个任务列表。列表有一个自定义适配器,它扩展了回收器视图滑动适配器,因为我已经实现了对回收器视图的滑动手势。 现在,任务列表与已完成和待处理的任务一起显示。每个列表项都有一个复选框,显示任务已完成或挂起。 如果选中复选框,则任务完成,反之亦然。现在我想在这篇文章中用页眉做两个部分。一个用于已完成的任务,另一个用于待完成的任务。 所以完成的任务应该显示在完成的部分中,反

  • http://javatar.iteye.com/blog/690845 我们平台的产品越来越多,产品的功能也越来越多。平台的产品为了适应各 BU 和部门以及产品线的需求,势必会将很多不相干的功能凑在一起,客户可以选择性的使用。为了兼容更多的需求,每个产品,每个框架,都在不停的扩展,而我们经常会选择一些扩展的扩展方式,也就是将新旧功能扩展成一个通用实现。我想讨论是,有些情况下也可以考虑增量式的扩展

  • 目的 使用部署配置来部署多个 Pod,并以此扩展缩放应用。 环境 openshift v3.11.16/kubernetes v1.11.0 步骤 创建工程1. CLI 登录到 OCP $ oc login https://master.example.com:8443 -u admin -p admin2. 创建工程 $ oc new-project lab08 创建一个新应用,测试缩放1. 创

  • 我正在做一个项目,该项目将有许多JavaFX应用程序,这些应用程序具有相似但又足够不同的功能,因此我创建了一个抽象基类来扩展Application以处理常见的功能并指示它们需要做什么,还创建了一系列具体的类来扩展这些功能。然而,当我试图跑的时候,我得到 应用程序构造函数java.lang.Reflect.InvocationTargetException位于java.base/jdk.intern

  • 主要内容:介绍,实现,MediaPlayer.java,AdvancedMediaPlayer.java,VlcPlayer.java,Mp4Player.java,MediaAdapter.java,AudioPlayer.java,AdapterPatternDemo.java适配器模式(Adapter Pattern)是作为两个不兼容的接口之间的桥梁。这种类型的设计模式属于结构型模式,它结合了两个独立接口的功能。 这种模式涉及到一个单一的类,该类负责加入独立的或不兼容的接口功能。举个真实的