Spring AOP监控0229.docx
《Spring AOP监控0229.docx》由会员分享,可在线阅读,更多相关《Spring AOP监控0229.docx(35页珍藏版)》请在冰点文库上搜索。
SpringAOP监控0229
深入浅出ActiveMQ
1.案例简述
1.1概述与介绍
ActiveMQ是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器,ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现,提供客户端支持跨语言和协议,带有易于在充分支持JMS1.1和1.4使用J2EE企业集成模式和许多先进的功能。
1.2 特性
1、多种语言和协议编写客户端。
语言:
Java、C、C++、C#、Ruby、Perl、Python、PHP。
应用协议:
OpenWire、StompREST、WSNotification、XMPP、AMQP
2、完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)
3、对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4、通过了常见J2EE服务器(如Geronimo、JBoss4、GlassFish、WebLogic)的测试,其中通过JCA1.5resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4商业服务器上
5、支持多种传送协议:
in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
6、支持通过JDBC和journal提供高速的消息持久化
7、从设计上保证了高性能的集群,客户端-服务器,点对点
8、支持Ajax
9、支持与Axis的整合
10、可以很容易得调用内嵌JMSprovider,进行测试
1.3案例背景
在开发过程中,我们经常会遇到消息同步不及时、消息延迟等问题,ActiveMQ的机制可以很好的解决消息同步和消息延时问题,本案例详细介绍了ActiveMQ的环境搭建、消息示例、代码示例等操作过程。
2.案例解决过程
2.1环境搭建
开发环境:
System:
Windows
JDK:
1.6+
IDE:
eclipse
apacheActiveMQ5.8
1、下载ActiveMQ,下载地址:
http:
//www.apache.org/dyn/closer.cgi?
path=/activemq/apache-activemq/5.8.0/apache-activemq-5.8.0-bin.zip
2、解压apache-activemq-5.8.0.zip即可完成ActiveMQ的安装
3、解压后目录结构如下
+bin(windows下面的bat和unix/linux下面的sh)启动ActiveMQ的启动服务就在这里
+conf(activeMQ配置目录,包含最基本的activeMQ配置文件)
+data(默认是空的)
+docs(index,replease版本里面没有文档)
+example(几个例子)
+lib(activeMQ使用到的lib)
+webapps(系统管理员控制台代码)
+webapps-demo(系统示例代码)
-activemq-all-5.8.0.jar(ActiveMQ的binary)
-user-guide.html(部署指引)
-LICENSE.txt
-NOTICE.txt
-README.txt
其他文件就不详细介绍了,搞Java的应该都知道干什么用的。
你可以进入bin目录,使用activemq.bat双击启动(windows用户可以选择系统位数,如果你是linux的话,就用命令行的发送去启动),如果一切顺利,你就会看见类似下面的信息:
如果你看到这个,那么恭喜你成功了。
如果你启动看到了异常信息:
Causedby:
java.io.IOException:
Failedtobindtoserversocket:
tcp:
//0.0.0.0:
61616?
maximumConnections=1000&wireformat.maxFrameSize=104857600dueto:
.SocketException:
UnrecognizedWindowsSocketserror:
0:
JVM_Bind
那么我告诉你,很不幸,你的端口被占用了。
接下来你大概想知道是哪个程序占用了你的端口,并kill掉该进程或服务。
或者你要尝试修改ActiveMQ的默认端口61616(ActiveMQ使用的默认端口是61616),在大多数情况下,占用61616端口的是InternetConnectionSharing(ICS)这个Windows服务,你只需停止它就可以启动ActiveMQ了。
4、启动成功就可以访问管理员界面:
http:
//localhost:
8161/admin,默认用户名和密码admin/admin。
如果你想修改用户名和密码的话,在conf/jetty-realm.properties中修改即可。
其中在导航菜单中,Queues是队列方式消息。
Topics是主题方式消息。
Subscribers消息订阅监控查询。
Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接。
Network是网络链接数监控。
Send可以发送消息数据。
5、运行demo示例,在dos控制台输入activemq.batxbean:
../conf/activemq-demo.xml 即可启动demo示例。
官方提供的user-guide.html中的accessthewebconsole中是提示输入:
activemq.batconsolexbean:
conf/activemq-demo.xml,我用这种方式不成功。
当然你还可以用绝对的文件目录方式:
activemq.batxbean:
file:
D:
/mq/conf/activemq-demo.xml
如果提示conf/activemq-demo.xml没有找到,你可以尝试改变下路径,也就是去掉上面的“..”。
通过http:
//localhost:
8161/demo/ 就可以访问示例了。
2.2 消息示例
2.2.1ActiviteMQ消息有3中形式
JMS 公共
点对点域
发布/订阅域
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
Queue
Topic
Session
QueueSession
TopicSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver
TopicSubscriber
(1)、点对点方式(point-to-point)
点对点的消息发送方式主要建立在MessageQueue,Sender,reciever上,MessageQueue存贮消息,Sneder发送消息,receive接收消息.具体点就是SenderClient发送MessageQueue,而receiverCliernt从Queue中接收消息和"发送消息已接受"到Queue,确认消息接收。
消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行
(2)、发布/订阅方式(publish/subscriberMessaging)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。
一个接收端只能接收他创建以后发送客户端发送的信息。
作为subscriber,在接收消息时有两种方法,destination的receive方法,和实现messagelistener接口的onMessage方法。
2.2.2ActiviteMQ接收和发送消息基本流程
发送消息的基本步骤:
(1)、创建连接使用的工厂类JMSConnectionFactory
(2)、使用管理对象JMSConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection建立会话Session
(4)、使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)、使用消息生产者MessageSender发送消息
消息接收者从JMS接受消息的步骤
(1)、创建连接使用的工厂类JMSConnectionFactory
(2)、使用管理对象JMSConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection建立会话Session
(4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。
2.3 代码示例
在代码开始,我们先建一个project,在这个project中添加如下jar包
添加完jar包后就可以开始实际的代码工作了。
2.3.1使用JMS方式发送接收消息
消息发送者
packagecom.hoo.mq.jms;
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.DeliveryMode;
importjavax.jms.Destination;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
/**
*function:
消息发送者
*@authorliuyl
*@createDate2013-6-19上午11:
26:
43
*@fileMessageSender.java
*@packagecom.hoo.mq.jms
*@projectActiveMQ-5.8
*@version1.0
*/
publicclassMessageSender{
//发送次数
publicstaticfinalintSEND_NUM=5;
//tcp地址
publicstaticfinalStringBROKER_URL="tcp:
//localhost:
61616";
//目标,在ActiveMQ管理员控制台创建http:
//localhost:
8161/admin/queues.jsp
publicstaticfinalStringDESTINATION="hoo.mq.queue";
/**
*function:
发送消息
*@authorliuyl
*@createDate2013-6-19下午12:
05:
42
*@paramsession
*@paramproducer
*@throwsException
*/
publicstaticvoidsendMessage(Sessionsession,MessageProducerproducer)throwsException{
for(inti=0;iStringmessage="发送消息第"+(i+1)+"条";
TextMessagetext=session.createTextMessage(message);
System.out.println(message);
producer.send(text);
}
}
publicstaticvoidrun()throwsException{
Connectionconnection=null;
Sessionsession=null;
try{
//创建链接工厂
ConnectionFactoryfactory=newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,BROKER_URL);
//通过工厂创建一个连接
connection=factory.createConnection();
//启动连接
connection.start();
//创建一个session会话
session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//创建一个消息队列
Destinationdestination=session.createQueue(DESTINATION);
//创建消息制作者
MessageProducerproducer=session.createProducer(destination);
//设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session,producer);
//提交会话
mit();
}catch(Exceptione){
throwe;
}finally{
//关闭释放资源
if(session!
=null){
session.close();
}
if(connection!
=null){
connection.close();
}
}
}
publicstaticvoidmain(String[]args)throwsException{
MessageSender.run();
}
}
接受者
packagecom.hoo.mq.jms;
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.Message;
importjavax.jms.MessageConsumer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
/**
*function:
消息接收者
*@authorliuyl
*@createDate2013-6-19下午01:
34:
27
*@fileMessageReceiver.java
*@packagecom.hoo.mq.jms
*@projectActiveMQ-5.8
*@version1.0
*/
publicclassMessageReceiver{
//tcp地址
publicstaticfinalStringBROKER_URL="tcp:
//localhost:
61616";
//目标,在ActiveMQ管理员控制台创建http:
//localhost:
8161/admin/queues.jsp
publicstaticfinalStringDESTINATION="hoo.mq.queue";
publicstaticvoidrun()throwsException{
Connectionconnection=null;
Sessionsession=null;
try{
//创建链接工厂
ConnectionFactoryfactory=newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,BROKER_URL);
//通过工厂创建一个连接
connection=factory.createConnection();
//启动连接
connection.start();
//创建一个session会话
session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//创建一个消息队列
Destinationdestination=session.createQueue(DESTINATION);
//创建消息制作者
MessageConsumerconsumer=session.createConsumer(destination);
while(true){
//接收数据的时间(等待)100ms
Messagemessage=consumer.receive(1000*100);
TextMessagetext=(TextMessage)message;
if(text!
=null){
System.out.println("接收:
"+text.getText());
}else{
break;
}
}
//提交会话
mit();
}catch(Exceptione){
throwe;
}finally{
//关闭释放资源
if(session!
=null){
session.close();
}
if(connection!
=null){
connection.close();
}
}
}
publicstaticvoidmain(String[]args)throwsException{
MessageReceiver.run();
}
}
2.3.2Queue队列方式发送点对点消息数据
发送方
packagecom.hoo.mq.queue;
importjavax.jms.DeliveryMode;
importjavax.jms.MapMessage;
importjavax.jms.Queue;
importjavax.jms.QueueConnection;
importjavax.jms.QueueConnectionFactory;
importjavax.jms.QueueSession;
importjavax.jms.Session;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
/**
*function:
Queue方式消息发送者
*@authorliuyl
*@createDate2013-6-19下午04:
34:
36
*@fileQueueSender.java
*@packagecom.hoo.mq.queue
*@projectActiveMQ-5.8
*@version1.0
*/
publicclassQueueSender{
//发送次数
publicstaticfinalintSEND_NUM=5;
//tcp地址
publicstaticfinalStringBROKER_URL="tcp:
//localhost:
61616";
//目标,在ActiveMQ管理员控制台创建http:
//localhost:
8161/admin/queues.jsp
publicstaticfinalStringDESTINATION="hoo.mq.queue";
/**
*function:
发送消息
*@authorliuyl
*@createDate2013-6-19下午12:
05:
42
*@paramsession
*@paramsender
*@throwsException
*/
publicstaticvoidsendMessage(QueueSessionsession,javax.jms.QueueSendersender)throwsException{
for(inti=0;iStringmessage="发送消息第"+(i+1)+"条";
MapMessagemap=session.createMapMessage();
map.setString("text",message);
map.setLong("time",System.currentTimeMillis());
System.out.println(map);
sender.send(map);
}
}
publicstaticvoidrun()throwsException{
QueueConnectionconnection=null;
QueueSessionsession=null;
try{
//创建链接工厂
QueueConnectionFactoryfactory=