博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HDFS源码分析心跳汇报之数据块汇报
阅读量:4983 次
发布时间:2019-06-12

本文共 6991 字,大约阅读时间需要 23 分钟。

在《HDFS源码分析心跳汇报之数据块增量汇报》一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode。那么,时间间隔更长的正常数据块汇报都做了些什么呢?本文,我们将开始研究下时间间隔更长的正常数据块汇报。

        首先,看下正常数据块汇报是如何发起的?我们先看下BPServiceActor工作线程的offerService()方法:

 

[java]   
 
 
  1.   /** 
  2.    * Main loop for each BP thread. Run until shutdown, 
  3.    * forever calling remote NameNode functions. 
  4.    */  
  5.   private void offerService() throws Exception {  
  6.   
  7.     //  
  8.     // Now loop for a long time....  
  9.     //  
  10.     while (shouldRun()) {
    // 又是一个利用shouldRun()判断的while循环  
  11.       try {  
  12.         // 省略部分代码  
  13.         ...  
  14.   
  15.         // 调用blockReport()方法,进行数据块汇报,放返回来自名字节点NameNode的相关命令cmds  
  16.         List<DatanodeCommand> cmds = blockReport();  
  17.           
  18.         // 调用processCommand()方法处理来自名字节点NameNode的相关命令cmds  
  19.         processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));  
  20.        
  21.         // 省略部分代码  
  22.   
  23.         //  
  24.         // There is no work to do;  sleep until hearbeat timer elapses,   
  25.         // or work arrives, and then iterate again.  
  26.         // 计算等待时间waitTime:心跳时间间隔减去上次心跳后截至到现在已过去的时间  
  27.         long waitTime = dnConf.heartBeatInterval -   
  28.         (Time.now() - lastHeartbeat);  
  29.           
  30.         synchronized(pendingIncrementalBRperStorage) {  
  31.           if (waitTime > 0 && !sendImmediateIBR) {
    // 如果等待时间大于0,且不是立即发送数据块增量汇报  
  32.             try {  
  33.                   
  34.               // 利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步  
  35.               pendingIncrementalBRperStorage.wait(waitTime);  
  36.             } catch (InterruptedException ie) {  
  37.               LOG.warn("BPOfferService for " + this + " interrupted");  
  38.             }  
  39.           }  
  40.         } // synchronized  
  41.   
  42.       } catch(RemoteException re) {  
  43. <pre name="code" class="java">       // 省略部分代码  

} catch (IOException e) {

[java]   
 
 
  1. // 省略部分代码  

} } // while (shouldRun())

 

 

         可以看出,在BPServiceActor工作线程offerService()方法的while循环内,数据块汇报blockReport()方法执行时,仅有下面的waitTime的等待时间,其他情况下都是立即执行的。那么等待时间waitTime是如何计算的呢?它就是心跳时间间隔减去上次心跳后截至到现在已过去的时间,并且,如果等待时间waitTime大于0,且不是立即发送数据块增量汇报(标志位sendImmediateIBR为false),那么才会利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步。在这里,我们就可以大胆猜测,数据块汇报的时间间隔应该是大于心跳时间间隔的,并且两者之间的距离肯定不小。

 

        那么,我们开始研究实现正常数据块汇报的blockReport()方法吧,代码如下:

 

