當前位置:網站首頁>Apache ActiveMQ實戰(1)-基本安裝配置與消息類型

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型

2022-01-27 04:24:37 程序員社區

ActiveMQ簡介

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖

ActiveMQ是一種開源的,實現了JMS1.1規範的,面向消息(MOM)的中間件,為應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通信。ActiveMQ使用Apache提供的授權,任何人都可以對其實現代碼進行修改。

ActiveMQ的設計目標是提供標准的,面向消息的,能够跨越多語言和多系統的應用集成消息通信中間件。ActiveMQ實現了JMS標准並提供了很多附加的特性。這些附加的特性包括,JMX管理(java Management Extensions,即java管理擴展),主從管理(master/salve,這是集群模式的一種,主要體現在可靠性方面,當主中介(代理)出現故障,那麼從代理會替代主代理的比特置,不至於使消息系統癱瘓)、消息組通信(同一組的消息,僅會提交給一個客戶進行處理)、有序消息管理(確保消息能够按照發送的次序被接受者接收)。

ActiveMQ 支持JMS規範,ActiveMQ完全實現了JMS1.1規範。

JMS規範提供了同步消息和异步消息投遞方式、有且僅有一次投遞語義(指消息的接收者對一條消息必須接收到一次,並且僅有一次)、訂閱消息持久接收等。如果僅使用JMS規範,錶明無論您使用的是哪家廠商的消息代理,都不會影響到您的程序。

ActiveMQ整體架構

ActiveMQ主要涉及到5個方面:

  • 傳輸協議

消息之間的傳遞,無疑需要協議進行溝通,啟動一個ActiveMQ打開了一個監聽端口,
ActiveMQ提供了廣泛的連接模式,其中主要包括SSL、STOMP、XMPP;ActiveMQ默認的使用
的協議是openWire,端口號:61616;

  • 消息域

ActiveMQ主要包含Point-to-Point (點對點),Publish/Subscribe Model (發布/訂閱者),其中在
Publich/Subscribe 模式下又有Nondurable subscription和      
durable subscription (持久
化訂閱)2種消息處理方式

  • 消息存儲

在消息傳遞過程中,部分重要的消息可能需要存儲到數據庫或文件系統中,當中介崩潰時,信息不
回丟失

  • Cluster  (集群)

最常見到 集群方式包括network of brokers和Master Slave;

  • Monitor (監控)

ActiveMQ一般由jmx來進行監控;

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖1

ActiveMQ的安裝配置

  1. 通過http://activemq.apache.org/download.html 下載:apache-activemq-5.13.3-bin.tar.gz 
  2. 把下載的該文件通過tar –zxvf apache-activemq-5.13.3-bin.tar.gz解壓在當前目錄
  3. 通過修改$ACTIVEMQ_HOME/conf/activemq.xml文件可以修改其配置

一般修改的其實也只有以下幾個段:、

 <destinationPolicy>            <policyMap>              <policyEntries>

我們在此段増加配置如下:

<destinationPolicy>            <policyMap>              <policyEntries>                <policyEntry topic=">" >                <pendingMessageLimitStrategy>                    <constantPendingMessageLimitStrategy limit="50000"/>                  </pendingMessageLimitStrategy>               </policyEntry>               <policyEntry queue=">" producerFlowControl="false" optimizedDispatch="true" memoryLimit=“2mb">               </policyEntry>              </policyEntries>            </policyMap></destinationPolicy>

此處,我們使用的是”>”通配符,上述配置為每個隊列、每個Topic配置了一個最大2mb的隊列,並且使用了”optimizedDispatch=true”這個策略,該策略會啟用優化了的消息分發器,直接减少消息來回時的上下文以加快消息分發速度。

找到下面這一段

 <persistenceAdapter>            <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>

為確保擴展配置既可以處理大量連接也可以處理海量消息隊列,我們可以使用JDBC或更新更快的KahaDB消息存儲。默認情况下ActiveMQ使用KahaDB消息存儲。

