RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统

RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统

博客翻译自:RabbitMQ Tutorials Java版


RabbitMQ(一):Hello World程序

RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发

RabbitMQ(三):Exchange交换器–fanout

RabbitMQ(四):Exchange交换器–direct

RabbitMQ(五):Exchange交换器–topic

RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统

RabbitMQ(七):常用方法说明 与 学习小结


远程过程调用(RPC):

在第二篇博客中,我们学会了如何使用工作队列将耗时的任务分发给多个工作者。但假如我们想调用远程电脑上的一个函数(或方法)并等待函数执行的结果,这时候该怎么办呢?好吧,这是一个不同的故事。这种模式通常称为远程过程调用RPC(Remote Procedure Call)。

在今天的教程中,我们将会使用RabbitMQ来建立一个RPC系统:一个客户端和一个可扩展的RPC服务端。因为我们没有任何现成的耗时任务,我们将会创建一个假的RPC服务,它将返回斐波那契数(Fibonacci numbers)。


客户端接口(Client interface):

为了演示如何使用RPC服务,我们将创建一个简单的客户端类。它负责暴露一个名为call的方法,该方法将发送一个RPC请求并阻塞,直到接收到回答。

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

回调队列(Callback queue):

使用RabbitMQ来做RPC很容易。客户端发送一个请求消息,服务端以一个响应消息回应。为了可以接收到响应,需要与请求(消息)一起,发送一个回调的队列。我们使用默认的队列(Java独有的):

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息属性
AMQP 0-9-1协议预定义了消息的14种属性。大部分属性都很少用到,除了下面的几种:

  • deliveryMode:标记一个消息是持久的(值为2)还是短暂的(2以外的任何值),你可能还记得我们的第二个教程中用到过这个属性。
  • contentType:描述编码的mime-typemime-type of the encoding)。比如最常使用JSON格式,就可以将该属性设置为application/json
  • ③ replyTo:通常用来命名一个回调队列。
  • ④ correlationId:用来关联RPC的响应和请求。

我们需要引入一个新的类:

import com.rabbitmq.client.AMQP.BasicProperties;

关联标识(Correlation Id):

在上面的方法中,我们为每一个RPC请求都创建了一个新的回调队列。这样做显然很低效,但幸好我们有更好的方式:让我们为每一个客户端创建一个回调队列

这样做又引入了一个新的问题,在回调队列中收到响应后不知道到底是属于哪个请求的。这时候,CorrelationId就可以派上用场了。对每一个请求,我们都创建一个唯一性的值作为CorrelationId。之后,当我们从回调队列中收到消息的时候,就可以查找这个属性,基于这一点,我们就可以将一个响应和一个请求进行关联。如果我们看到一个不知道的 CorrelationId值,我们就可以安全地丢弃该消息,因为它不属于我们的请求。

你可能会问,为什么要忽视回调队列中的不知道的消息,而不是直接以一个错误失败(failing with an error)。这是由于服务端可能存在的竞争条件。尽管不会,但这种情况仍有可能发生:RPC服务端在发给我们答案之后就挂掉了,还没来得及为请求发送一个确认信息。如果发生这种情况,重启后的RPC服务端将会重新处理该请求(因为没有给RabbitMQ发送确认消息,RabbitMQ会重新发送消息给RPC服务)。这就是为什么我们要在客户端优雅地处理重复响应,并且理想情况下,RPC服务要是幂等的。


总结:

RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统

我们的RPC系统的工作流程如下:

当客户端启动后,它会创建一个异步的独特的回调队列。对于一个RPC请求,客户端将会发送一个配置了两个属性的消息:一个是replyTo属性,设置为这个回调队列;另一个是correlation id属性,每一个请求都会设置为一个具有唯一性的值。这个请求将会发送到rpc_queue队列。