[java]   
 
 
  1.  /** 
  2.   * Report the list blocks to the Namenode 
  3.   * @return DatanodeCommands returned by the NN. May be null. 
  4.   * @throws IOException 
  5.   */  
  6.  List<DatanodeCommand> blockReport() throws IOException {  
  7.    // send block report if timer has expired.  
  8. // 到期就发送数据块汇报  
  9.     
  10. // 取当前开始时间startTime  
  11.    final long startTime = now();  
  12.      
  13.    // 如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null,  
  14.    // 数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时  
  15.    if (startTime - lastBlockReport <= dnConf.blockReportInterval) {  
  16.      return null;  
  17.    }  
  18.   
  19.    // 构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommand  
  20.    ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();  
  21.   
  22.    // Flush any block information that precedes the block report. Otherwise  
  23.    // we have a chance that we will miss the delHint information  
  24.    // or we will report an RBW replica after the BlockReport already reports  
  25.    // a FINALIZED one.  
  26.      
  27.    // 调用reportReceivedDeletedBlocks()方法发送数据块增量汇报  
  28.    reportReceivedDeletedBlocks();  
  29.      
  30.    // 记录上次数据块增量汇报时间lastDeletedReport  
  31.    lastDeletedReport = startTime;  
  32.   
  33.    // 设置数据块汇报起始时间brCreateStartTime为当前时间  
  34.    long brCreateStartTime = now();  
  35.      
  36.    // 从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists,  
  37.    // key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongs  
  38.    Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =  
  39.        dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());  
  40.   
  41.    // Convert the reports to the format expected by the NN.  
  42.    int i = 0;  
  43.    int totalBlockCount = 0;  
  44.      
  45.    // 创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小  
  46.    StorageBlockReport reports[] =  
  47.        new StorageBlockReport[perVolumeBlockLists.size()];  
  48.   
  49.    // 遍历perVolumeBlockLists  
  50.    for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {  
  51.        
  52.      // 取出value:BlockListAsLongs  
  53.      BlockListAsLongs blockList = kvPair.getValue();  
  54.        
  55.      // 将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports,  
  56.      // StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组  
  57.      reports[i++] = new StorageBlockReport(  
  58.          kvPair.getKey(), blockList.getBlockListAsLongs());  
  59.        
  60.      // 累加数据块数目totalBlockCount  
  61.      totalBlockCount += blockList.getNumberOfBlocks();  
  62.    }  
  63.   
  64.    // Send the reports to the NN.  
  65.    int numReportsSent;  
  66.    long brSendStartTime = now();  
  67.      
  68.    // 根据数据块总数目判断是否需要多次发送消息  
  69.    if (totalBlockCount < dnConf.blockReportSplitThreshold) {
    // 如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送  
  70.      // split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000  
  71.      // Below split threshold, send all reports in a single message.  
  72.        
  73.      // 发送的数据块汇报消息数numReportsSent设置为1  
  74.      numReportsSent = 1;  
  75.        
  76.      // 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息  
  77.      DatanodeCommand cmd =  
  78.          bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);  
  79.        
  80.      // 将数据块汇报后返回的命令cmd加入到命令列表cmds  
  81.      if (cmd != null) {  
  82.        cmds.add(cmd);  
  83.      }  
  84.    } else {  
  85.      // Send one block report per message.  
  86.       
  87.      // 发送的数据块汇报消息数numReportsSent设置为1  
  88.      numReportsSent = i;  
  89.        
  90.      // 遍历reports,取出每个StorageBlockReport  
  91.      for (StorageBlockReport report : reports) {  
  92.        StorageBlockReport singleReport[] = { report };  
  93.          
  94.        // 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息  
  95.        DatanodeCommand cmd = bpNamenode.blockReport(  
  96.            bpRegistration, bpos.getBlockPoolId(), singleReport);  
  97.          
  98.        // 将数据块汇报后返回的命令cmd加入到命令列表cmds  
  99.        if (cmd != null) {  
  100.          cmds.add(cmd);  
  101.        }  
  102.      }  
  103.    }  
  104.   
  105.    // Log the block report processing stats from Datanode perspective  
  106.      
  107.    // 计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中  
  108.    long brSendCost = now() - brSendStartTime;  
  109.    long brCreateCost = brSendStartTime - brCreateStartTime;  
  110.    dn.getMetrics().addBlockReport(brSendCost);  
  111.    LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +  
  112.        " blocks total. Took " + brCreateCost +  
  113.        " msec to generate and " + brSendCost +  
  114.        " msecs for RPC and NN processing. " +  
  115.        " Got back commands " +  
  116.            (cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds)));  
  117.   
  118.    // 调用scheduleNextBlockReport()方法,调度下一次数据块汇报  
  119.    scheduleNextBlockReport(startTime);  
  120.      
  121.    // 返回命令cmds  
  122.    return cmds.size() == 0 ? null : cmds;  
  123.  }  

        数据块汇报的blockReport()方法处理流程大体如下:

 

        1、取当前开始时间startTime;

        2、如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null:

              数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时;

        3、构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommand;

        4、调用reportReceivedDeletedBlocks()方法发送数据块增量汇报;

        5、记录上次数据块增量汇报时间lastDeletedReport;

        6、设置数据块汇报起始时间brCreateStartTime为当前时间;

        7、从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists:

              key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongs;

        8、创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小;

        9、取出value:BlockListAsLongs:

              9.1、取出value:BlockListAsLongs;

              9.2、将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports,StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组;

              9.3、累加数据块数目totalBlockCount;

        10、根据数据块总数目判断是否需要多次发送消息:

                10.1、如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送(split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000):

                           10.1.1、发送的数据块汇报消息数numReportsSent设置为1;

                           10.1.2、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

                           10.1.3、将数据块汇报后返回的命令cmd加入到命令列表cmds;

                 10.2、如果数据块总数目在split阈值之上,将数据块汇报按照DatanodeStorage分多个消息来发送:

                            10.2.1、发送的数据块汇报消息数numReportsSent设置为i,即DatanodeStorage数目;

                            10.2.2、遍历reports,取出每个StorageBlockReport:

                                           10.2.2.1、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

                                           10.2.2.2、将数据块汇报后返回的命令cmd加入到命令列表cmds;

        11、计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中;

        12、调用scheduleNextBlockReport()方法,调度下一次数据块汇报;

        13、返回命令cmds。

转载于:https://www.cnblogs.com/jirimutu01/p/5556217.html

你可能感兴趣的文章
[Gatsby] Install Gatsby and Scaffold a Blog
查看>>
[Recompose] Add Local State to a Functional Stateless Component using Recompose
查看>>
Spring Boot + Spring Data + Elasticsearch实例
查看>>
我的机器学习之旅(一):认识机器学习
查看>>
util包下Timer类的延迟执行
查看>>
缓冲区溢出漏洞实验
查看>>
失业的程序员(十):分歧的产生
查看>>
[FZU2261]浪里个浪
查看>>
四则运算*2
查看>>
《Linux就该这么学》 - 必读的红帽系统与红帽linux认证自学手册
查看>>
名句名篇
查看>>
图像的基本运算——scale, rotation, translation
查看>>
OpenCV——PS滤镜, 碎片特效
查看>>
python-字典相关函数认识
查看>>
Java之IO流
查看>>
Lua学习笔记-C API
查看>>
浅析:Android 嵌套滑动机制(NestedScrolling)
查看>>
Python+Selenium练习篇之18-获取元素上面的文字
查看>>
php状态模式
查看>>
Asp.net C# 图像处理
查看>>