DataBlockScanner.docx

上传人:b****4 文档编号:6797471 上传时间:2023-05-10 格式:DOCX 页数:12 大小:299.75KB
下载 相关 举报
DataBlockScanner.docx_第1页
第1页 / 共12页
DataBlockScanner.docx_第2页
第2页 / 共12页
DataBlockScanner.docx_第3页
第3页 / 共12页
DataBlockScanner.docx_第4页
第4页 / 共12页
DataBlockScanner.docx_第5页
第5页 / 共12页
DataBlockScanner.docx_第6页
第6页 / 共12页
DataBlockScanner.docx_第7页
第7页 / 共12页
DataBlockScanner.docx_第8页
第8页 / 共12页
DataBlockScanner.docx_第9页
第9页 / 共12页
DataBlockScanner.docx_第10页
第10页 / 共12页
DataBlockScanner.docx_第11页
第11页 / 共12页
DataBlockScanner.docx_第12页
第12页 / 共12页
亲,该文档总共12页,全部预览完了,如果喜欢就下载吧!
下载资源
资源描述

DataBlockScanner.docx

《DataBlockScanner.docx》由会员分享,可在线阅读,更多相关《DataBlockScanner.docx(12页珍藏版)》请在冰点文库上搜索。

DataBlockScanner.docx

DataBlockScanner

DataBlockScanner

由于每一个磁盘或者是网络上的I/O操作可能会对正在读写的数据处理不慎而出现错误,所以HDFS提供了下面两种数据检验方式,以此来保证数据的完整性,而且这两种检验方式在DataNode节点上是同时工作的:

一.校验和

      检测损坏数据的常用方法是在第一次进行系统时计算数据的校验和,在通道传输过程中,如果新生成的校验和不完全匹配原始的校验和,那么数据就会被认为是被损坏的。

二.数据块检测程序(DataBlockScanner)

    在DataNode节点上开启一个后台线程,来定期验证存储在它上所有块,这个是防止物理介质出现损减情况而造成的数据损坏。

       关于校验和,HDFS以透明的方式检验所有写入它的数据,并在默认设置下,会在读取数据时验证校验和。

正对数据的每一个校验块,都会创建一个单独的校验和,默认校验块大小是512字节,对应的校验和是4字节。

DataNode节点负载在存储数据(当然包括数据的校验和)之前验证它们收到的数据,如果此DataNode节点检测到错误,客户端会收到一个CheckSumException。

客户端读取DataNode节点上的数据时,会验证校验和,即将其与DataNode上存储的校验和进行比较。

每一个DataNode节点都会维护着一个连续的校验和和验证日志,里面有着每一个Block的最后验证时间。

客户端成功验证Block之后,便会告诉DataNode节点,Datanode节点随之更新日志。

这一点也就涉及到前面说的DataBlockScanner了,所以接下来我将主要讨论DataBlockScanner。

    还是先来看看与DataBlockScanner相关联的类吧!

dataset:

数据块管理器;

blockInfoSet:

数据块扫描信息集合,按照上一次扫描时间和数据块id升序排序,以便快速获取验证到期的数据块;

blockMap:

数据块和数据块扫描信息的映射,以便能够根据数据块快速获取对应的扫描信息;

totalBytesToScan:

一个扫描周期中需要扫描的总数据量;

bytesLeft:

一个扫描周期中还剩下需要扫描的数据量;

throttler:

扫描时I/O速度控制器,需要根据totalBytesToScan和bytesLeft信息来衡量;

verificationLog:

数据块的扫描验证日志记录器;

scanPeriod:

一个扫描周期,可以由Datanode的配置文件来设置,配置项是:

dfs.datanode.scan.period.hours,单位是小时,默认的值是21*24*60*60*1000ms。

   DataBlockScanner是作为DataNode的一个后台线程工作的,跟着DataNode一块启动,它的工作流程如下:

  DataBlockScanner被DataNode节点用来检测它所管理的所有Block数据块的一致性,因此,对已DataNode节点上的每一个Block,它都会每隔scanPeriodms利用Block对应的校验和文件来检测该Block一次,看看这个Block的数据是否已经损坏。

由于scanPeriod的值一般比较大,因为对DataNode节点上的每一个Block扫描一遍要消耗不少系统资源,这就可能带来另外一个问题就是在一个扫描周期内可能会出现DataNode节点重启的情况,所以为了提供系统性能,避免DataNode节点在启动之后对还没有过期的Block又扫描一遍,DataBlockScanner在其内部使用了日志记录器来持久化保存每一个Block上一次扫描的时间,这样的话,DataNode节点在启动之后通过日志文件来恢复之前所有Block的有效时间。

另外,DataNode为了节约系统资源,它对Block的验证不仅仅只依赖于DataBlockScanner后台线程(VERIFICATION_SCAN方式),他还会在向某一个客户端传送Block的时候来更行该Block的扫描时间(REMOTE_READ方式),这是因为DataNode向客户端传送一个Block的时候要必须校验该数据块。

