SpringMVC消費RabbitMQ佇列

語言: CN / TW / HK

1.環境介紹

  • Jdk1.8.0_202
  • Spring 4.3.9.RELEASE
  • spring-webmvc 4.3.9.RELEASE

2.新增Maven依賴

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.6.0</version>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-amqp</artifactId>
   <version>2.0.12.RELEASE</version>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit</artifactId>
   <version>2.0.12.RELEASE</version>
</dependency>
<dependency>
   <groupId>org.springframework.retry</groupId>
   <artifactId>spring-retry</artifactId>
   <version>1.2.4.RELEASE</version>
</dependency>

3.新增RabbitMQ配置

在jdbc.properties檔案中加入RabbitMQ地址埠使用者密碼等資訊
#RabbitMQ地址
mq_host=127.0.0.1
#RabbitMQ埠
mq_port=5672
#RabbitMQ虛擬主機
mq_virtual_host=/
#RabbitMQ使用者名稱
mq_username=admin
#RabbitMQ密碼
mq_password=your rabbitmq password

4.建立RabbitMQ配置檔案

新建 applicationContext-rabbitmq.xml

內容如下
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/rabbit
 http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd" >
    <description>RabbitMQ 連線服務配置</description>
    <!-- 連線配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${mq_host}" username="${mq_username}" password="${mq_password}" port="${mq_port}" virtual-host="${mq_virtual_host}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- rabbit template宣告-->
    <rabbit:template exchange="capture_exchanges" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
    <!-- 訊息物件json轉換類 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
    <!-- 申明訊息佇列Queue -->
    <!--
     durable:是否持久化
     exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除
     auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列
     ignore-declaration-exceptions : [非常重要] 當前佇列已經建立,則忽略;消費已有佇列時需增加此配置,否則啟動報錯
     -->
    <!--<rabbit:queue id="mail" name="mail" durable="true" auto-delete="false" exclusive="false" />-->
    <rabbit:queue id="capture-000001" name="capture-000001" durable="true" ignore-declaration-exceptions="true" auto-delete="false" exclusive="false" />
    <!-- 交換機定義 -->
    <!--
     rabbit:direct-exchange:定義exchange模式為direct,意思就是訊息與一個特定的路由鍵完全匹配,才會轉發。
     rabbit:binding:設定訊息queue匹配的key
     -->
    <rabbit:direct-exchange name="capture_exchanges" durable="true" auto-delete="false"  ignore-declaration-exceptions="true" id="capture_exchanges">
        <rabbit:bindings>
            <!--<rabbit:binding queue="mail" key="mail"/>-->
            <rabbit:binding queue="capture-000001" key="000001"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 配置監聽
       acknowledeg = "manual" 設定手動應答 當訊息處理失敗時:會一直重發 直到訊息處理成功,需要自己手動提交;否則不會繼續消費;Auto自動應答,則會繼續往下消費
       prefetch ="1" 設定訊息消費數量,每次從佇列中取1條訊息;多個監聽器公平分發,同一時刻僅有一條訊息處理
       concurrency = "1" 設定每個listener併發的消費者個數
    -->
    <rabbit:listener-container connection-factory="connectionFactory" transaction-size="1" prefetch="1" concurrency="1" acknowledge="auto" message-converter="jsonMessageConverter" monitor-interval="5000">
        <!-- 配置監聽器
          queues:監聽的佇列,多個的話用逗號(,)分隔
          ref:監聽器,需實現監聽器介面,@Component加入Spring管理
        -->
        <!--<rabbit:listener queues="mail" ref="mailListener"/>-->
        <rabbit:listener queues="capture-000001" ref="plateNumRabbitConsumer" />
    </rabbit:listener-container>

</beans>

5.建立監聽器消費者

建立與ref="plateNumRabbitConsumer"對應的消費者類,必須添加註解@Component加入Spring管理,例項名可自定義,但必須與ref關聯的名字一致;消費者類必須實現MessageListener,監聽佇列的變化,實現實時消費;

遇到的問題

  • 同一個專案中若同時存在生產者和消費者,applicationContext-rabbitmq.xml中宣告的工廠,路由,佇列都無特別注意的地方;生產者需要使用到rabbit:template,消費者不需要此配置即可行
  • 若只需消費佇列,在宣告佇列時,若rabbitmq服務中已建立過同名佇列,則必須配置ignore-declaration-exceptions="true",否則專案啟動時將報錯
  • 監聽容器的配置,需注意prefetch="1" 和 acknowledge="auto",prefetch表示單次從佇列中消費的數量,prefetch=”1”,說明單次消費1條訊息,若容器中配置多個監聽器消費,則會公平分發;acknowledge="auto" 表示自動應答,消費完成後回覆確認,改訊息將從rabbitmq服務中刪除,並且會自動消費下一條訊息,acknowledge=" manual"表示手動應答 當訊息處理失敗時:會一直重發 直到訊息處理成功,需要自己手動提交;否則不會繼續消費;手動應答方式需要配置template的confirm-callback回撥方法;