ActiveMQ支持三種持久化策略:AMQ、KAHADB、JDBC

  • AMQ 它是一種文件存儲形式,具有寫入快、容易恢複的特點,采用這種方式持久化消息會被存儲在一個個文件中,每個文件默認大小為32MB,如果一條消息超過32MB,那麼這個值就需要設大。當一個文件中所有的消息被“消費”掉了,那麼這文件會被置成“删除”標志,並且在下一個清除開始時被删除掉。
  • KAHADB,相比AMQ來說,KAHADB速度沒有AMQ快,可是KAHADB具有極强的垂直和橫向擴展能力,恢複時間比AMQ還要短,因此從5.4版後ActiveMQ默認使用KAHADB作為其持久化存儲。而且在作MQ的集群時使用KAHADB可以做到Cluster+Master Slave的這樣的完美高可用集群方案。
  • JDBC,即ActiveMQ默認可以支持把數據持久化到DB中,如:MYSQL、ORACLE等。
找到下面這一段
<systemUsage>            <systemUsage>                <memoryUsage>                    <memoryUsage percentOfJvmHeap="90" />                </memoryUsage>                <storeUsage>                    <storeUsage limit="100 gb"/>                </storeUsage>                <tempUsage>                    <tempUsage limit="50 gb"/>                </tempUsage>            </systemUsage></systemUsage>

此處為ActiveMQ的內存配置,從5.10版後ActiveMQ在<memoryUsage>中引入了一個percentOfJvmHeap的配置,該百分比為:

$ACTIVEMQ_HOME/bin/env中配置的JVM堆大小的百分比,如$ACTIVEMQ_HOME/bin/env 中:

# Set jvm memory configuration (minimal/maximum amount of memory)ACTIVEMQ_OPTS_MEMORY="-Xms2048M -Xmx2048M"

那麼此處的percentOfJvmHeap=90即錶示:MQ消息隊列一共會用到2048M*0.9的內存。

全部配完後我們可以通過以下命令啟動ActiveMQ
$cd $ACTIVEMQ_HOME$ ./activemq console  

這種方式為前臺啟動activemq,用於開發模式便於調試時的信息輸出。

你也可以使用:

以後臺進程的方式啟動activemq。

啟動後在瀏覽器內輸入http://192.168.0.101:8161/admin/ 輸入管理員帳號(默認為admin/admin)即可登錄activemq的console界面

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖2

  • 啟動後的ActiveMQ的數據比特於:$ACTIVEMQ_HOME/data/目錄內
  • 啟動後的ActiveMQ運行日志比特於:$ACTIVEMQ_HOME/data/目錄內的activemq.log文件
  • 如果需要改ActiveMQ的日志配置可以通過修改$ACTIVEMQ_HOME/conf/log4j.properties

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖3

ActiveMQ與Spring集成

在Spring中建立一個activemq.xml文件,使其內容如下:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">	<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">		<property name="brokerURL" value="tcp://192.168.0.101:61616" />		<property name="useAsyncSend" value="true" />		<property name="alwaysSessionAsync" value="true" />		<property name="useDedicatedTaskRunner" value="true" />	</bean>	<!-- 發送消息的目的地(一個隊列) -->	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">		<!-- 設置消息隊列的名字 -->		<constructor-arg value="ymk.queue?consumer.prefetchSize=100" />	</bean></beans>
其中:

<property name="alwaysSessionAsync" value=“true" />

對於一個connection如果只有一個session,該值有效,否則該值無效,默認這個參數的值為true。

<property name="useAsyncSend" value="true" />

將該值開啟官方說法是可以取得更高的發送速度(5倍)。

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">

<!-- 設置消息隊列的名字 -->

<constructor-arg value="ymk.queue?consumer.prefetchSize=100" />

</bean>

在此我們申明了一個隊列,並用它用於後面的實驗代碼。

consumer.prefetchSize則代錶我們在此使用“消費者”預分配協議,在消費者內在足够時可以使這個值更大以獲得更好的吞吐性能。

工程中的pom.xml文件主要內容如下:
。。。。。。<properties>	<activemq_version>5.13.3</activemq_version></properties>。。。。。。<dependency>	<groupId>org.apache.activemq</groupId>	<artifactId>activemq-all</artifactId>	<version>${activemq_version}</version></dependency><dependency>	<groupId>org.apache.activemq</groupId>	<artifactId>activemq-pool</artifactId>	<version>${activemq_version}</version></dependency>

ActiveMQ與Spring集成-發送端代碼

