成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一篇帶給你 ActiveMQ 詳細(xì)入門教程

開發(fā) 后端
兩個(gè)系統(tǒng)或兩個(gè)客戶端之間進(jìn)行消息傳送,利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成。

[[428863]]

一、什么是消息中間件

兩個(gè)系統(tǒng)或兩個(gè)客戶端之間進(jìn)行消息傳送,利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊(duì)模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)程間的通信。

消息中間件,總結(jié)起來作用有三個(gè):異步化提升性能、降低耦合度、流量削峰。

系統(tǒng)A發(fā)送消息給中間件后,自己的工作已經(jīng)完成了,不用再去管系統(tǒng)B什么時(shí)候完成操作。而系統(tǒng)B拉去消息后,執(zhí)行自己的操作也不用告訴系統(tǒng)A執(zhí)行結(jié)果,所以整個(gè)的通信過程是異步調(diào)用的。

二、消息中間件的應(yīng)用場(chǎng)景

2.1 異步通信

有些業(yè)務(wù)不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。

2.2 緩沖

在任何重要的系統(tǒng)中,都會(huì)有需要不同的處理時(shí)間的元素。消息隊(duì)列通過一個(gè)緩沖層來幫助任務(wù)最高效率的執(zhí)行,該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度。以調(diào)節(jié)系統(tǒng)響應(yīng)時(shí)間。

2.3 解耦

降低工程間的強(qiáng)依賴程度,針對(duì)異構(gòu)系統(tǒng)進(jìn)行適配。在項(xiàng)目啟動(dòng)之初來預(yù)測(cè)將來項(xiàng)目會(huì)碰到什么需求,是極其困難的。通過消息系統(tǒng)在處理過程中間插入了一個(gè)隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實(shí)現(xiàn)這一接口,當(dāng)應(yīng)用發(fā)生變化時(shí),可以獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

2.4 冗余

有些情況下,處理數(shù)據(jù)的過程會(huì)失敗。除非數(shù)據(jù)被持久化,否則將造成丟失。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。許多消息隊(duì)列所采用的”插入-獲取-刪除”范式中,在把一個(gè)消息從隊(duì)列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。

2.5 擴(kuò)展性

因?yàn)橄㈥?duì)列解耦了你的處理過程,所以增大消息入隊(duì)和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調(diào)節(jié)參數(shù)。便于分布式擴(kuò)容。

2.6 可恢復(fù)性

系統(tǒng)的一部分組件失效時(shí),不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。

2.7 順序保證

在大多使用場(chǎng)景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊(duì)列本來就是排序的,并且能保證數(shù)據(jù)會(huì)按照特定的順序來處理。

2.8 過載保護(hù)

在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量無法提取預(yù)知;如果以為了能處理這類瞬間峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰。

2.9 數(shù)據(jù)流處理

分布式系統(tǒng)產(chǎn)生的海量數(shù)據(jù)流,如:業(yè)務(wù)日志、監(jiān)控?cái)?shù)據(jù)、用戶行為等,針對(duì)這些數(shù)據(jù)流進(jìn)行實(shí)時(shí)或批量采集匯總,然后進(jìn)行大數(shù)據(jù)分析是當(dāng)前互聯(lián)網(wǎng)的必備技術(shù),通過消息隊(duì)列完成此類數(shù)據(jù)收集是最好的選擇。

三、常用消息隊(duì)列(ActiveMQ、RabbitMQ、RocketMQ、Kafka)比較

