RxJava 操作符flatmap

RxJava 操作符flatmap有如下场景:在前段调用后端的API时,经常会出现回调嵌套的情况。假设我们有两个API,queryA和queryB.并且queryB的运行依赖于queryA的结果。那么我们的程序在一般的情况下可能是这个样子。想象有如下的代码:是不是感觉非常不舒服?假如嵌套的API再多几层,那么这将是个灾难。一个人开发的时候可能不觉得有什么问题,但是可以想象做codereview或者新入项目组的同事

大家好,又见面了,我是你们的朋友全栈君。

有如下场景:

在前段调用后端的API时,经常会出现回调嵌套的情况。假设我们有两个API,queryA 和 queryB. 并且queryB的运行依赖于queryA的结果。那么我们的程序在一般的情况下可能是这个样子。

想象有如下的代码:
server类

嵌套调用API

是不是感觉非常不舒服?假如嵌套的API再多几层,那么这将是个灾难。一个人开发的时候可能不觉得有什么问题,但是可以想象做code review或者新入项目组的同事看到你这复杂的嵌套时的表情。

是时候让flatmap出现啦!让我们把程序稍微改造一下,在Server类里面把makeRequest的方式变成RxJava的Observable的形式(我会在例子之后解释为什么要用flatmap()):
使用Observable

开始调用

看上去好像没觉得有都简洁是么?你等着我给你看看假如嵌套多几层之后:
这里写图片描述

看到了么?在RxJava的链式调用下,所有之前需要嵌套的地方都被flatMap()隔开了。代码可读性大大增加!假如你的IDE支持java 8的话,你可以体验更美妙的事情:lambda!

这里写图片描述

在抛弃了可恶的匿名类之后,代码更加简洁了!

看来很多同学都不太理解为何要用flatmap来解决嵌套回调。那咱们就深入点。

flatMap()究竟是用来干嘛的?简单的说,flatmap把一个Observable变成多个Observable,然后把得到的多个Obervable的元素一个个的发射出去。

假设你有一个API queryA,用这个query可以通过一个userID得到一个指定用户的所有关注的明星的userID(但是原始数据只是String),你需要把这些userID一个个打印出来。

那么我们在调用queryA的时候就已经构建了一个Obervable了,我们暂且叫他O1.在O1每发射结果的同时,我们需要调用把返回的String结果变成另一个Observable,O2,O2含有所有的明星userID,并且一个个发射出去。代码例子:
flatmap

大家可以看到,新的observable2是一个JsonObject类型的。因为在第三行注释之后,我们返回了一个(可以是多个)新的包含所有userID的observable,RxJava会将这个(或者多个)Observable平铺发射.

或者大家可以参考抛物线大神文章中的学生和课程的例子 大概在文章的中间

总之flatmap最核心的功能大家可以理解为:转换,可一个Obervable转换成多个Observable,再讲结果平铺发射。

在我们的例子中,是一个observable(O1)变成另一个observable(O2),是一对一的关系,因为queryA只会返回一个String s的结果,所以我们只会将一个observable(O2)平铺并发射。

所以这和nested callback有什么关系呢?

关系大了!

再想想,我们的刚刚做的和nested callback不同的地方差在哪里?唯一不通的地方是在获取所有明星userID的时候,我们是一个同步的操作,也就是说userID全部都包含在queryA返回的结果里面。我们用同步的方法把其封装成一个JSONArray.

假如queryA返回的只是用户关注明星userID的url呢?假如在第二部我们需要再做一步API call呢?

是的,就算我们还需要一步API call,程序的结构还是一样的:
异步获取list

flatmap最核心的功能大家可以理解为:转换,可一个Obervable转换成多个Observable,再将结果平铺发射。

至于转换的Observable里面的操作是同步还是异步,我们不需要去过多的思考,Rxjava已经完美的解决了。

参考:http://www.jianshu.com/p/0f926fda682b

延伸:实例项目(体会flatMap用法)

一. 添加依赖

