Reactive Streams

Reactive Streams1.概述在本文中,我们将介绍Java9ReactiveStreams。简而言之,我们将能够使用Flow类,它包含用于构建反应流处理逻辑的主要构建块。ReactiveStreams是具有无阻塞背压的异步流处理的标准。此规范在ReactiveManifesto中定义,并且有各种实现,例如,RxJava或Akka-Streams。2.ReactiveAPI概述要构建Flow,我们可…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

1.概述
在本文中,我们将介绍Java 9 Reactive Streams。简而言之,我们将能够使用Flow类,它包含用于构建反应流处理逻辑的主要构建块。
Reactive Streams是具有无阻塞背压的异步流处理的标准。此规范在Reactive Manifesto中定义,并且有各种实现,例如,RxJava或Akka-Streams。

2.Reactive API概述
要构建Flow, 我们可以使用三个主要抽象并将它们组合成异步处理逻辑。
每个Flow都需要处理Publisher实例发布给它的事件 ; he Publisher 有一个方法 – subscribe().
如果任何订阅者想要接收由其发布的事件,则他们需要订阅给定的发布者。
消息接收者需要实现订阅者接口。通常,这是每个Flow处理的结束,因为它的实例不会进一步发送消息。我们可以将Subscriber视为一个接收器。这有四个需要重写的方法 – onSubscribe(),onNext(),onError()和onComplete()。
如果我们想要转换传入消息并将其进一步传递给下一个订阅服务器,我们需要实现处理器接口。
这既可以作为订阅者,也可以作为发布者,因为它处理这些消息并将其发送以进行进一步处理。

3.发布和使用消息
假设我们要创建一个简单的Flow,其中我们有一个Publisher发布消息,一个简单的订阅者在消息到达时消费一个消息。让我们创建一个EndSubscriber类。我们需要实现Subscriber接口。接下来,我们将覆盖所需的方法。所述onSubscribe()方法在处理开始之前调用。订阅的实例作为参数传递。它是一个用于控制订阅服务器和发布服务器之间的消息流的类:

public class EndSubscriber<T> implements Subscriber<T> {
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();
 
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
}

Jetbrains全家桶1年46,售后保障稳定

我们还初始化一个空列表的consumedElements那将在测试中使用。
现在,我们需要从Subscriber接口实现其余方法。这里的主要方法是onNext() – 每当Publisher发布一条新消息时都会调用它:

  @Override
public void onNext(T item) {
    System.out.println("Got : " + item);
    subscription.request(1);
}

请注意,当我们在onSubscribe()方法中启动订阅时,当我们处理消息时,我们需要在Subscription上调用request()方法,以表示当前订阅服务器已准备好使用更多消息。
最后,我们需要实现onError() – 只要在处理中抛出一些异常就会调用它,以及onComplete() -当Publisher关闭时调用:

@Override
public void onError(Throwable t) {
    t.printStackTrace();
}
 
@Override
public void onComplete() {
    System.out.println("Done");
}

让我们为Processing Flow编写一个测试。我们将使用SubmissionPublisher类 – 来自java.util.concurrent的构造- 它实现了Publisher接口。

@Test
public void whenSubscribeToIt_thenShouldConsumeAll() 
  throws InterruptedException {
  
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>();
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
 
    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();
 
    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(
         () -> assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(items)
     );
}

注意,我们在EndSubscriber的实例上调用close()方法。它将在给定Publisher的每个订阅服务器下面调用onComplete()回调。

运行该程序将产生以下输出:

Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done

4.消息的转换
假设我们想要在发布服务器和订阅服务器之间构建类似的逻辑。我们将创建实现Processor并扩展SubmissionPublisher的TransformProcessor类。
我们将传入一个将输入转换为输出的函数:

public class TransformProcessor<T, R> 
  extends SubmissionPublisher<R> 
  implements Flow.Processor<T, R> {
 
    private Function<T, R> function;
    private Flow.Subscription subscription;
 
    public TransformProcessor(Function<T, R> function) {
        super();
        this.function = function;
    }
 
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
 
    @Override
    public void onNext(T item) {
        submit(function.apply(item));
        subscription.request(1);
    }
 
    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }
 
    @Override
    public void onComplete() {
        close();
    }
}

现在让我们编写一个快速测试,其中包含Publisher发布String元素的处理流程。
我们的TransformProcessor将String解析为Integer – 这意味着需要在此处进行转换:

@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
  throws InterruptedException {
  
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    TransformProcessor<String, Integer> transformProcessor 
      = new TransformProcessor<>(Integer::parseInt);
    EndSubscriber<Integer> subscriber = new EndSubscriber<>();
    List<String> items = List.of("1", "2", "3");
    List<Integer> expectedResult = List.of(1, 2, 3);
 
    // when
    publisher.subscribe(transformProcessor);
    transformProcessor.subscribe(subscriber);
    items.forEach(publisher::submit);
    publisher.close();
 
    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(() -> 
         assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(expectedResult)
     );
}

注意,在基本Publisher上调用close()方法将导致调用TransformProcessor上的onComplete()方法。

