博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消息中间件--ActiveMQ&JMS消息服务
阅读量:4562 次
发布时间:2019-06-08

本文共 11230 字,大约阅读时间需要 37 分钟。

### 消息中间件 ###

----------

**消息中间件**

1. 消息中间件的概述
2. 消息中间件的应用场景
  * 异步处理
  * 应用解耦
  * 流量削峰
  * 消息通信
 
----------

### JMS消息服务 ###

----------

**JMS的概述**

1. JMS消息服务的概述
2. JMS消息模型
  * P2P模式
  * Pub/Sub模式
 
3. 消息消费的方式
  * 同步的方式---手动
  * 异步的方式---listener监听
 
4. JMS编程模型
----------

### 消息中间件:ActiveMQ ###

----------

**ActiveMQ的下载与安装**

1. ActiveMQ的下载与安装
  * 下载ActiveMQ的压缩文件,解压apache-activemq-5.14.5-bin.zip文件
  * 双击运行:activemq.bat文件,启动服务
 
2. 测试ActiveMQ是否安装成功
  * 打开浏览器,输入:http://localhost:8161
 
3. 点击Manage ActiveMQ broker连接,可以查看ActiveMQ中已经发布的消息等
  * 用户名密码都是:admin
----------

**ActiveMQ的消息队列方式入门**(P2P模式)

1. 在父工程的pom.xml文件中引入ActiveMQ和Spring整合JMS的坐标依赖
org.apache.activemq
activemq-all
5.2.0
org.springframework
spring-jms
4.2.4.RELEASE
org.apache.xbean
xbean-spring
3.7
 
