Spark存储体系底层架构剖析-Spark商业环境实战

Spark存储体系底层架构剖析-Spark商业环境实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

Spark商业环境实战及调优进阶系列

1. Spark存储体系组件关系解释

BlockInfoManger 主要提供读写锁控制,层级仅仅位于BlockManger之下,通常Spark读写操作都先调用BlockManger,然后咨询BlockInfoManger是否存在锁竞争,然后才会调用DiskStore和MemStore,进而调用DiskBlockManger来确定数据与位置映射,或者调用 MemoryManger来确定内存池的软边界和内存使用申请。

1.1 Driver 与 Executor 与 SparkEnv 与 BlockManger 组件关系:

Driver与 Executor 组件各自拥有任务执行的SparkEnv环境,而每一个SparkEnv 中都有一个BlockManger负责存储服务,作为高层抽象,BlockManger 之间需要通过 RPCEnv,ShuffleClient,及BlocakTransferService相互通讯。

1.1 BlockInfoManger 与 BlockInfo 共享锁和排它锁读写控制关系:

BlockInfo中具有读写锁的标志,通过标志可以判断是否进行写控制

  val NO_WRITER: Long = -1
  val NON_TASK_WRITER: Long = -1024
  
 * The task attempt id of the task which currently holds the write lock for this block, or
 * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
 * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
 
 def writerTask: Long = _writerTask
 def writerTask_=(t: Long): Unit = {
 _writerTask = t
    checkInvariants()
复制代码

BlockInfoManager具有BlockId与BlockInfo的映射关系以及任务id与BlockId的锁映射:

 private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]  
 
 *Tracks the set of blocks that each task has locked for writing.
 private[this] val writeLocksByTask = new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
                                       with mutable.MultiMap[TaskAttemptId, BlockId]
 
 *Tracks the set of blocks that each task has locked for reading, along with the number of times
 *that a block has been locked (since our read locks are re-entrant).
 private[this] val readLocksByTask =
 new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
复制代码

1.3 DiskBlockManager 与 DiskStore 组件关系:

