ActiveMQ安装
下载
ActiveMQ官网: http://activemq.apache.org
找到最新的稳定版,下载并解压到一个全英文路径
启动
进入bin目录,在控制台执行命令
activemq-admin.bat start
打开浏览器
http://127.0.0.1:8161
默认账号: admin/admin
JMS 的两种模式
- 在点对点的消息传递时,目的地称为 队列 queue
- 在发布订阅消息传递中,目的地称为 主题 topic
SpringBoot后端
application.yaml配置
#=============== activemq配置 =======================
activemq:
broker-url: failover:(tcp://192.168.1.99:61616)?initialReconnectDelay=1000&startupMaxReconnectAttempts=2
user: admin
password: admin
# 在考虑结束之前等待的时间
close-timeout: 15s
# 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
in-memory: true
# 是否在回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
non-blocking-redelivery: false
# 等待消息发送响应的时间。设置为0等待永远。
send-timeout: 0s
# 消息队列名称
pool:
enabled: true
max-connections: 5
idle-timeout: 60s
#=============== jsm消费配置 =======================
jms:
listener:
# 默认开启多少个消费者
concurrency: 3
#在这里消费者是可以随动的。最大配置消费者并行最大数量
max-concurrency: 3
自定义配置类
@Configuration
@EnableConfigurationProperties({ActiveMQProperties.class, JmsProperties.class})
public class ActiveMqConfiguration {
/**
* Jms message template jms messaging template.
*
* @param jmsConnectionFactory the jms connection factory
* @return the jms messaging template
*/
@Bean
public JmsMessagingTemplate jmsMessageTemplate(ConnectionFactory jmsConnectionFactory) {
return new JmsMessagingTemplate(jmsConnectionFactory);
}
/**
* 队列模式 - 连接工厂
*
* @param connectionFactory the connection factory
* @return the jms listener container factory
*/
@Bean({"queueListener"})
public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// false 代表队列模式
factory.setPubSubDomain(false);
return factory;
}
/**
* 订阅模式 - 连接工厂
*
* @param connectionFactory the connection factory
* @return the jms listener container factory
*/
public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// true 代表topic模式
factory.setPubSubDomain(true);
return factory;
}
}
创建消息通道
@Slf4j
@Configuration
public class MyActiveMqConfig {
/**
* 告警消息接收, 消息类型全部为 {@link javax.jms.TextMessage}
*
* @param msg the msg
*/
@JmsListener(
destination = "warm-queue" ,
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receiveAlarmMsg(String msg) {
log.info("Received alarm mq msg : {}", msg); }
/**
* Receive business msg.
*
* @param msg the msg
*/
@JmsListener(
destination = "buz-queue",
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receiveBusinessMsg(String msg) {
log.info("Received business mq msg : {}" , msg); }
/**
* Receive resource msg.
*
* @param msg the msg
*/
@JmsListener(
destination = "res-queue",
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receiveResourceMsg(String msg) {
log.info("Received resource mq msg : {}", msg);
}
/**
* 模拟接收 push msg
*
* @param msg the push msg
*/
@JmsListener(
destination = "push-org-1",
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receivePushMsg(String msg) {
log.info("Received push msg : {}", msg);
}
/**
* 告警队列
*
* @return the queue
*/
@Bean
public Queue alarmQueue() {
return new ActiveMQQueue("warm-queue" + messageProperties.getEnv());
}
/**
* 数据队列
*
* @return the queue
*/
@Bean
public Queue businessQueue() {
return new ActiveMQQueue("buz-queue" + messageProperties.getEnv());
}
/**
* 资源数据队列
*
* @return the queue
*/
@Bean
public Queue resourceQueue() {
return new ActiveMQQueue("res-queue" + messageProperties.getEnv());
}
/**
* 模拟测试前端订阅的队列
*
* @return the queue
*/
@Bean
public Topic pushTopic() {
return new ActiveMQTopic("topic-push");
}
}
后端发送消息
/**
* 发送消息
* @param message */
private static void sendMessage(String message) {
JmsMessagingTemplate jmt = SpringUtils.getBean(JmsMessagingTemplate.class);
log.info("向前端推送消息: {}", message);
jmt.convertAndSend("topic-push", message);
}
前端接收消息
下载stomp.js
https://svn.apache.org/repos/asf/activemq/trunk/activemq-web-demo/src/main/webapp/websocket/stomp.js
初始化
var client = Stomp.client(url);
client.connect(username, password, onconnect);
Onconnect是回调方法,在里面做订阅
订阅
client.subscribe(destination, function(message) {
//消息内容主体是message.body
});
如:
var onconnect = function(frame) {
client.msg("connected to Stomp");
client.subscribe(destination, function(message) {
client.msg(message.body);
});
client.msg("subscribe to " + destination);
};
完整示例
<script type="text/javascript" src="jquery-1.4.2.min.js"></script>
<script type="text/javascript" src="stomp.js"></script>
<script type="text/javascript">
var url = 'ws://192.168.1.99:61614/stomp/’;
var username= 'admin';
var password= admin;
var destination = 'topic-push';
$(function(){
$('#connect_btn').click(function() {
var client = Stomp.client(url);
client.msg = function(str) {
var t = document.createTextNode(str);
var p = document.createElement("p");
p.appendChild(t);
$("#messages").append(p);
};
var onconnect = function(frame) {
client.msg("connected to Stomp");
client.subscribe(destination, function(message) {
client.msg(message.body);
});
client.msg("subscribe to " + destination);
};
client.connect(username, password, onconnect);
});
$('#send_btn').click(function() {
var text = $('#send_form_input').val();
if (text) {
client.send(destination, {foo: 1}, text);
$('#send_form_input').val("");
}
return false;
});
});
</script>
<div id='connect'>
<dl>
<dd><input type="button" id='connect_btn' value="Connect"></dd>
<dd><input type="button" id='send_btn' value="send"></dd>
</dl>
<input id='send_form_input' placeholder="message to send />
</div>
<div id="messages">
</div>
安装包里示例
在exmaples/stomp/websocket目录
本文暂时没有评论,来添加一个吧(●'◡'●)