compile 'com.squareup.retrofit2:retrofit:2.0.0-beta4'
    compile 'io.reactivex:rxjava:1.1.1'

    compile 'com.squareup.retrofit2:converter-gson:2.0.0-beta4'
    //Gson解析器

    compile 'io.reactivex:rxandroid:1.1.0'
    //rxjava中用到的AndroidSchedulers.mainThread()

    compile 'com.squareup.retrofit2:adapter-rxjava:2.0.0-beta4'
    这里使用的是retrofit2.0,默认为okhttp3.0

二. 定义请求接口,转换HTTPAPI为Java接口

public interface IgankApi {
    @GET("a.json")
    Call<List<GirlEntity>> getGirl();

    @GET("data/%E7%A6%8F%E5%88%A9/{count}/{page}")
    Call<GirlJsonData> getGirl(@Path("count") int count, @Path("page") int page);

//与rxjava结合api
    @GET("data/%E7%A6%8F%E5%88%A9/{count}/{page}")
   Observable<GirlJsonData> getG(@Path("count") int count, @Path("page") int page);

}

三. 接着使用类Retrofit生成 接口的实现,使用了动态代理。

   public static IgankApi getIgankApi() {

        if (igankApi == null) {
            synchronized (IgankApi.class) {
                if (igankApi == null) {
                    Retrofit retrofit = new Retrofit.Builder().baseUrl("http://gank.io/api/")
                            .addConverterFactory(gsonConverterFactory)
                            .client(okHttpClient)
                            .addCallAdapterFactory(rxJavaCallAdapterFactory)
                            .build();
                    igankApi = retrofit.create(IgankApi.class);
                }
            }
        }
        return igankApi;
    }

四. 调用接口

private void getImg() {
        NetUtils.getIgankApi().getG(10, 2).flatMap(new Func1<GirlJsonData, Observable<List<GirlEntity>>>() {
            @Override
            public Observable<List<GirlEntity>> call(GirlJsonData girlJsonData) {
                return Observable.just(girlJsonData.getResults());
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<List<GirlEntity>>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(List<GirlEntity> girlEntities) {
                        girlEntityList.addAll(girlEntities);
                        loge(girlEntityList.get(1).getUrl() + "");
                        myRecycleViewAdapter.notifyDataSetChanged();
                    }
                });

    }

项目代码:https://github.com/jdsjlzx/Girl


再比如下面场景:

访问某个接口时并不能直接访问,而需要填入一个在线获取的 token ,代码应该怎么写?

Callback 方式,可以使用嵌套的 Callback:

@GET("/token")
public void getToken(Callback<String> callback);

@GET("/user")
public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);

...

getToken(new Callback<String>() {
    @Override
    public void success(String token) {
        getUser(userId, new Callback<User>() {
            @Override
            public void success(User user) {
                userView.setUser(user);
            }

            @Override
            public void failure(RetrofitError error) {
                // Error handling
                ...
            }
        };
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
});

使用RxJava 的话,代码如下:

@GET("/token")
public Observable<String> getToken();

@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);

...

getToken()
    .flatMap(new Func1<String, Observable<User>>() {
        @Override
        public Observable<User> call(String token) {
            return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

使用操作符flatMap() 就搞定了逻辑,依然是一条链。


flatMap()操作符的作用是将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,还是太抽象了。

简单说就是将 一个List 或者数组中的每一条数据都 转换成一个 Observable对象。

场景:

假如我们写了个网络请求, 然后拿出所有请求的数据, 现在我们不需要那么多数据, 我们只需要city字段和WD字段(运行过上段代码就知道了), 而且这次我不在只给你一个url了,而是给你多个url。

那我们就开始写代码吧(代码是最好的老师):

public static final String HOST = "http://www.weather.com.cn";
    List<String> values = new ArrayList<>();
    private String TAG = "SecondActivity2";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        values.add("/adat/sk/101010100.html");
        values.add("/adat/sk/101010100.html");
        values.add("/adat/sk/101010100.html");
        values.add("/adat/sk/101010100.html");
        values.add("/adat/sk/101010100.html");

        Observable.just(values).flatMap(new Func1<List<String>, Observable<?>>() {
            @Override
            public Observable<?> call(List<String> strings) {
                return Observable.from(strings);
            }
        }).cast(String.class).map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return doNetTaskForString(HOST + s); //取出想要的字段,这里我就不取出来了
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i(TAG, "value: " + s); 
                    }
                });
    }

    @NonNull
    @Override
    public int getContentView() {
        return R.layout.activity_second2;
    }

    private synchronized String doNetTaskForString(String s) {

        HttpClient client = new DefaultHttpClient();

        Log.i(TAG, "url:" + s);
        HttpGet get = new HttpGet(s);
        String result;
        try {
            HttpResponse response = client.execute(get);
            Log.i(TAG, "state code :" + response.getStatusLine().getStatusCode());
            if (200 == response.getStatusLine().getStatusCode()) {
                result = EntityUtils.toString(response.getEntity(), HTTP.UTF_8);
            } else {
                result = "状态行非200";
            }

        } catch (Exception e1) {
            result = "抛出了异常" + e1.getMessage();
            e1.printStackTrace();
        }
        return result;

    }