请注意,处理链中的所有Publisher都需要以这种方式关闭。

5.使用订阅控制消息需求
让我们修改我们的EndSubscriber只消耗N个消息。我们将传递该数字作为howMuchMessagesConsume构造函数参数:

public class EndSubscriber<T> implements Subscriber<T> {
  
    private AtomicInteger howMuchMessagesConsume;
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();
 
    public EndSubscriber(Integer howMuchMessagesConsume) {
        this.howMuchMessagesConsume 
          = new AtomicInteger(howMuchMessagesConsume);
    }
 
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
 
    @Override
    public void onNext(T item) {
        howMuchMessagesConsume.decrementAndGet();
        System.out.println("Got : " + item);
        consumedElements.add(item);
        if (howMuchMessagesConsume.get() > 0) {
            subscription.request(1);
        }
    }
    //...
     
}

让我们编写一个测试,我们只想从给定的Subscription中使用一个元素:

@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
  throws InterruptedException {
  
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(1);
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
    List<String> expected = List.of("1");
 
    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();
 
    // then
    await().atMost(1000, TimeUnit.MILLISECONDS)
      .until(() -> 
        assertThat(subscriber.consumedElements)
       .containsExactlyElementsOf(expected)
    );
}

虽然发布者发布了六个元素,但我们的EndSubscriber只消耗一个元素,因为它表示只处理那个元素的需求。

六,结论
我们了解了如何创建由发布服务器和订阅服务器组成的处理流程。我们使用处理器转换元素创建了一个更复杂的处理流程。最后,我们使用Subscription来控制订阅者对元素的需求。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/222988.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 一种基于Linux文件系统的数据恢复方法_武汉病例轨迹

    一种基于Linux文件系统的数据恢复方法_武汉病例轨迹问题阶段起因:昨天晚上思路不是很清晰(上了一天班回来有点蒙),还是强忍着疲惫想搞事情,结果悲剧了……本来想拿SD卡做一张linux烧录卡,烧录脚本是很久以前写的,有git记录,一直不成功,就回退了几次提交,然后执行的时候没有给脚本传参(/dev/sd**),结果脚本中默认磁盘设备为/dev/sdb,在现在电脑上是一块数据磁盘,执行到一半的时候由于某些原因意外退出,但还是有一些命令执行,比

    2025年5月30日
    0
  • pychram 激活码【最新永久激活】

    (pychram 激活码)2021最新分享一个能用的的激活码出来,希望能帮到需要激活的朋友。目前这个是能用的,但是用的人多了之后也会失效,会不定时更新的,大家持续关注此网站~IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.net/100143.html…

    2022年3月31日
    57
  • iframe自适应高度_jquery取iframe文本

    iframe自适应高度_jquery取iframe文本超级简单的方法,也不用写什么判断浏览器高度、宽度啥的。下面的两种方法自选其一就行了。一个是放在和iframe同页面的,一个是放在test.html页面的。注意别放错地方了哦。iframe代码

    2022年10月12日
    1
  • linux空文件夹删不掉_linux可以遍历删除空目录吗

    linux空文件夹删不掉_linux可以遍历删除空目录吗请关注本头条号,每天坚持更新原创干货技术文章。如需学习视频,请在微信搜索公众号“智传网优”直接开始自助视频学习。1.rmdir命令简介本文主要介绍rmdir命令,该命令用于删除Linux上的空目录。对于非空目录,请使用rm命令。2.rmdir命令选项-p或–parents:删除指定目录后,若该目录的上层目录已变成空目录,则将其一并删除;–ignore-fail-on-non-empty:此…

    2025年5月27日
    0
  • linux怎么关闭防火墙命令,Linux怎么用命令永久关闭防火墙

    有时防火墙会限制我们下载上传操作等,而Linux操作系统想要关闭防火墙有很多命令。具体有哪些呢?下面由学习啦小编为大家整理了linux中永久关闭防火墙命令的相关知识,希望对大家有帮助!Linux永久关闭防火墙命令1)永久性生效,重启后不会复原开启:chkconfigiptableson关闭:chkconfigiptablesoff2)即时生效,重启后复原开启:serviceiptab…

    2022年4月8日
    220
  • 有约束最优化问题MATLAB_约束条件下的最优化问题

    有约束最优化问题MATLAB_约束条件下的最优化问题最近在做天线多目标优化的实例,因此接触到了NSGA-Ⅱ算法,所以想分享以下我个人的学习内容与经历,仅作参考,如果内容有误,也希望各位能够指出来,大家一起进行交流指正。内容将分为以下几个模块,内容可能较多,如果觉得不错的话,可以点赞????,收藏或者转发哦!目录NSGA-Ⅱ算法简介非支配集排序锦标赛选择模拟二进制交叉多项式变异精英保留策略参考文献NSGA-Ⅱ算法简介NSGA-Ⅱ算法由Deb等人首次提出,其思想为带有精英保留策略的快速非支配多目标优化算法,是一种基于Pareto最优解的多目标优化算法。

    2022年10月11日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号