日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
干貨|SpringBootJMS(ActiveMQ)API實(shí)踐應(yīng)用詳解

[[350573]]

 前言

Active是一種開源的,實(shí)現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件,為應(yīng)用程序提供高效的、可擴(kuò)展的、穩(wěn)定的和安全的企業(yè)級(jí)消息通信。AC-tiveMQ使用Apache提供的授權(quán),任何人都可以對其實(shí)現(xiàn)代碼進(jìn)行修改。

ActiveMQ的設(shè)計(jì)目標(biāo)是提供標(biāo)準(zhǔn)的,面向消息的,能夠跨越多語言和多系統(tǒng)的應(yīng)用集成消息通信中間件。ActiveMQ實(shí)現(xiàn)了JMS標(biāo)準(zhǔn)并提供了很多附加的特性。本文將帶大家詳細(xì)介紹ActiveMQ的API的使用。

1. JMS的概念?

「什么是JMS呢:」

  • JMS---------JAVA Message Service
  • JAVA的消息服務(wù),是sun公司提供的接口,只是一個(gè)規(guī)范,這個(gè)規(guī)范就類似于JDBC是一樣的,使用的時(shí)候是需要當(dāng)前規(guī)范的實(shí)現(xiàn)產(chǎn)品的。

「JMS能干什么呢:」

  • 能夠?qū)⑿畔l(fā)布到目的地
  • 可以從目的地來消費(fèi)這個(gè)消息

2、兩種通信模型

「隊(duì)列的通信概念:」

  • 特點(diǎn):當(dāng)我們同一個(gè)隊(duì)列有多個(gè)消費(fèi)者的時(shí)候,多個(gè)消費(fèi)者的數(shù)據(jù)之和才是原來隊(duì)列中的所有數(shù)據(jù)
  • 隊(duì)列的通信模型最大的適用場景:流量的消峰,高并發(fā)的處理

「主題的通信模型:」

  • 特點(diǎn):當(dāng)我們隊(duì)列有多個(gè)消費(fèi)者的時(shí)候,那么這多個(gè)消費(fèi)者消費(fèi)到的數(shù)據(jù)是一樣的
  • 主題消費(fèi)者通信模型的適用場景:微服務(wù)下服務(wù)之間的異步通信

3. MQ的實(shí)現(xiàn)產(chǎn)品

「實(shí)現(xiàn)產(chǎn)品:」

  • ActiveMQ
  • RabbitMQ
  • RockerMQ
  • Kafka(這個(gè)設(shè)計(jì)的初衷是做分布式的日志的,后來因?yàn)槿罩居袊?yán)格的順序問題,這個(gè)時(shí)候人們就用Kafka來做消息隊(duì)列了)

4、JMS中常見的名詞

「常見的名詞:」

  • ActiveMQConnectionFactory:這個(gè)是創(chuàng)建連接的工廠
  • ConnectionFactory:連接的工廠
  • Connection:連接JAVA對MQ的一個(gè)連接
  • Destination:目的地
  • 生產(chǎn)者(Producer)
  • 消費(fèi)者(Consumer)
  • Session:會(huì)話(每一次對MQ的操作都稱為一次會(huì)話)
  • Queue:隊(duì)列
  • Topic:主題

5、什么是消息隊(duì)列

「消息隊(duì)列簡單的說就是用來存放臨時(shí)數(shù)據(jù)的地方:」

  • 生產(chǎn)者----------->存儲(chǔ)介質(zhì)上
  • 消費(fèi)者----------->存儲(chǔ)介質(zhì)上

「消息隊(duì)列類似于快遞公司:」

  • 你可以將東西交給快遞公司
  • 目標(biāo)人也可以從快遞公司去取東西

6. ActiveMQ是什么

「含義:」

  • ActiveMQ就是一個(gè)JMS的實(shí)現(xiàn)產(chǎn)品,它能夠?qū)崿F(xiàn)JMS下的所有功能

