tair源码分析.docx

上传人:b****2 文档编号:3185761 上传时间:2023-05-05 格式:DOCX 页数:31 大小:76.36KB
下载 相关 举报
tair源码分析.docx_第1页
第1页 / 共31页
tair源码分析.docx_第2页
第2页 / 共31页
tair源码分析.docx_第3页
第3页 / 共31页
tair源码分析.docx_第4页
第4页 / 共31页
tair源码分析.docx_第5页
第5页 / 共31页
tair源码分析.docx_第6页
第6页 / 共31页
tair源码分析.docx_第7页
第7页 / 共31页
tair源码分析.docx_第8页
第8页 / 共31页
tair源码分析.docx_第9页
第9页 / 共31页
tair源码分析.docx_第10页
第10页 / 共31页
tair源码分析.docx_第11页
第11页 / 共31页
tair源码分析.docx_第12页
第12页 / 共31页
tair源码分析.docx_第13页
第13页 / 共31页
tair源码分析.docx_第14页
第14页 / 共31页
tair源码分析.docx_第15页
第15页 / 共31页
tair源码分析.docx_第16页
第16页 / 共31页
tair源码分析.docx_第17页
第17页 / 共31页
tair源码分析.docx_第18页
第18页 / 共31页
tair源码分析.docx_第19页
第19页 / 共31页
tair源码分析.docx_第20页
第20页 / 共31页
亲,该文档总共31页,到这儿已超出免费预览范围,如果喜欢就下载吧!
下载资源
资源描述

tair源码分析.docx

《tair源码分析.docx》由会员分享,可在线阅读,更多相关《tair源码分析.docx(31页珍藏版)》请在冰点文库上搜索。

tair源码分析.docx

tair源码分析

Tair源码分析

网易-忻丁峰

编译连接运行:

Svncheckouthttp:

//code.taobao.org/svn/tb-common-utils/trunk

Svncheckouthttp:

//code.taobao.org/svn/tair/trunk

编译tbsys和tbnet

ExportTBLIB_ROOT=XXXXX

./build.sh

编译tair

./bootstrap.sh

./configure--prefix=XXXXX--with-release=yes

Make

Makeinstall

运行:

调试:

由于tair_server是fork子进程执行初始化服务的,而通过gdbexecutefile,然后run,只能跟踪父进程代码,那怎么跟踪到子进程代码呢?

可以通过设置setfollow-fork-modechild来调试子进程

系统架构

模块

Tbnet

EPollSocketEvent:

注册/移除网络事件,同时获取网络事件。

通过epoll_event中的data.ptr来指向该事件对应的IOComponent

TcpAcceptor:

处理监听端口的读请求,获取新连接组装成TCPComponent并add给tranport。

TcpAcceptor是继承于IOComponent。

ChannelPool:

管理所有的Channel,包括正在使用的channel和空闲的channel

Channel:

主要用于设置网络包对应的处理handler。

典型用法是连接在发送packet时生成Channel,该channel包含channelID,packetHandler,ExpireTime等,等读取响应包后根据解析包头得到channelID去channelPool查找对应的Channel,然后由channel中的packetHandler处理响应packet,如果未设置packetHandler则采用默认的defaultPacketHandler

TransPort:

对应传输层,EPollSocketEvent用于读写socket网络事件,包含读写处理线程和超时检查线程。

listen接口中传入的IPacketStreamer用于packet的创建,组包和解包,IServerAdapter用于服务端处理客户端请求包。

Connection用于连接serverid,并将新的socket网络事件添加到EPollSocketEvent。

EventLoop用于处理读写,如果发现error的连接或者对方关闭连接,通过removeComponent来注销对应的网络事件,同时将对应的component加入del链表中。

TimeoutLoop用于检测所有连接是否超时,同时将del链表中的component移除。

无论是服务端还是客户端,都采用EventLoop(IO复用模型)和TimeoutLoop。

checkTimeout的具体流程是检测IOComponent是否超时,如果超时,那么将socket进行shutdown(rw),此时发生4次握手协议,对方socket也会发finish包给本端,本端因为已经关闭了读一半,所以会触发read返回值为0,此时会调用removeComponent。

QA:

1.eventLoop只有一个线程去轮询所有的注册事件,获取到读事件,该线程会读取对应socket上的数据,然后处理数据。

那么如果系统请求量比较大的话,eventLoop只有一个线程需要处理所有的读数据和处理数据,这会不会成为系统的瓶颈?

答:

tair系统的用法是在处理包函数中并不包含逻辑处理,而是交给PacketQueueThread进行多线程处理数据包

疑问点:

1.eventLoop和timeoutLoop有可能会同时操作同一个socket,这个怎样来保证socket线程安全(read/write和shutdown可能会同时进行)?