RPC工作者(即图中的server)将会等待rpc_queue队列的请求。当有请求到来时,它就会开始干活(计算斐波那契数)并将结果通过发送消息来返回,该返回消息发送到replyTo指定的队列。

客户端将等待回调队列返回数据。当返回的消息到达时,它将检查correlation id属性。如果该属性值和请求匹配,就将响应返回给程序。


放在一块:

计算斐波那契数的任务如下:

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

我们定义了斐波那契函数,它假设只会输入正整数(不要期望该函数在输入很大的数的时候可以好好工作,它可能是最慢的递归实现)。

RPC服务RPCServer.java的代码如下:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    //模拟的耗时任务,即计算斐波那契数
    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) {
        //创建连接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = null;
        try {
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();

            //声明队列
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            //一次只从队列中取出一个消息
            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            //监听消息(即RPC请求)
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(properties.getCorrelationId())
                            .build();

                    //收到RPC请求后开始处理
                    String response = "";
                    try {
                        String message = new String(body, "UTF-8");
                        int n = Integer.parseInt(message);
                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    } catch (RuntimeException e) {
                        System.out.println(" [.] " + e.toString());
                    } finally {
                        //处理完之后,返回响应(即发布消息)
                        System.out.println("[server current time] : " + System.currentTimeMillis());
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

            //loop to prevent reaching finally block
            while (true) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException _ignore) {
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
                try {
                    connection.close();
                } catch (IOException _ignore) {
                }
        }
    }
}

RPC服务的代码很直白:

  • (1)开始先建立连接、通道并声明队列。
  • (2)我们可能会运行多个服务进程,为了负载均衡我们通过设置 prefetchCount =1将任务分发给多个服务进程
  • (3)我们使用了basicConsume来连接队列,并通过一个DefaultConsumer对象提供回调。这个DefaultConsumer对象将进行工作并返回响应。

我们的RPC客户端RPCClient代码如下:

package com.maxwell.rabbitdemo;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;

    //定义一个RPC客户端
    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
    }

    //真正地请求
    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(corrId)) {
                    System.out.println("[client current time] : " + System.currentTimeMillis());
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });

        return response.take();
    }

    //关闭连接
    public void close() throws IOException {
        connection.close();
    }

    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            //创建一个RPC客户端
            fibonacciRpc = new RPCClient();
            System.out.println(" [x] Requesting fib(30)");
            //RPC客户端发送调用请求,并等待影响,直到接收到
            response = fibonacciRpc.call("30");
            System.out.println(" [.] Got '" + response + "'");
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (fibonacciRpc != null) {
                try {
                    //关闭RPC客户的连接
                    fibonacciRpc.close();
                } catch (IOException _ignore) {
                }
            }
        }
    }
}

客户端代码看起来有一些复杂:

  • (1)建立连接和通道,并声明了一个独特的回调队列。
  • (2)订阅这个回调队列,所以我们可以接收RPC响应。
  • (3)call方法执行RPC请求。在call方法中,我们首先生成一个具有唯一性的correlationId值并存在变量corrId中。我们的DefaultConsumer中的实现方法handleDelivery会使用这个值来获取争取的响应。然后,我们发布了这个请求消息,并设置了replyTocorrelationId这两个属性。好了,现在我们可以坐下来耐心等待响应到来了。
  • (4)由于我们的消费者处理(指handleDelivery方法)是在子线程进行的,因此我们需要在响应到来之前暂停主线程(否则主线程结束了,子线程接收到了影响传给谁啊)。使用BlockingQueue是一种解决方案。在这里我们创建了一个阻塞队列ArrayBlockingQueue并将它的容量设为1,因为我们只需要接受一个响应就可以啦。handleDelivery方法所做的很简单,当有响应来的时候,就检查是不是和correlationId匹配,匹配的话就放到阻塞队列ArrayBlockingQueue中。
  • 同时,主线程正等待影响。
  • (5)最终将影响返回给用户了。