四、消息中間件的角色

  • Queue: 隊(duì)列存儲(chǔ),常用與點(diǎn)對(duì)點(diǎn)消息模型 ,默認(rèn)只能由唯一的一個(gè)消費(fèi)者處理。一旦處理消息刪除。
  • Topic: 主題存儲(chǔ),用于訂閱/發(fā)布消息模型,主題中的消息,會(huì)發(fā)送給所有的消費(fèi)者同時(shí)處理。只有在消息可以重復(fù)處 理的業(yè)務(wù)場(chǎng)景中可使用, Queue/Topic都是Destination的子接口ConnectionFactory: 連接工廠,客戶用來創(chuàng)建連接的對(duì)象,例如ActiveMQ提供的ActiveMQConnectionFactory
  • Connection: JMS Connection封裝了客戶與JMS提供者之間的一個(gè)虛擬的連接。
  • Destination: 消息的目的地,目的地是客戶用來指定它生產(chǎn)的消息的目標(biāo)和它消費(fèi)的消息的來源的對(duì)象。JMS1.0.2規(guī)范中定義了兩種消息傳遞域:點(diǎn)對(duì)點(diǎn)(PTP)消息傳遞域和發(fā)布/訂閱消息傳遞域。

點(diǎn)對(duì)點(diǎn)消息傳遞域的特點(diǎn)如下:

每個(gè)消息只能有一個(gè)消費(fèi)者。

消息的生產(chǎn)者和消費(fèi)者之間沒有時(shí)間上的相關(guān)性。無論消費(fèi)者在生產(chǎn)者發(fā)送消息的時(shí)候是否處于運(yùn)行狀態(tài),它都可以提取消息。發(fā)布/訂閱消息傳遞域的特點(diǎn)如下:

每個(gè)消息可以有多個(gè)消費(fèi)者。

生產(chǎn)者和消費(fèi)者之間有時(shí)間上的相關(guān)性。

訂閱一個(gè)主題的消費(fèi)者只能消費(fèi)自它訂閱之后發(fā)布的消息。JMS規(guī)范允許客戶創(chuàng)建持久訂閱,這在一定程度上放松了時(shí)間上的相關(guān)性要求 。持久訂閱允許消費(fèi)者消費(fèi)它在未處于激活狀態(tài)時(shí)發(fā)送的消息。在點(diǎn)對(duì)點(diǎn)消息傳遞域中,目的地被成為隊(duì)列(queue);在發(fā)布/訂閱消息傳遞域中,目的地被成為主題(topic)。

五、JMS的消息格式

JMS消息由以下三部分組成的:

消息頭:

每個(gè)消息頭字段都有相應(yīng)的getter和setter方法。

消息屬性:

如果需要除消息頭字段以外的值,那么可以使用消息屬性。

消息體:

JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

消息類型:

六、消息可靠性機(jī)制

只有在被確認(rèn)之后,才認(rèn)為已經(jīng)被成功地消費(fèi)了,消息的成功消費(fèi)通常包含三個(gè)階段 :客戶接收消息、客戶處理消息和消息被確認(rèn)。在事務(wù)性會(huì)話中,當(dāng)一個(gè)事務(wù)被提交的時(shí)候,確認(rèn)自動(dòng)發(fā)生。在非事務(wù)性會(huì)話中,消息何時(shí)被確認(rèn)取決于創(chuàng)建會(huì)話時(shí)的應(yīng)答模式(acknowledgement mode)。該參數(shù)有以下三個(gè)可選值:

  • Session.AUTO_ACKNOWLEDGE:當(dāng)客戶成功的從receive方法返回的時(shí)候,或者從MessageListener.onMessage方法成功返回的時(shí)候,會(huì)話自動(dòng)確認(rèn)客戶收到的消息。
  • Session.CLIENT_ACKNOWLEDGE:客戶通過消息的acknowledge方法確認(rèn)消息。需要注意的是,在這種模式中,確認(rèn)是在會(huì)話層上進(jìn)行:確認(rèn)一個(gè)被消費(fèi)的消息將自動(dòng)確認(rèn)所有已被會(huì)話消費(fèi)的消息。例如,如果一個(gè)消息消費(fèi)者消費(fèi)了10個(gè)消息,然后確認(rèn)第5個(gè)消息,那么所有10個(gè)消息都被確認(rèn)。
  • Session.DUPS_ACKNOWLEDGE:該選擇只是會(huì)話遲鈍的確認(rèn)消息的提交。如果JMS Provider失敗,那么可能會(huì)導(dǎo)致一些重復(fù)的消息。如果是重復(fù)的消息,那么JMS Provider必須把消息頭的JMSRedelivered字段設(shè)置為true。

