解道Vertx线程模型.docx

上传人:b****6 文档编号:11918620 上传时间:2023-06-03 格式:DOCX 页数:13 大小:42.51KB
下载 相关 举报
解道Vertx线程模型.docx_第1页
第1页 / 共13页
解道Vertx线程模型.docx_第2页
第2页 / 共13页
解道Vertx线程模型.docx_第3页
第3页 / 共13页
解道Vertx线程模型.docx_第4页
第4页 / 共13页
解道Vertx线程模型.docx_第5页
第5页 / 共13页
解道Vertx线程模型.docx_第6页
第6页 / 共13页
解道Vertx线程模型.docx_第7页
第7页 / 共13页
解道Vertx线程模型.docx_第8页
第8页 / 共13页
解道Vertx线程模型.docx_第9页
第9页 / 共13页
解道Vertx线程模型.docx_第10页
第10页 / 共13页
解道Vertx线程模型.docx_第11页
第11页 / 共13页
解道Vertx线程模型.docx_第12页
第12页 / 共13页
解道Vertx线程模型.docx_第13页
第13页 / 共13页
亲,该文档总共13页,全部预览完了,如果喜欢就下载吧!
下载资源
资源描述

解道Vertx线程模型.docx

《解道Vertx线程模型.docx》由会员分享,可在线阅读,更多相关《解道Vertx线程模型.docx(13页珍藏版)》请在冰点文库上搜索。

解道Vertx线程模型.docx

解道Vertx线程模型

解道Vert.x线程模型

1.线程模型概述

Vert.x的线程模型设计的非常巧妙。

总的来说,Vert.x中主要有两种线程:

EventLoop线程和Worker线程。

其中,EventLoop线程结合了Netty的EventLoop,用于处理事件。

每一个EventLoop都与唯一的线程相绑定,这个线程就叫EventLoop线程。

EventLoop线程不能被阻塞,否则事件将无法被处理。

Worker线程用于执行阻塞任务,这样既可以执行阻塞任务而又不阻塞EventLoop线程。

如果像Node.js一样只有单个EventLoop的话就不能充分利用多核CPU的性能了。

为了充分利用多核CPU的性能,Vert.x中提供了一组EventLoop线程。

每个EventLoop线程都可以处理事件。

为了保证线程安全,防止资源争用,Vert.x保证了某一个Handler总是被同一个EventLoop线程执行,这样不仅可以保证线程安全,而且还可以在底层对锁进行优化提升性能。

所以,只要开发者遵循Vert.x的线程模型,开发者就不需要再担心线程安全的问题,这是非常方便的。

本篇文章将底层的角度来解析Vert.x的线程模型。

对应的Vert.x版本为3.3.3。

2.EventLoop线程

首先回顾一下EventLoop线程,它会不断地轮询获取事件,并将获取到的事件分发到对应的事件处理器中进行处理:

Vert.x线程模型中最重要的一点就是:

永远不要阻塞EventLoop线程。

因为一旦处理事件的线程被阻塞了,事件就会一直积压着不能被处理,整个应用也就不能正常工作了。

Vert.x中内置一种用于检测EventLoop是否阻塞的线程:

vertx-blocked-thread-checker。

一旦EventLoop处理某个事件的时间超过一定阈值(默认为2000ms)就会警告,如果阻塞的时间过长就会抛出异常。

BlockChecker的实现原理比较简单,底层借助了JUC的TimerTask,定时计算每个EventLoop线程的处理事件消耗的时间,如果超时就进行相应的警告。

3.Vert.xThread

Vert.x中的EventLoop线程及Worker线程都用VertxThread类表示,并通过VertxThreadFactory线程工厂来创建。

VertxThreadFactory创建Vert.x线程的过程非常简单:

@Override

publicThreadnewThread(Runnablerunnable){

VertxThreadt=newVertxThread(runnable,prefix+threadCount.getAndIncrement(),worker,maxExecTime);

if(checker!

=null){

checker.registerThread(t);

}

addToMap(t);

t.setDaemon(false);

returnt;

}

除了创建VertxThread线程之外,VertxThreadFactory还会将此线程注册至BlockChecker线程中以监视线程的阻塞情况,并且将此线程添加至内部的weakMap中。