IOComponent/TCPComponent:

包含一个连接的组件,主要用于处理读写事件和设置对应连接的属性。

继承类TCPComponent中含有一个Connection成员,而Connection可以用来处理socket读写

Connection/TCPConnection:

对应一个连接,主要用于执行真正的读写包,同时读包后需要调用处理包逻辑,服务端连接调用IServerAdapter中的handlePacket/handleBatchPacket接口,客户端通过packet中的channelId查找对应的channel中的packHandler来处理网络包。

发送包都是通过调用Connection中postPacket方法来实现的,该方法将要发送的packet放入outputQueue中,然后等Transport处理写事件时将outPutQueue中的数据报写入socket

ConnectionManager:

用于管理所有的客户端连接服务端的connection,用connectionMap维护了一个连接池

Packet:

网络包接口。

包头的结构为

classPacketHeader{

public:

uint32_t_chid;//通道ID

int_pcode;//数据包类型

int_dataLen;//数据包body长度(除头信息外)

};

Tair

dataServer

具体功能

1)存储引擎

2)接受client的put/get/remove。

等操作

3)执行数据迁移,复制等

4)插件:

在接受请求的时候处理一些自定义功能

5)访问统计

代码分析

tair_server:

监听服务,并处理相关请求。

主要成员变量是transport(提供了监听服务,接受请求,IO复用管理),task_queue_thread(多线程处理put/get/remove等请求包),duplicate_task_queue_thread(多线程处理复制请求),request_processor是处理put/get/remove等请求,stat_processor是处理flow相关请求,heartbeat用来实现心跳。

QA:

1.因为Transport中eventLoop是单线程轮询,而且还是单线程读取所有可读socket的数据,还需要写数据,会不会成为性能瓶颈

答:

在tair_server端Transport是单线程读取所有可读socket中的数据,但处理数据是多线程的。

这是因为tair_server将读取到的数据根据请求类型放入task_queue_thread/duplicate_task_queue_thread,而上述2个queue_thread是多线程处理请求队列

2.flow请求主要起什么作用

淘宝答复:

与客户端配合实现流控机制。

request_processor:

处理put/get等请求。

put流程:

1.判断request中key值对应的bucket_number是否处于迁移完成状态。

针对迁移完成的bucket,本节点不能处理读写请求(如果处理了写请求,那么还需要将写同步到迁移目的端),原先的设计是针对迁移已经完成的bucket,本节点做proxy去连接目的dataserver;但现在已经不再这样做了,直接返回TAIR_RETURN_SHOULD_PROXY值。

不过这段时间比较短暂,原因如下:

该bucket迁移完成后,会通知configsever,configserver接收到通知后,会修改对应的数据分布表,client通过version值比较会向configserver获取最新数据分布表,当client获取到最新的数据分布表后就不会再向原先的dataserver发请求了。

2.Plugin调用do_request_plugins处理对应的request请求。

3.Tair_manager处理request请求。

4.Plugin调用do_response_plugins处理步骤3的返回结果。

tair_manager:

主要实现了put/get/remove/duplicate等功能。

具体列举put,remove和get流程

put流程:

1.解析packet,提取area,key,value等信息,同时将areamerge到key的前2个byte组成mkey

2.根据key计算出bucket_number,同时判断该bucket_number是否处于锁定状态。

处于锁定状态的bucket,说明该bucket正在做migrate最后一次读取回放记录。

对于写操作,还需要检查本节点是否属于该bucket的masterdataserver,写操作永远只能在masterbucketdataserver上执行。

3.向存储引擎中put数据

4.对于正常的clientput请求(非dataservermaster发出的duplicateput请求)获取该bucket_number对应的slavedataserver集合,根据dup_sync的设置,同步或者异步发duplicateput请求

5.如果设置slave集群(即双机房主备集群),需要把put请求发送slave集群,tair_manager需要做的是将put请求写入logger,remote_sync_manager后台线程会读取logger并发送给slave集群

6.如果此时dataServer正在对该bucket_number执行迁移操作,则需要写migrateputlog。

目的是为了migrate不遗漏数据

7.增加对应统计信息

Remove流程跟put流程基本一致

get流程:

1.解析packet,提取area,key等信息,同时将areamerge到key的前2个byte组成mkey

2.根据key计算出bucket_number,并从存储引擎中获取数据

3.增加对应统计信息

QA:

1.流程陈述中key和keymergenamespace组成的mkey,它们各自有什么作用?

答:

key是用来计算hash值,从而得到存储该k-v值的bucket_no。

而merge了namespace的mkey是传给引擎层的,用来保证不同namespace相同的key在引擎层是隔离的。

