Hadoop序列化中的Writable接口(附部分源码)

Hadoop序列化中的Writable接口(附部分源码)

序列化是将结构化对象为字节流以便与通过网络进行传输或者写入持久存储。反序列化指的是将字节流转为一系列结构化对象的过程。

序化在分布式数据处理的两大领域经常出现:进程间通信和永久存储

hadoop中,节点直接的进程间通信是用远程过程调用(RPC)实现的。RPC协议将消息序列化成二进制流后发送到运城节点,远程节点接着将二进制流反序列化为原始的消息。

在Hadoop中,Writable接口定义了两个方法:一个用于将其状态写入二进制格式的DataOutput流,另一个用于从二进制格式的DataInput流读取其态。

packageorg.apache.hadoop.io; importjava.io.DataOutput; importjava.io.DataInput; importjava.io.IOException; public interface Writable { void write(DataOutput out)throws IOException; void readFields(DataInput in)throws IOException;

write和readFields分别实现了把对象序列化和反序列化的功能

让我们来看一个特别的Writable,看看可以对它进行哪些操作。我们要使用IntWritable,这是一个Java的int对象的封装。可以使用set()函数来创建和设置它的值:

IntWritable writable =new IntWritable(); writable.set(163);

类似地,我们也可以使用构造函数:

IntWritable writable =newIntWritable(163);

为了检查IntWritable的序列化形式,我们写一个小的辅助方法,它把一个java.io.ByteArrayOutputStream封装到java.io.DataOutputStream中(java.io.DataOutput的一个实现),以此来捕获序列化的数据流中的字节:

public static byte[] serialize(Writable writable)throws IOException { ByteArrayOutputStream out =new ByteArrayOutputStream(); DataOutputStream dataOut =new DataOutputStream(out); writable.write(dataOut); dataOut.close(); returnout.toByteArray(); }

整数用四个字节写入(我们使用JUnit 4断言):

byte[] bytes = serialize(writable); assertThat(bytes.length, is(4));

字节使用大端顺序写入(所以,最重要的字节写在数据流的开始处,这是由java.io.DataOutput接口规定的),我们可以使用Hadoop的StringUtils方法看到它们的十六进制表示:

assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));

让我们再来试试反序列化。我们创建一个帮助方法来从一个字节数组读取一个Writable对象:

public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException { ByteArrayInputStream in =new ByteArrayInputStream(bytes); DataInputStream dataIn =new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }

我们构造一个新的、缺值的IntWritable,然后调用deserialize()方法来读取刚写入的输出流。然后发现它的值(使用get方法检索得到)还是原来的值163:

IntWritable newWritable =new IntWritable(); deserialize(newWritable, bytes); assertThat(newWritable.get(), is(163));

WritableComparable 和comparator

IntWritable实现了WritableComparable接口,后者是Writable和java.lang.Comparable接口的子接口。

packageorg.apache.hadoop.io; public interface WritableComparable<t> extends Writable, Comparable<t> { }

类型的比较对MapReduce而言至关重要的,键和键之间的比较是在排序阶段完成。Hadoop提供的一个优化方法是从Java Comparator的RawComparator扩展

packageorg.apache.hadoop.io; importjava.util.Comparator; public interface RawComparator<t> extends Comparator<t> { public int compare(byte[] b1,ints1,intl1,byte[] b2,ints2,intl2); }
 package java.util; public interface Comparator<T> {     int compare(T o1, T o2);     boolean equals(Object obj); }

这个接口允许执行者比较从流中读取的未被反序列化为对象的记录,从而省去了创建对象的所有开销。例如,IntWritables的comparator使用原始的compare()方法从每个字节数组的指定开始位置(S1和S2)和长度(L1和L2)读取整数b1和b2然后直接进行比较。

WritableComparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要功能。首先,它提供了一个默认的对原始compare()函数的调用,对从数据流对要比较的对象进行反序列化,然后调用对象的compare()方法其次,它充当的是RawComparator实例的一个工厂方法(Writable方法已经注册)。例如,为获得IntWritable的comparator,我们只需使用:

RawComparator<intwritable> comparator = WritableComparator.get(IntWritable.class);