这个weakMap作用只有一个,就是在注销对应的Verticle的时候可以将每个VertxThread中的Context实例清除(unset)。

为了保证资源不被一直占用,这里使用了WeakHashMap来存储每一个VertxThread。

当里面的VertxThread的引用不被其他实例持有的时候,它就会被标记为可清除的对象,等待GC。

至于VertxThread,它其实就是在普通线程的基础上存储了额外的数据(如对应的Vert.xContext,最大执行时长,当前执行时间,是否为Worker线程等),这里就不多讲了。

4.Vert.xContext

Vert.x底层中一个重要的概念就是Context,每个Context都会绑定着一个EventLoop线程(而一个EventLoop线程可以对应多个Context)。

我们可以把Context看作是控制一系列的Handler的执行作用域及顺序的上下文对象。

每当Vert.x底层将事件分发至Handler的时候,Vert.x都会给此Handler钦点一个Context用于处理任务:

∙如果当前线程是Vert.x线程(VertxThread),那么Vert.x就会复用此线程上绑定的Context;如果没有对应的Context就创建新的

∙如果当前线程是普通线程,就创建新的Context

Vert.x中存在三种Context,与之前的线程种类相对应:

∙EventLoopContext

∙WorkerContext

∙MultiThreadedWorkerContext

4.1Eventloopcontext

每个EventLoopContext都会对应着唯一的一个EventLoop,即一个EventLoopContext只会在同一个EventLoop线程上执行任务。

在创建Context的时候,Vert.x会自动根据轮询策略选择对应的EventLoop:

protectedContextImpl(VertxInternalvertx,WorkerPoolinternalBlockingPool,WorkerPoolworkerPool,StringdeploymentID,JsonObjectconfig,

ClassLoadertccl){

//...

EventLoopGroupgroup=vertx.getEventLoopGroup();

if(group!

=null){

this.eventLoop=group.next();

}else{

this.eventLoop=null;

}

//...

}

在Netty中,EventLoopGroup代表一组EventLoop,而从中获取EventLoop的方法则是next方法。

EventLoopGroup中EventLoop的数量由CPU内核数目所确定。

Vert.x这里使用了NettyNIO对应的NioEventLoop:

eventLoopGroup=newNioEventLoopGroup(options.getEventLoopPoolSize(),eventLoopThreadFactory);

eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

对应的轮询算法:

@Override

publicEventExecutorChoosernewChooser(EventExecutor[]executors){

if(isPowerOfTwo(executors.length)){

returnnewPowerOfTowEventExecutorChooser(executors);

}else{

returnnewGenericEventExecutorChooser(executors);

}

}

可以看到,正常情况下Netty会用轮询策略选择EventLoop。

特别地,如果EventLoop的个数是2的倍数的话,选择的会快一些:

privatestaticfinalclassGenericEventExecutorChooserimplementsEventExecutorChooser{

//...

@Override

publicEventExecutornext(){

returnexecutors[Math.abs(idx.getAndIncrement()%executors.length)];

}

}

privatestaticfinalclassPowerOfTowEventExecutorChooserimplementsEventExecutorChooser{

//...

@Override

publicEventExecutornext(){

returnexecutors[idx.getAndIncrement()&executors.length-1];

}

}

我们可以在Embedded模式下测试一下EventLoop线程的分配:

System.out.println(Thread.currentThread());

Vertxvertx=Vertx.vertx();

