今天就來說下 這個項目中使用ActiveMQ的情況, MQ: message queue, 顧名思義就是消息隊列的意思.?
一: 使用場景:?
消息隊列在大型電子商務(wù)類網(wǎng)站,如京東、淘寶、去哪兒等網(wǎng)站有這深入的應(yīng)用,隊列的主要作用是消除高并發(fā)訪問高峰,加快網(wǎng)站的響應(yīng)速度。在不使用消息隊列的情況下,用戶的請求數(shù)據(jù)直接寫入數(shù)據(jù)庫,在高并發(fā)的情況下,會對數(shù)據(jù)庫造成巨大的壓力,同時也使得系統(tǒng)響應(yīng)延遲加劇。在使用隊列后,用戶的請求發(fā)給隊列后立即返回(當(dāng)然不能直接給用戶提示訂單提交成功,京東上提示:您“您提交了訂單,請等待系統(tǒng)確認(rèn)”),再由消息隊列的消費(fèi)者進(jìn)程從消息隊列中獲取數(shù)據(jù),異步寫入數(shù)據(jù)庫。由于消息隊列的服務(wù)處理速度遠(yuǎn)快于數(shù)據(jù)庫,因此用戶的響應(yīng)延遲可得到有效改善。
那么在babasport這個項目中, 我們可以在上架的時候使用消息隊列的模式:我們之前在點擊一款商品上架的時候, 我們需要分成2步, 第一: 更新商品表中該商品的上架狀態(tài). 第二: 將該商品信息保存到Solr服務(wù)器中. ?那么如果我們使用了消息隊列后, 第二步就可以使用發(fā)送message來異步完成.
消息隊列可以接收消息和?發(fā)送消息
消息隊列類型:
隊列:一對一聊天 ?私聊 ?QQ
主題(訂閱模式):一對多聊天 ?群聊 ?QQ
名詞解釋:?
?
?二, 代碼原型
ActiveMQ需要部署到Linux系統(tǒng)下, 這里就不再做概述.
這里也是tar包, 導(dǎo)入到linux下直接解壓啟動即可, 前面已經(jīng)有過很多博文講Linux下一些常用軟件的安裝步驟.
上架代碼原型:
項目構(gòu)件圖:
未使用ActiveMQ前ProductServiceImpl.cs:
1 //上架
2 public void isShow(Long[] ids){
3 Product product =
new Product();
4 product.setIsShow(
true);
5 for (
final Long id : ids) {
6 //上下架狀態(tài)
7 product.setId(id);
8 productDao.updateByPrimaryKeySelective(product);
9
10 //這個地方的代碼應(yīng)該在babasport-solr中寫, 現(xiàn)在使用ActiveMQ進(jìn)行遷移.
11 //TODO 保存商品信息到Solr服務(wù)器
12 SolrInputDocument doc =
new SolrInputDocument();
13 //ID
14 doc.setField("id"
, id);
15 //名稱
16 Product p =
productDao.selectByPrimaryKey(id);
17 doc.setField("name_ik"
, p.getName());
18 //圖片URL
19 doc.setField("url", p.getImgUrls()[0
]);
20 //品牌 ID
21 doc.setField("brandId"
, p.getBrandId());
22 //價格 sql查詢語句: select price from bbs_sku where product_id = ? order by price asc limit 1
23 SkuQuery skuQuery =
new SkuQuery();
24 skuQuery.createCriteria().andProductIdEqualTo(id);
25 skuQuery.setOrderByClause("price asc"
);
26 skuQuery.setPageNo(1
);
27 skuQuery.setPageSize(1
);
28 List<Sku> skus =
skuDao.selectByExample(skuQuery);
29 doc.setField("price", skus.get(0
).getPrice());
30 //...時間等 剩下的省略
31
32 try {
33 solrServer.add(doc);
34 solrServer.commit();
35 }
catch (Exception e) {
36 // TODO Auto-generated catch block
37 e.printStackTrace();
38 }
39
40
41
42
43 //TODO 靜態(tài)化
44 }
45 }
上面的代碼 除了更改本來就該更改的商品狀態(tài)信息外, 還去見商品信息保存到了Solr服務(wù)器中了. 這里我們使用ActiveMQ進(jìn)行改造:?
使用ActiveMQ后的ProductServiceImpl.cs:
1 //上架
2 public void isShow(Long[] ids){
3 Product product =
new Product();
4 product.setIsShow(
true);
5 for (
final Long id : ids) {
6 //上下架狀態(tài)
7 product.setId(id);
8 productDao.updateByPrimaryKeySelective(product);
9
10 //發(fā)送商品ID到ActiveMQ即可.
11 jmsTemplate.send(
new MessageCreator() {
12
13 @Override
14 public Message createMessage(Session session)
throws JMSException {
15
16 return session.createTextMessage(String.valueOf(id));
17 }
18 });
19
20 //TODO 靜態(tài)化
21 }
22 }
接著就是配置消息發(fā)送方(JMS生產(chǎn)者) mq.xml:
1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:aop="http://www.springframework.org/schema/aop"
5 xmlns:tx="http://www.springframework.org/schema/tx"
6 xmlns:task="http://www.springframework.org/schema/task"
7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
8 xsi:schemaLocation="http://www.springframework.org/schema/beans
9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12 http://www.springframework.org/schema/context
13 http://www.springframework.org/schema/context/spring-context-4.0.xsd
14 http://www.springframework.org/schema/aop
15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16 http://www.springframework.org/schema/tx
17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18 http://www.springframework.org/schema/task
19 http://www.springframework.org/schema/task/spring-task-4.0.xsd
20 http://code.alibabatech.com/schema/dubbo
21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
25 <!-- 連接工廠, 此工廠由Apache提供 -->
26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27 <!-- 連接地址
28 在網(wǎng)頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
29 -->
30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31 <!-- 設(shè)置用戶名及密碼 -->
32 <property name="userName" value="admin"></property>
33 <property name="password" value="admin"></property>
34 </bean>
35
36 <!-- 配置連接池管理工廠 -->
37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38 <!-- 注入工廠 -->
39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40 <!-- 設(shè)置最大連接數(shù) -->
41 <property name="maxConnections" value="5"></property>
42 </bean>
43
44 <!-- 把上面的工廠交給Spring管理 -->
45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46 <!-- 注入上面的工廠 -->
47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48 </bean>
49
50 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52 <!-- 注入Spring單例工廠 -->
53 <property name="connectionFactory" ref="singleConnectionFactory"></property>
54 <!-- 設(shè)置默認(rèn)的目標(biāo)管道 -->
55 <property name="defaultDestinationName" value="pId"/>
56 </bean>
57 </beans> 配置說明: 這里是首先構(gòu)建一個MQ的連接工廠, 只要ActiveMQ啟動后 就可以這樣構(gòu)建連接了. 配置登錄的用戶名和和密碼.
接著就是配置連接池, 把連接工廠交給連接池去管理. 這些都是Apache廠商提供的.?
接著就是再將連接池交由Spring管理.?
最后我們再來配置一個jmsTemplate模板來操作ActiveMQ, 這個類似于jdbcTemplate模板. 而且我們這個里面注入了一個默認(rèn)的管道, 也就是productId, 因為我們現(xiàn)在是 傳遞消息一一去對應(yīng), 關(guān)于怎么對應(yīng) ?就是依賴于這個管道.
接下來我們就看下消息的接收方(JMS消費(fèi)者)的一些東西:
消費(fèi)者的目錄結(jié)構(gòu):(Solr)
Solr項目中的ActiveMQ配置文件mq.xml:
1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:aop="http://www.springframework.org/schema/aop"
5 xmlns:tx="http://www.springframework.org/schema/tx"
6 xmlns:task="http://www.springframework.org/schema/task"
7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
8 xsi:schemaLocation="http://www.springframework.org/schema/beans
9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12 http://www.springframework.org/schema/context
13 http://www.springframework.org/schema/context/spring-context-4.0.xsd
14 http://www.springframework.org/schema/aop
15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16 http://www.springframework.org/schema/tx
17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18 http://www.springframework.org/schema/task
19 http://www.springframework.org/schema/task/spring-task-4.0.xsd
20 http://code.alibabatech.com/schema/dubbo
21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
25 <!-- 連接工廠, 此工廠由Apache提供 -->
26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27 <!-- 連接地址
28 在網(wǎng)頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
29 -->
30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31 <!-- 設(shè)置用戶名及密碼 -->
32 <property name="userName" value="admin"></property>
33 <property name="password" value="admin"></property>
34 </bean>
35
36 <!-- 配置連接池管理工廠 ,由Apache提供.-->
37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38 <!-- 注入工廠 -->
39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40 <!-- 設(shè)置最大連接數(shù) -->
41 <property name="maxConnections" value="5"></property>
42 </bean>
43
44 <!-- 把上面的工廠交給Spring管理 -->
45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46 <!-- 注入上面的工廠 -->
47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48 </bean>
49
50 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52 <!-- 注入Spring單例工廠 -->
53 <property name="connectionFactory" ref="singleConnectionFactory"></property>
54 <!-- 設(shè)置默認(rèn)的目標(biāo)管道 -->
55 <property name="defaultDestinationName" value="pId"/>
56 </bean>
57
58 <!-- 實例化一個監(jiān)聽到消息后 處理此消息的類 -->
59 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
60
61 <!-- 配置實時監(jiān)聽器 -->
62 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
63 <!-- 配置工廠, 需要配置spirng的工廠 -->
64 <property name="connectionFactory" ref="singleConnectionFactory"/>
65 <!-- 設(shè)置監(jiān)聽的目標(biāo) -->
66 <property name="destinationName" value="pId"/>
67 <!-- 監(jiān)聽后獲取消息的類, 接收監(jiān)聽到的消息 -->
68 <property name="messageListener" ref="customMessageListener"></property>
69 </bean>
70 </beans> 我們來說下 和上面配置不同的地方, 我們在這里配置了一個監(jiān)聽器, 因為接收到 JMS 生產(chǎn)者發(fā)過來的消息后我們需要有個監(jiān)聽器去監(jiān)聽且 將監(jiān)聽到的消息拿過來處理.
接下來看看監(jiān)聽器的處理方法做了些什么事情:?
CustomMessageListener.java:
1 /*
2 * 接收MQ中的消息
3 */
4 public class CustomMessageListener
implements MessageListener{
5 @Autowired
6 private SearchService searchService;
7
8 @Override
9 public void onMessage(Message message) {
10 //先將接收到的消息強(qiáng)轉(zhuǎn)為ActiveMQ類型的消息
11 //因為在消息發(fā)送方那邊傳遞的是Text類型的消息對象, 所以需要轉(zhuǎn)成ActiveMQTextMessage
12 ActiveMQTextMessage amtm =
(ActiveMQTextMessage)message;
13 try {
14 String id =
amtm.getText();
15 System.out.println("接收到的ID:"+
id);
16 searchService.insertProductToSolr(Long.parseLong(id));
17 }
catch (JMSException e) {
18 // TODO Auto-generated catch block
19 e.printStackTrace();
20 }
21 }
因為我們接收到的是string類型的文本, 所以這里我們直接將接收到的消息轉(zhuǎn)換為ActiveMQText類型, 然后通過getText去得到傳遞過來的id, 然后我們就可以通過這個productId去做相應(yīng)的操作了.?
接下來就看保存商品信息到Solr服務(wù)器的邏輯:
SearchServiceImpl.java:
1 //保存商品信息到Solr服務(wù)器中, 通過ActiveMQ
2 public void insertProductToSolr(Long productId){
3 //TODO 保存商品信息到Solr服務(wù)器
4 SolrInputDocument doc =
new SolrInputDocument();
5 //ID
6 doc.setField("id"
, productId);
7 //名稱
8 Product p =
productDao.selectByPrimaryKey(productId);
9 doc.setField("name_ik"
, p.getName());
10 //圖片URL
11 doc.setField("url", p.getImgUrls()[0
]);
12 //品牌 ID
13 doc.setField("brandId"
, p.getBrandId());
14 //價格 sql查詢語句: select price from bbs_sku where product_id = ? order by price asc limit 1
15 SkuQuery skuQuery =
new SkuQuery();
16 skuQuery.createCriteria().andProductIdEqualTo(productId);
17 skuQuery.setOrderByClause("price asc"
);
18 skuQuery.setPageNo(1
);
19 skuQuery.setPageSize(1
);
20 List<Sku> skus =
skuDao.selectByExample(skuQuery);
21 doc.setField("price", skus.get(0
).getPrice());
22 //...時間等 剩下的省略
23
24 try {
25 solrServer.add(doc);
26 solrServer.commit();
27 }
catch (Exception e) {
28 // TODO Auto-generated catch block
29 e.printStackTrace();
30 }
31 }
這樣就比較明朗了, ActiveMQ 隊列就是這樣來實現(xiàn)的.?
====================接下來還會有 ActiveMQ 訂閱者模式的示例, 這里只是生產(chǎn)者發(fā)送消息給單個消費(fèi)者, 下次還會更新生產(chǎn)者發(fā)送消息給多個消費(fèi)者.
?2016/09/04 20:32 更新
上面已經(jīng)說了 消息的隊列模式, 及點對點發(fā)送消息, 那么接下來就來說下 消息的一對多模式, 也就是 發(fā)布/訂閱模式.
項目原型: 當(dāng)商品上架后(babasport-product), 發(fā)送消息id給solr(babasport-solr)來將商品信息保存到solr服務(wù)器和cms(babasport-cms)來對商品詳情頁面做頁面靜態(tài)化.
===================
babasport-product:
結(jié)構(gòu)圖:
babasport-product下的項目結(jié)構(gòu)圖:
ProductServiceImpl.java中的上架:
1 @Autowired
2 private JmsTemplate jmsTemplate;
3
4 //上架
5 public void isShow(Long[] ids){
6 Product product =
new Product();
7 product.setIsShow(
true);
8 for (
final Long id : ids) {
9 //上下架狀態(tài)
10 product.setId(id);
11 productDao.updateByPrimaryKeySelective(product);
12
13 //發(fā)送商品ID到ActiveMQ即可.
14 jmsTemplate.send(
new MessageCreator() {
15
16 @Override
17 public Message createMessage(Session session)
throws JMSException {
18
19 return session.createTextMessage(String.valueOf(id));
20 }
21 });
22 }
23 }
View Code mq.xml:
1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:aop="http://www.springframework.org/schema/aop"
5 xmlns:tx="http://www.springframework.org/schema/tx"
6 xmlns:task="http://www.springframework.org/schema/task"
7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
8 xsi:schemaLocation="http://www.springframework.org/schema/beans
9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12 http://www.springframework.org/schema/context
13 http://www.springframework.org/schema/context/spring-context-4.0.xsd
14 http://www.springframework.org/schema/aop
15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16 http://www.springframework.org/schema/tx
17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18 http://www.springframework.org/schema/task
19 http://www.springframework.org/schema/task/spring-task-4.0.xsd
20 http://code.alibabatech.com/schema/dubbo
21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
25 <!-- 連接工廠, 此工廠由Apache提供 -->
26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27 <!-- 連接地址
28 在網(wǎng)頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
29 -->
30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31 <!-- 設(shè)置用戶名及密碼 -->
32 <property name="userName" value="admin"></property>
33 <property name="password" value="admin"></property>
34 </bean>
35
36 <!-- 配置連接池管理工廠 -->
37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38 <!-- 注入工廠 -->
39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40 <!-- 設(shè)置最大連接數(shù) -->
41 <property name="maxConnections" value="5"></property>
42 </bean>
43
44 <!-- 把上面的工廠交給Spring管理 -->
45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46 <!-- 注入上面的工廠 -->
47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48 </bean>
49
50 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52 <!-- 注入Spring單例工廠 -->
53 <property name="connectionFactory" ref="singleConnectionFactory"></property>
54 <!-- 設(shè)置默認(rèn)的目標(biāo)管道 -->
55 <property name="defaultDestinationName" value="pId"/>
56 <!-- 默認(rèn)是隊列模式, 可改為主題, 發(fā)布模式 publish subject -->
57 <property name="pubSubDomain" value="true"/>
58 </bean>
59 </beans> View Code 這里面的最大的變化就是將消息發(fā)布模式改為了: publish subject.
============================================
babasport-solr:
mq.xml配置文件:
1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:aop="http://www.springframework.org/schema/aop"
5 xmlns:tx="http://www.springframework.org/schema/tx"
6 xmlns:task="http://www.springframework.org/schema/task"
7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
8 xsi:schemaLocation="http://www.springframework.org/schema/beans
9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12 http://www.springframework.org/schema/context
13 http://www.springframework.org/schema/context/spring-context-4.0.xsd
14 http://www.springframework.org/schema/aop
15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16 http://www.springframework.org/schema/tx
17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18 http://www.springframework.org/schema/task
19 http://www.springframework.org/schema/task/spring-task-4.0.xsd
20 http://code.alibabatech.com/schema/dubbo
21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
25 <!-- 連接工廠, 此工廠由Apache提供 -->
26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27 <!-- 連接地址
28 在網(wǎng)頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
29 -->
30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31 <!-- 設(shè)置用戶名及密碼 -->
32 <property name="userName" value="admin"></property>
33 <property name="password" value="admin"></property>
34 </bean>
35
36 <!-- 配置連接池管理工廠 ,由Apache提供.-->
37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38 <!-- 注入工廠 -->
39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40 <!-- 設(shè)置最大連接數(shù) -->
41 <property name="maxConnections" value="5"></property>
42 </bean>
43
44 <!-- 把上面的工廠交給Spring管理 -->
45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46 <!-- 注入上面的工廠 -->
47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48 </bean>
49
50 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52 <!-- 注入Spring單例工廠 -->
53 <property name="connectionFactory" ref="singleConnectionFactory"></property>
54 <!-- 設(shè)置默認(rèn)的目標(biāo)管道 -->
55 <property name="defaultDestinationName" value="pId"/>
56 </bean>
57
58 <!-- 實例化一個監(jiān)聽到消息后 處理此消息的類 -->
59 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
60
61 <!-- 配置實時監(jiān)聽器 -->
62 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
63 <!-- 配置工廠, 需要配置spirng的工廠 -->
64 <property name="connectionFactory" ref="singleConnectionFactory"/>
65 <!-- 設(shè)置監(jiān)聽的目標(biāo) -->
66 <property name="destinationName" value="pId"/>
67 <!-- 監(jiān)聽后獲取消息的類, 接收監(jiān)聽到的消息 -->
68 <property name="messageListener" ref="customMessageListener"></property>
69 <!-- 默認(rèn)是隊列模式, 可改為主題, 發(fā)布模式 publish subject -->
70 <property name="pubSubDomain" value="true"/>
71 </bean>
72 </beans> View Code SearchServiceImpl.java: 保存商品信息到Solr服務(wù)器中, 通過ActiveMQ
1 //保存商品信息到Solr服務(wù)器中, 通過ActiveMQ
2 public void insertProductToSolr(Long productId){
3 //TODO 保存商品信息到Solr服務(wù)器
4 SolrInputDocument doc =
new SolrInputDocument();
5 //ID
6 doc.setField("id"
, productId);
7 //名稱
8 Product p =
productDao.selectByPrimaryKey(productId);
9 doc.setField("name_ik"
, p.getName());
10 //圖片URL
11 doc.setField("url", p.getImgUrls()[0
]);
12 //品牌 ID
13 doc.setField("brandId"
, p.getBrandId());
14 //價格 sql查詢語句: select price from bbs_sku where product_id = ? order by price asc limit 1
15 SkuQuery skuQuery =
new SkuQuery();
16 skuQuery.createCriteria().andProductIdEqualTo(productId);
17 skuQuery.setOrderByClause("price asc"
);
18 skuQuery.setPageNo(1
);
19 skuQuery.setPageSize(1
);
20 List<Sku> skus =
skuDao.selectByExample(skuQuery);
21 doc.setField("price", skus.get(0
).getPrice());
22 //...時間等 剩下的省略
23
24 try {
25 solrServer.add(doc);
26 solrServer.commit();
27 }
catch (Exception e) {
28 // TODO Auto-generated catch block
29 e.printStackTrace();
30 }
31 }
View Code CustomMessageListener.java: 監(jiān)聽ActiveMQ中傳遞過來的消息, 且對傳遞過來的消息進(jìn)行處理:
1 public class CustomMessageListener
implements MessageListener{
2 @Autowired
3 private SearchService searchService;
4
5 @Override
6 public void onMessage(Message message) {
7 //先將接收到的消息強(qiáng)轉(zhuǎn)為ActiveMQ類型的消息
8 //因為在消息發(fā)送方那邊傳遞的是Text類型的消息對象, 所以需要轉(zhuǎn)成ActiveMQTextMessage
9 ActiveMQTextMessage amtm =
(ActiveMQTextMessage)message;
10 try {
11 String id =
amtm.getText();
12 System.out.println("接收到的ID:"+
id);
13 searchService.insertProductToSolr(Long.parseLong(id));
14 }
catch (JMSException e) {
15 // TODO Auto-generated catch block
16 e.printStackTrace();
17 }
18 }
19 }
View Code
===============================
babasport-cms:
mq.xml:
1 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
2 <!-- 連接工廠, 此工廠由Apache提供 -->
3 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
4 <!-- 連接地址
5 在網(wǎng)頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
6 -->
7 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
8 <!-- 設(shè)置用戶名及密碼 -->
9 <property name="userName" value="admin"></property>
10 <property name="password" value="admin"></property>
11 </bean>
12
13 <!-- 配置連接池管理工廠 ,由Apache提供.-->
14 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
15 <!-- 注入工廠 -->
16 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
17 <!-- 設(shè)置最大連接數(shù) -->
18 <property name="maxConnections" value="5"></property>
19 </bean>
20
21 <!-- 把上面的工廠交給Spring管理 -->
22 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
23 <!-- 注入上面的工廠 -->
24 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
25 </bean>
26
27 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
28 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
29 <!-- 注入Spring單例工廠 -->
30 <property name="connectionFactory" ref="singleConnectionFactory"></property>
31 <!-- 設(shè)置默認(rèn)的目標(biāo)管道 -->
32 <property name="defaultDestinationName" value="pId"/>
33 </bean>
34
35 <!-- 實例化一個監(jiān)聽到消息后 處理此消息的類 -->
36 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
37
38 <!-- 配置實時監(jiān)聽器 -->
39 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
40 <!-- 配置工廠, 需要配置spirng的工廠 -->
41 <property name="connectionFactory" ref="singleConnectionFactory"/>
42 <!-- 設(shè)置監(jiān)聽的目標(biāo) -->
43 <property name="destinationName" value="pId"/>
44 <!-- 監(jiān)聽后獲取消息的類, 接收監(jiān)聽到的消息 -->
45 <property name="messageListener" ref="customMessageListener"></property>
46 <!-- 默認(rèn)是隊列模式, 可改為主題, 發(fā)布模式 publish subject -->
47 <property name="pubSubDomain" value="true"/>
48 </bean> View Code CustomMessageListener.java: 監(jiān)聽ActiveMQ中傳遞過來的消息, 且對傳遞過來的消息進(jìn)行處理:
1 public class CustomMessageListener
implements MessageListener{
2 @Autowired
3 private StaticPageService staticPageService;
4 @Autowired
5 private CMSService cmsService;
6
7 @Override
8 public void onMessage(Message message) {
9 //先將接收到的消息強(qiáng)轉(zhuǎn)為ActiveMQ類型的消息
10 //因為在消息發(fā)送方那邊傳遞的是Text類型的消息對象, 所以需要轉(zhuǎn)成ActiveMQTextMessage
11 ActiveMQTextMessage amtm =
(ActiveMQTextMessage)message;
12 try {
13 String id =
amtm.getText();
14 System.out.println("CMS接收到的ID:"+
id);
15 Map<String, Object> root =
new HashMap<String, Object>
();
16
17 Product product =
cmsService.selectProductById(Long.parseLong(id));
18 List<Sku> skus =
cmsService.selectSkuListByProductIdWithStock(Long.parseLong(id));
19 //去掉重復(fù)的顏色
20 Set<Color> colors =
new HashSet<Color>
();
21 for (Sku sku : skus) {
22 colors.add(sku.getColor());
23 }
24 root.put("colors"
, colors);
25 root.put("product"
, product);
26 root.put("skus"
, skus);
27
28 staticPageService.index(root, id);
29 }
catch (JMSException e) {
30 // TODO Auto-generated catch block
31 e.printStackTrace();
32 }
33 }
34 }
View Code StaticPageServiceImpl.java: 靜態(tài)化頁面的核心類:
1 public class StaticPageServiceImpl
implements StaticPageService, ServletContextAware{
2 //SpringMvc 管理 conf
3 private Configuration conf;
4 public void setFreeMarkerConfig(FreeMarkerConfig freeMarkerConfig) {
5 this.conf =
freeMarkerConfig.getConfiguration();
6 }
7
8 //靜態(tài)化頁面的方法
9 public void index(Map<String, Object>
root, String id){
10 //輸出目錄: 通過getPath方法獲取的是絕對路徑
11 String path = getPath("/html/product/" + id +".html"
);
12 File f =
new File(path);
13 File parentFile =
f.getParentFile();
14 if(!
parentFile.exists()){
15 parentFile.mkdirs();
16 }
17
18 //spring中已經(jīng)設(shè)置了模板路徑:<property name="templateLoaderPath" value="/WEB-INF/ftl/" />
19 Writer out =
null;
20
21 try {
22 //讀
23 Template template = conf.getTemplate("product.html"
);
24
25 //設(shè)置輸出的位置
26 //寫
27 out =
new OutputStreamWriter(
new FileOutputStream(f), "UTF-8"
);
28 template.process(root, out);
29 }
catch (Exception e) {
30 // TODO Auto-generated catch block
31 e.printStackTrace();
32 }
finally {
33 if (out !=
null)
34 {
35 try {
36 out.close();
37 }
catch (IOException e) {
38 // TODO Auto-generated catch block
39 e.printStackTrace();
40 }
41 }
42
43 }
44
45 }
46
47 //獲取webapp下的html文件夾所在的位置
48 //將相對路徑轉(zhuǎn)換為絕對路徑
49 public String getPath(String path){
50 return servletContext.getRealPath(path);
51 }
52
53 private ServletContext servletContext;
54 @Override
55 public void setServletContext(ServletContext servletContext) {
56 this.servletContext =
servletContext;
57 }
58 }
View Code Spring管理 freemarkerConfig配置類:
1 <!-- 配置freemarker 實現(xiàn)類 -->
2 <bean class="cn.itcast.core.service.StaticPageServiceImpl">
3 <property name="freeMarkerConfig">
4 <bean class="org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer">
5 <!-- 設(shè)置模板所在目錄或文件夾的位置, 相對路徑 -->
6 <property name="templateLoaderPath" value="/WEB-INF/ftl/" />
7 <!-- 設(shè)置默認(rèn)編碼集 -->
8 <property name="defaultEncoding" value="UTF-8"></property>
9 </bean>
10 </property>
11 </bean> View Code 更多關(guān)于freemarker的講解請關(guān)注我以后的博客...
關(guān)于ActiveMQ的內(nèi)容就更新到這么多.
轉(zhuǎn)載于:https://www.cnblogs.com/wang-meng/p/5831538.html
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ的介绍及使用实例.的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。