现在,可以动手实验了。首先,执行RPC服务端,让它等待请求的到来。

 [x] Awaiting RPC requests

然后,执行RPC客户端,即RPCClient中的main方法,发起请求:

[x] Requesting fib(30)
[client current time] : 1500474305838
 [.] Got '832040'

可以看到,客户端很快就接受到了请求,回头看RPC服务端的时间:

 [.] fib(30)
[server current time] : 1500474305835

上面这种设计并不是RPC服务端的唯一实现,但是它有以下几个重要的优势:

  • ① 如果RPC服务端很慢,你可以通过运行多个实例就可以实现扩展。
  • ② 在RPC客户端,RPC要求发送和接受一个消息。非同步的方法queueDeclare是必须的。这样,RPC客户端只需要为一个RPC请求只进行一次网络往返。

但我们的代码仍然太简单,并没有处理更复杂但也非常重要的问题,像:

  • ① 如果没有服务端在运行,客户端该怎么办
  • ② 客户端应该为一次RPC设置超时吗
  • ③ 如果服务端发生故障并抛出异常,它还应该返回给客户端吗?
  • ④ 在处理消息前,先通过边界检查、类型判断等手段过滤掉无效的消息等

 

说明:

①与原文略有出入,如有疑问,请参阅原文

②原文均是编译后通过javacp命令直接运行程序,我是在IDE中进行的,相应的操作做了修改。

③添加了客户端和服务端执行时间。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/114604.html原文链接:https://javaforall.net

(0)
上一篇 2021年10月5日 下午12:00
下一篇 2021年10月5日 下午1:00


相关推荐

  • C++ lamda表达式[通俗易懂]

    C++ lamda表达式[通俗易懂]简要介绍了c++中的lamda表达式和其用法

    2022年6月3日
    35
  • cssselector_css样式库

    cssselector_css样式库指定列的宽度和数量:columns属性是一个速记属性设置列宽和列数。columns是column-width与column-count的简写属性。语法:columns:列的宽度列数

    2022年8月30日
    5
  • android读取短信_android发短信代码

    android读取短信_android发短信代码在Tasker中利用短信的通知实现短信内容转发到微信之前的教程是利用Tasker自带的短信变量来实现的,这有一个无法解决问题:在360,魅族,坚果等品牌的手机中无法获取短信的消息内容。利用短信的通知则可以获取短信的内容,并且对于华为手机来说也可以不用关闭短信验证码保护功能,所以相较于原来的方法更简单,方便。应用通知的变量是一个本地数组变量%evtprm(),这个变量数组包含%evtprm…

    2022年10月13日
    3
  • WiMAX技术知识概述

    WiMAX技术知识概述WiMax 是什么 WiMax 有什么优缺点 WiMax 是什么 WiMax WorldwideInt 即全球微波互联接入 WiMAX 也叫 802 16 无线城域网或 802 16 WiMAX 是一项新兴的宽带无线接入技术 能提供面向互联网的高速连接 数据传输距离最远可达 50km WiMAX 还具有 QoS 保障 传输速率高 业务丰富多样等优点 WiMAX 的技术起点较高 采用了代表未来通信技术发展方向的 OFDM OFDMA AAS MIMO 等先进技术 随着

    2026年3月19日
    2
  • 子查询关键字-ALL、ANY、SOME、IN、EXISTS「建议收藏」

    子查询关键字-ALL、ANY、SOME、IN、EXISTS「建议收藏」子查询关键字-ALL、ANY、SOME、IN、EXISTSALLselectfromwherec>all(查询语句)等价于selectfromwherec>result1andc>result2andc>result3特点: 1:all与子查询返回的所有值比较为true则返回true 2:ALL可以与=><>=<=<>结合使用 3:all表示指定列中的值必须要大于子查询集中的每一个值

    2022年7月27日
    12
  • ICMP报文分析

    ICMP报文分析

    2021年11月14日
    55

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号