6.1 優(yōu)先級(jí)

可以使用消息優(yōu)先級(jí)來指示JMS Provider首先提交緊急的消息。優(yōu)先級(jí)分10個(gè)級(jí)別,從0(最低)到9(最高)。如果不指定優(yōu)先級(jí),默認(rèn)級(jí)別是4。需要注意的是,JMS Provider并不一定保證按照優(yōu)先級(jí)的順序提交消息。

6.2 消息過期

可以設(shè)置消息在一定時(shí)間后過期,默認(rèn)是永不過期。

6.3 臨時(shí)目的地

可以通過會(huì)話上的createTemporaryQueue方法和createTemporaryTopic方法來創(chuàng)建臨時(shí)目的地。它們的存在時(shí)間只限于創(chuàng)建它們的連接所保持的時(shí)間。只有創(chuàng)建該臨時(shí)目的地的連接上的消息消費(fèi)者才能夠從臨時(shí)目的地中提取消息。

七、什么是ActiveMQ

ActiveMQ是一種開源的基于JMS(Java Message Servie)規(guī)范的一種消息中間件的實(shí)現(xiàn),ActiveMQ的設(shè)計(jì)目標(biāo)是提供標(biāo)準(zhǔn)的,面向消息的,能夠跨越多語言和多系統(tǒng)的應(yīng)用集成消息通信中間件。

官網(wǎng)地址:http://activemq.apache.org/

7.1 存儲(chǔ)方式

1. KahaDB存儲(chǔ): KahaDB是默認(rèn)的持久化策略,所有消息順序添加到一個(gè)日志文件中,同時(shí)另外有一個(gè)索引文件記錄指向這些日志的存儲(chǔ)地址,還有一個(gè)事務(wù)日志用于消息回復(fù)操作。是一個(gè)專門針對(duì)消息持久化的解決方案,它對(duì)典型的消息使用模式進(jìn)行了優(yōu)化

特性:

  • 日志形式存儲(chǔ)消息;
  • 消息索引以 B-Tree 結(jié)構(gòu)存儲(chǔ),可以快速更新;
  • 完全支持 JMS 事務(wù);
  • 支持多種恢復(fù)機(jī)制kahadb 可以限制每個(gè)數(shù)據(jù)文件的大小。不代表總計(jì)數(shù)據(jù)容量。

2. AMQ 方式: 只適用于 5.3 版本之前。AMQ 也是一個(gè)文件型數(shù)據(jù)庫(kù),消息信息最終是存儲(chǔ)在文件中。內(nèi)存中也會(huì)有緩存數(shù)據(jù)。

3. JDBC存儲(chǔ) : 使用JDBC持久化方式,數(shù)據(jù)庫(kù)默認(rèn)會(huì)創(chuàng)建3個(gè)表,每個(gè)表的作用如下:

activemqmsgs:queue和topic的消息都存在這個(gè)表中activemqacks:存儲(chǔ)持久訂閱的信息和最后一個(gè)持久訂閱接收的消息IDactivemq_lock:跟kahadb的lock文件類似,確保數(shù)據(jù)庫(kù)在某一時(shí)刻只有一個(gè)broker在訪問

4. LevelDB存儲(chǔ) : LevelDB持久化性能高于KahaDB,但是在ActiveMQ官網(wǎng)對(duì)LevelDB的表述:LevelDB官方建議使用以及不再支持,推薦使用的是KahaDB

5.Memory 消息存儲(chǔ): 顧名思義,基于內(nèi)存的消息存儲(chǔ),就是消息存儲(chǔ)在內(nèi)存中。persistent=”false”,表示不設(shè)置持 久化存儲(chǔ),直接存儲(chǔ)到內(nèi)存中,在broker標(biāo)簽處設(shè)置。

7.2 協(xié)議

協(xié)議官網(wǎng)API:http://activemq.apache.org/configuring-version-5-transports.html