那么这个时候日志记录器并不会马上把该数据块的扫描信息写到日志,毕竟频繁的磁盘I/O会导致性能下降,至于何时对该Block的最新扫描时间写日志有一个判断条件:

   1.如果是VERIFICATION_SCAN方式的Block验证,必须记日志;

   2.如果是REMOTE_READ方式,那么该Block上一次的记录日志到现在的时间间隔超过24小时或者超过scanPeriod/3ms的话,记日志。

  下面来结合源码详细讨论这个过程:

1.初始化

  在整个扫描验证过程中都一个速度控制器,

privatevoidinit(){

Blockarr[]=dataset.getBlockReport();//从“磁盘”上获取所有的数据块基本信息

Collections.shuffle(Arrays.asList(arr));

blockInfoSet=newTreeSet();

blockMap=newHashMap();

longscanTime=-1;

for(Blockblock:

arr){

//为每一个Block建立扫描验证信息

BlockScanInfoinfo=newBlockScanInfo(block);

info.lastScanTime=scanTime--;

addBlockInfo(info);

}

/*寻找一个合适的扫描验证日志文件

*/

Filedir=null;

FSDataset.FSVolume[]volumes=dataset.volumes.volumes;

for(FSDataset.FSVolumevol:

volumes){

if(LogFileHandler.isFilePresent(vol.getDir(),verificationLogFile)){

dir=vol.getDir();

break;

}

}

if(dir==null){

dir=volumes[0].getDir();

}

try{

//创建一个日志记录器

verificationLog=newLogFileHandler(dir,verificationLogFile,100);

}catch(IOExceptione){

LOG.warn("Couldnotopenverficationlog."+"Verificationtimesarenotstored.");

}

synchronized(this){

//创建一个扫描速度控制器

throttler=newBlockTransferThrottler(200,MAX_SCAN_RATE);

}

}

privatevoidupdateBytesToScan(longlen,longlastScanTime){

//lencouldbenegativewhenablockisdeleted.

totalBytesToScan+=len;

//新添加的Block需要在需要在此次中扫描验证

if(lastScanTime

bytesLeft+=len;

}

}

privatesynchronizedvoidaddBlockInfo(BlockScanInfoinfo){

booleanadded=blockInfoSet.add(info);

blockMap.put(info.block,info);

if(added){

LogFileHandlerlog=verificationLog;

if(log!

=null){

log.setMaxNumLines(blockMap.size()*verficationLogLimit);

}

//用新添加的Block扫描信息来更新此次扫描的任务量

updateBytesToScan(info.block.getNumBytes(),info.lastScanTime);

}

}

  这个日志文件dncp_block_verification.log.curr保存在DataNode节点的一个存储目录中,并放在current/目录下,如:

2.初始化上一次验证时间

privatesynchronizedvoiddelBlockInfo(BlockScanInfoinfo){

booleanexists=blockInfoSet.remove(info);

blockMap.remove(info.block);

if(exists){

LogFileHandlerlog=verificationLog;

if(log!

=null){

log.setMaxNumLines(blockMap.size()*verficationLogLimit);

}

//更新此次扫描验证的工作量

updateBytesToScan(-info.block.getNumBytes(),info.lastScanTime);

}

}

privatesynchronizedvoidupdateBlockInfo(LogEntrye){

BlockScanInfoinfo=blockMap.get(newBlock(e.blockId,0,e.genStamp));

if(info!

=null&&e.verificationTime>0&&info.lastScanTime

delBlockInfo(info);

info.lastScanTime=e.verificationTime;

info.lastScanType=ScanType.VERIFICATION_SCAN;

addBlockInfo(info);

}

}

//为每一个Block分配上一次验证的时间

privatebooleanassignInitialVerificationTimes(){

intnumBlocks=1;

synchronized(this){

numBlocks=Math.max(blockMap.size(),1);

}

//读取数据块的验证日志文件

LogFileHandler.ReaderlogReader=null;

try{

if(verificationLog!

=null){

logReader=verificationLog.newReader(false);

}

}catch(IOExceptione){

LOG.warn("Couldnotreadpreviousverificationtimes:

"+StringUtils.stringifyException(e));

}

if(verificationLog!

=null){

verificationLog.updateCurNumLines();

}

try{

//用日志信息来更新记录的Block上一次验证时间

while(logReader!

=null&&logReader.hasNext()){

if(!

datanode.shouldRun||Thread.interrupted()){

returnfalse;

}

LogEntryentry=LogEntry.parseEntry(logReader.next());

if(entry!

=null){

updateBlockInfo(entry);

}

}

}finally{

IOUtils.closeStream(logReader);

}

/*计算Blocks之间验证的间隔时间

*/

longverifyInterval=(long)(Math.min(scanPeriod/2.0/numBlocks,10*60*1000));

longlastScanTime=System.currentTimeMillis()-scanPeriod;

/*初始化剩余Blocks的上一次验证时间

*/

synchronized(this){

if(blockInfoSet.size()>0){

BlockScanInfoinfo;

while((info=blockInfoSet.first()).lastScanTime<0){

delBlockInfo(info);

info.lastScanTime=lastScanTime;

lastScanTime+=verifyInterval;

addBlockInfo(info);

}

}

}

returntrue;

}

3.调整扫描速度

  在一次Blocks扫描验证周期中,DataBlockScanner需要进行大量的磁盘I/O,为了不影响DataNode节点上其它线程的工作资源,同时也为了自身工作的有效性,所以DataBlockScanner采用了扫描验证速度控制器,根据当前的工作量来控制当前数据块的验证速度。

[java]viewplaincopy

1.private synchronized void adjustThrottler() {  

2.   //本次扫描验证还剩余的时间  

3.   long timeLeft = currentPeriodStart+scanPeriod - System.currentTimeMillis();  

4.   //根据本次验证扫描剩余的工作量和时间来计算速度  

5.   long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);  

6.   throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));  