2. ActiveMQ的向消息队列中发送消息的入门程序(没有使用Spring整合JMS的方式)
@Testpublic void sendQueueMessage() throws JMSException {// 1 创建连接工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); // 2 使用工厂,创建连接Connection connection = factory.createConnection(); // 3 启动连接connection.start(); // 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 5 创建队列队形(myQueue--队列的名字)/topic-----------session创建Queue queue = session.createQueue("myQueue");// 6 创建生产者-----------session创建MessageProducer producer = session.createProducer(queue);// 7 创建消息----文本消息-------session创建TextMessage message = session.createTextMessage();message.setText("helloworld!!!"); // 8 发送消息producer.send(message); // 9 提交事务session.commit();session.close();connection.close();}
3. ActiveMQ从消息队列中获取消息
@Testpublic void receiverQueueMessage() throws JMSException {// 1 创建连接工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();// 2 使用工厂,创建连接Connection connection = factory.createConnection();// 3 启动连接connection.start();// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);// 5 创建队列队形(hello--队列的名字)/topic-----------session创建Queue queue = session.createQueue("myQueue");// 6 创建消费者-----------session创建MessageConsumer consumer = session.createConsumer(queue); // 7 接收消息----text格式TextMessage receive = (TextMessage) consumer.receive();String text = receive.getText();System.out.println("接收到的消息====" + text); // 8 提交事务session.commit();session.close();connection.close(); }
4. 使用监听器的方式,从队列中消费消息
/***异步方式Queue接受用Listener方式接受,多用如果有多个监听listener,则交替执行* @throws Exception*/@Testpublic void receiverQueueListener() throws Exception{// 1 创建连接工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();// 2 使用工厂,创建连接Connection connection = factory.createConnection();// 3 启动连接connection.start();// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务//死循环的不能用事物Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5 创建队列队形(hello--队列的名字)/topic-----------session创建Queue queue = session.createQueue("myQueue");// 6 创建消费者-----------session创建MessageConsumer consumer = session.createConsumer(queue); //7 // 给消费者添加监听器consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message msg) {TextMessage message = (TextMessage) msg;try {System.out.println("Listener1111111111接收到的消息是=="+message.getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}); while(true){}// 使用监听器的方式不能关闭,需要监听器一直工作// session.commit();// session.close();// connection.close();}

 

**ActiveMQ的消息订阅方式入门**(Pub/Sub模式

/*** Topic发送* @throws JMSException*/@Testpublic void sendTopicMessage() throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();Connection connection = factory.createConnection();connection.start();Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建消息订阅Topic topic = session.createTopic("myTopic");// 创建生产者MessageProducer producer = session.createProducer(topic);// 创建消息,一组可以存储key value的消息MapMessage message = session.createMapMessage();message.setString("username", "cgx");message.setString("password", "123456");// 发送消息producer.send(message);// 提交事务session.commit();session.close();connection.close();}
/*** Topic接受** @throws JMSException*/@Testpublic void testReceiverMessage() throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();Connection connection = factory.createConnection();connection.start();Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建消息订阅Topic topic = session.createTopic("myTopic");// 创建消费者MessageConsumer consumer = session.createConsumer(topic);// 接收消息MapMessage message = (MapMessage) consumer.receive();System.out.println(message.getString("username"));System.out.println(message.getString("password")); session.commit();session.close();connection.close();}
/*** Topic接受Listener监听方式** @throws Exception*/@Testpublic void receiverQueueListener() throws Exception {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();Connection connection = factory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建消息订阅Topic topic = session.createTopic("myTopic");// 创建消费者MessageConsumer consumer = session.createConsumer(topic); // 给消费者添加监听器consumer添加监听consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message msg) {MapMessage message = (MapMessage) msg;try {System.out.println(message.getString("username"));System.out.println(message.getString("password"));} catch (JMSException e) {e.printStackTrace();}}}); while (true) { } }

 

### Spring整合ActiveMQ ###★★★★★

----------
 
**Spring整合ActiveMQ**★★★★★
 
1. 创建applicationContext-mq.xml的配置文件,导入约束★★★★★

 

2. 具体的配置如下★★★★★
applicationContext-mq.xml===================mq的消息发送(消息生产者)

 

 
3. 发送消息的代码如下★★★★★
  3.1.Queue方式:★★★★★
@Autowired@Qualifier(value="jmsQueueTemplate")private JmsTemplate queueTemplate;//Queue  /*** Queue发送消息---spring框架*/@Testpublic void sendQueueMessage() {// 发送消息 构造参数指定目标,因为配置文件中的队列和订阅模式是通过id与false和true进行区分queueTemplate.send("myQueue", new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {// 使用session创建消息,发送TextMessage textMessage = session.createTextMessage("测试结合spring框架发送queue消息");return textMessage;}});}
  3.2.Topic方式:★★★★★
@Autowired@Qualifier(value = "jmsTopicTemplate")private JmsTemplate topicTemplate;//Topic /*** Topic发送消息---spring框架*/@Testpublic void sendTopicMessage() {topicTemplate.send("spring_topic", new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {MapMessage mapMessage = session.createMapMessage();mapMessage.setString("username", "mdzz");return mapMessage;}});}

 

4. 接收消息的代码如下==========不提倡手动,要用监听器异步获取
/*** Queue接收消息---spring框架* 同步手动:不提倡* receive("myQueue")要写目标,不写目标的话会报找不到目标的错误NO defaultDestination*/@Testpublic void receiverMessage() {//接收消息textMessage类型TextMessage textMessage = (TextMessage) queueTemplate.receive("myQueue"); try {System.out.println(textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}
 
**Spring配置监听器**★★★★★★★★★★★★★★★
 
1. 自定义监听器代码的编写----接收消息---spring框架---实现MessageListener接口★★★★★
  1.1.Queue:★★★★★
@Component(value="queueConsumer1")public class QueueListener implements MessageListener { @Overridepublic void onMessage(Message arg0) {// 把arg0强转TextMessage textMessage = (TextMessage) arg0;try {// 输出消息System.out.println(textMessage.getText());} catch (JMSException e) {e.printStackTrace();}} }
 
  1.2.Topic:发送一个,两个都会接受★★★★★topic特点:有几个监听几个都会同时收到
@Componentpublic class TopicConsumer1 implements MessageListener { @Overridepublic void onMessage(Message arg0) {MapMessage mapMessage = (MapMessage) arg0;try {System.out.println("TopicConsumer1===="+mapMessage.getString("username"));} catch (JMSException e) {e.printStackTrace();}} } @Componentpublic class TopicConsumer2 implements MessageListener {    //...}
 
2. 编写配置文件
applicationContext-mq-consumer.xml=============mq的消息接受(负责监听接受消息)
 
3.不用启动项目,把spring配置文件applicationContext-mq-consumer.xml启动起来,可以用采用下面方法
新建一个test类,让他一直启动着,这样就一直加载spring的配置文件
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("classpath:applicationContext-mq-consumer.xml")public class SpringQueueListenerTest { @Testpublic void test(){while(true);}}

 

4.只要发送端(发送消息---spring框架)一启动,监听器就会监听到,就会输出:测试结合spring框架发送queue消息★★★★★

spring整合总结:

消息发送

  1. 创建spring容器
  2. 从容器中获取JMSTemplate对象,发送消息
  3. 定义Destination
  4. 使用JMSTemplate对象发送消息
消息接受
  1. 创建一个类实现MessageListener 接口。业务处理在此类中实现。
  2.在spring容器中配置DefaultMessageListenerContainer对象,引用MessageListener 实现类对象接收消息。

项目整合ActiveMQ:

1. 消息生产者整合ActiveMQ

  消息生产者只需要发送消息
  需要把JMSTemplate和Destination交给spring进行管理

 部分代码:
/**===========================activeMQ消息发送========================================*/// 发送消息!!!this.send("save", item.getId());}@Autowiredprivate JmsTemplate jmsTemplate;@Autowiredprivate Destination destination;/*** 此方法就是用来发送消息的* 考虑:1、发送什么数据?2、我需要什么数据?* 在消息中需要:1、消息的标识:save,delete,update;2、商品的ID*/private void send(final String type, final Long itemId) {// TODO Auto-generated method stubjmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {//创建消息体TextMessage textMessage = new ActiveMQTextMessage();//设置消息内容Map
map = new HashMap<>();map.put("type", type);map.put("itemId", itemId);try {ObjectMapper mapper = new ObjectMapper();textMessage.setText(mapper.writeValueAsString(map));} catch (Exception e) {e.printStackTrace();}return textMessage;}});}

 

2. 消息消费改造
  在search-service添加
  ItemMessageListener:

/**===========================activeMQ消息发送========================================*/@Autowiredprivate SearchService searchService;@Overridepublic void onMessage(Message message) {//先判断此消息类型是否是TextMessageif(message instanceof TextMessage){//如果是,强转TextMessage textMessage = (TextMessage)message;try {//获取消息:jsonString json = textMessage.getText();//杰克逊第三作用:直接解析json数据ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(json);String type = jsonNode.get("type").asText();Long itemId = jsonNode.get("itemId").asLong();//根据解析出来的type,判断此type=save的时候我应该调用indexSearch方法if("save".equals(type)){searchService.indexItem(itemId);}} catch (Exception e) {e.printStackTrace();}}}

索引库增加商品会触发mq:

SearchServiceImpl:

@Overridepublic void indexItem(Long itemId) throws Exception {Item item = this.itemMapper.selectByPrimaryKey(itemId);SolrInputDocument doc = new SolrInputDocument();doc.addField("id", item.getId());doc.addField("item_title", item.getTitle());doc.addField("item_image", item.getImage());doc.addField("item_cid", item.getCid());doc.addField("item_price", item.getPrice());doc.addField("item_status", item.getStatus());this.cloudSolrServer.add(doc);this.cloudSolrServer.commit();}

 

 

转载于:https://www.cnblogs.com/soul-wonder/p/8910220.html

你可能感兴趣的文章
《Python数据科学手册》第五章机器学习的笔记
查看>>
ubuntu16.04 配置爬虫环境
查看>>
Centos7,PHP7安装swoole
查看>>
02_ListActive中响应事件 并LogCat输出
查看>>
doubleclick adx note
查看>>
Celery框架
查看>>
[c#]asp.net开发微信公众平台(4)关注事件、用户记录、回复文本消息
查看>>
[转载,感觉写的非常详细]DUBBO配置方式详解
查看>>
linux Valgrind使用说明-内存泄漏
查看>>
Android在Eclipse上的环境配置
查看>>
面向对象(五)
查看>>
android平台下使用点九PNG技术
查看>>
Python学习3,列表
查看>>
最长回文子串
查看>>
JAVA基础-JDBC(一)
查看>>
js中for和while运行速度比较
查看>>
简单理解什么是递归(阶乘演示)
查看>>
http协议
查看>>
js倒计时,页面刷新时,不会从头计时
查看>>
洛谷1156垃圾陷阱
查看>>