1、DataBlockScannerDataBlockScanner 由于每一个磁盘或者是网络上的I/O操作可能会对正在读写的数据处理不慎而出现错误,所以HDFS提供了下面两种数据检验方式,以此来保证数据的完整性,而且这两种检验方式在DataNode节点上是同时工作的:一.校验和 检测损坏数据的常用方法是在第一次进行系统时计算数据的校验和,在通道传输过程中,如果新生成的校验和不完全匹配原始的校验和,那么数据就会被认为是被损坏的。二.数据块检测程序(DataBlockScanner) 在DataNode节点上开启一个后台线程,来定期验证存储在它上所有块,这个是防止物理介质出现损减情况而造成的数据损坏
2、。 关于校验和,HDFS以透明的方式检验所有写入它的数据,并在默认设置下,会在读取数据时验证校验和。正对数据的每一个校验块,都会创建一个单独的校验和,默认校 验块大小是512字节,对应的校验和是4字节。DataNode节点负载在存储数据(当然包括数据的校验和)之前验证它们收到的数据,如果此 DataNode节点检测到错误,客户端会收到一个CheckSumException。客户端读取DataNode节点上的数据时,会验证校验和,即将 其与DataNode上存储的校验和进行比较。每一个DataNode节点都会维护着一个连续的校验和和验证日志,里面有着每一个Block的最后验证时 间。客户端成功验
3、证Block之后,便会告诉DataNode节点,Datanode节点随之更新日志。这一点也就涉及到前面说的DataBlockScanner了,所以接下来我将主要讨论DataBlockScanner。 还是先来看看与DataBlockScanner相关联的类吧!dataset:数据块管理器;blockInfoSet:数据块扫描信息集合,按照上一次扫描时间和数据块id升序排序,以便快速获取验证到期的数据块;blockMap:数据块和数据块扫描信息的映射,以便能够根据数据块快速获取对应的扫描信息;totalBytesToScan:一个扫描周期中需要扫描的总数据量;bytesLeft:一个扫描周期中还
4、剩下需要扫描的数据量;throttler:扫描时I/O速度控制器,需要根据totalBytesToScan和bytesLeft信息来衡量;verificationLog:数据块的扫描验证日志记录器;scanPeriod:一个扫描周期,可以由Datanode的配置文件来设置,配置项是:dfs.datanode.scan.period.hours,单位是小时,默认的值是21*24*60*60*1000 ms。 DataBlockScanner是作为DataNode的一个后台线程工作的,跟着DataNode一块启动,它的工作流程如下: DataBlockScanner被DataNode节点用来检测它
5、所管理的所有Block数据块的一致性,因此,对已DataNode节点上的每一个Block,它都会每隔scanPeriod ms利用Block对应的校验和文件来检测该Block一次,看看这个Block的数据是否已经损坏。由于scanPeriod 的值一般比较大,因为对DataNode节点上的每一个Block扫描一遍要消耗不少系统资源,这就可能带来另外一个问题就是在一个扫描周 期内可能会出现DataNode节点重启的情况,所以为了提供系统性能,避免DataNode节点在启动之后对还没有过期的Block又扫描一遍,DataBlockScanner在其内部使用了日志记录器来持久化保存每一个Block上
6、一次扫描的时间,这样的话, DataNode节点在启动之后通过日志文件来恢复之前所有Block的有效时间。另外,DataNode为了节约系统资源,它对Block的验证不仅仅只依赖于DataBlockScanner后台线程(VERIFICATION_SCAN方式),他还会在向某一个客户端传送Block的时候来更行该Block的扫描时间(REMOTE_READ方式),这是因为DataNode向客户端传送一个Block的时候要必须校验该数据块。那么这个时候日志记录器并不会马上把该数据块的扫描信息写到日志,毕竟频繁的磁盘I/O会导致性能下降,至于何时对该Block的最新扫描时间写日志有一个判断条件:
7、1.如果是VERIFICATION_SCAN方式的Block验证,必须记日志; 2.如果是REMOTE_READ方式,那么该Block上一次的记录日志到现在的时间间隔超过24小时或者超过scanPeriod/3 ms 的话,记日志。 下面来结合源码详细讨论这个过程:1.初始化 在整个扫描验证过程中都一个速度控制器, private void init() Block arr = dataset.getBlockReport();/从“磁盘”上获取所有的数据块基本信息 Collections.shuffle(Arrays.asList(arr); blockInfoSet = new TreeS
8、et(); blockMap = new HashMap(); long scanTime = -1; for (Block block : arr) /为每一个Block建立扫描验证信息 BlockScanInfo info = new BlockScanInfo( block ); info.lastScanTime = scanTime-; addBlockInfo(info); /* 寻找一个合适的扫描验证日志文件 */ File dir = null; FSDataset.FSVolume volumes = dataset.volumes.volumes; for(FSDatase
9、t.FSVolume vol : volumes) if (LogFileHandler.isFilePresent(vol.getDir(), verificationLogFile) dir = vol.getDir(); break; if (dir = null) dir = volumes0.getDir(); try / 创建一个日志记录器 verificationLog = new LogFileHandler(dir, verificationLogFile, 100); catch (IOException e) LOG.warn(Could not open verfica
10、tion log. + Verification times are not stored.); synchronized (this) /创建一个扫描速度控制器 throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE); private void updateBytesToScan(long len, long lastScanTime) / len could be negative when a block is deleted. totalBytesToScan += len; /新添加的Block需要在需要在此次中扫描验证
11、if ( lastScanTime 0 & info.lastScanTime 0 ) BlockScanInfo info; while (info = blockInfoSet.first().lastScanTime 0) delBlockInfo(info); info.lastScanTime = lastScanTime; lastScanTime += verifyInterval; addBlockInfo(info); return true; 3.调整扫描速度 在一次Blocks扫描验证周期中,DataBlockScanner需要进行大量的磁盘I/O,为了不影响DataNo
12、de节点上其它线程的工作资源,同时也为了自身工作的有效性,所以DataBlockScanner采用了扫描验证速度控制器,根据当前的工作量来控制当前数据块的验证速度。java view plaincopy1. privatesynchronizedvoidadjustThrottler()2. /本次扫描验证还剩余的时间3. longtimeLeft=currentPeriodStart+scanPeriod-System.currentTimeMillis();4. /根据本次验证扫描剩余的工作量和时间来计算速度5. longbw=Math.max(bytesLeft*1000/timeLef
13、t,MIN_SCAN_RATE);6. throttler.setBandwidth(Math.min(bw,MAX_SCAN_RATE);7. 4.数据块的扫描验证 DataNode节点在向客户端或者其它DataNode节点传输数据时,客户端或者其它DataNode节点会根据接收的数据校验和来验证接收到的数据,当验证出错时,它们会通知传送节点。DataBlockScanner通过自己扮演传输者又扮演接受者来实现数据块的验证的;同时为了防止本地磁盘的I/O的错误,DataBlockScanner采用了两次传输-接收来确保验证的Block的数据是出错了(损坏了)。当发现有出错的Block是,就需
14、要向NameNode节点报告,由NameNode来决定如何处理这个数据块,而不是由DataNode节点擅自作主清除该Block数据信息。 private void verifyBlock(Block block) BlockSender blockSender = null; for (int i=0; i 0); try adjustThrottler(); blockSender = new BlockSender(block, 0, -1, false, false, true, datanode); DataOutputStream out = new DataOutputStream
15、(new IOUtils.NullOutputStream(); blockSender.sendBlock(out, null, throttler); LOG.info(second ? Second : ) + Verification succeeded for + block); if ( second ) totalTransientErrors+; updateScanStatus(block, ScanType.VERIFICATION_SCAN, true); return; catch (IOException e) totalScanErrors+; updateScan
16、Status(block, ScanType.VERIFICATION_SCAN, false); /在“磁盘”上没有该Block对应的文件 if ( dataset.getFile(block) = null ) LOG.info(Verification failed for + block + . Its ok since + it not in datanode dataset anymore.); deleteBlock(block); return; LOG.warn(second ? Second : First ) + Verification failed for + blo
17、ck + . Exception : + StringUtils.stringifyException(e); /两次验证都出错 if (second) datanode.getMetrics().blockVerificationFailures.inc(); handleScanFailure(block); return; finally IOUtils.closeStream(blockSender); datanode.getMetrics().blocksVerified.inc(); totalScans+; totalVerifications+; private synchr
18、onized void updateScanStatus(Block block, ScanType type, boolean scanOk) BlockScanInfo info = blockMap.get(block); if ( info != null ) delBlockInfo(info); else / It might already be removed. Thats ok, it will be caught next time. info = new BlockScanInfo(block); /更新该Block的验证信息 long now = System.curr
19、entTimeMillis(); info.lastScanType = type; info.lastScanTime = now; info.lastScanOk = scanOk; addBlockInfo(info); if (type = ScanType.REMOTE_READ) totalVerifications+; / Dont update meta data too often in case of REMOTE_READ / of if the verification failed. long diff = now - info.lastLogTime; if (!s
20、canOk | (type = ScanType.REMOTE_READ & diff scanPeriod/3 & diff ONE_DAY) return; info.lastLogTime = now; LogFileHandler log = verificationLog; if (log != null) log.appendLine(LogEntry.newEnry(block, now);/记录通过验证的Block验证信息 /处理发生错误的Block private void handleScanFailure(Block block) try DatanodeInfo dnA
21、rr = new DatanodeInfo(datanode.dnRegistration) ; LocatedBlock blocks = new LocatedBlock(block, dnArr) ; /向NameNode节点发送出错的Block datanode.namenode.reportBadBlocks(blocks); catch (IOException e) /* One common reason is that NameNode could be in safe mode. * Should we keep on retrying in that case? */ LOG.warn(Failed to report bad block + block + to namenode : + Exception : + StringUtils.stringifyException(e);
copyright@ 2008-2023 冰点文库 网站版权所有
经营许可证编号:鄂ICP备19020893号-2