Transmission Control Protocol (TCP):

  1. 這是默認(rèn)的Broker配置,TCP的Client監(jiān)聽端口是61616。
  2. 在網(wǎng)絡(luò)傳輸數(shù)據(jù)前,必須要序列化數(shù)據(jù),消息是通過一個(gè)叫wire protocol的來序列化成字節(jié)流。默認(rèn)情況下,ActiveMQ把wire protocol叫做OpenWire,它的目的是促使網(wǎng)絡(luò)上的效率和數(shù)據(jù)快速交互。
  3. TCP連接的URI形式:tcp://hostname:port?key=value&key=value
  4. TCP傳輸?shù)膬?yōu)點(diǎn):>(1)TCP協(xié)議傳輸可靠性高,穩(wěn)定性強(qiáng) (2)高效性:字節(jié)流方式傳遞,效率很高 (3)有效性、可用性:應(yīng)用廣泛,支持任何平臺(tái)

New I/O API Protocol(NIO)

1.NIO協(xié)議和TCP協(xié)議類似,但NIO更側(cè)重于底層的訪問操作。它允許開發(fā)人員對(duì)同一資源可有更多的client調(diào)用和服務(wù)端有更多的負(fù)載。

2.適合使用NIO協(xié)議的場(chǎng)景:

(1)可能有大量的Client去鏈接到Broker上一般情況下,大量的Client去鏈接Broker是被操作系統(tǒng)的線程數(shù)所限制的。因此,NIO的實(shí)現(xiàn)比TCP需要更少的線程去運(yùn)行,所以建議使用NIO協(xié)議(2)可能對(duì)于Broker有一個(gè)很遲鈍的網(wǎng)絡(luò)傳輸NIO比TCP提供更好的性能

3.NIO連接的URI形式:nio://hostname:port?key=value

4.Transport Connector配置示例:

  1. <transportConnectors> 
  2.   <transportConnector 
  3.     name=<span data-raw-text="" "="" data-textnode-index="217" data-index="4734" class="character">"tcp<span data-raw-text="" "="" data-textnode-index="217" data-index="4738" class="character">" 
  4.     uri=<span data-raw-text="" "="" data-textnode-index="221" data-index="4747" class="character">"tcp://localhost:61616?trace=true<span data-raw-text="" "="" data-textnode-index="221" data-index="4780" class="character">" /> 
  5.   <transportConnector 
  6.     name=<span data-raw-text="" "="" data-textnode-index="229" data-index="4814" class="character">"nio<span data-raw-text="" "="" data-textnode-index="229" data-index="4818" class="character">" 
  7.     uri=<span data-raw-text="" "="" data-textnode-index="233" data-index="4827" class="character">"nio://localhost:61618?trace=true<span data-raw-text="" "="" data-textnode-index="233" data-index="4860" class="character">" /> 
  8. </transportConnectors> 

User Datagram Protocol(UDP)

1.UDP和TCP的區(qū)別

(1)TCP是一個(gè)原始流的傳遞協(xié)議,意味著數(shù)據(jù)包是有保證的,換句話說,數(shù)據(jù)包是不會(huì)被復(fù)制和丟失的。UDP,另一方面,它是不會(huì)保證數(shù)據(jù)包的傳遞的

( 2)TCP也是一個(gè)穩(wěn)定可靠的數(shù)據(jù)包傳遞協(xié)議,意味著數(shù)據(jù)在傳遞的過程中不會(huì)被丟 失。這樣確保了在發(fā)送和接收之間能夠可靠的傳遞。相反,UDP僅僅是一個(gè)鏈接協(xié)議,所以它沒有可靠性之說

2.從上面可以得出:TCP是被用在穩(wěn)定可靠的場(chǎng)景中使用的;UDP通常用在快速數(shù)據(jù)傳遞和不怕數(shù)據(jù)丟失的場(chǎng)景中,還有ActiveMQ通過防火墻時(shí),只能用UDP

3.UDP連接的URI形式:udp://hostname:port?key=value

