读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor

读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor

大家好,又见面了,我是全栈君。

Coprocessor是HBase 0.92.0引入的特性。使用Coprocessor。能够将一些计算逻辑下推到HBase节点,HBase由一个单纯的存储系统升级为分布式数据处理平台。

Coprocessor分为两种:Observer和Endpoint。

Observer能改动扩展已有的client操作功能。而Endpoint能引入新的client操作。

Observer

Observer的作用类似于数据库的触发器或者AOP中的advice。下图为Put操作添加Observer,当中1-2-4-6是一次正常的Put操作RPC调用过程,而3和5属于Observer,能够在Put操作之前和之后添加自己定义处理逻辑。

读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor

Observer包含三种,RegionObserver(针对数据訪问和更新操作,执行在Region上)/WALObserver(针对WAL日志事件,执行在RegionServer上下文)/MasterObserver(针对DDL操作,执行在Master节点)。

Endpoint

Endpoint的作用则类似于数据库存储过程。实现机制是通过扩展HBase RPC协议,给client暴露新的操作接口。

例如以下图,client负责发起调用和收集结果,服务端各节点负责并行计算。

读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor

实战

以上一章的follows表为例,通过Observer实现followedBy被关注表数据一致性维护。Endpoint实现关注人数量统计。

由于要实如今插入follows表时自己主动插入followedBy表。须要用到关注人/被关注人username信息,所以首先升级schema。

读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor

实现Observer

代码中有三处凝视值得注意:

  1. postPut方法在put操作之后被调用。
  2. 假设通过hbase-site.xml安装Observer。会应用到全局全部表,所以这里推断put操作的是否follows表。

  3. 这里有点bad smell。Observer执行在服务器端。为了共用代码,又调用client代码,仅为演示作用。

packageHBaseIA.TwitBase.coprocessors;//…publicclass FollowsObserver extends BaseRegionObserver {    private HTablePool pool = null;    @Override    public void start(CoprocessorEnvironment env)throws IOException {        pool = newHTablePool(env.getConfiguration(), Integer.MAX_VALUE);    }    @Override    public void stop(CoprocessorEnvironment env)throws IOException {        pool.close();    }    @Override    public void postPut(//1,在Put操作之后调用            finalObserverContext<RegionCoprocessorEnvironment> e,            final Put put,            final WALEdit edit,            final boolean writeToWAL) throws IOException {        byte[] table=e.getEnvironment().getRegion().getRegionInfo().getTableName();        if (!Bytes.equals(table,FOLLOWS_TABLE_NAME))             return;  //2,推断表名        KeyValue kv =put.get(RELATION_FAM, FROM).get(0);        String from =Bytes.toString(kv.getValue());        kv = put.get(RELATION_FAM,TO).get(0);        String to =Bytes.toString(kv.getValue());        RelationsDAO relations = newRelationsDAO(pool);        relations.addFollowedBy(to,from);//3,插入followedBy表    }}

Observer的安装能够通过改动hbase-site.xml或者使用tableschema改动语句完毕,前者须要重新启动HBase服务,后者仅仅须要又一次上下线相应表。

$ hbase shellHBaseShell; enter 'help<RETURN>' for list of supported commands.Type"exit<RETURN>" to leave the HBase ShellVersion0.92.0, r1231986, Mon Jan 16 13:16:35 UTC 2012hbase(main):001:0>disable 'follows'0 row(s) in 7.0560 secondshbase(main):002:0>alter 'follows', METHOD => 'table_att','coprocessor'=>'file:///Users/ndimiduk/repos/hbaseiatwitbase/target/twitbase-1.0.0.jar|HBaseIA.TwitBase.coprocessors.FollowsObserver|1001|'Updatingall regions with the new schema...1/1regions updated.Done.0 row(s) in 1.0770 secondshbase(main):003:0>enable 'follows'0 row(s) in 2.0760 seconds

当中1001为优先级。当载入多个Observer时。依照优先级次序执行。

实现Endpoint

关注人数量统计能够通过clientScan实现,相比Endpoint方案。有两个待改进点:

  1. 传输全部关注人到client,不必要的网络I/O。

  2. 拿到全部关注人Result结果后。遍历实现计数是单线程的。

实现Endpoint包含三部分

定义PRC接口

publicinterface RelationCountProtocol extends CoprocessorProtocol {    public long followedByCount(String userId) throwsIOException;}

服务端实现

和client不同,InternalScanner执行在特定Region上。返回的是原始的KeyValue对象。

