Hbase 实战之谷粒微博

Hbase 实战之谷粒微博Hbase 实战之谷粒微博 1 需求分析 1 微博内容的浏览 数据库表设计 2 用户社交体现 关注用户 取关用户 3 拉取关注的人的微博内容 2 代码实现 2 1 代码设计总览 1 创建命名空间以及表名的定义 2 创建微博内容表 3 创建用户关系表 4 创建用户微博内容接收邮件表 5 发布微博内容 6

Hbase 实战之谷粒微博

 

1 需求分析

1) 微博内容的浏览,数据库表设计
2) 用户社交体现:关注用户,取关用户
3) 拉取关注的人的微博内容
 
 
Hbase 实战之谷粒微博

 

2 代码实现

2.1 代码设计总览:

1) 创建命名空间以及表名的定义
2) 创建微博内容表
3) 创建用户关系表
4) 创建用户微博内容接收邮件表
5) 发布微博内容
6) 添加关注用户
7) 移除(取关)用户
8) 获取关注的人的微博内容
9) 测试
 

 2.2 com.atlxl.weibo.Constant 代码

package com.atlxl.weibo; public class Constant { //命名空间 public static final String NAMESPACE = "weibo"; //内容表 public static final String CONTENT = "weibo:content"; //用户关系表 public static final String RELATIONS = "weibo:relations"; //收件箱表 public static final String INBOX = "weibo:inbox"; }

 

 

2.3 com.atlxl.weibo.WeiBoUtil 代码

package com.atlxl.weibo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class WeiBoUtil { private static Configuration configuration = HBaseConfiguration.create(); static { configuration.set("hbase.zookeeper.quorum", "192.168.192.102" ); } //创建命名空间 public static void createNamespace(String ns) throws IOException { //创建连接 Connection connection = ConnectionFactory.createConnection(configuration); Admin admin = connection.getAdmin(); //创建NS描述器 NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(ns).build(); //创建操作  admin.createNamespace(namespaceDescriptor); //关闭资源  admin.close(); connection.close(); } //创建表 public static void createTable(String tableName, int versions, String... cfs) throws IOException { //创建连接 Connection connection = ConnectionFactory.createConnection(configuration); Admin admin = connection.getAdmin(); //创建表描述器 HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); //循环添加列族 for (String cf : cfs) { HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf); hColumnDescriptor.setMaxVersions(versions); hTableDescriptor.addFamily(hColumnDescriptor); } //创建表操作  admin.createTable(hTableDescriptor); //关闭资源  admin.close(); connection.close(); } / * 1.更新微博内容表数据 * 2.更新收件箱表的数据 * --获取当前操作人的fans * --去往收件箱表依次更新数据 * * * @param uid * @param content * @throws IOException */ //发布微博 public static void createDate(String uid, String content) throws IOException { //获取连接 Connection connection = ConnectionFactory.createConnection(configuration); //获取三张操作表的对象 Table contTable = connection.getTable(TableName.valueOf(Constant.CONTENT)); Table relaTable = connection.getTable(TableName.valueOf(Constant.RELATIONS)); Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX)); //拼接RK long ts = System.currentTimeMillis(); String rowKey = uid + "_" + ts; //生成Put对象 Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content)); //往内容表添加数据  contTable.put(put); //获取关系表中的fans Get get = new Get(Bytes.toBytes(uid)); get.addFamily(Bytes.toBytes("fans")); Result result = relaTable.get(get); Cell[] cells = result.rawCells(); if (cells.length <= 0) { return; } //更新fans收件箱表 List 
   
     puts = 
    new ArrayList 
     
     ();  
     for 
      (Cell cell : cells) {  
     byte[] cloneQualifier = 
      CellUtil.cloneQualifier(cell); Put inboxPut = 
     new 
      Put(cloneQualifier); inboxPut.addColumn(Bytes.toBytes("info" 
     ), Bytes.toBytes(uid), ts, Bytes.toBytes(rowKey)); puts.add(inboxPut); } inboxTable.put(puts);  
     // 
     关闭资源 
      inboxTable.close(); relaTable.close(); contTable.close(); connection.close(); }  
     / 
      * 1.在用户关系表 * --添加操作人的attends * --添加被操作人的fans * 2.在收件箱中 * --在微博内容表中获取被关注者的3条数据(rowkey) * --在收件箱表中添加操作人的关注者信息 * *  
     @param 
      uid *  
     @param 
      uids  
     */ 
     // 
     关注用户 
     public 
     static 
     void addAttend(String uid, String... uids) 
     throws 
      IOException {  
     // 
     获取连接 Connection connection = 
      ConnectionFactory.createConnection(configuration);  
     // 
     获取三张操作表的对象 Table contTable = 
      connection.getTable(TableName.valueOf(Constant.CONTENT)); Table relaTable = 
      connection.getTable(TableName.valueOf(Constant.RELATIONS)); Table inboxTable = 
      connection.getTable(TableName.valueOf(Constant.INBOX));  
     // 
     创建操作者的Put对象 Put relaPut = 
     new 
      Put(Bytes.toBytes(uid)); ArrayList 
     
       puts = 
      new ArrayList 
       
       ();  
       for 
        (String s : uids) { relaPut.addColumn(Bytes.toBytes("attends" 
       ), Bytes.toBytes(s), Bytes.toBytes(s));  
       // 
       创建被关注者的Put对象 Put fansPut = 
       new 
        Put(Bytes.toBytes(s)); fansPut.addColumn(Bytes.toBytes("fans" 
       ), Bytes.toBytes(uid), Bytes.toBytes(uid)); puts.add(fansPut);  
       // 
       添加被操作人的fans 
        } puts.add(relaPut);  
       // 
       添加操作人的attends 
        relaTable.put(puts); Put inboxPut = 
       new 
        Put(Bytes.toBytes(uid));  
       // 
       获取内容表中被关注者的rowkey 
       for 
        (String s : uids) { Scan scan = 
       new Scan(Bytes.toBytes(s),Bytes.toBytes(s + "|" 
       )); ResultScanner results = 
        contTable.getScanner(scan);  
       for 
        (Result result : results) { String rowKey = 
        Bytes.toString(result.getRow()); String[] split = rowKey.split("_" 
       );  
       byte[] row = 
        result.getRow(); inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), Long.parseLong(split[1 
       ]), row); } } inboxTable.put(inboxPut); inboxTable.close(); relaTable.close(); contTable.close(); connection.close(); }  
       / 
        * 1.用户关系表 * --删除操作者关注列族的待取关用户 * --删除待取关用户fans列族的操作者 * * 2.收件箱表 * --删除操作者的待取关用户信息 *  
       */ 
       // 
       取关用户 
       public 
       static 
       void delAttend(String uid, String... uids) 
       throws 
        IOException {  
       // 
       获取连接 Connection connection = 
        ConnectionFactory.createConnection(configuration);  
       // 
       获取表对象 Table relaTable = 
        connection.getTable(TableName.valueOf(Constant.RELATIONS)); Table inboxTable = 
        connection.getTable(TableName.valueOf(Constant.INBOX));  
       // 
       创建操作者的删除对象 Delete relaDel = 
       new 
        Delete(Bytes.toBytes(uid)); ArrayList 
       
         deletes = 
        new ArrayList 
         
         ();  
         for 
          (String s : uids) {  
         // 
         创建被取关者删除对象 Delete fansDel = 
         new 
          Delete(Bytes.toBytes(s)); fansDel.addColumns(Bytes.toBytes("fans" 
         ), Bytes.toBytes(uid)); deletes.add(fansDel); relaDel.addColumns(Bytes.toBytes("attends" 
         ), Bytes.toBytes(s)); } deletes.add(relaDel);  
         // 
         执行删除操作 
          relaTable.delete(deletes);  
         // 
         删除收件箱表的相关内容 Delete inboxDel = 
         new 
          Delete(Bytes.toBytes(uid));  
         for 
          (String s : uids) { inboxDel.addColumns(Bytes.toBytes("info" 
         ), Bytes.toBytes(s)); }  
         // 
         执行收件箱表删除操作 
          inboxTable.delete(inboxDel);  
         // 
         关闭资源 
          inboxTable.close(); relaTable.close(); connection.close(); }  
         // 
         获取微博内容(初始化页面) 
         public 
         static 
         void getInit(String uid) 
         throws 
          IOException {  
         // 
         获取连接 Connection connection = 
          ConnectionFactory.createConnection(configuration);  
         // 
         获取表对象(2个) Table inboxTable = 
          connection.getTable(TableName.valueOf(Constant.INBOX)); Table contTable = 
          connection.getTable(TableName.valueOf(Constant.CONTENT));  
         // 
         获取收件箱表数据 Get get = 
         new Get(Bytes.toBytes(uid)); 
         // 
         收件箱表get对象 get.setMaxVersions(); 
         // 
         设置获取最大版本的数据 
          Result result = 
          inboxTable.get(get); ArrayList 
         
           gets = 
          new ArrayList 
           
           (); Cell[] cells = 
            result.rawCells();  
           // 
           遍历返回内容并将其封装成内容的get对象 
           for 
            (Cell cell : cells) { Get contGet = 
           new 
            Get(CellUtil.cloneValue(cell)); gets.add(contGet); }  
           // 
           根据收件箱表获取值去往内容表获取实际微博内容 Result[] results = 
            contTable.get(gets);  
           for 
            (Result result1 : results) { Cell[] cells1 = 
            result1.rawCells();  
           // 
           遍历并打印 
           for 
            (Cell cell : cells1) { System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(cell)) + ",Content:" + 
            Bytes.toString(CellUtil.cloneValue(cell))); } }  
           // 
           关闭资源 
            inboxTable.close(); contTable.close(); connection.close(); }  
           // 
           获取微博内容(查看某个人所有内容) 
           public 
           static 
           void getData(String uid) 
           throws 
            IOException {  
           // 
           获取连接 Connection connection = 
            ConnectionFactory.createConnection(configuration);  
           // 
           获取表对象 Table table = 
            connection.getTable(TableName.valueOf(Constant.CONTENT));  
           // 
           扫描(过滤器) Scan scan = 
           new 
            Scan();  
           // 
            过滤器集合  
           // 
            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);  
           // 
            filterList.addFilter(); 
            RowFilter rowFilter = 
           new RowFilter(CompareFilter.CompareOp.EQUAL, 
           new SubstringComparator(uid + "_" 
           )); scan.setFilter(rowFilter); ResultScanner results = 
            table.getScanner(scan);  
           // 
           遍历打印 
           for 
            (Result result : results) { Cell[] cells = 
            result.rawCells();  
           for 
            (Cell cell : cells) { System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(cell)) + ",Content:" + 
            Bytes.toString(CellUtil.cloneValue(cell))); } }  
           // 
           关闭资源 
            table.close(); connection.close(); } } 
           
          
         
        
       
      
     
   

 

 

 

