消息队列技术,它是分布式应用间交换信息的一种技术。
关于消息队列的一些理解,不明白我们为什么要在自己的系统加入消息队列?
对于一个业务比较小的系统,以前的做法是直接通过一个项目集成了所有业务,比如是一个商城系统,登陆,注册,购物车,支付功能等等,一个系统把所有的事情做完了,但是随着业务的不断扩张,单纯的单一系统,已经不能再适应这个商城系统,我们很多时候回对整个系统进行解耦,进行分布式的开发,把一些功能细化出去,形成各种各样的子系统,例如登陆的模块分离出去,变成了独立的授权验证服务器子系统,支付功能形成一个独立的子系统,那么问题来了,由于两个不同的服务器之间,两者是如何进行消息的同步的呢。,那么我们首先可能会想到诸如resetful api或者socket来实现,其实答案是可以的,不过假如是socket实现的,那么对于我们开发者来说,需要我们解决的问题有很多:
1、我们需要具备网络的底层知识,以及使用相应的客户端语言来实现收发信息,而且还需要定制一套相应的通信协议。
2、我们如何做到消息传递的准确性以及数据已经完整的接收.
3、解决网络中断消息传递的处理问题。
4、在多个系统之间,哪个子系统最先会受到信息.
5、对于异构系统间的数据交换,如何解决.
6、如何处理应用程序阻塞.
7、负载均衡的问题
8、子系统通信之间的地址管理
其实,我们要解决的问题远远不止上面这些,但是我们使用了消息队列的中间件之后,可以轻松的把问题解决掉。总的来说,使用消息队列,可以解决我们开发者在解决分布式架构之间的传递问题。
对于我们开发来者来说,我们一般会采用一些开源的MQ协议来接入我们的系统,下面我简单介绍一些消息队列中间件。
目前比较热门的三种消息队列
ActiveMQ
ActiveMQ 由Apache维护的比较优秀的开源协议。它是可以支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,是支持Apache2.0协议,对于spring项目,我们可以很轻易的嵌入进去。
支持的客户端语言:
Java, C, C++, C#, Ruby, Perl, Python, PHP。
支持的应用协议:
OpenWire、Stomp、XMPP、AMQP、MQTT等等。
对于XMPP,我在前一些文章介绍过,它是可以用来我们的即时通讯的协议,以XML格式传递。而MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议。该协议支持所有平台,目前可以用于物联网的通讯中,前景非常可观.
官方网站:
http://activemq.apache.org/
RabbitMQ
是用Erlang编写的一款基于AMQP协议的消息队列中间件,在开源项目关注度比较高。
支持的客户端语言:
Python、Ruby、.NET、Java、Node.js、C、PHP、Go、Erlang。
支持的协议XMPP、STOMP等
官方网站:
http://www.rabbitmq.com/devtools.html
RocketMQ
它是由阿里维护的一个开源消息队列,它在阿里各个业务线得到了广泛的应用,业务场景如下:
Mysql binlog 、同步订单类应用、流计算IM 等实时消息领域、Cache 同步、削峰填谷,目前阿里云的消息中间件也是由它演变而来的
支持的客户端语言:
Java 、C++
支持的协议:
JMS,MQTT
官方网站:
https://github.com/alibaba/rocketmq?spm=5176.doc29532.2.1.I7qrLz
当然消息的中间件,不止上面这些,比如有关注比较高的Jafka/Kafka、open mq,还有一些商业的消息中间件。这里就不一一介绍了。
大概介绍了MQ的一些知识,我们如何在我们的Spring mvc项目中加入消息队列?
上面的三种消息队列中间件,大家可以根据自己的业务需要,使用一个适合自己的中间件,我这里主要讲解一些如何在spring MVC加入RabbitMQ。
RabbitMQ的使用过程:
大概如下
(1)client连接到消息队列服务器。
(2)声明一个exchange。
(3)声明一个queue。
(4)设置好exchange和queue之间的关系。
(5)客户端投递消息的监听。
RabbitMQ有三种类型的Exchange:direct, topic 和fanout。
direct:相当于点对点的,下图所示
topic:它是将路由键和某模式进行匹配,只有符合规则才会接收到
fanout就是广播模式。
首先,我们知道消息中间件,它是相当于一台服务器程序那样,负责我们客户端的消息的传递,所以我们是要安装消息中间件程序的,而不是简单配置到我们的spring就可以完成事情的,我们spring 配置的只是客户端,这个可能很多人在刚刚接触的时候会有点模糊。
那么我们是如何安装RabbitMQ的
我们刚才介绍过RabbitMQ是有Erlang语音编写的,那么运行RabbitMQ是需要配置一下erlang的环境的。具体各个操作系统的安装方法不一样,我这里不做详细的解释,大家可以自行研究。
安装RabbitMQ
安装环境:Mac +rabbitmq
对于mac 系统,安装Rabbitmq非常简单,可以通过brew工具来安装,当然这个是自行安装了。直接输入如下指令就可以了。
brew install rabbitmq
过了一段时间后,安装完毕。
启动RabbitMq服务器
sudo /usr/local/sbin/rabbitmq-server
这个时候已经成功启动服务器了。我们可以输入http://localhost:15672/#/
看看我们的服务器上面的一些信息,账号和密码为:guest和guest
那么对于客户端的连接,我们可以使用127.0.0.1,端口5672,账号和密码guest
那么我们的spring mvc 项目是如何配置rabbitmq的。
maven管理
我这里是通过maven来关系项目,可以在pom.xml加入spring-rabbit和amqp-client,代码如下:
<!-- http://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.0.RELEASE</version>
</dependency>
<!-- rabbit -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.4</version>
</dependency>
下载好后,我们需要了解连接rabbit的一些步骤,
1、配置连接配置
2、声明template,可以通过注解中使用rabbit来收发信息
3、消息对象json转换类。
4、Queue队列的声明。
5、交换机的定义
配置信息
在这里我编写一个spring管理rabbitmq的xml,我这里只是以topic为例子,其他的几种比如是
direct和
这里我命名为:spring-amqp.xml,具体说明在xml文件里
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- rabbit服务器连接的一些配置 -->
<rabbit:connection-factoryid="connectionFactory"
host="127.0.0.1"
port="5672"
username="guest"
password="guest" />
<!-- rabbit服务器连接的一些配置 -->
<rabbit:adminconnection-factory="connectionFactory"id="amqpAdmin"/>
<!-- rabbit模板的配置 -->
<rabbit:templateid="amqpTemplate"connection-factory="connectionFactory"channel-transacted="true"
message-converter="jsonMessageConverter"/>
<!--json消息转换-->
<beanid="jsonMessageConverter"class="org.springframework.amqp.support.converter.JsonMessageConverter">
<propertyname="classMapper">
<beanclass="org.springframework.amqp.support.converter.DefaultClassMapper">
</bean>
</property>
</bean>
<!--消息队列,id和name可以是名字一致的-->
<rabbit:queueid="com.yeehot.topic.queue"name="com.yeehot.topic.queue">
<rabbit:queue-arguments>
<entrykey="x-ha-policy"value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 声明Exchange的类型为topic并设定Exchange的名称 -->
<rabbit:topic-exchangename="yeehot.topic">
<rabbit:bindings>
<!-- 这里的queue是<rabbit:queue 里的ID -->
<rabbit:bindingpattern="com.yeehot.topic"queue="com.yeehot.topic.queue"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--消息的监听,如果有多个子系统的话,可以在子系统中定义,我这里只是同时让他把消息发送和接收放在同一个系统而已。-->
<rabbit:listener-containerconnection-factory="connectionFactory"message-converter="jsonMessageConverter"
channel-transacted="true"concurrency="10"
auto-startup="true">
<!-- queues是我们上面的消息的队列,这句话的意思就是监听上面名字的队列 -->
<rabbit:listenerqueues="com.yeehot.topic.queue"ref="testReceiveMessageListener"method="handleMessage"/>
</rabbit:listener-container>
</beans>
编写信息发送器的代码:
@Controller
public class MessageQueeController {
privatestaticfinal String EXCHANGE = "yeehot.topic";
@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;
@RequestMapping(value="/sendmsg", method=RequestMethod.GET, produces = "text/html;charset=UTF-8")
@ResponseBody
public String showInfo( String msg){
System.out.println("message="+msg);
amqpTemplate.convertAndSend(EXCHANGE, "com.yeehot.topic",msg);
return msg;
}
}
编写接收信息的代码:
@Component
public class TestReceiveMessageListener {
public void handleMessage(String result) {
if(result==null)
{
System.out.println("oh ,no");
return;
}
System.out.println("receive msg="+result);
//处理逻辑
}
}
运行:
我们在浏览器输入我们的项目地址
http://192.168.3.114:8080/Yeehot-Program-King/sendmsg?msg=ming,可以发现控制台会打印出如下信息,说明我们的消息监听已经成功了。
今天的消息队列就说到这里,更多关于技术的知识,订阅我的头条号:一点热。欢迎收藏与转发
本文暂时没有评论,来添加一个吧(●'◡'●)