4.Transport Connector配置示例:

  1. <transportConnectors> 
  2.     <transportConnector 
  3.         name=<span data-raw-text="" "="" data-textnode-index="255" data-index="5300" class="character">"udp<span data-raw-text="" "="" data-textnode-index="255" data-index="5304" class="character">" 
  4.         uri=<span data-raw-text="" "="" data-textnode-index="259" data-index="5317" class="character">"udp://localhost:61618?trace=true<span data-raw-text="" "="" data-textnode-index="259" data-index="5350" class="character">" /> 
  5. </transportConnectors> 

Secure Sockets Layer Protocol (SSL)

1.連接的URI形式:ssl://hostname:port?key=value

2.Transport Connector配置示例:

  1. <transportConnectors> 
  2.     <transportConnector name=<span data-raw-text="" "="" data-textnode-index="277" data-index="5528" class="character">"ssl<span data-raw-text="" "="" data-textnode-index="277" data-index="5532" class="character">" uri=<span data-raw-text="" "="" data-textnode-index="281" data-index="5538" class="character">"ssl://localhost:61617?trace=true<span data-raw-text="" "="" data-textnode-index="281" data-index="5571" class="character">"/> 
  3. </transportConnectors> 

八、案例(Hello World)

這里以windows為案例演示

下載地址:http://activemq.apache.org/components/classic/download/

8.1 安裝啟動(dòng)

解壓后直接執(zhí)行 bin/win64/activemq.bat

8.2 web控制臺(tái)

http://localhost:8161/賬號(hào)密碼:admin/admin

8.3 web控制臺(tái)

修改 ActiveMQ 配置文件 activemq/conf/jetty.xml

jettyport節(jié)點(diǎn): 配置文件修改完畢,保存并重新啟動(dòng) ActiveMQ 服務(wù)

  1. <bean id=<span data-raw-text="" "="" data-textnode-index="310" data-index="5875" class="character">"jettyPort<span data-raw-text="" "="" data-textnode-index="310" data-index="5885" class="character">" class=<span data-raw-text="" "="" data-textnode-index="314" data-index="5893" class="character">"org.apache.activemq.web.WebConsolePort<span data-raw-text="" "="" data-textnode-index="314" data-index="5932" class="character">" init-method=<span data-raw-text="" "="" data-textnode-index="318" data-index="5946" class="character">"start<span data-raw-text="" "="" data-textnode-index="318" data-index="5952" class="character">"
  2.             <!-- the default port number for the web console --> 
  3.        <property name=<span data-raw-text="" "="" data-textnode-index="328" data-index="6042" class="character">"host<span data-raw-text="" "="" data-textnode-index="328" data-index="6047" class="character">" value=<span data-raw-text="" "="" data-textnode-index="332" data-index="6055" class="character">"127.0.0.1<span data-raw-text="" "="" data-textnode-index="332" data-index="6065" class="character">"/> 
  4.        <property name=<span data-raw-text="" "="" data-textnode-index="340" data-index="6091" class="character">"port<span data-raw-text="" "="" data-textnode-index="340" data-index="6096" class="character">" value=<span data-raw-text="" "="" data-textnode-index="344" data-index="6104" class="character">"8161<span data-raw-text="" "="" data-textnode-index="344" data-index="6109" class="character">"/> 
  5.    </bean> 

8.4 開發(fā)

1. jar引入:

  1. <dependency> 
  2.     <groupId>org.springframework.boot</groupId> 
  3.     <artifactId>spring-boot-starter-activemq</artifactId> 
  4. </dependency> 

