当前位置: 首页 > 编程笔记 >

Rxjava功能操作符的使用方法详解

松正阳
2023-03-14
本文向大家介绍Rxjava功能操作符的使用方法详解,包括了Rxjava功能操作符的使用方法详解的使用技巧和注意事项,需要的朋友参考一下

Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者,

被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据

下面把Rxjava常用的模型代码列出来,还有一些操作符的运用:

依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
  compile 'io.reactivex.rxjava2:rxjava:2.1.5'

这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。

compile 'com.alibaba:fastjson:1.2.39'
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.TextView;
 
import com.alibaba.fastjson.JSONObject;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
 
public class MainActivity extends AppCompatActivity {
 
  private TextView name;
 
  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
 
    name = (TextView) findViewById(R.id.name);
    //用来调用下面的方法,监听。
    name.setOnClickListener(new View.OnClickListener() {
      @Override
      public void onClick(View v) {
 
        interval();
      }
    });
  }
 
  //例1:Observer
  public void observer() {
    //观察者
    Observer<string> observer = new Observer<string>() {
      @Override
      public void onSubscribe(@NonNull Disposable d) {
 
      }
      @Override
      public void onNext(@NonNull String s) {
        //接收从被观察者中返回的数据
        System.out.println("onNext :" + s);
      }
      @Override
      public void onError(@NonNull Throwable e) {
 
      }
      @Override
      public void onComplete() {
 
      }
    };
    //被观察者
    Observable<string> observable = new Observable<string>() {
      @Override
      protected void subscribeActual(Observer<!--? super String--> observer) {
        observer.onNext("11111");
        observer.onNext("22222");
        observer.onComplete();
      }
    };
    //产生了订阅
    observable.subscribe(observer);
  }
 
  //例2:Flowable
  private void flowable(){
    //被观察者
    Flowable.create(new FlowableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
        for (int i = 0; i < 100; i++) {
          e.onNext(i+"");
        }
      }
      //背压的策略,buffer缓冲区        观察者
      //背压一共给了五种策略
      // BUFFER、
      // DROP、打印前128个,后面的删除
      // ERROR、
      // LATEST、打印前128个和最后一个,其余删除
      // MISSING
      //这里的策略若不是BUFFER 那么,会出现著名的:MissingBackpressureException错误
    }, BackpressureStrategy.BUFFER).subscribe(new Consumer<string>() {
      @Override
      public void accept(String s) throws Exception {
        System.out.println("subscribe accept"+s);
        Thread.sleep(1000);
      }
    });
  }
 
  //例3:线程调度器 Scheduler
  public void flowable1(){
    Flowable.create(new FlowableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
        for (int i = 0; i < 100; i++) {
          //输出在哪个线程
          System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
          e.onNext(i+"");
        }
      }
    },BackpressureStrategy.BUFFER)
        //被观察者一般放在子线程
        .subscribeOn(Schedulers.io())
        //观察者一般放在主线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<string>() {
          @Override
          public void accept(String s) throws Exception {
            System.out.println("s"+ s);
            Thread.sleep(100);
            //输出在哪个线程
            System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
          }
        });
  }
 
 
  //例4:http请求网络,map转化器,fastjson解析器
  public void map1(){
    Observable.create(new ObservableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull final ObservableEmitter<string> e) throws Exception {
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder()
            .url("https://qhb.2dyt.com/Bwei/login")
            .build();
        client.newCall(request).enqueue(new Callback() {
          @Override
          public void onFailure(Call call, IOException e) {
 
          }
 
          @Override
          public void onResponse(Call call, Response response) throws IOException {
            String result = response.body().string();
            e.onNext(result);
          }
        });
      }
    })
        //map转换器 flatmap(无序),concatmap(有序)
        .map(new Function<string, bean="">() {
      @Override
      public Bean apply(@NonNull String s) throws Exception {
        //用fastjson来解析数据
        return JSONObject.parseObject(s,Bean.class);
      }
    }).subscribe(new Consumer<bean>() {
      @Override
      public void accept(Bean bean) throws Exception {
        System.out.println("bean = "+ bean.toString() );
      }
    });
  }
 
  //常见rxjava操作符
  //例 定时发送消息
  public void interval(){
    Observable.interval(2,1, TimeUnit.SECONDS)
        .take(10)
        .subscribe(new Consumer<long>() {
          @Override
          public void accept(Long aLong) throws Exception {
            System.out.println("aLong = " + aLong);
          }
        });
  }
 
 
  //例 zip字符串合并
  public void zip(){
    Observable observable1 = Observable.create(new ObservableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {
        e.onNext("1");
        e.onNext("2");
        e.onNext("3");
        e.onNext("4");
        e.onComplete();
 
      }
    });
    Observable observable2 = Observable.create(new ObservableOnSubscribe<string>() {
      @Override
      public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {
        e.onNext("A");
        e.onNext("B");
        e.onNext("C");
        e.onNext("D");
        e.onComplete();
      }
    });
 
    Observable.zip(observable1, observable2, new BiFunction<string,string,string>() {
      @Override
      public String apply(@NonNull String o, @NonNull String o2) throws Exception {
        return o + o2;
      }
    }).subscribe(new Consumer<string>() {
      @Override
      public void accept(String o) throws Exception {
        System.out.println("o"+ o);
      }
    });
  }