WritableComparator get方法源码:

private static HashMap<Class, WritableComparator> comparators = new HashMap<Class, WritableComparator>(); // registry /** Get a comparator for a {@link WritableComparable} implementation. */ public static synchronized WritableComparator get(Class<? extends WritableComparable> c) { WritableComparator comparator = comparators.get(c); if (comparator == null) comparator = new WritableComparator(c, true); return comparator; }

comparator可以用来比较两个IntWritable:

IntWritable w1 =newIntWritable(163); IntWritable w2 =newIntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0));

或者它们的序列化描述: 

byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));

WritableComparator的compare()方法的源码:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { buffer.reset(b1, s1, l1); // parse key1 key1.readFields(buffer); buffer.reset(b2, s2, l2); // parse key2 key2.readFields(buffer); } catch (IOException e) { throw new RuntimeException(e); } return compare(key1, key2); // compare them } @SuppressWarnings("unchecked") public int compare(WritableComparable a, WritableComparable b) { return a.compareTo(b); }

参考:《hadoop权威指南》

转载于:https://my.oschina.net/winHerson/blog/130145

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/110135.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • java mediatype属性_SpringMVC 及常用MediaType

    java mediatype属性_SpringMVC 及常用MediaTypeSpringMVC简介在WEB开发中,SpringMVC实现了较为经典的MVC(Model,View,Controller)模式,组成:1.Model层(模型层):管理App中每个功能模块所用到的值和数据.(实体类entity).2.View层(视图层):将模型层的数据展示给用户.(页面jsp,html,thymeleaf等..)3.Controller层(控制层/控制器):管理页面跳转…

    2022年5月9日
    214
  • gbk编码表_河北十一选五五码基本分布图

    gbk编码表_河北十一选五五码基本分布图1.GBK码位分布图2.GBK码位说明GBK亦採用双字节表示,整体编码范围为8140-FEFE,首字节在81-FE之间,尾字节在40-FE之间,剔除xx7F一条线。总计23940

    2022年8月1日
    4
  • 玩转跨境电商_个人如何做跨境电商

    玩转跨境电商_个人如何做跨境电商近日日本最大二手交易平台Mercari日前对外宣布,将与阿里巴巴合作启动跨境销售。不久后,Mercari将登陆淘宝和闲鱼,消费者下单后,由电商服务平台BEENOS负责代购,随后将货物发往国内。Mercari作为日本最大二手交易平台,一直以来在亚洲范围内都久负盛名,而闲鱼作为国内首屈一指的二手电商平台,二者的联合碰撞起的全新火花,能打开跨境二手电商的新链路吗?二手与跨境,电商后意识形态的新鸾凤自流量为尊的互联网商业态势席卷之后,电商在不停演变,从最初的图书到衣物综合,再到各垂直电商、社交电商…

    2022年10月4日
    3
  • 非线性最小二乘问题例题_非线性自适应控制算法

    非线性最小二乘问题例题_非线性自适应控制算法摘录的一篇有关求解非线性最小二乘问题的算法–LM算法的文章,当中也加入了一些我个人在求解高精度最小二乘问题时候的一些感触:LM算法,全称为Levenberg-Marquard算法,它可用于解决非线性最小二乘问题,多用于曲线拟合等场合。LM算法的实现并不算难,它的关键是用模型函数 f 对待估参数向量p在其邻域内做线性近似,忽略掉二阶以上的导数项,从而转化为线性最小二乘问题,它具有收敛速度快

    2022年9月26日
    2
  • BeanUtils.populate方法使用

    BeanUtils.populate方法使用BeanUtils.populate方法使用1.在执行BeanUtils.populate之后,会把map封装成User对象。要注意的是,UserBean类中的字段名必须和html中的name属性值相同,不然在BeanUtils.populate执行之后,User对象的字段中会出现NULL数据Map<String,String[]>map=req.getParameterMap();//创建User对象UserloginUser=newUser

    2022年7月26日
    7
  • (tkinter)撩妹弹窗(3)之不要越过三八线,canvas的使用方法

    (tkinter)撩妹弹窗(3)之不要越过三八线,canvas的使用方法

    2022年2月21日
    42

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号