for(inti=0;i<20;i++){

intindex=i;

vertx.setTimer(1,t->{

System.out.println(index+":

"+Thread.currentThread());

});

运行结果(不同机器运行顺序、EventLoop线程数可能不同):

Thread[main,5,main]

0:

Thread[vert.x-eventloop-thread-0,5,main]

1:

Thread[vert.x-eventloop-thread-1,5,main]

2:

Thread[vert.x-eventloop-thread-2,5,main]

3:

Thread[vert.x-eventloop-thread-3,5,main]

5:

Thread[vert.x-eventloop-thread-5,5,main]

6:

Thread[vert.x-eventloop-thread-6,5,main]

8:

Thread[vert.x-eventloop-thread-8,5,main]

7:

Thread[vert.x-eventloop-thread-7,5,main]

10:

Thread[vert.x-eventloop-thread-10,5,main]

9:

Thread[vert.x-eventloop-thread-9,5,main]

4:

Thread[vert.x-eventloop-thread-4,5,main]

11:

Thread[vert.x-eventloop-thread-11,5,main]

12:

Thread[vert.x-eventloop-thread-12,5,main]

13:

Thread[vert.x-eventloop-thread-13,5,main]

14:

Thread[vert.x-eventloop-thread-14,5,main]

16:

Thread[vert.x-eventloop-thread-0,5,main]

17:

Thread[vert.x-eventloop-thread-1,5,main]

15:

Thread[vert.x-eventloop-thread-15,5,main]

18:

Thread[vert.x-eventloop-thread-2,5,main]

19:

Thread[vert.x-eventloop-thread-3,5,main]

可以看到尽管每个Context对应唯一的EventLoop线程,而每个EventLoop线程却可能对应多个Context。

EventLoopContext会在对应的EventLoop中执行Handler进行事件的处理(IO事件,非阻塞)。

Vert.x会保证同一个Handler会一直在同一个EventLoop线程中执行,这样可以简化线程模型,让开发者在写Handler的时候不需要考虑并发的问题,非常方便。

我们来粗略地看一下Handler是如何在EventLoop上执行的。

EventLoopContext中实现了executeAsync方法用于包装Handler中事件处理的逻辑并将其提交至对应的EventLoop中进行执行:

publicvoidexecuteAsync(Handlertask){

//Nometrics,weareontheeventloop.

nettyEventLoop().execute(wrapTask(null,task,true,null));

}

这里Vert.x使用了wrapTask方法将Handler封装成了一个Runnable用于向EventLoop中提交。

代码比较直观,大致就是检查当前线程是否为Vert.x线程,然后记录事件处理开始的时间,给当前的Vert.x线程设置Context,并且调用Handler里面的事件处理方法。

具体请参考源码,这里就不贴出来了。

那么把封装好的task提交到EventLoop以后,EventLoop是怎么处理的呢?

这就需要更多的Netty相关的知识了。

根据Netty的模型,EventLoop线程需要处理IO事件,普通事件(即我们的Handler)以及定时事件(比如Vert.x的setTimer)。

Vert.x会提供一个NETTY_IO_RATIO给Netty代表EventLoop处理IO事件时间占用的百分比(默认为50,即IO事件时间占用:

非IO事件时间占用=1:

1)。

当EventLoop启动的时候,它会不断轮询IO时间及其它事件并进行处理:

@Override

protectedvoidrun(){

for(;;){

try{

switch(selectStrategy.calculateStrategy(selectNowSupplier,hasTasks())){

caseSelectStrategy.CONTINUE:

continue;

caseSelectStrategy.SELECT:

select(wakenUp.getAndSet(false));

if(wakenUp.get()){

selector.wakeup();

}

default:

//fallthrough

}

cancelledKeys=0;

needsToSelectAgain=false;

finalintioRatio=this.ioRatio;

if(ioRatio==100){

processSelectedKeys();

runAllTasks();

}else{

finallongioStartTime=System.nanoTime();

processSelectedKeys();

finallongioTime=System.nanoTime()-ioStartTime;

runAllTasks(ioTime*(100-ioRatio)/ioRatio);

}

if(isShuttingDown()){

closeAll();

if(confirmShutdown()){

break;

}

}

}catch(Throwablet){

//processtheerror

//...

}

}

}

这里面Netty会调用processSelectedKeys方法进行IO事件的处理,并且会计算出处理IO时间所用的事件然后计算出给非IO事件处理分配的时间,然后调用runAllTasks方法执行所有的非IO任务(这里面就有我们的各个Handler)。

runAllTasks会按顺序从内部的任务队列中取出任务(Runnable)然后进行安全执行。

而我们刚才调用的NioEventLoop的execute方法其实就是将包装好的Handler置入NioEventLoop内部的任务队列中等待执行。

4.2Workercontext

顾名思义,WorkerContext用于跑阻塞任务。

与EventLoopContext相似,每一个Handler都只会跑在固定的Worker线程下。

Vert.x还提供一种Multi-threadedworkercontext可以在多个Worker线程下并发执行任务,这样就会出现并发问题,需要开发者自行解决并发问题。

因此一般情况下我们用不到Multi-threadedworkercontext。

4.3Verticle

我们再来讨论一下Verticle中的Context。

在部署Verticle的时候,Vert.x会根据配置来创建Context并绑定到Verticle上,此后此Verticle上所有绑定的Handler都会在此Context上执行。

相关实现位于doDeploy方法,这里摘取核心部分:

for(Verticleverticle:

verticles){

WorkerExecutorImplworkerExec=poolName!

=null?

vertx.createSharedWorkerExecutor(poolName,options.getWorkerPoolSize()):

null;

WorkerPoolpool=workerExec!

=null?

workerExec.getPool():

null;

//根据配置创建Context

ContextImplcontext=options.isWorker()?

vertx.createWorkerContext(options.isMultiThreaded(),deploymentID,pool,conf,tccl):

vertx.createEventLoopContext(deploymentID,pool,conf,tccl);

if(workerExec!

=null){

context.addCloseHook(workerExec);

}

context.setDeployment(deployment);

deployment.addVerticle(newVerticleHolder(verticle,context));

//此Verticle上的Handler都会在创建的context作用域内执行

context.runOnContext(v->{

try{

verticle.init(vertx,context);

FuturestartFuture=Future.future();

//大家熟悉的start方法的执行点

verticle.start(startFuture);

startFuture.setHandler(ar->{

if(ar.succeeded()){

if(parent!

=null){

parent.addChild(deployment);

deployment.child=true;

}

vertx.metricsSPI().verticleDeployed(verticle);

deployments.put(deploymentID,deployment);

if(deployCount.incrementAndGet()==verticles.length){

reportSuccess(deploymentID,callingContext,completionHandler);

}

}elseif(!

failureReported.get()){

reportFailure(ar.cause(),callingContext,completionHandler);

}

});

}catch(Throwablet){

reportFailure(t,callingContext,completionHandler);

}

});

}

通过这样一种方式,Vert.x保证了Verticle的线程安全——即某个Verticle上的所有Handler都会在同一个Vert.x线程上执行,这样也保证了Verticle内部成员的安全(没有racecondition问题)。

比如下面Verticle中处理IO及事件的处理都一直是在同一个Vert.x线程下执行的,每次打印出的线程名称应该是一样的:

publicclassTcpClientVerticleextendsAbstractVerticle{

inti=0;

@Override

publicvoidstart()throwsException{

vertx.createNetClient().connect(6666,"localhost",ar->{

if(ar.succeeded()){

NetSocketsocket=ar.result();

System.out.println(Thread.currentThread().getName());

socket.handler(buffer->{

i++;

System.out.println(Thread.currentThread().getName());

System.out.println("Netclientreceiving:

"+buffer.toString("UTF-8"));

});

socket.write("+1s\n");

}else{

ar.cause().printStackTrace();

}

});

}

}

5.线程池

5.1EventLoop线程池

之前我们已经提到过,EventLoop线程池的类型为Netty中的NioEventLoopGroup,里面的线程通过Vert.x自己的线程工厂VertxThreadFactory进行创建:

eventLoopThreadFactory=newVertxThreadFactory("vert.x-eventloop-thread-",checker,false,options.getMaxEventLoopExecuteTime());

eventLoopGroup=newNioEventLoopGroup(options.getEventLoopPoolSize(),eventLoopThreadFactory);

eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

其中EventLoop线程的数目可以在配置中指定。

5.2Worker线程池

在之前讲executeBlocking底层实现的文章中我们已经提到过Worker线程池,它其实就是一种FixedThreadPool:

ExecutorServiceworkerExec=Executors.newFixedThreadPool(options.getWorkerPoolSize(),

newVertxThreadFactory("vert.x-worker-thread-",checker,true,options.getMaxWorkerExecuteTime()));

PoolMetricsworkerPoolMetrics=isMetricsEnabled()?

metrics.createMetrics(worke

展开阅读全文
相关资源
猜你喜欢
相关搜索
资源标签

当前位置:首页 > 工程科技 > 交通运输

copyright@ 2008-2023 冰点文库 网站版权所有

经营许可证编号:鄂ICP备19020893号-2