Spark存储体系底层架构剖析-Spark商业环境实战

Spark存储体系底层架构剖析-Spark商业环境实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

Spark商业环境实战及调优进阶系列

1. Spark存储体系组件关系解释

BlockInfoManger 主要提供读写锁控制,层级仅仅位于BlockManger之下,通常Spark读写操作都先调用BlockManger,然后咨询BlockInfoManger是否存在锁竞争,然后才会调用DiskStore和MemStore,进而调用DiskBlockManger来确定数据与位置映射,或者调用 MemoryManger来确定内存池的软边界和内存使用申请。

1.1 Driver 与 Executor 与 SparkEnv 与 BlockManger 组件关系:

Driver与 Executor 组件各自拥有任务执行的SparkEnv环境,而每一个SparkEnv 中都有一个BlockManger负责存储服务,作为高层抽象,BlockManger 之间需要通过 RPCEnv,ShuffleClient,及BlocakTransferService相互通讯。

1.1 BlockInfoManger 与 BlockInfo 共享锁和排它锁读写控制关系:

BlockInfo中具有读写锁的标志,通过标志可以判断是否进行写控制

  val NO_WRITER: Long = -1
  val NON_TASK_WRITER: Long = -1024
  
 * The task attempt id of the task which currently holds the write lock for this block, or
 * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
 * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
 
 def writerTask: Long = _writerTask
 def writerTask_=(t: Long): Unit = {
 _writerTask = t
    checkInvariants()
复制代码

BlockInfoManager具有BlockId与BlockInfo的映射关系以及任务id与BlockId的锁映射:

 private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]  
 
 *Tracks the set of blocks that each task has locked for writing.
 private[this] val writeLocksByTask = new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
                                       with mutable.MultiMap[TaskAttemptId, BlockId]
 
 *Tracks the set of blocks that each task has locked for reading, along with the number of times
 *that a block has been locked (since our read locks are re-entrant).
 private[this] val readLocksByTask =
 new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
复制代码

1.3 DiskBlockManager 与 DiskStore 组件关系:

可以看到DiskStore内部会调用DiskBlockManager来确定Block的读写位置:

  • 以下是DiskStore的抽象写操作,需要传入FileOutputStream => Unit高阶函数:

      def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
      if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
      }
      logDebug(s"Attempting to put block $blockId")
      val startTime = System.currentTimeMillis
      
      val file = diskManager.getFile(blockId)
      
      val fileOutputStream = new FileOutputStream(file)
      var threwException: Boolean = true
      try {
          writeFunc(fileOutputStream)
          threwException = false
      } finally {
       try {
          Closeables.close(fileOutputStream, threwException)
       } finally {
       if (threwException) {
        remove(blockId)
              }
          }
      }
      val finishTime = System.currentTimeMillis
      logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
      }
    复制代码
  • 以下是DiskStore的读操作,调用DiskBlockManager来获取数据位置:

      def getBytes(blockId: BlockId): ChunkedByteBuffer = {
      
      val file = diskManager.getFile(blockId.name)
     
      val channel = new RandomAccessFile(file, "r").getChannel
      Utils.tryWithSafeFinally {
    * For small files, directly read rather than memory map
      if (file.length < minMemoryMapBytes) {
      val buf = ByteBuffer.allocate(file.length.toInt)
      channel.position(0)
      while (buf.remaining() != 0) {
        if (channel.read(buf) == -1) {
          throw new IOException("Reached EOF before filling buffer\n" +
            s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
        }
      }
      buf.flip()
      new ChunkedByteBuffer(buf)
      } else {
      new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
          }
      } {
      channel.close()
       }
      }
    复制代码

1.3 MemManager 与 MemStore 与 MemoryPool 组件关系:

在这里要强调的是:第一代大数据框架hadoop只将内存作为计算资源,而Spark不仅将内存作为计算资源外,还将内存的一部分纳入存储体系:

  • 内存池模型 :逻辑上分为堆内存和堆外内存,然后堆内存(或堆外内存)内部又分为StorageMemoryPool和ExecutionMemoryPool。
  • MemManager是抽象的,定义了内存管理器的接口规范,方便以后扩展,比如:老版的StaticMemoryManager和新版的UnifiedMemoryManager.
  • MemStore 依赖于UnifiedMemoryManager进行内存的申请和软边界变化或内存释放。
  • MemStore 内部同时负责存储真实的对象,比如内部成员变量:entries ,建立了内存中的BlockId与MemoryEntry(Block的内存的形式)之间的映射。
  • MemStore 内部的“占座”行为,如:内部变量offHeapUnrollMemoryMap 和onHeapUnrollMemoryMap。

1.4 BlockManagerMaster 与 BlockManager 组件关系:

  • BlockManagerMaster的作用就是对存在于Dirver或Executor上的BlockManger进行统一管理,这简直是代理行为,因为他持有BlockManagerMasterEndpointREf,进而和BlockManagerMasterEndpoint进行通讯。

2. Spark存储体系组件BlockTransferServic传输服务

未完待续

3. 总结

存储体系是Spark的基石,我争取把每一块细微的知识点进行剖析,和大部分博客不同的是,我会尽量采用最平实的语言,毕竟技术就是一层窗户纸。

秦凯新 20181031 凌晨

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/101341.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • matplotlib绘图基础[通俗易懂]

    matplotlib绘图基础[通俗易懂]http://blog.csdn.net/pipisorry/article/details/37742423matplotlib介绍matplotlib是python最著名的绘图库,它提供了一整套和matlab相似的命令API,十分适合交互式地进行制图。而且也可以方便地将它作为绘图控件,嵌入GUI应用程序中。它的文档相当完备,并且Gallery页面中有上百幅缩略图,打开之后都

    2022年5月24日
    44
  • httpclient Accept-Encoding 乱码[通俗易懂]

    httpclient Accept-Encoding 乱码[通俗易懂]解决方法1HttpEntityhttpEntity=httpResponse.getEntity();2if(httpEntity!=null){3if(httpEntity.getContentEncoding()!=null){4if("g…

    2022年7月15日
    15
  • win10电脑设置提醒任务_win10添加计划任务

    win10电脑设置提醒任务_win10添加计划任务博主公司周报漏交一次要缴纳50RMB部门经费,另外博主每天上午下午都需要活动10分钟(好像放风。。),防止职业病+让自己的工作状态更好。步骤:1、打开Win10控制面板—>点选管理工

    2022年8月2日
    10
  • STM32的优先级NVIC_PriorityGroupConfig的理解及其使用[通俗易懂]

    STM32的优先级NVIC_PriorityGroupConfig的理解及其使用[通俗易懂]写作原由:因为之前有对stm32优先级做过研究,但是没时间把整理的东西发表,最近项目需要2个串口,但是不是两个串口同时使用,只是随机使用其中一个,程序对2个串口的优先级需要配置;此文思路:“中断优先级”思维导图–&gt;关键要点—&gt;结合图和要点相关程序应用例程讲解;我们先来看ST公司的一张图:我自己依据此图理解,应用思维导图画了一张方便理解:(如果看不清可通过ctrl+鼠标滑轮   …

    2022年5月28日
    41
  • 一款好用的Linux系统服务器性能监控分析工具介绍「建议收藏」

    一款好用的Linux系统服务器性能监控分析工具介绍「建议收藏」软件性能测试过程中经常要对服务器性能指标(比如CPU、内存、磁盘IO及网络IO等等)进行监控以分析出软件在此服务器上的性能瓶颈以便进行后续的服务器调优及软件性能优化。下面为大家介绍一款小编认为比较好用的Linux系统服务器性能监控分析工具:nmonforLinux。从nmon工具包中选择监控服务器匹配的nmon监控可执行文件(如下图所示:小编使用的是nmon_linux_x86_64)将…

    2022年5月30日
    33
  • mt4历史数据下载位置_头榜土豪数据中心

    mt4历史数据下载位置_头榜土豪数据中心    打开MT4,按F2,会出现一个历史数据中心对话框。之前,我直接按下载按钮时,往往下载数据会出错。因此百度了很久,也查看了很多的处理方式,觉得都不尽如人意。不是数据找不到,就是即使找到了下载时也出现问题。    近日又捣弄了一番,跑到MT4中的history文件夹,发现里面有各个我以前申请的模拟帐户,而且是不同公司下的帐户。这突然让我意识到,我在历史数据中心对话框中点击下载时出现的警

    2022年8月15日
    7

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号