7. }  

4.数据块的扫描验证

   DataNode节点在向客户端或者其它DataNode节点传输数据时,客户端或者其它DataNode节点会根据接收的数据校验和来验证接收到的数据,当验证出错时,它们会通知传送节点。

DataBlockScanner通过自己扮演传输者又扮演接受者来实现数据块的验证的;同时为了防止本地磁盘的I/O的错误,DataBlockScanner采用了两次传输-接收来确保验证的Block的数据是出错了(损坏了)。

当发现有出错的Block是,就需要向NameNode节点报告,由NameNode来决定如何处理这个数据块,而不是由DataNode节点擅自作主清除该Block数据信息。

privatevoidverifyBlock(Blockblock){

BlockSenderblockSender=null;

for(inti=0;i<2;i++){

booleansecond=(i>0);

try{

adjustThrottler();

blockSender=newBlockSender(block,0,-1,false,false,true,datanode);

DataOutputStreamout=newDataOutputStream(newIOUtils.NullOutputStream());

blockSender.sendBlock(out,null,throttler);

LOG.info((second?

"Second":

"")+"Verificationsucceededfor"+block);

if(second){

totalTransientErrors++;

}

updateScanStatus(block,ScanType.VERIFICATION_SCAN,true);

return;

}catch(IOExceptione){

totalScanErrors++;

updateScanStatus(block,ScanType.VERIFICATION_SCAN,false);

//在“磁盘”上没有该Block对应的文件

if(dataset.getFile(block)==null){

LOG.info("Verificationfailedfor"+block+".Itsoksince"+"itnotindatanodedatasetanymore.");

deleteBlock(block);

return;

}

LOG.warn((second?

"Second":

"First")+"Verificationfailedfor"+block+".Exception:

"+StringUtils.stringifyException(e));

//两次验证都出错

if(second){

datanode.getMetrics().blockVerificationFailures.inc();

handleScanFailure(block);

return;

}

}finally{

IOUtils.closeStream(blockSender);

datanode.getMetrics().blocksVerified.inc();

totalScans++;

totalVerifications++;

}

}

privatesynchronizedvoidupdateScanStatus(Blockblock,ScanTypetype,booleanscanOk){

BlockScanInfoinfo=blockMap.get(block);

if(info!

=null){

delBlockInfo(info);

}else{

//Itmightalreadyberemoved.Thatsok,itwillbecaughtnexttime.

info=newBlockScanInfo(block);

}

//更新该Block的验证信息

longnow=System.currentTimeMillis();

info.lastScanType=type;

info.lastScanTime=now;

info.lastScanOk=scanOk;

addBlockInfo(info);

if(type==ScanType.REMOTE_READ){

totalVerifications++;

}

//Don'tupdatemetadatatoooftenincaseofREMOTE_READ

//ofiftheverificationfailed.

longdiff=now-info.lastLogTime;

if(!

scanOk||(type==ScanType.REMOTE_READ&&diff

return;

}

info.lastLogTime=now;

LogFileHandlerlog=verificationLog;

if(log!

=null){

log.appendLine(LogEntry.newEnry(block,now));//记录通过验证的Block验证信息

}

}

//处理发生错误的Block

privatevoidhandleScanFailure(Blockblock){

try{

DatanodeInfo[]dnArr={newDatanodeInfo(datanode.dnRegistration)};

LocatedBlock[]blocks={newLocatedBlock(block,dnArr)};

//向NameNode节点发送出错的Block

datanode.namenode.reportBadBlocks(blocks);

}catch(IOExceptione){

/*OnecommonreasonisthatNameNodecouldbeinsafemode.

*Shouldwekeeponretryinginthatcase?

*/

LOG.warn("Failedtoreportbadblock"+block+"tonamenode:

"+"Exception:

"+StringUtils.stringifyException(e));

}

 

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 解决方案 > 学习计划

copyright@ 2008-2023 冰点文库 网站版权所有

经营许可证编号:鄂ICP备19020893号-2