ActiveMQ 学习笔记.docx

上传人:b****4 文档编号:5047320 上传时间:2023-05-07 格式:DOCX 页数:23 大小:190.55KB
下载 相关 举报
ActiveMQ 学习笔记.docx_第1页
第1页 / 共23页
ActiveMQ 学习笔记.docx_第2页
第2页 / 共23页
ActiveMQ 学习笔记.docx_第3页
第3页 / 共23页
ActiveMQ 学习笔记.docx_第4页
第4页 / 共23页
ActiveMQ 学习笔记.docx_第5页
第5页 / 共23页
ActiveMQ 学习笔记.docx_第6页
第6页 / 共23页
ActiveMQ 学习笔记.docx_第7页
第7页 / 共23页
ActiveMQ 学习笔记.docx_第8页
第8页 / 共23页
ActiveMQ 学习笔记.docx_第9页
第9页 / 共23页
ActiveMQ 学习笔记.docx_第10页
第10页 / 共23页
ActiveMQ 学习笔记.docx_第11页
第11页 / 共23页
ActiveMQ 学习笔记.docx_第12页
第12页 / 共23页
ActiveMQ 学习笔记.docx_第13页
第13页 / 共23页
ActiveMQ 学习笔记.docx_第14页
第14页 / 共23页
ActiveMQ 学习笔记.docx_第15页
第15页 / 共23页
ActiveMQ 学习笔记.docx_第16页
第16页 / 共23页
ActiveMQ 学习笔记.docx_第17页
第17页 / 共23页
ActiveMQ 学习笔记.docx_第18页
第18页 / 共23页
ActiveMQ 学习笔记.docx_第19页
第19页 / 共23页
ActiveMQ 学习笔记.docx_第20页
第20页 / 共23页
亲,该文档总共23页,到这儿已超出免费预览范围,如果喜欢就下载吧!
下载资源
资源描述

ActiveMQ 学习笔记.docx

《ActiveMQ 学习笔记.docx》由会员分享,可在线阅读,更多相关《ActiveMQ 学习笔记.docx(23页珍藏版)》请在冰点文库上搜索。

ActiveMQ 学习笔记.docx

ActiveMQ学习笔记

ActiveMQ学习笔记

ActiveMQ管理

管理控制台webconsole

访问地址:

http:

//localhost:

8161/admin

队列管理

说明:

NumberofPendingMessage:

挂起的消息数量

NumberofConsumers:

消费者的数量

MessageEnqueued:

进栈的消息数量

MessageDequeued:

出栈消息的数量

主题管理

说明:

NumberofConsumers:

消费者的数量

MessageEnqueued:

进栈的消息数量

MessageDequeued:

出栈消息的数量

ActiveSubscribers:

活动的主题订阅者

ActiveProducers:

活动的主题发布者

备注:

JMX与JMS的区别

JMX:

JMX--JavaManagementExtensions,即Java管理扩展,是一个为应用程序、设备、系统等植入管理功能的框架。

JMX可以跨越一系列异构操作系统平台、系统体系结构和网络传输协议,灵活的开发无缝集成的系统、网络和服务管理应用。

JMX体系结构分为以下四个层次:

1)设备层(InstrumentationLevel):

主要定义了信息模型。

在JMX中,各种管理对象以管理构件的形式存在,需要管理时,向MBean服务器进行注册。

该层还定义了通知机制以及一些辅助元数据类。

2)代理层(AgentLevel):

主要定义了各种服务以及通信模型。

该层的核心是一个MBean服务器,所有的管理构件都需要向它注册,才能被管理。

注册在MBean服务器上管理构件并不直接和远程应用程序进行通信,它们通过协议适配器和连接器进行通信。

而协议适配器和连接器也以管理构件的形式向MBean服务器注册才能提供相应的服务。

3)分布服务层(DistributedServiceLevel):

主要定义了能对代理层进行操作的管理接口和构件,这样管理者就可以操作代理。

然而,当前的JMX规范并没有给出这一层的具体规范。

4)附加管理协议API:

定义的API主要用来支持当前已经存在的网络管理协议,如SNMP、TMN、CIM/WBEM等。

JMS:

JMS(JavaMessageService)是访问企业消息系统的标准API,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

JMS是用于和面向消息的中间件相互通信的应用程序接口。

它既支持点对点(point-to-point)的域,又支持发布/订阅(publish/subscribe)类型的域,并且提供对下列类型的支持:

经认可的消息传递,事务型消息的传递,一致性消息和具有持久性的订阅者支持。

JMS还提供了另一种方式来对您的应用与旧的后台系统相集成。

区别:

从以上的两个概念很容易的就能看出两者区别很大,JMS用于发送与接收消息,JMX用于为应用程序、设备、系统等植入管理功能。

安装包介绍

配置文件详解

在conf目录下有个activemq.xml文件,为ActiveMQ的主要配置文件

详解如下:

数据持久化配置

(1)通过JDBC持久化到数据库

(2)Kaha持久化(默认文件存储的方式)

ActiveMQ特性

EIP企业集成模式

EIP(EnterpriseIntegrationPatterns),在5.0版本起,ActiveMQ将会对EIP进行全面的支持。

主要包括消息的路由router、消息的格式转化transfrom等

JMS到JMS的桥接

ActvieMQ提供了Camel路由的方式和BridgeAPI两种方式,执行JMS到JMS的桥接工作。

集群特性

队列消费者集群

是指同一个队列存在多个消费者,他们在消费同一个类型消息的是一种竞争关系,俗称竞争消费者模式。

这个不同于负载均衡处理,这个比那个更好,负载均衡中,系统要监控每一个消费者的情况,竞争模式就不需要了,停止运转的消费者就不用去竞争了。

这是ActiveMQ出来问题一种模式,不需要配置。

BrokerCluster

大部分普通的构思的集群模型在JMS中应该是这样的,JMSbroker集合以及一个将要连接这些broker之一的JMS客户端。

在其中一个broker宕机的时候,客户端可以自动连接另一个broker。

我们实现是在JMS客户端使用failover:

//协议。

failover:

(tcp:

//primary:

61616,tcp:

//secondary:

61616)?

randomize=false

OptionName

DefaultValue

Description

initialReconnectDelay

10

Howlongtowaitbeforethefirstreconnectattempt(inms)

maxReconnectDelay

30000

Themaximumamountoftimeweeverwaitbetweenreconnectattempts(inms)

useExponentialBackOff

true

Shouldanexponentialbackoffbeusedbtweenreconnectattempts

reconnectDelayExponent

2.0

Theexponentusedintheexponentialbackoffattempts

maxReconnectAttempts

-1|0

Fromversion5.6onwards:

-1isdefaultandmeansretryforever,0meansdon'tretry(onlytryconnectiononcebutnoretry). 

Priortoversion5.6:

0isdefaultandmeansretryforever. 

Allversions:

Ifsetto>0,thenthisisthemaximumnumberofreconnectattemptsbeforeanerrorissentbacktotheclient.

startupMaxReconnectAttempts

-1

Ifnot0,thenthisisthemaximumnumberofreconnectattemptsbeforeanerrorissentbacktotheclientonthefirstattemptbytheclienttostartaconnection,onceconnectedthemaxReconnectAttempts optiontakesprecedence. Avalueof-1signifiesthatthetransportwillhavenolimittothenumberofinitialconnectionattempts.

randomize

true

usearandomalgorithmtochoosethetheURItouseforreconnectfromthelistprovided

backup

false

initializeandholdasecondtransportconnection-toenablefastfailover

timeout

-1

Enablestimeoutonsendoperations(inmiliseconds)withoutinterruptionofreconnectionprocess

trackMessages

false

keepacacheofin-flightmessagesthatwillflushedtoabrokeronreconnect

maxCacheSize

131072

sizeinbytesforthecache,iftrackMessagesisenabled

updateURIsSupported

true

DetermineswhethertheclientshouldacceptupdatestoitslistofknownURIsfromtheconnectedbroker. AddedinActiveMQ5.4

updateURIsURL

null

AURL(orpathtoalocalfile)toatextfilecontainingacommaseparatedlistofURIstouseforreconnectinthecaseoffailure. AddedinActiveMQ5.4

nested.*

null

ExtraoptionstoaddtothenestedURLs. AddedinActiveMQ5.9

warnAfterReconnectAttempts

10

AftereveryNreconnectattemptslogawarningtoindicatethereisnoconnectionbutthatwearestilltrying,setto<=0todisable. AddedinActiveMQ5.10

reconnectSupported

true

DetermineswhethertheclientshouldrespondtobrokerConnectionControleventswithareconnect(see:

 rebalanceClusterClients)

消息咨询服务(AdvisoryMessage)

我个人认为是一种消息队里的监听服务。

开发人员可以通过此API了解消息队列的中运行情况:

●正在创建和停止的消费者、生产者以及连接信息

●正在被创建或者销毁的临时目的地

●在主题或者队列中到期的消息

●Brokers正在给没有消费者的目的地发送消息

●正在创建和停止的连接信息

ActiveMQ协议

OpenWire协议

他是多语言线性协议,允许来自不同语言和平台请求到ActiveMQ的本地入口

REST协议

ActiveMQ实现了RESTfulAPI,允许任何可以发布和消费web服务消息的设备使用规则的POST或者GET消息。

STOMP协议

ActiveMQ支持Stomp协议和stomp与JMS的映射。

STOMP是一个简单原生的文本消息协议。

AMQP协议

AMQP,即AdvancedMessageQueuingProtocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制

XMPP协议

该协议已经过时

TCP协议

基于Javasocket的tcp通信协议

NIO协议