packageHBaseIA.TwitBase.coprocessors;//…publicclass RelationCountImpl extends BaseEndpointCoprocessor implementsRelationCountProtocol {    @Override    public longfollowedByCount(String userId) throws IOException {        byte[]startkey = Md5Utils.md5sum(userId);        Scan scan = newScan(startkey);        scan.setFilter(newPrefixFilter(startkey));        scan.addColumn(RELATION_FAM,FROM);        scan.setMaxVersions(1);        RegionCoprocessorEnvironmentenv= (RegionCoprocessorEnvironment)getEnvironment();        InternalScanner scanner =env.getRegion().getScanner(scan);//1,server端        long sum = 0;        List<KeyValue> results= new ArrayList<KeyValue>();        boolean hasMore = false;        do {            hasMore =scanner.next(results);            sum += results.size();            results.clear();        } while (hasMore);        scanner.close();        return sum;    }}

client代码

參考凝视:

  1. 定义Call实例 
  2. 调用服务端Endpoint。
  3. 聚合全部RegionServer得到的结果

public long followedByCount (final String userId) throws Throwable {    HTableInterface followed =pool.getTable(FOLLOWED_TABLE_NAME);    final byte[] startKey = Md5Utils.md5sum(userId);    final byte[] endKey =Arrays.copyOf(startKey, startKey.length);    endKey[endKey.length-1]++;    Batch.Call<RelationCountProtocol,Long> callable =        newBatch.Call<RelationCountProtocol, Long>() {        @Override        public Longcall(RelationCountProtocol instance) throws IOException {            returninstance.followedByCount(userId);        }    };//1 call instance    Map<byte[], Long>results = followed.coprocessorExec(                                   RelationCountProtocol.class,                                   startKey,                                   endKey,                                   callable);//2 invoke endpoint    long sum = 0;    for(Map.Entry<byte[],Long> e : results.entrySet()) {        sum +=e.getValue().longValue();    }//3 aggreagte results    return sum;}

Endpoint仅仅能通过配置文件部署,还须要将相关jar包增加到HBase classpath。

<property>    <name>hbase.coprocessor.region.classes</name>    <value>HBaseIA.TwitBase.coprocessors.RelationCountImpl</value></property>

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/115767.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • mac phpstorm 激活码【2021.8最新】

    (mac phpstorm 激活码)好多小伙伴总是说激活码老是失效,太麻烦,关注/收藏全栈君太难教程,2021永久激活的方法等着你。IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.net/100143.htmlS32PGH0SQB-eyJsaWNlbnNlSW…

    2022年3月26日
    69
  • 初中python培训机构

    初中python培训机构都知道现在Python这门编程语言很火,那它究竟火到什么程度?可能互联网上铺天盖地的Python学习贴不够直观,求职平台上Python相关工资水涨船高,也离我们普通人太远,但——Python被纳入基础教育体系呢?浙江省八年级将新增Python编程课程风变编程得到最新消息,在2020年9月开始的新学期中,浙江省三年级到九年级信息技术课将同步替换新教材,而其中最大的变化是,八年级将新增Python课程内容。同时,新高一信息技术编程语言由VB替换为Python,大数据、人工智能、程序设计与算法按照教材规划

    2022年5月16日
    42
  • zencart模板列表下载地址

    zencart模板列表下载地址下载index.html文件后用浏览器打开,里面有一百多个zencart模板示例下载地址:zencart模板示例下载地址或者复制下面网址,用浏览器打开即可下载:http://bcs.duapp.com

    2022年7月2日
    25
  • 基于麦克风阵列的声源定位_python播放声音模块

    基于麦克风阵列的声源定位_python播放声音模块上一篇文章说到odas_web界面非常难安装,并且运行也很卡。所以我自己用python写了一个界面程序,用来接收odas处理完的结果。这个界面程序与odas之间是通过socket连接的,界面作为服务器,odas作为客户端,由于有两路数据,所以各有两个服务器和客户端。但是实际绘制在界面上的是SSL的结果,不是SST的结果。其实我也试过SST的结果,从直观的感受而言,效果会比SSL差一些,实时性不是很高,我的理解SST的好处是可以跟踪音源是否有活动。

    2022年9月2日
    5
  • C语言数组练习题目

    C语言数组练习题目C语言数组练习题目1、编写程序,输入10个整数存入一维数组,统计输出其中的正数、负数和零的个数。#include<stdio.h>main(){ inta[10],i,j=0,k=0,l=0; printf(“请输入10个整数:”); for(i=0;i<10;i++) { scanf(“%d”,&a[i]); } for(i=0;i<10;i++) { if(a[i]>0) ++j; elseif(a[i]==0) ++k

    2022年7月11日
    12
  • crontab使用方法[通俗易懂]

    crontab使用方法[通俗易懂]一、crontab基本用法1.1cron服务cron是一个linux下的定时执行工具,可以在无需人工干预的情况下运行作业。servicecrondstart//启动服务servicecrondstop//关闭服务servicecrondrestart//重启服务servicecrondreload//重新载入配置servicecrondstatus//查看服务状态1.2Crontab存放路径/var/spool/c…

    2022年8月24日
    4

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号