首页天道酬勤为什么要用rabbitmq,springboot netty

为什么要用rabbitmq,springboot netty

张世龙 05-13 12:01 91次浏览

说到远程处理模块远程过程调用协议(RPC ),人们浮现在脑海的是rest风格的API、Dubbo、web服务、Java RMI、CORBA等。

其实,RabbitMQ也提供了RPC功能,而且使用方便。

今天,松哥将通过一个简单的案例向大家展示Spring Boot RabbitMQ是如何实现简单的RPC调用的。

注意

对于RabbitMQ实现RPC调用,一些合作伙伴可能有所误解,你认为这还不容易吗? 创建两个消息队列queue_1和queue_2,首先客户端向queue_1发送消息,服务端监听并接收queue_1的消息后进行处理; 处理完成后,服务器端向queue_2队列发送消息,客户端接收queue_2队列的消息,可以知道服务器端的处理结果。

这种方式不是不行,但是有点麻烦! RabbitMQ提供了一个现成的方案,非常有用。 接下来一起学习吧。

1 .框架首先看看简单的框架图:

这张图很理解问题:

首先,客户端发送消息。 与普通消息相比,此消息有两个重要内容。 一个是correlation_id,它表示此消息的唯一id,另一个是reply_to,它表示消息回复队列的名称。 Server从消息发送队列中检索消息,处理相应的业务逻辑,并在处理完成后将处理结果发送到reply_to指定的回调队列。 客户端可以通过从回调队列读取消息来了解消息的执行情况。 这其实最适合异步调用的处理。

2 .实践接下来通过具体的例子来看看这个是怎么玩的。

2.1客户端开发首先,创建一个名为producer的Spring Boot项目,它是消息生产者。 在创建时添加web和rabbitmq的依赖关系。 下图:

成功创建项目后,首先在application.properties中配置RabbitMQ的基本信息,如下所示:

spring.rabbit MQ.host=localhost spring.rabbit MQ.port=5672 spring.rabbit MQ.username=guest spring.rabit MQ.pasport 后两行:首先是配置消息的确认方式。 通过correlated确认。 只需选中此配置,将来的消息就会带有correlated。最后一行的配置是打开发送失败的退货。

接下来,提供以下配置类:

/** * @author江南一点雨* @微信公众号江南一点雨* @网站http://www.itboyhub.com * @国际站http://www.javaboy.org * @微信a _ Java _ boy * @ @ configurationpublicclassrabbitconfig { publicstaticfinalstringrpc _ queue1=' queue _ publicstaticfinalstringrpc _ queue publicstaticfinalstringrpc _ exchange=' RPC _ exchange '; /** *发送消息设置RPC队列*/@ beanqueuemsgqueue ({ return new queue (RPC _ queue1); } /** *设置返回队列*/@Bean Queue replyQueue () returnnewqueue(RPC_queue2); } /** *开关*/@Bean TopicExchange exchange (() returnnewtopicexchange ) ) RPC_exchange ); } /** *请求队列和交换机绑定*/@ bean绑定msg绑定(() returnbindingbuilder.bind (msg queue ) ).to (exchange ) ) (} /** *队列和交换机绑定(返回*/@Bean Bi

nding replyBinding() { return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2); } /** * 使用 RabbitTemplate发送和接收消息 * 并设置回调队列地址 */ @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setReplyAddress(RPC_QUEUE2); template.setReplyTimeout(6000); return template; } /** * 给返回队列设置监听器 */ @Bean SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(RPC_QUEUE2); container.setMessageListener(rabbitTemplate(connectionFactory)); return container; }}

这个配置类中我们分别配置了消息发送队列 msgQueue 和消息返回队列 replyQueue,然后将这两个队列和消息交换机进行绑定。这个都是 RabbitMQ 的常规操作,没啥好说的。

在 Spring Boot 中我们负责消息发送的工具是 RabbitTemplate,默认情况下,系统自动提供了该工具,但是这里我们需要对该工具重新进行定制,主要是添加消息发送的返回队列,最后我们还需要给返回队列设置一个监听器。

好啦,接下来我们就可以开始具体的消息发送了:

