tcp长链接Spring ActiveMQ配置Word文档下载推荐.docx
《tcp长链接Spring ActiveMQ配置Word文档下载推荐.docx》由会员分享,可在线阅读,更多相关《tcp长链接Spring ActiveMQ配置Word文档下载推荐.docx(11页珍藏版)》请在冰点文库上搜索。
![tcp长链接Spring ActiveMQ配置Word文档下载推荐.docx](https://file1.bingdoc.com/fileroot1/2023-5/1/f29db3de-29a5-4bdb-b45c-88e66bc235ae/f29db3de-29a5-4bdb-b45c-88e66bc235ae1.gif)
amq="
//activemq.apache.org/schema/core"
xsi="
//www.w3.org/2001/XMLSchema-instance"
xsi:
schemaLocation="
//www.springframework.org/schema/beanshttp:
//www.springframework.org/schema/beans/spring-beans-2.5.xsd"
bean
class="
org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
&
propertyname="
locations"
&
list&
&
value&
classpath:
com/demo/broker.properties&
/value&
/list&
/property&
/bean&
beanid="
tcpConnector"
class="
org.apache.activemq.broker.TransportConnector"
uri"
value="
${broker.tcp.local.url}"
tcpQueue"
mand.ActiveMQQueue"
physicalName"
${broker.tcp.local.queue}"
kahaPersistenceAdapter"
org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter"
persistentIndex"
true"
maxDataFileLength"
1048576"
kahaDBPersistenceAdapter"
org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"
checkForCorruptJournalFiles"
checkpointInterval"
5000"
checksumJournalFiles"
cleanupInterval"
30000"
directory"
activemq-data"
enableIndexWriteAsync"
enableJournalDiskSyncs"
ignoreMissingJournalfiles"
false"
indexCacheSize"
100"
journalMaxFileLength"
journalMaxWriteBatchSize"
1000"
brokerService"
org.apache.activemq.broker.BrokerService"
init-method="
start"
destroy-method="
stop"
scope="
singleton"
brokerName"
${broker.tcp.local.name}"
useJmx"
persistenceAdapter"
ref="
transportConnectors"
reflocal="
/&
destinations"
set&
/set&
plugins"
loggingBrokerPlugin"
destinationDotFilePlugin"
statisticsBrokerPlugin"
org.apache.activemq.broker.util.LoggingBrokerPlugin"
logAll"
org.apache.activemq.broker.view.DestinationDotFilePlugin"
file"
ActiveMQDestinations.dot.txt"
org.apache.activemq.plugin.StatisticsBrokerPlugin"
/beans&
二、消息生产者配置1、beans-tcp-producer.xml文件内容:
connectionFactory"
org.apache.activemq.ActiveMQConnectionFactory"
brokerURL"
!
--采用TCP长连接方式,避免每次建立短连接需要的额外工作时间--&
pooledConnectionFactory"
org.apache.activemq.pool.PooledConnectionFactory"
constructor-argref="
/constructor-arg&
jmsTemplate"
org.springframework.jms.core.JmsTemplate"
beanclass="
org.springframework.jms.connection.SingleConnectionFactory"
targetConnectionFactory"
messageConverter"
sessionTransacted"
--消息转换--&
com.demo.client.MyMessageConverter"
--消息生产--&
messageProducer"
com.demo.client.MyMessageProducer"
template"
destination"
2、消息转换类:
publicclassMyMessageConverterimplementsMessageConverter{
privatestaticLoggerlog=Logger.getLogger(MyMessageConverter.class);
@SuppressWarnings("
unchecked"
)
publicObjectfromMessage(Messagemsg)throwsJMSException,
MessageConversionException{
if(msginstanceofObjectMessage){
HashMap&
String,byte[]&
map=(HashMap&
)((ObjectMessage)msg)
.getObjectProperty("
Map"
);
try{
ByteArrayInputStreambis=newByteArrayInputStream(map
.get("
MSG_ID"
));
ObjectInputStreamois=newObjectInputStream(bis);
Objecto=ois.readObject();
ois.close();
bis.close();
returno;
}catch(IOExceptione){
log.error("
failedtoreadobjectmessage:
"
+e.getMessage());
}catch(ClassNotFoundExceptione){
}
}else{
thrownewJMSException("
Message:
["
+msg+"
]isnotaMap!
"
}
returnnull;
}publicMessagetoMessage(Objectobj,Sessionsession)throwsJMSException,
if(objinstanceofMyMessage){
ActiveMQObjectMessageo=(ActiveMQObjectMessage)session
.createObjectMessage();
Map&
map=newHashMap&
();
ByteArrayOutputStreambos=newByteArrayOutputStream();
ObjectOutputStreamoos=newObjectOutputStream(bos);
oos.writeObject(obj);
map.put("
bos.toByteArray());
oos.close();
bos.close();
failedtowriteobjectmessage:
o.setObjectProperty("
map);
returno;
Object:
+obj+"
]isnotaMessage!
}}
3、消息生产者类:
publicclassMyMessageProducer{
privatestaticLoggerlog=Logger.getLogger(MyMessageProducer.class);
privateJmsTemplatetemplate;
privateQueuedestination;
publicvoidsetTemplate(JmsTemplatetemplate){
this.template=template;
}publicvoidsetDestination(Queuedestination){
this.destination=destination;
}publicvoidsend(GrccMessagemessage){
this.template.convertAndSend(this.destination,message);
log.info("
生产消息==&
\n"
+message);
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
}}三、消息消费者配置:
1、beans-tcp-consumer.xml文件内容:
/bean