package webpoc;public class AMQSender {	public static void sendWithAuto(ApplicationContext context) {		ActiveMQConnectionFactory factory = null;		Connection conn = null;		Destination destination = null;		Session session = null;		MessageProducer producer = null;		try {			destination = (Destination) context.getBean("destination");			factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");			conn = factory.createConnection();			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);			producer = session.createProducer(destination);			Message message = session.createTextMessage("...Hello JMS!");			producer.send(message);		} catch (Exception e) {			e.printStackTrace();		} finally {			try {				producer.close();				producer = null;			} catch (Exception e) {			}			try {				session.close();				session = null;			}			} catch (Exception e) {			}			try {				conn.stop();			} catch (Exception e) {			}			try {				conn.close();			} catch (Exception e) {			}		}	}	public static void main(String[] args) {		final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml");		sendWithAuto(context);	}}

ActiveMQ與Spring集成-接收端代碼

package webpoc;public class TranQConsumer extends Thread implements MessageListener {	private Connection conn = null;	private Destination destination = null;	private Session session = null;	public void run() {		receive();	}	public void receive() {		ConnectionFactory factory = null;		Connection conn = null;		try {			final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml");			factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");			conn = factory.createConnection();			conn.start();			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);			destination = (Destination) context.getBean("destination");			MessageConsumer consumer = session.createConsumer(destination);			consumer.setMessageListener(this);		} catch (Exception e) {			e.printStackTrace();		}	}	public void onMessage(Message message) {		try {			TextMessage tm = (TextMessage) message;			System.out.println("TranQConsumer receive message: " + tm.getText());		} catch (Exception e) {			e.printStackTrace();		}	}	public static void main(String[] args) {		TranQConsumer tranConsumer = new TranQConsumer();		tranConsumer.start();	}}

ActiveMQ與Spring集成-示例講解

上述例子非常的簡單。

它其實是啟動了一個Message Listener用來監聽ymk.queue中的消息,如果有消息到達,接收端代碼就會把消息“消費”掉。

而發送端代碼也很簡單,它每次向ymk.queue隊列發送一個文本消息。

這邊所謂的MQ消費大家可以這樣理解:

用戶sender向MQ的KAHADB中插入一條數據。

用戶receiver把這條數據select後,再delete,這個select一下後再delete就是一個“消費”動作。

簡單消息與事務型消息

我們可以注意到上述的例子中我們的代碼中有這樣的一段: 
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

它代錶的是我們的MQ消費端消費模式為“自動”,即一旦消費端從MQ中取到一條消息,這條消息會自動從隊列中删除。

ActiveMQ是一個分布式消息隊列,它自然支持“事務”型消息,我們可以舉一個例子

系統A和系統B是有一個事務的系統間“服務集成”,我們可以把它想成如下場景:

系統A先會do sth…然後發送消息給系統B,系統B拿到消息後do sth,如果在其中任意一個環節發生了Exception,那麼代錶系統A與系統B之間的消息調用這一過程為“失敗”。

失敗要重發,重發的話那原來那條消息必須還能重新拿得到。

此時我們就需要使用事務性的消息了。而事務性的消息是在:

生產端和消費端在創建session時,需要:

session = conn.createSession(
true,
Session.AUTO_ACKNOWLEDGE);

下面來看一個實際例子。

事務型消息發送端(生產端)

此處其它代碼與普通式消息發送代碼相似,只在以下幾處有不同,首先在取得session時會聲明事務開啟“true”。
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

然後在發送時會有一個動作:

producer.send(message);System.out.println("send......" + Thread.currentThread().getId());session.commit();

相應的在catch(Exception)時需要

catch (Exception e) {						e.printStackTrace();	try {		session.rollback();	} catch (Exception ex) {	}} 

事務型消息接收端(消費端)

在我們的接收端的createSession時也需要把它設為“事務開啟”,此時請注意,生產和消費是在一個事務邊界中的。
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

然後在接收時會有一個動作:

try {	TextMessage tm = (TextMessage) message;	System.out.println("TranQConsumer receive message: " + tm.getText());	session.commit();} catch (Exception e) {	e.printStackTrace();	try {		session.rollback();	} catch (Exception ex) {	}}

注意:

  1. 如果在消費端的onMessage中沒有session.commit(),那麼這條消息可以正常被接收,但不會被消費,換句話說客戶端只要不commit這條消息,這條消息可以被客戶端無限消費下去,直到commit(從MQ所persistent的DB中被删除)。
  2. 如果在消費斷遇到任何Exception時session.rollback()了,ActiveMQ會按照默認策略每隔1s會重發一次,重發6次如果還是失敗,則進入ActiveMQ的ActiveMQ.DLQ隊列,重發策略這個值可以設(稍後會給出)。
  3. 如果在生產端的try{}塊裏發生錯誤,導致回滾(沒有commit),會怎麼樣?消費隊列永遠拿不到這條被rollback的消息,因為這條數據還沒被插入KAHADB中呢。
  4. 再如果,消費端拿到了消息不commit也不rollback呢?那消費端重啟後會再次拿到這條消息(因為始終取where status=‘未消費’取不到的原因,對吧?)