2.怎样来保证不同namespace,相同的key值之间的隔离性

答:

在put/get等操作时需要提供area,key等值,area代表namespace的int值,然后area值merge到key的前2个byte,这样实现了不同namespace相同key相互不影响

duplicate_sender_manager:

实现了异步复制。

主要的数据结构为packets_mgr(一个key为bucket_number,value为bucket_waiting_queue的map),bucket_waiting_queue中最主要的结构是packets_queue(一个key为serverid,value为需要发送packet队列的map,serverid表示要发送的对方的id,value是要发送给对方的packet队列)。

异步复制的实现在duplicate_data函数中,具体流程如下:

1.判断该bucket_no对应的消息队列是否还有空闲空间

2.将area,key,value,expire_time,bucket_number构成packet,然后和dest_dataserverids(即slavedataserverid集合)一起放入对应的packets_mgr->bucket_waiting_queue中

后台线程负责从packets_mgr中获取并发送数据,流程如下:

1.遍历需要发送的packets_mgr,获取需要发送request_duplicate_packet的bucket_number。

2.获取bucket_number对应的发送队列,将遍历到符合发送要求(主要是expiretime)的packet发送给对应的serverid(connectionManager维护了serverid和connection的对应关系)。

duplicate_sender_manager:

:

handlePacket实现处理响应包,将成功发送的request_duplicate包从packets_mgr->bucket_waiting_queue中移除。

dup_sync_sender_manager:

实现了同步复制。

最主要的数据结构是CPacket_wait_manager.同步复制通过duplicate_data接口,主要流程如下:

1.根据要发送的packet_id在CPacket_wait_manager设置等待点。

2.将area,key,value,expire_time,bucket_number构成request_duplicate_packet发送给对应所有的bucketslavedataserver。

3.等待所有slave的response,清除对应等待点,通过tair_packet_factory:

:

set_return_packet向client发succ包

注:

同步的duplicate的返回值是tair_dup_wait_rsp,tair_server在处理该值时不发送response,所以同步duplicate需要自己发送response。

而异步duplicate返回值是tair_return_success,tair_server在处理该值时是发送response的

QA:

1.同步复制失败,是否会造成该bucket的masterdataserver和slavedataserver数据不一致

答:

会的。

同步失败,不会对masterbucketdataserver进行回滚处理,只是给客户端的返回值为fail

migrate_manager:

实现迁移数据。

主要数据结构是migrate_server(一个key是bucket_number,value是serverid集的map),此结构记录了需要迁移的bucket_number和destserverids;volatileintcurrent_migrating_bucket记录了正在迁移的bucket_number,主要的作用是向正在迁移桶写(put等操作)数据时需要记log;volatileintcurrent_locked_bucket用于记录最后同步日志的桶号,在最后同步日志阶段,需要阻塞所有的向该桶写数据的操作,写操作发现需要向current_locked_bucket写数据就抛异常。

迁移数据的驱动是dataserver接收到configserver发送的heartbeatresponse,从中解析出需要将bucket_number的数据迁移到destserverids结构。

迁移具体流程如下:

1.遍历迁移任务,即将bucket_number对应数据遍历发送给其对应destserverids。

迁移数据的源永远是masterbucketdataserver。

2.记录log的当前lsn作为起始lsn(记startLsn)

3.Masterbucketdataserver把存储引擎上所有属于bucket_number的data数据同步到dest_server_ids

4.同步log,先判断需要同步的log是否超过MIGRATE_LOCK_LOG_LEN,如果不超过则通过设置current_locket_bucket来锁对应的bucket_number,如果超过了,先不设置current_locket_bucket来同步日志,但最后一次同步日志必须通过设current_locket_bucket来锁对应bucket_number。

同步日志方式是先遍历log找到属于bucket_number的日志,然后组成packet发送给对应的destserverids

5.同步完日志后,发送flush命令给所有的destserverids。

这条命令主要是针对需要序列化至外存的存储引擎,如果不将内存的数据序列化至外存,那么dataserver宕机后仅靠log是不能恢复全部数据的。

6.如果这个bucket_number在执行复制操作,等待复制完成。

等待复制完成,是为了迁移后dest_serverids上的数据与原有的slavedataserver一致。

7.通知configServer,针对bucket_number的迁移已经完成。

同时设置current_migrating_bucket和current_locked_bucket为-1。

QA:

1.通过设置current_locked_bucket来阻挡put等写请求,那么正在写的请求,该怎么等待它们的结束呢?

答:

通过current_locked_bucket锁bucket来阻挡put等写请求的进入,通过sleepMISECONDS_WAITED_FOR_WRITE来等待正在执行的写操作结束

remote_sync_manager:

用于实现集群之间的同步(典型用例为双机房主备集群)。

主要的数据结构有ClusterHandlerlocal_cluster_handler(表示主集群信息);CLUSTER_HANDLER_MAPremote_cluster_handlers(一个以configserver中master地址+slave地址+groupname为key,ClusterHandler为value的map,用于维护所有slave集群);logger表示本dataServer的binlog;retry_logger表示dataServer需要重做的logger;fail_logger表示dataServer集群间复制失败的logger。

主要流程:

1.dataServer在执行put或者remove操作时,如果配置了do_rsync,那么将在logger上写remoteput或者remoteremove日志

2.后台线程根据序号读取logger或者retryLogger上的日志记录

3.对key进行过滤,如果符合要求的item进行同步操作

4.用tair_client模拟客户端去连接所有的slave集群,根据日志发送对应的put或者remove请求给slave集群。

5.请求失败根据配置写入failLogger或者retryLogger

注1:

remote_sync_manager的后台线程数为logger和retryLogger支持最大读者数量,然后后台线程根据线程的序号决定同步logger还是retryLogger上的数据

注2:

dup_sync用于设置同一集群中bucketdataServer的主从复制方式(复制方式有同步和异步2种模式,即dup_sync_sender_manager和duplicate_sender_manager);do_rsync是用于设置主备集群间是否需要同步。

dup_sync和do_rsync都是在dataserver.conf中设置的

上图所示,local表示主集群中configServer的master和slave地址,remote表示从集群中configServer的master和slave地址

table_manager:

管理本data_server所负责的bucket_numbers,决定需要迁移的bucket。

主要的数据结构为u64*server_table数据分布表,其中包括m_hash_table和d_hash_table;table_version记录版本号;std:

:

vectorholding_buckets表示现在本dataserver持有的bucket_no集合;std:

:

vectorpadding_buckets表示本dataserver最终需要负责的bucket_no;std:

:

vectorrelease_buckets表示需要释放的bucket_no集合,因为这些bucket数据已经被迁走了;bucket_server_mapmigrates(一个bucket_number为key,target_server_ids为value的map)表示需要迁移的任务。

计算需要迁移任务的流程如下:

1.遍历m_hash_table中masterbucketdataserver,如果该masterbucketdataserver属于本机,那么执行一下流程,否则跳过遍历下一个bucket(数据迁移的数据源永远是masterbucketdataserver)。

2.计算该bucket_no对应的数据是否需要迁移,通过计算m_hash_table和d_hash_table中负责该bucket的所有data_server_id是否一致(顺序可以不同),找到d_hash_table存在bucketdataserver而在m_hash_table中不存在的dataserver,这些dataserver构成了迁移的目的dataserver

3.通过现在持有的bucket_no集合和迁移后持有的bucket_no集合,计算需要释放的bucket集合。

然后调用storage_mgr->close_buckets(release_buckets)来关闭对应bucket数据

4.向migrate_manager设置迁移任务。

QA:

1.table_manager:

:

calculate_release_bucket在计算需要释放的bucket时,采用了set_difference方法,那么release_buckets表示现在持有的bucket_no,而在迁移后不再持有的bucket_no。

那么如果该bucket的masterdataserver是本机,而在迁移后本机不负责该bucket数据,然后在步骤三调用storage_mgr->close_buckets(release_buckets)来关闭对应bucket数据,此后才是migrate_manager后台线程做迁移任务,而该bucket对应的数据已经被关闭,此时引擎还能访问到该bucket对应的数据吗?

答:

这里计算holdingbucket与releasebucket的时候,使用的是对照表的当前表。

所以如果此处进入了releasebucket,那么必然这个bucket已经不在这个服务器上了(因为在calculate_release_bucket时,使用的是当前dataserver上m_hash_table和configserver发送给dataserver的最新m_hash_table,所以中间的差值就是数据迁移已经完成的bucket)。

所以不会对迁移有影响。

heartbeat_thread:

监听TAIR_HEARTBEAT_PORT端口,同时还作为客户端定期给configServer发心跳包,configServer响应心跳包,该心跳响应包主要包括更新client_version,server_version,获取有效的serverid集合来过滤无效的slavedataserver,通过plugin_version的比较来启动后台线程加载plugin动态库,启动后台线程更新namespace的配额.

QA:

1.dataserver起心跳监听端口起什么用?

哪种情况需要主动连接dataserver的心跳监听端口?

答:

目前已无作用。

server_table

展开阅读全文
相关资源
猜你喜欢
相关搜索
资源标签

当前位置:首页 > 解决方案 > 学习计划

copyright@ 2008-2023 冰点文库 网站版权所有

经营许可证编号:鄂ICP备19020893号-2