也是基于socket的消息通信协议,底层还是TCP,主要的区别在于处理方式不同,使用了选择器selector。

生产者和消费者那点事

定义队列生产者(Java)

packageorg.hengtian.shch;

importjavax.jms.Connection;

importjavax.jms.ConnectionFactory;

importjavax.jms.DeliveryMode;

importjavax.jms.Destination;

importjavax.jms.JMSException;

importjavax.jms.MessageProducer;

importjavax.jms.Session;

importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnectionFactory;

 

publicclassJMSProducer{

publicstaticvoidmain(String[]args){

//ConnectionFactoryJMS用它创建工厂

ConnectionFactoryfactory;

Connectionconnection=null;//建立JMS到JMS的连接

Sessionsession;//一个接收和发送消息的线程

Destinationdestination;//消息的目的地,消息发送给谁

MessageProducerproducer;//TextMessageproducer

try{

factory=newActiveMQConnectionFactory("tcp:

//0.0.0.0:

61616");

connection=factory.createConnection();

connection.start();

session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

destination=session.createQueue("Fistqueue");

producer=session.createProducer(destination);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

inti=0;

while(true){

TextMessagemessage=session.createTextMessage("测试内容"+i);

producer.send(message);

System.out.println("sendmessage..."+message.getText());

Thread.sleep(10000);//10秒发送一次消息

mit();//没有commit操作,生产者的消息就发送不出去的

i++;

}

}catch(Exceptione){

e.printStackTrace();

}finally{

if(connection!

=null){

try{

connection.close();

}catch(JMSExceptione){

e.printStackTrace();

}

}

}

}

}

定义队列单消费者(Java)

packageorg.hengtian.shch;

importjavax.jms.Connection;

importjavax.jms.ConnectionFactory;

importjavax.jms.Destination;

importjavax.jms.JMSException;

importjavax.jms.MessageConsumer;

importjavax.jms.Session;

importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnectionFactory;

 

publicclassJMSConsumer{

publicstaticvoidmain(String[]args){

//1定义一个连接工厂

ConnectionFactoryfactory;

//2定义连接

Connectionconnection=null;

//3定义session回话

Sessionsession;

//4.定义目的地,这里目的地是一个队列

Destinationdestination;

//5.定义消费者

MessageConsumerconsumer;

try{

factory=newActiveMQConnectionFactory("tcp:

//0.0.0.0:

61616");

connection=factory.createConnection();

connection.start();

session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

destination=session.createQueue("Fistqueue");

consumer=session.createConsumer(destination);

while(true){

TextMessagemessage=(TextMessage)consumer.receive(1000);

if(message!

=null){

System.out.println("message"+message.getText());

mit();

}

}

}catch(Exceptione){

e.printStackTrace();

}finally{

if(connection!

=null){

try{

connection.close();

}catch(JMSExceptione){

e.printStackTrace();

}

}

}

}

}

注意问题(队列)

1.建立的connection必须执行start方法进行启动,不然不会回去消息

2.生产者不提交session会话,ActiveMQ中队列中是没有消息的。

消费者不提交session会话,则该消息在ActiveMQ中处于挂起状态,即每次消费者重新连接的时候都会再次回去所有的消息。

这些消息没有真正的出栈,或者时候没有真正的消费掉。

定义队列多消费者(Java)

packageorg.hengtian.shch;

importjavax.jms.Connection;

importjavax.jms.ConnectionFactory;

importjavax.jms.Destination;

importjavax.jms.JMSException;

importjavax.jms.MessageConsumer;

importjavax.jms.Session;

importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnectionFactory;

 

publicclassJMSConsumerextendsThread{

privateStringthreadName;

publicJMSConsumer(StringthreadName){

this.threadName=threadName;

}

publicvoidrun(){

System.out.println("===============消费者"+threadName+"已经启动=============");

//1定义一个连接工厂

ConnectionFactoryfactory;

//2定义连接

Connectionconnection=null;

//3定义session回话

Sessionsession;

//4.定义目的地,这里目的地是一个队列

Destinationdestination;

//5.定义消费者

MessageConsumerconsumer;

try{

factory=newActiveMQConnectionFactory("tcp:

//0.0.0.0:

61616");

connection=factory.createConnection();

connection.start();

session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

destination=session.createQueue("Fistqueue");

consumer=session.createConsumer(destination);

while(true){

TextMessagemessage=(TextMessage)consumer.receive(1000);

if(message!

=null){

System.out.println(Thread.currentThread().getName()+"getmessage.."+message.getText());

mit();

}

}

}catch(Exceptione){

e.printStackTrace();

}finally{

if(connection!

=null){

try{

connection.close();

}cat

展开阅读全文
相关资源
猜你喜欢
相关搜索
资源标签

当前位置:首页 > 人文社科 > 法律资料

copyright@ 2008-2023 冰点文库 网站版权所有

经营许可证编号:鄂ICP备19020893号-2