7、ActiveMQ能干什么

「主要作用:」

  • 流量消峰處理
  • 微服務(wù)下模塊的異步通信
  • 處理高并發(fā)下的訂單
  • 處理第三方平臺(tái)的高并發(fā)
  • 協(xié)助消息表可以完成分布式事務(wù)的最終一致性

8、ActiveMQ的安裝

「ActiveMQ的安裝和配置:」

 
 
 
  1.  1、官網(wǎng)下載Linux版的ActiveMQ(最新版本為5.13.4) 
  2.        https://activemq.apache.org/download.html 
  3.  
  4.        2、解壓安裝 
  5.       tar -zxvf apache-activemq-5.13.4-bin.tar.gz 
  6.  
  7.        3、配置(這里采用默認(rèn)配置,無需修改) 
  8.       vim /usr/lical/activemq-1/conf/activemq.xml 
  9.  
  10.        4、啟動(dòng) 
  11.       cd /usr/local/activemq-1/bin 
  12. ./activemq start 
  13.  
  14.       5、打開管理界面(管理界面可以查看并管理所有隊(duì)列及消息) 
  15.          http://192.168.1.100:8161 
  16.  
  17.        啟動(dòng)成功后,可以瀏覽 http://localhost:8161/admin/ 
  18.        默認(rèn)用戶名、密碼:admin/admin 
  19.        管理界面是用jetty做容器的,如果想修改管理界面的端口,可以編輯../conf/jetty.xml,找到下面這一段: 
  20.        
  21.      
  22.      
  23.      
  24.       
  25.        用戶名/密碼是在 ../conf/jetty-realm.properties 里,比如要增加一個(gè)管理員jimmy/123456,可參考下面修改: 
  26.        1 
  27. 3admin: admin, admin 
  28. jimmy: 123456, admin 
  29. user: user, user 
  30.        注:管理界面有一個(gè)小坑,ActiveMQ 5.13.2與jdk1.8兼容性有點(diǎn)問題,如果使用jdk1.8,管理界面進(jìn)入Queues標(biāo)簽頁時(shí),偶爾會(huì)報(bào)錯(cuò),但是并不影響消息正常收發(fā),只是無法從界面上查看隊(duì)列情況,如果出現(xiàn)該問題,可將jdk版本降至1.7,同時(shí)最好清空data目錄下的所有數(shù)據(jù),再重啟activemq即可。 

9. ActiveMQ的API的使用