/** * @author 江南一点雨 * @微信公众号 江南一点雨 * @网站 http://www.itboyhub.com * @国际站 http://www.javaboy.org * @微信 a_java_boy * @GitHub https://github.com/lenve * @Gitee https://gitee.com/lenve */@RestControllerpublic class RpcClientController { private static final Logger logger = LoggerFactory.getLogger(RpcClientController.class); @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { // 创建消息对象 Message newMessage = MessageBuilder.withBody(message.getBytes()).build(); logger.info("client send:{}", newMessage); //客户端发送消息 Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage); String response = ""; if (result != null) { // 获取已发送的消息的 correlationId String correlationId = newMessage.getMessageProperties().getCorrelationId(); logger.info("correlationId:{}", correlationId); // 获取响应头信息 HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders(); // 获取 server 返回的消息 id String msgId = (String) headers.get("spring_returned_message_correlation"); if (msgId.equals(correlationId)) { response = new String(result.getBody()); logger.info("client receive:{}", response); } } return response; }}

这块的代码其实也都是一些常规代码,我挑几个关键的节点说下:

消息发送调用 sendAndReceive 方法,该方法自带返回值,返回值就是服务端返回的消息。服务端返回的消息中,头信息中包含了 spring_returned_message_correlation 字段,这个就是消息发送时候的 correlation_id,通过消息发送时候的 correlation_id 以及返回消息头中的 spring_returned_message_correlation 字段值,我们就可以将返回的消息内容和发送的消息绑定到一起,确认出这个返回的内容就是针对这个发送的消息的。

这就是整个客户端的开发,其实最最核心的就是 sendAndReceive 方法的调用。调用虽然简单,但是准备工作还是要做足够。例如如果我们没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id,这样就无法将返回的消息内容和发送的消息内容关联起来。

2.2 服务端开发

再来看看服务端的开发。

首先创建一个名为 consumer 的 Spring Boot 项目,创建项目添加的依赖和客户端开发创建的依赖是一致的,不再赘述。

然后配置 application.properties 配置文件,该文件的配置也和客户端中的配置一致,不再赘述。

接下来提供一个 RabbitMQ 的配置类,这个配置类就比较简单,单纯的配置一下消息队列并将之和消息交换机绑定起来,如下:

/** * @author 江南一点雨 * @微信公众号 江南一点雨 * @网站 http://www.itboyhub.com * @国际站 http://www.javaboy.org * @微信 a_java_boy * @GitHub https://github.com/lenve * @Gitee https://gitee.com/lenve */@Configurationpublic class RabbitConfig { public static final String RPC_QUEUE1 = "queue_1"; public static final String RPC_QUEUE2 = "queue_2"; public static final String RPC_EXCHANGE = "rpc_exchange"; /** * 配置消息发送队列 */ @Bean Queue msgQueue() { return new Queue(RPC_QUEUE1); } /** * 设置返回队列 */ @Bean Queue replyQueue() { return new Queue(RPC_QUEUE2); } /** * 设置交换机 */ @Bean TopicExchange exchange() { return new TopicExchange(RPC_EXCHANGE); } /** * 请求队列和交换器绑定 */ @Bean Binding msgBinding() { return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1); } /** * 返回队列和交换器绑定 */ @Bean Binding replyBinding() { return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2); }}

最后我们再来看下消息的消费:

@Componentpublic class RpcServerController { private static final Logger logger = LoggerFactory.getLogger(RpcServerController.class); @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = RabbitConfig.RPC_QUEUE1) public void process(Message msg) { logger.info("server receive : {}",msg.toString()); Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build(); CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId()); rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData); }}

这里的逻辑就比较简单了:

服务端首先收到消息并打印出来。服务端提取出原消息中的 correlation_id。服务端调用 sendAndReceive 方法,将消息发送给 RPC_QUEUE2 队列,同时带上 correlation_id 参数。

服务端的消息发出后,客户端将收到服务端返回的结果。

OK,大功告成。

2.3 测试

接下来我们进行一个简单测试。

首先启动 RabbitMQ。

接下来分别启动 producer 和 consumer,然后在 postman 中调用 producer 的接口进行测试,如下:

可以看到,已经收到了服务端的返回信息。

来看看 producer 的运行日志:

可以看到,消息发送出去后,同时也收到了 consumer 返回的信息。

可以看到,consumer 也收到了客户端发来的消息。

3. 小结

好啦,一个小小的案例,带小伙伴们体验一把 RabbitMQ 实现 RPC 调用。

公众号江南一点雨后台回复 mq_rpc 可以获取本文案例哦~感兴趣的小伙伴可以试试~

java rpc框架,thrift rpc框架 eth改算法,java实现rpc调用