打印log:
这里写图片描述

这段代码里我们又用到的新的操作符 cast .

cast的作用就是 在发射之前强制将Observable发射的所有数据转换为指定类型。
这里写图片描述

再举个例子。
这里写图片描述

这里写图片描述

这里结合了retrofit 在flatMap中通过一个城市的str,返回一个Observable,这个Observable的参数是一些获取到的天气信息结构WeatherData,这样在后面subscrib中就可以对其进行处理了。

flatMap多线程执行任务

单线程
有如下代码:

 private Observable<String> processUrlIpByOneFlatMap() {
        return Observable.just(
                "http://www.baidu.com/",
                "http://www.google.com/",
                "https://www.bing.com/")
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String s) {
                        return createIpObservable(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        printLog(tvLogs, "Consume Data <- ", s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        printErrorLog(tvLogs, "throwable call()", throwable.getMessage());
                    }
                });
    }

    //根据主机获取ip
    private Observable<String> createIpObservable(final String url) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    String ip = getIPByUrl(url);
                    subscriber.onNext(ip);
                    printLog(tvLogs, "Emit Data -> ",url+" : " +ip);
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                    //subscriber.onError(e);
                    subscriber.onNext(null);
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                    //subscriber.onError(e);
                    subscriber.onNext(null);
                }
                subscriber.onCompleted();
            }
        });
    }

执行结果:

Emit Data -> 'http://www.baidu.com/ : 115.239.211.112'
Main Thread:false, Thread Name:RxCachedThreadScheduler-1
Consume Data <- '115.239.211.112'
Main Thread:true, Thread Name:main

Emit Data -> 'http://www.google.com/ : 216.58.199.100'
Main Thread:false, Thread Name:RxCachedThreadScheduler-1
Consume Data <- '216.58.199.100'
Main Thread:true, Thread Name:main

Emit Data -> 'https://www.bing.com/ : 202.89.233.104'
Main Thread:false, Thread Name:RxCachedThreadScheduler-1
Consume Data <- '202.89.233.104'
Main Thread:true, Thread Name:main

我们从上面的输出结果可以看出,效果和使用map操作符的效果是一样。

我们同时也发现线程的名称(Thread Name)都是 RxCachedThreadScheduler-1 ,说明他们是通过一个线程来完成所有的任务的。

多线程

如果任务很多,仅仅通过一个线程去做,效率上是不是有点低呢?如果我想使用多个线程来完成这些任务该怎么做呢?

很简单,只需要在创建Observable的时候加上subscribeOn(Schedulers.io()) 即可。完整代码如下:

//根据主机获取ip
    private Observable<String> createIpObservable(final String url) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    String ip = getIPByUrl(url);
                    subscriber.onNext(ip);
                    printLog(tvLogs, "Emit Data -> ",url+" : " +ip);
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                    //subscriber.onError(e);
                    subscriber.onNext(null);
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                    //subscriber.onError(e);
                    subscriber.onNext(null);
                }
                subscriber.onCompleted();
            }
        })
        .subscribeOn(Schedulers.io());
    }

执行结果:

Consume Data <-202.89.233.103’ 
Main Thread:true, Thread Name:main 
Emit Data -> ‘https://www.bing.com/ : 202.89.233.103’ 
Main Thread:false, Thread Name:RxCachedThreadScheduler-8