2.4 com.atlxl.weibo.Weibo 代码

package com.atlxl.weibo; import java.io.IOException; public class Weibo { public static void init() throws IOException { //创建命名空间  WeiBoUtil.createNamespace(Constant.NAMESPACE); //创建内容表 WeiBoUtil.createTable(Constant.CONTENT, 1, "info"); //创建用户关系表 WeiBoUtil.createTable(Constant.RELATIONS, 1, "attends","fans"); //创建收件箱表(多版本) WeiBoUtil.createTable(Constant.INBOX, 2,"info"); } public static void main(String[] args) throws IOException { // //测试 // init(); //1001,1002发布微博 // WeiBoUtil.createDate("1001", "今天天气晴朗!"); // WeiBoUtil.createDate("1002", "今天天气下雨!"); //1001关注1002和1003 // WeiBoUtil.addAttend("1001", "1002","1003"); //获取1001初始化页面信息 // WeiBoUtil.getInit("1001"); //1003发布微博 // WeiBoUtil.createDate("1003", "今天你过的怎么样!"); WeiBoUtil.createDate("1003", "好好学习!"); WeiBoUtil.createDate("1003", "天天向上!"); // System.out.println("*"); //获取1001初始化页面信息 WeiBoUtil.getInit("1001"); //取关 WeiBoUtil.delAttend("1001", "1002"); System.out.println("*"); WeiBoUtil.getInit("1001"); } }

 

转载于:https://www.cnblogs.com/LXL616/p/11028856.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/212138.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午8:44
下一篇 2026年3月18日 下午8:44


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号