2. Sender :

  1. import org.apache.activemq.ActiveMQConnectionFactory; 
  2. import javax.jms.*; 
  3.  
  4. /** 
  5.  * @program: activemq_01 
  6.  * @ClassName Sender 
  7.  * @description: 消息發(fā)送 
  8.  * @author: muxiaonong 
  9.  * @create: 2020-10-02 13:01 
  10.  * @Version 1.0 
  11.  **/ 
  12. public class Sender { 
  13.  
  14.     public static void main(String[] args) throws Exception{ 
  15.         // 1. 獲取連接工廠 
  16.         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( 
  17.                 ActiveMQConnectionFactory.DEFAULT_USER, 
  18.                 ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
  19.                 <span data-raw-text="" "="" data-textnode-index="429" data-index="6804" class="character">"tcp://localhost:61616<span data-raw-text="" "="" data-textnode-index="429" data-index="6826" class="character">" 
  20.         ); 
  21.  
  22.         // 2. 獲取一個(gè)向activeMq的連接 
  23.         Connection connection = factory.createConnection(); 
  24.         // 3. 獲取session 
  25.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
  26.  
  27.         // 4.找目的地,獲取destination,消費(fèi)端,也會(huì)從這個(gè)目的地取消息 
  28.         Queue queue = session.createQueue(<span data-raw-text="" "="" data-textnode-index="442" data-index="7122" class="character">"user<span data-raw-text="" "="" data-textnode-index="442" data-index="7127" class="character">"); 
  29.  
  30.         // 5.1 消息創(chuàng)建者 
  31.         MessageProducer producer = session.createProducer(queue); 
  32.  
  33.         // consumer --> 消費(fèi)者 
  34.         // producer --> 創(chuàng)建者 
  35.         // 5.2. 創(chuàng)建消息 
  36.         for (int i = 0; i < 100; i++) { 
  37.             TextMessage textMessage = session.createTextMessage(<span data-raw-text="" "="" data-textnode-index="463" data-index="7392" class="character">"hi:<span data-raw-text="" "="" data-textnode-index="463" data-index="7396" class="character">"+i); 
  38.             // 5.3 向目的地寫入消息 
  39.             producer.send(textMessage); 
  40.             Thread.sleep(1000); 
  41.         } 
  42.  
  43.         // 6.關(guān)閉連接 
  44.         connection.close(); 
  45.  
  46.         System.out.println(<span data-raw-text="" "="" data-textnode-index="476" data-index="7578" class="character">"結(jié)束。。。。。<span data-raw-text="" "="" data-textnode-index="476" data-index="7586" class="character">"); 
  47.  
  48.     } 

3. Receiver :

  1. import org.apache.activemq.ActiveMQConnectionFactory; 
  2.  
  3. import javax.jms.*; 
  4.  
  5.  
  6. /** 
  7.  * @program: activemq_01 
  8.  * @ClassName Receiver 
  9.  * @description: 消息接收 
  10.  * @author: muxiaonong 
  11.  * @create: 2020-10-02 13:01 
  12.  * @Version 1.0 
  13.  **/ 
  14. public class Receiver { 
  15.  
  16.     public static void main(String[] args) throws Exception{ 
  17.         // 1. 獲取連接工廠 
  18.         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( 
  19.                 ActiveMQConnectionFactory.DEFAULT_USER, 
  20.                 ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
  21.                 <span data-raw-text="" "="" data-textnode-index="533" data-index="8126" class="character">"tcp://localhost:61616<span data-raw-text="" "="" data-textnode-index="533" data-index="8148" class="character">" 
  22.         ); 
  23.  
  24.         // 2. 獲取一個(gè)向activeMq的連接 
  25.         Connection connection = factory.createConnection(); 
  26.         connection.start(); 
  27.  
  28.         // 3. 獲取session 
  29.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
  30.  
  31.         // 4.找目的地,獲取destination,消費(fèi)端,也會(huì)從這個(gè)目的地取消息 
  32.         Destination queue = session.createQueue(<span data-raw-text="" "="" data-textnode-index="547" data-index="8477" class="character">"user<span data-raw-text="" "="" data-textnode-index="547" data-index="8482" class="character">"); 
  33.  
  34.         // 5 獲取消息 
  35.         MessageConsumer consumer = session.createConsumer(queue); 
  36.  
  37.         while(true){ 
  38.             TextMessage message = (TextMessage)consumer.receive(); 
  39.             System.out.println(<span data-raw-text="" "="" data-textnode-index="559" data-index="8684" class="character">"message:<span data-raw-text="" "="" data-textnode-index="559" data-index="8693" class="character">"+message.getText()); 
  40.         } 
  41.  
  42.     } 

測(cè)試結(jié)果:

  1. message:hi:38 
  2. message:hi:39 
  3. message:hi:40 
  4. message:hi:41 
  5. message:hi:42 
  6. message:hi:43 
  7. message:hi:44 
  8. message:hi:45 

web后臺(tái)顯示有一個(gè)消費(fèi)者處于連接狀態(tài),且已消費(fèi)了68個(gè)message,而該條隊(duì)列已沒有message待消費(fèi)了。

九、總結(jié)

今天的MQ入門教程系列就這里了,感興趣的小伙伴可以試試,MQ作為一個(gè)消息中間件,不管是面試還是工作中都會(huì)經(jīng)常用到,所以是很有必要去了解和學(xué)習(xí)的一個(gè)技術(shù)點(diǎn),今天的分享就到這里了,謝謝各位小伙伴的觀看,我們下篇文章見,大家加油!

本文轉(zhuǎn)載自微信公眾號(hào)「牧小農(nóng)」

責(zé)任編輯:姜華 來源: 牧小農(nóng)
相關(guān)推薦

2022-07-06 07:57:37

Zookeeper分布式服務(wù)框架

2021-09-18 07:43:33

ApolloJava配置中心

2021-01-28 08:55:48

Elasticsear數(shù)據(jù)庫(kù)數(shù)據(jù)存儲(chǔ)

2021-07-21 09:48:20

etcd-wal模塊解析數(shù)據(jù)庫(kù)

2020-12-24 08:07:18

SpringBootSpring SecuWeb

2021-06-28 10:04:12

SpringCloudSleuth微服務(wù)

2021-05-11 09:31:31

kustomizeoperator kubernetes

2021-05-08 09:02:48

KubeBuilderOperatork8s

2021-07-12 06:11:14

SkyWalking 儀表板UI篇

2021-06-07 12:06:19

SpringCloud Sleuth微服務(wù)

2021-08-18 10:28:09

MySQL SQL 語句數(shù)據(jù)庫(kù)

2021-06-21 14:36:46

Vite 前端工程化工具

2023-03-29 07:45:58

VS編輯區(qū)編程工具

2021-04-14 14:16:58

HttpHttp協(xié)議網(wǎng)絡(luò)協(xié)議

2021-04-08 11:00:56

CountDownLaJava進(jìn)階開發(fā)

2024-06-13 08:34:48

2022-03-22 09:09:17

HookReact前端

2022-04-29 14:38:49

class文件結(jié)構(gòu)分析

2021-04-01 10:51:55

MySQL鎖機(jī)制數(shù)據(jù)庫(kù)

2021-03-12 09:21:31

MySQL數(shù)據(jù)庫(kù)邏輯架構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 91精品国产麻豆 | 丁香久久 | 欧美日韩综合精品 | 久久久一二三 | 国产精品99久久久久久久vr | 国产精品成人在线播放 | 午夜久久久久久久久久一区二区 | 日韩国产欧美一区 | 日韩精品在线观看一区二区三区 | 黄色毛片一级 | 99成人免费视频 | 亚洲精品9999久久久久 | 在线a视频网站 | 亚洲午夜精品一区二区三区他趣 | 自拍偷拍精品 | 男女免费视频网站 | 看片国产| 欧美一区二区三区在线观看 | 国产人成在线观看 | 国产精品一区二区在线观看 | 天天爱爱网 | 精品久久久久久久人人人人传媒 | 91精品国产91久久久久久 | 91免费小视频 | 欧美aaaaaaaa| 日韩视频一区在线观看 | 99精品免费在线观看 | 一区二区三区四区在线视频 | 久久国产精品无码网站 | 久久免费香蕉视频 | 国产农村妇女精品一区 | 久久精品一区二区三区四区 | 伊人春色成人 | dy天堂 | 国产精品夜色一区二区三区 | 亚洲精品视频免费观看 | 欧美精品一区二区免费 | 欧美日韩成人网 | 欧美影院| 久久不卡视频 | 亚洲精品国产精品国自产在线 |