编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

WebSocket订阅ActiveMQ消息(websocket 消息确认)

wxchong 2024-08-21 03:07:54 开源技术 7 ℃ 0 评论


ActiveMQ安装

下载

ActiveMQ官网: http://activemq.apache.org

找到最新的稳定版,下载并解压到一个全英文路径

启动

进入bin目录,在控制台执行命令

activemq-admin.bat start

打开浏览器

http://127.0.0.1:8161

默认账号: admin/admin

JMS 的两种模式

  • 在点对点的消息传递时,目的地称为 队列 queue
  • 在发布订阅消息传递中,目的地称为 主题 topic

SpringBoot后端

application.yaml配置

#=============== activemq配置 =======================
activemq:
broker-url: failover:(tcp://192.168.1.99:61616)?initialReconnectDelay=1000&startupMaxReconnectAttempts=2
user: admin
password: admin
# 在考虑结束之前等待的时间
close-timeout: 15s
# 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
in-memory: true
# 是否在回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
non-blocking-redelivery: false
# 等待消息发送响应的时间。设置为0等待永远。
send-timeout: 0s
# 消息队列名称
pool:
enabled: true
max-connections
: 5
idle-timeout: 60s

#=============== jsm消费配置 =======================
jms:
listener:
# 默认开启多少个消费者
concurrency: 3
#在这里消费者是可以随动的。最大配置消费者并行最大数量
max-concurrency: 3

自定义配置类


@Configuration
@EnableConfigurationProperties
({ActiveMQProperties.class, JmsProperties.class})
public class ActiveMqConfiguration {
/**
* Jms message template jms messaging template.
*
*
@param jmsConnectionFactory the jms connection factory
*
@return the jms messaging template
*/
@Bean
public JmsMessagingTemplate jmsMessageTemplate(ConnectionFactory jmsConnectionFactory) {
return new JmsMessagingTemplate(jmsConnectionFactory);
}
/**
* 队列模式 - 连接工厂
*
*
@param connectionFactory the connection factory
*
@return the jms listener container factory
*/
@Bean({"queueListener"})
public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// false 代表队列模式
factory.setPubSubDomain(false);
return
factory;
}
/**
* 订阅模式 - 连接工厂
*
*
@param connectionFactory the connection factory
*
@return the jms listener container factory
*/
public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// true 代表topic模式
factory.setPubSubDomain(true);
return
factory;
}
}

创建消息通道

@Slf4j
@Configuration

public class MyActiveMqConfig {

/**
* 告警消息接收, 消息类型全部为 {
@link javax.jms.TextMessage}
*
*
@param msg the msg
*/
@JmsListener(
destination = "warm-queue" ,
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receiveAlarmMsg(String msg) {
log.info("Received alarm mq msg : {}", msg); }
/**
* Receive business msg.
*
*
@param msg the msg
*/
@JmsListener(
destination = "buz-queue",
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receiveBusinessMsg(String msg) {
log.info("Received business mq msg : {}" , msg); }
/**
* Receive resource msg.
*
*
@param msg the msg
*/
@JmsListener(
destination = "res-queue",
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receiveResourceMsg(String msg) {
log.info("Received resource mq msg : {}", msg);
}
/**
* 模拟接收 push msg
*
*
@param msg the push msg
*/
@JmsListener(
destination = "push-org-1",
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receivePushMsg(String msg) {
log.info("Received push msg : {}", msg);
}
/**
* 告警队列
*
*
@return the queue
*/
@Bean
public Queue alarmQueue() {
return new ActiveMQQueue("warm-queue" + messageProperties.getEnv());
}
/**
* 数据队列
*
*
@return the queue
*/
@Bean
public Queue businessQueue() {
return new ActiveMQQueue("buz-queue" + messageProperties.getEnv());
}
/**
* 资源数据队列
*
*
@return the queue
*/
@Bean
public Queue resourceQueue() {
return new ActiveMQQueue("res-queue" + messageProperties.getEnv());
}
/**
* 模拟测试前端订阅的队列
*
*
@return the queue
*/
@Bean
public Topic pushTopic() {
return new ActiveMQTopic("topic-push");
}

}

后端发送消息

/**
* 发送消息
*
@param message */
private static void sendMessage(String message) {
JmsMessagingTemplate jmt = SpringUtils.
getBean(JmsMessagingTemplate.class);
log.info("向前端推送消息: {}", message);
jmt.convertAndSend("topic-push", message);
}

前端接收消息

下载stomp.js

https://svn.apache.org/repos/asf/activemq/trunk/activemq-web-demo/src/main/webapp/websocket/stomp.js

初始化

var client = Stomp.client(url);

client.connect(username, password, onconnect);

Onconnect是回调方法,在里面做订阅

订阅

client.subscribe(destination, function(message) {

//消息内容主体是message.body

});

如:

var onconnect = function(frame) {

client.msg("connected to Stomp");

client.subscribe(destination, function(message) {

client.msg(message.body);

});

client.msg("subscribe to " + destination);

};

完整示例

<script type="text/javascript" src="jquery-1.4.2.min.js"></script>

<script type="text/javascript" src="stomp.js"></script>

<script type="text/javascript">

var url = 'ws://192.168.1.99:61614/stomp/’;

var username= 'admin';

var password= admin;

var destination = 'topic-push';

$(function(){

$('#connect_btn').click(function() {

var client = Stomp.client(url);

client.msg = function(str) {

var t = document.createTextNode(str);

var p = document.createElement("p");

p.appendChild(t);

$("#messages").append(p);

};


var onconnect = function(frame) {

client.msg("connected to Stomp");

client.subscribe(destination, function(message) {

client.msg(message.body);

});

client.msg("subscribe to " + destination);

};


client.connect(username, password, onconnect);

});

$('#send_btn').click(function() {

var text = $('#send_form_input').val();

if (text) {

client.send(destination, {foo: 1}, text);

$('#send_form_input').val("");

}

return false;

});

});

</script>


<div id='connect'>

<dl>

<dd><input type="button" id='connect_btn' value="Connect"></dd>

<dd><input type="button" id='send_btn' value="send"></dd>

</dl>

<input id='send_form_input' placeholder="message to send />

</div>

<div id="messages">

</div>

安装包里示例

在exmaples/stomp/websocket目录

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表