事務型消息的重發機制

<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">	<property name="brokerURL" value="tcp://192.168.0.101:61616" />	<property name="useAsyncSend" value="true" />	<property name="alwaysSessionAsync" value="true" />	<property name="useDedicatedTaskRunner" value="true" />	<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /></bean><!-- 發送消息的目的地(一個隊列) --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">	<!-- 設置消息隊列的名字 -->	<constructor-arg value="ymk.queue?consumer.prefetchSize=100" /></bean><amq:redeliveryPolicy id="activeMQRedeliveryPolicy"	destination="#destination" redeliveryDelay="100"maximumRedeliveries="1" />
以上例子申明了對於destination這個隊列的重發機制為間隔100毫秒重發一次。

事務型消息的演示

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖4

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖5

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖6

點對點,應答式消息

所謂點對點應答式消息和事務無關,它主要實現的是如:

生產端:我發給你一個消息了,在你收到並處理後請回複!因為我要根據你的回複內容再做處理

消費端:我收到你的消息了,我處理完了請查收我給你的回複

生產端:收到你的消息,88

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖7

點對點,應答式消息核心代碼-配置部分

<!-- 發送消息的目的地(一個隊列) --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><!-- 設置消息隊列的名字 --><constructor-arg value="ymk.queue?consumer.prefetchSize=100" /></bean><bean id="replyDestination" class="org.apache.activemq.command.ActiveMQQueue"><!-- 設置消息隊列的名字 --><constructor-arg value="ymk.reply.queue" /></bean>

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖8

其實也沒啥花頭,就是多了一個隊列(不要打我)

。。。。。。

關鍵在於代碼,代碼,不要只重視錶面嗎。。。要看內含的LA。。。

這兩個隊列其實:

一個Request

一個應答(也可以使用temp隊列來做應答隊列)

點對點,應答式消息核心代碼-設計部分

我們設立兩個程序:

  • 發送端(生產端)內含一個MessageListener,用來收消費端的返回消息
  • 服務端(消費端)內含一個MessageListener,用來收生產端發過來的消息然後再异步返回

而溝通生產端和消費端的這根“消息鏈”是兩個東西:

  • JMSCorrelationID
  • JMSReplyTo

JMSCorrelationID:

它就是一個隨機不可重複的數字,以String型傳入API,也可以是GUID,它主要是被用來標示MQ
中每一條不同的消息用的一個唯一ID

JMSReplyTo

它就是一個生產端用來接收消費端返回消息的地址

點對點,應答式消息核心代碼-生產端部分代碼

String correlationId = RandomStringUtils.randomNumeric(5);consumer = session.createConsumer(replyDest);message.setJMSReplyTo(replyDest);message.setJMSCorrelationID(correlationId);consumer.setMessageListener(this);
  • RandomStringUtils

import org.apache.commons.lang.RandomStringUtils;

  • replyDest

replyDest = (Destination) context.getBean("replyDestination");

來看比特於客戶端(生產端)的messageListener吧

public void onMessage(Message message) {	TextMessage tm = (TextMessage) message;	try {		System.out.println("Client接收Server端消息:" + tm.getText());	} catch (Exception e) {		e.printStackTrace();	}}
其餘部分代碼(沒啥花頭,就是sender裏帶了一個messageListener):
producer.send(message);

點對點,應答式消息核心代碼-生產端所有代碼