Emit Data -> ‘http://www.google.com/ : 216.58.203.36’ 
Main Thread:false, Thread Name:RxCachedThreadScheduler-7 
Consume Data <-216.58.203.36’ 
Main Thread:true, Thread Name:main

Emit Data -> ‘http://www.baidu.com/ : 115.239.211.112’ 
Main Thread:false, Thread Name:RxCachedThreadScheduler-6 
Consume Data <-115.239.211.112’ 
Main Thread:true, Thread Name:main 

从运行可以看出,执行完成任务的不是一个线程了,而是三个不同的线程 RxCachedThreadScheduler-8 、RxCachedThreadScheduler-7、RxCachedThreadScheduler-6 。

但是发现一个问题,输出的结果的顺序乱了,不是我们输入的baidu.com/google.com/bing.com顺序了。

那怎么办呢?

这时候concatMap操作符就闪亮登场了,详情:
http://blog.csdn.net/jdsjlzx/article/details/51720741

参考文章:http://blog.csdn.net/johnny901114/article/details/51532776

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/136110.html原文链接:https://javaforall.net

(0)
上一篇 2022年6月4日 上午8:00
下一篇 2022年6月4日 上午8:00


相关推荐

  • bs是cs的一种吗_cs客户端和bs客户端

    bs是cs的一种吗_cs客户端和bs客户端一,B/S结构是WEB兴起后的一种网络结构模式,WEB浏览器是客户端最主要的应用软件。这种模式统一了客户端,将系统功能实现的核心部分集中到二,区别(C/S与B/S):1.硬件环境不同:C/S一般建立在专用的网络上,小范围里的网络环境,局域网之间再通过专门2.对安全要求不同:C/S一般面向相对固定的用户群,对信息安全的控制能力很强。一般高度机密的信息系统采用C/S结构适宜。可以通过B/S发布…

    2022年10月17日
    5
  • OpenClaw企业部署实战:多用户管理、权限隔离与安全加固完整指南

    OpenClaw企业部署实战:多用户管理、权限隔离与安全加固完整指南

    2026年3月13日
    4
  • latex中希腊字母_LaTeX怎么念

    latex中希腊字母_LaTeX怎么念日常写论文,ppt作汇报等,经常需要编写公式,为方便查阅希腊字母对应latex表示,特写此表格,以便大家查阅。小写 Latex表示 大写 Latex表示 \alpha A A \beta B B \gamma \Gamma \delta \Delta \epsilon …

    2022年10月13日
    3
  • 电机控制foc算法讲解_电机算法需求

    电机控制foc算法讲解_电机算法需求最近做完了一个直流无刷电机的电机调速项目,查阅了各种大神所写的博客和论文,在这里我只做一下小小的总结;FOC(FiledOrientedControl)是采用数学方法实现三相马达的力矩与励磁的解耦控制。主要是对电机的控制电流进行矢量分解,变成励磁电流IdIdId和交轴电流IqIqIq,励磁电流主要是产生励磁,控制的是磁场的强度,而交轴电流是用来控制力矩,所以在实际使用过程中,我们常…

    2025年12月3日
    5
  • 【路由】静态路由「建议收藏」

    【路由】静态路由「建议收藏」静态路由1、静态路由的概念1.1、概念1.2、注意事项1.3、弊端2、静态路由的配置须知2.1、出接口为BMA类型2.2、出接口为P2P类型2.3、出接口为NBMA类型3、默认路由3.1、概念3.2、实验场景3.3、适用场景3.4、注意事项4、浮动静态路由4.1、静态路由负载均衡的实验场景4.2、静态路由负载均衡讲解4.3、静态路由负载分担的优点4.4、浮动静态路由的原理4.5、浮动静态路由的应用场景1、静态路由的概念1.1、概念1.2、注意事项1.3、弊端2、静态路由的配置须知2.1、出接口为

    2025年11月25日
    7
  • springmvc源码下载_idea jar包导入

    springmvc源码下载_idea jar包导入1.首先需要下载源码码云地址:https://gitee.com/mirrors/Spring-Framework.git很快推荐GitHub地址:https://github.com/spring-projects/spring-framework.git不推荐然后打开idea,下载源码2.下载并配置gradle环境下载地址:https://services…

    2022年8月12日
    33

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号