「AcatveMQ的API使用:」

 

  • 隊(duì)列的使用(生產(chǎn)者)
 
 
 
  1. package com.qy.mq.queue; 
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory; 
  4. import org.apache.activemq.Message; 
  5.  
  6. import javax.jms.*; 
  7.  
  8. /** 
  9.  * @Auther: qianyu 
  10.  * @Date: 2020/11/04 14:12 
  11.  * @Description:生產(chǎn)者 
  12.  */ 
  13. public class Producer { 
  14.  
  15.     //準(zhǔn)備發(fā)布的這個(gè)地址 
  16.     private  static final String PATH="tcp://10.7.182.87:61616"; 
  17.     //ActiveMQ下的用戶名 
  18.     private static final String userName="admin"; 
  19.     //ActiveMQ下的密碼 
  20.     private static final String password="admin"; 
  21.  
  22.     public static void main(String[] args) throws JMSException { 
  23.         //第一步:創(chuàng)建連接的工廠 
  24.         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); 
  25.         //通過這個(gè)工廠獲取連接 
  26.         Connection connection = activeMQConnectionFactory.createConnection(); 
  27.         //第三步:打開這個(gè)連接 
  28.         connection.start(); 
  29.         //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話 
  30.         /** 
  31.          * 第一個(gè)參數(shù):是否使用事務(wù) 
  32.          * 第二個(gè)參數(shù):客戶端的應(yīng)答模式 
  33.          *     第一種:自動(dòng)應(yīng)答 
  34.          *     第二種:客戶端手動(dòng)應(yīng)答 
  35.          */ 
  36.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
  37.         //需要發(fā)送消息的目的地(queue操作的是隊(duì)列) 
  38.         Destination destination=session.createQueue("wqqq"); 
  39.         //生產(chǎn)者來生產(chǎn)這個(gè)消息 
  40.         //要有生產(chǎn)者 
  41.         MessageProducer messageProducer = session.createProducer(destination); 
  42.  
  43.         //發(fā)送很多的消息到消息隊(duì)列中去 
  44. //        for (int i=0;i<100;i++){ 
  45.             //需要準(zhǔn)備發(fā)送的消息 
  46. //            TextMessage textMessage = session.createTextMessage("我是淺羽:"+i); 
  47.             //研究消息的類型 
  48.  
  49.           /*  BytesMessage bytesMessage = session.createBytesMessage(); 
  50.             bytesMessage.setByteProperty("www",new Byte("123")); 
  51.  
  52.             //接下來就可以發(fā)送消息了 
  53.             messageProducer.send(bytesMessage);*/ 
  54.  
  55.         //創(chuàng)建map類型的message 
  56.         /*MapMessage mapMessage = session.createMapMessage(); 
  57.         mapMessage.setInt("www1",123); 
  58.  
  59.         messageProducer.send(mapMessage);*/ 
  60.  
  61.         ObjectMessage objectMessage = session.createObjectMessage(new User(1, "qianyu", "123")); 
  62.         messageProducer.send(objectMessage); 
  63.  
  64. //        } 
  65.     } 
  66.  
  • 隊(duì)列的使用(消費(fèi)者)
 
 
 
  1. package com.qy.mq.queue; 
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory; 
  4.  
  5. import javax.jms.*; 
  6. import java.io.Serializable; 
  7.  
  8. /** 
  9.  * @Auther: qianyu 
  10.  * @Date: 2020/11/04 14:13 
  11.  * @Description:消費(fèi)者 
  12.  */ 
  13. public class Consumer { 
  14.  
  15.     //準(zhǔn)備發(fā)布的這個(gè)地址 
  16.     private  static final String PATH="tcp://10.7.182.87:61616"; 
  17.     //ActiveMQ下的用戶名 
  18.     private static final String userName="admin"; 
  19.     //ActiveMQ下的密碼 
  20.     private static final String password="admin"; 
  21.  
  22.     public static void main(String[] args) throws JMSException { 
  23.         //第一步:創(chuàng)建連接的工廠 
  24.         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); 
  25.         //通過這個(gè)工廠獲取連接 
  26.         Connection connection = activeMQConnectionFactory.createConnection(); 
  27.         //第三步:打開這個(gè)連接 
  28.         connection.start(); 
  29.         //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話 
  30.         /** 
  31.          * 第一個(gè)參數(shù):是否使用事務(wù) 
  32.          * 第二個(gè)參數(shù):客戶端的應(yīng)答模式 
  33.          *     第一種:自動(dòng)應(yīng)答 
  34.          *     第二種:客戶端手動(dòng)應(yīng)答 
  35.          */ 
  36.         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
  37.         //需要發(fā)送消息的目的地(queue操作的是隊(duì)列) 
  38.         Destination destination=session.createQueue("wqqq"); 
  39.         //創(chuàng)建我們的消費(fèi)者了 
  40.         MessageConsumer messageConsumer = session.createConsumer(destination); 
  41.         //接下來就可以接收我們的消息了 
  42.         //Message message = messageConsumer.receive(); 
  43.  
  44.         //接收消息并指定這個(gè)超時(shí)的時(shí)間 
  45. //      Message message = messageConsumer.receive(5000); 
  46.  
  47.         //接收消息沒有就不等待了 直接over了  不接收了 
  48. //      Message message = messageConsumer.receiveNoWait(); 
  49.  
  50.         //給定當(dāng)前的路徑設(shè)置監(jiān)聽器 
  51.         messageConsumer.setMessageListener(new MessageListener() { 
  52.             @Override 
  53.             public void onMessage(Message message) { 
  54.  
  55.                /* BytesMessage bytesMessage= (BytesMessage) message; 
  56.  
  57.                 try { 
  58.                     System.out.println("獲取到的數(shù)據(jù)是:"+bytesMessage.getByteProperty("www")); 
  59.                 } catch (JMSException e) { 
  60.                     e.printStackTrace(); 
  61.                 }*/ 
  62.  
  63.  
  64.                /*MapMessage mapMessage= (MapMessage) message; 
  65.  
  66.                 try { 
  67.                     System.out.println("獲取到的數(shù)據(jù)是:"+mapMessage.getInt("www1")); 
  68.                 } catch (JMSException e) { 
  69.                     e.printStackTrace(); 
  70.                 }*/ 
  71.  
  72.                //測試對象類型的消息的發(fā)送和接收 
  73.                ObjectMessage objectMessage= (ObjectMessage) message; 
  74.                 try { 
  75.                     User user = (User) objectMessage.getObject(); 
  76.  
  77.                     System.out.println("接收到的數(shù)據(jù)是:"+user); 
  78.                 } catch (JMSException e) { 
  79.                     e.printStackTrace(); 
  80.                 } 
  81.  
  82.  
  83.               /*  //我們知道是一個(gè)字符串類型的消息 
  84.                 TextMessage textMessage= (TextMessage) message; 
  85.                 //接下來就可以打印這個(gè)消息了 
  86.                 try { 
  87.                     System.out.println("消費(fèi)者1---接收到的消息是:"+textMessage.getText()); 
  88.                 } catch (JMSException e) { 
  89.                     e.printStackTrace(); 
  90.                 }*/ 
  91.  
  92.                 try { 
  93.                     //這句話就表示的是客戶端來手動(dòng)的進(jìn)行應(yīng)答 
  94.                     message.acknowledge(); 
  95.                 } catch (JMSException e) { 
  96.                     e.printStackTrace(); 
  97.                 } 
  98.  
  99.             } 
  100.         }); 
  101.     } 
  • 主題模型的生產(chǎn)者
 
 
 
  1. package com.qy.mq.topic; 
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory; 
  4.  
  5. import javax.jms.*; 
  6.  
  7. /** 
  8.  * @Auther: qianyu 
  9.  * @Date: 2020/11/04 15:17 
  10.  * @Description: 
  11.  */ 
  12. public class Producer { 
  13.     //準(zhǔn)備發(fā)布的這個(gè)地址 
  14.     private  static final String PATH="tcp://10.7.182.87:61616"; 
  15.     //ActiveMQ下的用戶名 
  16.     private static final String userName="admin"; 
  17.     //ActiveMQ下的密碼 
  18.     private static final String password="admin"; 
  19.  
  20.     public static void main(String[] args) throws JMSException { 
  21.         //第一步:創(chuàng)建連接的工廠 
  22.         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); 
  23.         //通過這個(gè)工廠獲取連接 
  24.         Connection connection = activeMQConnectionFactory.createConnection(); 
  25.         //第三步:打開這個(gè)連接 
  26.         connection.start(); 
  27.         //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話 
  28.         /** 
  29.          * 第一個(gè)參數(shù):是否使用事務(wù) 
  30.          * 第二個(gè)參數(shù):客戶端的應(yīng)答模式 
  31.          *     第一種:自動(dòng)應(yīng)答 
  32.          *     第二種:客戶端手動(dòng)應(yīng)答 
  33.          */ 
  34.         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
  35.         //需要發(fā)送消息的目的地(下面創(chuàng)建的就應(yīng)該是主題模型的地址) 
  36.         Destination destination=session.createTopic("topic222"); 
  37.         //生產(chǎn)者來生產(chǎn)這個(gè)消息 
  38.         //要有生產(chǎn)者 
  39.         MessageProducer messageProducer = session.createProducer(destination); 
  40.  
  41.         //發(fā)送很多的消息到消息隊(duì)列中去 
  42.         for (int i=0;i<100;i++){ 
  43.             //需要準(zhǔn)備發(fā)送的消息 
  44.             TextMessage textMessage = session.createTextMessage("我是淺羽:"+i); 
  45.             //接下來就可以發(fā)送消息了 
  46.             messageProducer.send(textMessage); 
  47.         } 
  48.     } 
  • 主題模型的消費(fèi)者
 
 
 
  1. package com.qy.mq.topic; 
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory; 
  4.  
  5. import javax.jms.*; 
  6.  
  7. /** 
  8.  * @Auther: qianyu 
  9.  * @Date: 2020/11/04 15:19 
  10.  * @Description: 
  11.  */ 
  12. public class Consumer { 
  13.  
  14.     //準(zhǔn)備發(fā)布的這個(gè)地址 
  15.     private  static final String PATH="tcp://10.7.182.87:61616"; 
  16.     //ActiveMQ下的用戶名 
  17.     private static final String userName="admin"; 
  18.     //ActiveMQ下的密碼 
  19.     private static final String password="admin"; 
  20.  
  21.     public static void main(String[] args) throws JMSException { 
  22.         //第一步:創(chuàng)建連接的工廠 
  23.         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); 
  24.         //通過這個(gè)工廠獲取連接 
  25.         Connection connection = activeMQConnectionFactory.createConnection(); 
  26.         //第三步:打開這個(gè)連接 
  27.         connection.start(); 
  28.         //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話 
  29.         /** 
  30.          * 第一個(gè)參數(shù):是否使用事務(wù) 
  31.          * 第二個(gè)參數(shù):客戶端的應(yīng)答模式 
  32.          *     第一種:自動(dòng)應(yīng)答 
  33.          *     第二種:客戶端手動(dòng)應(yīng)答 
  34.          */ 
  35.         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
  36.         //需要發(fā)送消息的目的地(queue操作的是隊(duì)列) 
  37.         Destination destination=session.createTopic("topic222"); 
  38.         //創(chuàng)建我們的消費(fèi)者了 
  39.         MessageConsumer messageConsumer = session.createConsumer(destination); 
  40.         //接下來就可以接收我們的消息了 
  41.  
  42.         //給定當(dāng)前的路徑設(shè)置監(jiān)聽器 
  43.         messageConsumer.setMessageListener(new MessageListener() { 
  44.             @Override 
  45.             public void onMessage(Message message) { 
  46.  
  47.                 //我們知道是一個(gè)字符串類型的消息 
  48.                 TextMessage textMessage= (TextMessage) message; 
  49.                 //接下來就可以打印這個(gè)消息了 
  50.                 try { 
  51.                     System.out.println("消費(fèi)者1---接收到的消息是:"+textMessage.getText()); 
  52.                 } catch (JMSException e) { 
  53.                     e.printStackTrace(); 
  54.                 } 
  55.  
  56.                 try { 
  57.                     message.acknowledge(); 
  58.                 } catch (JMSException e) { 
  59.                     e.printStackTrace(); 
  60.                 } 
  61.  
  62.             } 
  63.         }); 
  64.     } 

本篇關(guān)于ActiveMQ的介紹就先到這里結(jié)束了,后續(xù)會(huì)出更多關(guān)于ActiveMQ系列更多文章,謝謝大家支持!


網(wǎng)站欄目:干貨|SpringBootJMS(ActiveMQ)API實(shí)踐應(yīng)用詳解
網(wǎng)址分享:http://www.5511xx.com/article/codohdj.html