package webpoc.mq.dual;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang.RandomStringUtils;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class Client implements MessageListener {	public void onMessage(Message message) {		TextMessage tm = (TextMessage) message;		try {			System.out.println("Client接收Server端消息:" + tm.getText());		} catch (Exception e) {			e.printStackTrace();		}	}public void start(ApplicationContext context) {		ConnectionFactory factory = null;		Connection conn = null;		Destination destination = null;		Destination replyDest = null;		Session session = null;		MessageProducer producer = null;		MessageConsumer consumer = null;		try {			destination = (Destination) context.getBean("destination");			replyDest = (Destination) context.getBean("replyDestination");			factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory");			conn = factory.createConnection();			conn.start();			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);			producer = session.createProducer(destination);			TextMessage message = session.createTextMessage("...Hello JMS!");			String correlationId = RandomStringUtils.randomNumeric(5);			consumer = session.createConsumer(replyDest);			message.setJMSReplyTo(replyDest);			message.setJMSCorrelationID(correlationId);			consumer.setMessageListener(this);		} catch (Exception e) {			String errorMessage = "JMSException while queueing HTTP JMS Message";			e.printStackTrace();		}}	public void send(ApplicationContext context) {		ConnectionFactory factory = null;		Connection conn = null;		Destination destination = null;		Destination replyDest = null;		Session session = null;		MessageProducer producer = null;		try {			destination = (Destination) context.getBean("destination");			replyDest = (Destination) context.getBean("replyDestination");			factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory");			conn = factory.createConnection();			conn.start();			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);			producer = session.createProducer(destination);			TextMessage message = session.createTextMessage("...Hello JMS!");			String correlationId = RandomStringUtils.randomNumeric(5);			message.setJMSReplyTo(replyDest);			message.setJMSCorrelationID(correlationId);			producer.send(message);			System.out.println("send 1 message");		} catch (Exception e) {			String errorMessage = "JMSException while queueing HTTP JMS Message";			e.printStackTrace();		}	}	public static void main(String[] args) {		final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml");		//sendWithAuto(context);		Client c = new Client();		c.start(context);		c.send(context);}

點對點,應答式消息核心代碼-消費端部分代碼

public void onMessage(Message message) {		System.out.println("on message");		try {			TextMessage response = this.session.createTextMessage();			if (message instanceof TextMessage) {				TextMessage txtMsg = (TextMessage) message;				String messageText = txtMsg.getText();				response.setText("服務器收到消息:" + messageText);				System.out.println(response.getText());			}			response.setJMSCorrelationID(message.getJMSCorrelationID());			producer.send(message.getJMSReplyTo(), response);		} catch (Exception e) {			e.printStackTrace();		}	}}

  1. 此處的send()方法內有兩個參數,注意其用法
  2. 然後為這個消費端也加一個messageListener如:
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);producer = session.createProducer(replyDest);consumer = session.createConsumer(destination);consumer.setMessageListener(this);

點對點,應答式消息核心代碼-全部代碼

package webpoc.mq.dual;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang.RandomStringUtils;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class Server implements MessageListener {	private ConnectionFactory factory = null;	private Connection conn = null;	private Destination destination = null;	Destination replyDest = null;	private Session session = null;	private MessageProducer producer = null;	private MessageConsumer consumer = null;	@Override	public void onMessage(Message message) {		System.out.println("on message");		try {			// 若有消息傳送到服務時,先創建一個文本消息  			TextMessage response = this.session.createTextMessage();			// 若從客戶端傳送到服務端的消息為文本消息  			if (message instanceof TextMessage) {				// 先將傳送到服務端的消息轉化為文本消息  				TextMessage txtMsg = (TextMessage) message;				// 取得文本消息的內容  				String messageText = txtMsg.getText();				// 將客戶端傳送過來的文本消息進行處理後,設置到回應消息裏面  				response.setText("服務器收到消息:" + messageText);				System.out.println(response.getText());			}			// 設置回應消息的關聯ID,關聯ID來自於客戶端傳送過來的關聯ID  			response.setJMSCorrelationID(message.getJMSCorrelationID());			System.out.println("replyto===" + message.getJMSReplyTo());			// 生產者發送回應消息,目的由客戶端的JMSReplyTo定義,內容即剛剛定義的回應消息  			producer.send(message.getJMSReplyTo(), response);		} catch (Exception e) {			e.printStackTrace();		}	}public void receive(ApplicationContext context) {		try {			destination = (Destination) context.getBean("destination");			replyDest = (Destination) context.getBean("replyDestination");			factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory");			conn = factory.createConnection();			conn.start();			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);			producer = session.createProducer(replyDest);			consumer = session.createConsumer(destination);			consumer.setMessageListener(this);		} catch (Exception e) {			String errorMessage = "JMSException while queueing HTTP JMS Message";			e.printStackTrace();		}	}	public static void main(String[] args) {		final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml");		Server s = new Server();		s.receive(context);	}}

點對點,應答式消息核心代碼-演示

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型插圖9

版權聲明
本文為[程序員社區]所創,轉載請帶上原文鏈接,感謝
https://cht.chowdera.com/2022/01/202201270424366332.html

隨機推薦