总结

以上就是本文关于Rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Javaweb应用使用限流处理大量的并发请求详解、分享一个简单的java爬虫框架、Java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

 类似资料:
  • 主要内容:RxJava 过滤操作符 介绍,RxJava 过滤操作符 示例RxJava 过滤操作符 介绍 以下是用于从 Observable 中选择性地发送信息的运算符。 运算符 描述 Debounce 仅在发生超时时才发送项目而不发送另一个项目。 Distinct 只发送独特的物品。 ElementAt 仅发出由 Observable 发出的 n 个索引处的项目。 Filter 只发出那些通过给定谓词函数的项目。 First 发出通过给定条件的第一个项目或第一个项目。

  • 主要内容:RxJava 转换操作符 介绍,RxJava 转换操作符 示例RxJava 转换操作符 介绍 以下是用于转换从 Observable 发出的信息的运算符。 运算符 描述 Buffer 定期将 Observable 中的项目收集到包中,然后发出包而不是项目。 FlatMap 用于嵌套的 observable。将项目转换为 Observable。然后将项目展平为单个 Observable。 GroupBy 将一个 Observable 分成按键组织的一组 Obs

  • 主要内容:RxJava 创建操作符 介绍,RxJava 创建操作符 示例RxJava 创建操作符 介绍 以下是用于创建 Observable 的运算符。 运算符 描述 Create 从头开始创建一个 Observable 并允许以编程方式调用观察者方法。 Defer 在观察者订阅之前不要创建 Observable。为每个观察者创建一个新的 observable。 Empty/Never/Throw 创建一个行为受限的 Observable。 From 将对象/数据结构

  • 本文向大家介绍详解ABP框架中Session功能的使用方法,包括了详解ABP框架中Session功能的使用方法的使用技巧和注意事项,需要的朋友参考一下 如果一个应用程序需要登录,则它必须知道当前用户执行了什么操作。因此ASP.NET在展示层提供了一套自己的SESSION会话对象,而ABP则提供了一个可以在任何地方 获取当前用户和租户的IAbpSession接口。 关于IAbpSession 需要获

  • 本文向大家介绍Python使用xlwt模块操作Excel的方法详解,包括了Python使用xlwt模块操作Excel的方法详解的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python使用xlwt模块操作Excel的方法。分享给大家供大家参考,具体如下: 部分摘自官网文档. 该模块安装很简单 先来个简单的例子: 运行后 会在当前目录生成一个Excel_test.xls 官方例子: 运行这

  • 本文向大家介绍JSONP和批量操作功能的实现方法,包括了JSONP和批量操作功能的实现方法的使用技巧和注意事项,需要的朋友参考一下 推荐一个好用的在线 Markdown 编辑器,比我自己用 Python 编译成 markdown 要方便多了。 [http://mahua.jser.me] markdown简明语法教程 [http://www.appinn.com/markdown/] 好东西会让人