可以看到DiskStore内部会调用DiskBlockManager来确定Block的读写位置:

  • 以下是DiskStore的抽象写操作,需要传入FileOutputStream => Unit高阶函数:

      def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
      if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
      }
      logDebug(s"Attempting to put block $blockId")
      val startTime = System.currentTimeMillis
      
      val file = diskManager.getFile(blockId)
      
      val fileOutputStream = new FileOutputStream(file)
      var threwException: Boolean = true
      try {
          writeFunc(fileOutputStream)
          threwException = false
      } finally {
       try {
          Closeables.close(fileOutputStream, threwException)
       } finally {
       if (threwException) {
        remove(blockId)
              }
          }
      }
      val finishTime = System.currentTimeMillis
      logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
      }
    复制代码
  • 以下是DiskStore的读操作,调用DiskBlockManager来获取数据位置:

      def getBytes(blockId: BlockId): ChunkedByteBuffer = {
      
      val file = diskManager.getFile(blockId.name)
     
      val channel = new RandomAccessFile(file, "r").getChannel
      Utils.tryWithSafeFinally {
    * For small files, directly read rather than memory map
      if (file.length < minMemoryMapBytes) {
      val buf = ByteBuffer.allocate(file.length.toInt)
      channel.position(0)
      while (buf.remaining() != 0) {
        if (channel.read(buf) == -1) {
          throw new IOException("Reached EOF before filling buffer\n" +
            s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
        }
      }
      buf.flip()
      new ChunkedByteBuffer(buf)
      } else {
      new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
          }
      } {
      channel.close()
       }
      }
    复制代码

1.3 MemManager 与 MemStore 与 MemoryPool 组件关系:

在这里要强调的是:第一代大数据框架hadoop只将内存作为计算资源,而Spark不仅将内存作为计算资源外,还将内存的一部分纳入存储体系:

  • 内存池模型 :逻辑上分为堆内存和堆外内存,然后堆内存(或堆外内存)内部又分为StorageMemoryPool和ExecutionMemoryPool。
  • MemManager是抽象的,定义了内存管理器的接口规范,方便以后扩展,比如:老版的StaticMemoryManager和新版的UnifiedMemoryManager.
  • MemStore 依赖于UnifiedMemoryManager进行内存的申请和软边界变化或内存释放。
  • MemStore 内部同时负责存储真实的对象,比如内部成员变量:entries ,建立了内存中的BlockId与MemoryEntry(Block的内存的形式)之间的映射。
  • MemStore 内部的“占座”行为,如:内部变量offHeapUnrollMemoryMap 和onHeapUnrollMemoryMap。

1.4 BlockManagerMaster 与 BlockManager 组件关系:

  • BlockManagerMaster的作用就是对存在于Dirver或Executor上的BlockManger进行统一管理,这简直是代理行为,因为他持有BlockManagerMasterEndpointREf,进而和BlockManagerMasterEndpoint进行通讯。

2. Spark存储体系组件BlockTransferServic传输服务

未完待续

3. 总结

存储体系是Spark的基石,我争取把每一块细微的知识点进行剖析,和大部分博客不同的是,我会尽量采用最平实的语言,毕竟技术就是一层窗户纸。

秦凯新 20181031 凌晨

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/101341.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 华硕老毛子Padavan使用IPV6+Aliddns远程管理路由

    华硕老毛子Padavan使用IPV6+Aliddns远程管理路由华硕老毛子Padavan使用IPV6远程管理路由前言一、pandas是什么?二、使用步骤1.引入库2.读入数据总结前言提示:以下是本篇文章正文内容,下面案例可供参考一、pandas是什么?示例:pandas是基于NumPy的一种工具,该工具是为了解决数据分析任务而创建的。二、使用步骤1.引入库代码如下(示例):importnumpyasnpimportpandasaspdimportmatplotlib.pyplotaspltimportseaborna

    2022年6月5日
    520
  • java学习——Java 动态代理机制分析

    越学越觉得自己不懂的越多!java的动态代理学习资料整理:Java 动态代理机制分析及扩展,第 1 部分: https://www.ibm.com/developerworks/cn/java/j-lo-proxy1/Java 动态代理作用是什么? https://www.zhihu.com/question/20794107Java动态代理机制详解(JDK 和CGLIB,Javassist,A

    2022年2月26日
    37
  • v4l2应用编程(labstillalive攻略)

    转自:http://www.cnblogs.com/ronnydm/p/5787182.html通过上两篇文章,我们已经成功的建立了/dev/video0这个字符设备,此时,在UserSpace就可以打开该设备,完成相应的调用。  总结如何使用V4L2架构建立我们自己的设备驱动,其实就是以下3个结构体的设置及注册:    1.structv4l2_device

    2022年4月14日
    879
  • mysql和mysql数据库的区别_sql数据库怎么用

    mysql和mysql数据库的区别_sql数据库怎么用什么是SQL?SQL是一种用于操作数据库的语言。SQL是用于所有数据库的基本语言。不同数据库之间存在较小的语法更改,但基本的SQL语法基本保持不变。SQL是StructuredQueryLanguage的简短缩写。根据ANSI(美国国家标准协会),SQL是操作关系数据库管理系统的标准语言。SQL用于访问,更新和操作数据库中的数据。它的设计允许管理RDBMS中的数据,例如MYSQL…

    2022年10月2日
    5
  • 漫画网站服务器,建立家庭漫画服务器,从iPad上看漫画

    漫画网站服务器,建立家庭漫画服务器,从iPad上看漫画好,故事开始。以前就在想,在retina屏幕的iPad上看漫画该多爽。可是现在捧着iPad却发现看漫画很困难,自己电脑上下载了一堆漫画都是jpg图片,导入iBook也很麻烦。现在通过家里的iMac建立一个家庭漫画服务器,直接在线浏览速度快、体验好,而且还很方便。以后下载了漫画往目录里一扔,就可以拿起iPad看了。需要工具:MAMP是Mac下出名的Apache-MySQL-PHP服务器套件。…

    2022年6月17日
    26
  • android开发之Notification_通知栏消息「建议收藏」

    Notification简介  Notification看名字就知道,是一个和提醒有关的东西,它通常和NotificationManager一块使用。具体来说,其主要功能如下。  1.NotificationManager和Notification用来设置通知  通知的设置等操作相对比较简单,基本的使用方式就是新建一个Notification对象,设置好通知的各项参数,然后使用系统后台

    2022年3月10日
    40

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号