snappy流式编解码总结

snappy流式编解码总结介绍 snappy 是谷歌开源的用于数据快速压缩和解压的程序库 它的目标并非实现最大压缩率 而是同时实现非常高的压缩速度和合理的压缩率 snappy 被广泛应用于 google 内部和开源的项目中 例如 Hadoop LevelDB Spark 官方库地址 https github com google snappy 由于最近的工作需要 发现 snappy 还实现了两个变种 这两个变种分别实现了 snappy 算法在本地文件系统和 hadoop 中的流式编解码 以下分别称之为 snappystream 和 hadoop

介绍

snappy是谷歌开源的用于数据快速压缩和解压的程序库,它的目标并非实现最大压缩率,而是同时实现非常高的压缩速度和合理的压缩率。snappy被广泛应用于google内部和开源的项目中,例如Hadoop, LevelDB, Spark。官方库地址:https://github.com/google/snappy

由于最近的工作需要,发现snappy还实现了两个变种,这两个变种分别实现了snappy算法在本地文件系统和hadoop中的流式编解码。以下分别称之为snappy stream code和hadoop snappy stream codec, 以区分于raw snappy codec.

  • raw snappy codec: 用于对整个文件或整块数据进行编解码.
  • stream snappy codec: 流式编解码。在raw snappy codec上增加了chunk的概念。编解码的最小粒度是chunk。
  • hadoop stream snappy codec: 流式编解码。在raw snappy codec的基础上增加了block的概念。编解码最小粒度是block

那么为什么我们需要snappy流式编解码呢?

因为如果没有流式编解码算法,要对snappy数据进行解压,就必须将数据全部读入到内存,如果文件特别大的话内存可能不够用。因此们需要流式编解码算法,每次从文件流中读取一个frame的数据,解压,再读取,再解压,这样内存占用不至于特别大。

snappy的google官方库并没有实现上述的流式codec, 而python snappy实现了:https://github.com/andrix/python-snappy

以下是个范例

#/usr/bin/env python # coding: utf-8     import snappy   text_file = "1.txt" snappy_file = "1.snappy"   with open (text_file, "r") as input_file:     uncompressed_data = input_file.read().encode('utf-8')       # raw snappy codec     compressed_data = snappy.compress(uncompressed_data)     assert uncompressed_data == snappy.uncompress(compressed_data)       # stream snappy codec     c = snappy.StreamCompressor()     d = snappy.StreamDecompressor()          compressed_data = c.compress(uncompressed_data)     assert uncompressed_data == d.decompress(compressed_data)       # hadoop stream snappy codec     c = snappy.hadoop_snappy.StreamCompressor()     d = snappy.hadoop_snappy.StreamDecompressor()     compressed_data = c.compress(uncompressed_data)     assert uncompressed_data == d.decompress(compressed_data) 

raw snappy codec

算法比较复杂,可参考:https://zzjw.cc/post/snappy/

stream snappy codec

使用stream snappy codec算法压缩的文件由一连串的chunk组成: chunk | chunk | chunk | … | chunk

每一个chunk的组成:header | body

chunk header的组成: chunk_type(1B) | chunk_size(3B)    其中chunk_size是chunk中body部分的长度,以byte为单位

chunk_type取值如下

  • stream identifier(chunk_type = 0xff). 压缩文件中, 第一个chunk必须是该类型。chunk size = 6, body = “sNaPpY”
  • compressed data(chunk_type = 0x00). 包含实际数据。body组成:crc32校验码(4B) | compressed_data. 其中crc32校验码为使用raw snappy codec解压compressed_data之后得到的uncompressed_data的crc校验码。注意:size(compressed_data) <= 2^24-1, size(uncompressed_data) <= 65535
  • uncompressed_data(chunk_type = 0x01). 包含实际数据。body组成: crc32校验码(4B) | uncompressed_data. 其中crc32校验码为之后的uncompressed_data的crc校验码。注意:size(uncompressed_data) <= 65535
  • padding(0xfe). 主要用于补零对齐。body = 00000…
  • Reserved unskippable chunks (chunk_types介于0x02-0x7f) 预留给未来扩展的的chunk type。解码器遇到这种chunk type应该立即报错
  • Reserved skippable chunks (chunk types介于0x80-0xfd) 预留给未来扩展的chunk type, 解码器遇到这种chunk应该立即跳过,

crc32校验算法如下:

def _masked_crc32c(data):     # see the framing format specification     crc = _crc32c(data)     return (((crc >> 15) | (crc << 17)) + 0xa282ead8) & 0xffffffff 

这种编码格式保证了两个经过stream snappy编码的文件通过linux cat命令合并之后,仍然是合法的压缩文件。

更详细的说明文档见:https://github.com/google/snappy/blob/master/framing_format.txt

snappy python库中解压算法实现:

def decompress(self, data):     """Decompress 'data', returning a string containing the uncompressed     data corresponding to at least part of the data in string. This data     should be concatenated to the output produced by any preceding calls to     the decompress() method. Some of the input data may be preserved in     internal buffers for later processing.     """     self._buf.extend(data)     uncompressed = bytearray()     while True:         if len(self._buf) < 4:             return bytes(uncompressed)         chunk_type = struct.unpack(" 
  
    > 8)         chunk_type &= 0xff         if not self._header_found:             if (chunk_type != _IDENTIFIER_CHUNK or                     size != len(_STREAM_IDENTIFIER)):                 raise UncompressError("stream missing snappy identifier")             self._header_found = True         if (_RESERVED_UNSKIPPABLE[0] <= chunk_type and                 chunk_type < _RESERVED_UNSKIPPABLE[1]):             raise UncompressError(                 "stream received unskippable but unknown chunk")         if len(self._buf) < 4 + size:             return bytes(uncompressed)         chunk, self._buf = self._buf[4:4 + size], self._buf[4 + size:]         if chunk_type == _IDENTIFIER_CHUNK:             if chunk != _STREAM_IDENTIFIER:                 raise UncompressError(                     "stream has invalid snappy identifier")             continue         if (_RESERVED_SKIPPABLE[0] <= chunk_type and                 chunk_type < _RESERVED_SKIPPABLE[1]):             continue         assert chunk_type in (_COMPRESSED_CHUNK, _UNCOMPRESSED_CHUNK)         crc, chunk = chunk[:4], chunk[4:]         if chunk_type == _COMPRESSED_CHUNK:             chunk = _uncompress(chunk)         if struct.pack(" 
    
  

执行流程如下:

  • 首先判断第一个chunk的类型是否为stream identifier, 如果不是,说明格式非法,返回错误。如果是,校验chunk body是否为“sNaPpY”
  • 判断chunk类型是否为reserved unskippable, 如果是,说明格式非法,返回错误
  • 判断chunk类型是否为reserved skippable, 如果是,则跳过当前chunk
  • 对chunk body进行crc校验
  • 如果chunk类型是uncompressed_data, 则将uncompressed_data追加到解压结果当中
  • 如果chunk类型是compressed_data, 则使用raw snappy codec对compressed_data解压,得到uncompressed_data追加到解压结果中
  • 当前chunk处理完成后,接着下一个,直到文件结束

hadoop stream snappy codec

hadoop stream snappy codec主要用于hdfs file的snappy编解码场景。其格式相比stream snappy codec更为简单

它由多个block组成:block | … |block

每个block的格式:total_len(4字节)|compressed_len(4字节)|compressed_data(compressed_len长度,变长)| compressed_len | compressed_data

其中

  • total_len: 当前block中解压结果的总长度
  • compressed_len: 对应的compressed data长度
  • compressed_data: 压缩数据

snappy python库中解压算法如下:

    def decompress(self, data):         """Decompress 'data', returning a string containing the uncompressed         data corresponding to at least part of the data in string. This data         should be concatenated to the output produced by any preceding calls to         the decompress() method. Some of the input data may be preserved in         internal buffers for later processing.         """         int_size = _INT_SIZE         self._buf += data         uncompressed = []         while True:             if len(self._buf) < int_size:                 return b"".join(uncompressed)             next_start = 0             if not self._block_length:                 self._block_length = unpack_int(self._buf[:int_size])                 self._buf = self._buf[int_size:]                 if len(self._buf) < int_size:                     return b"".join(uncompressed)             compressed_length = unpack_int(                 self._buf[next_start:next_start + int_size]             )             next_start += int_size             if len(self._buf) < compressed_length + next_start:                 return b"".join(uncompressed)             chunk = self._buf[                 next_start:next_start + compressed_length             ]             self._buf = self._buf[next_start + compressed_length:]             uncompressed_chunk = _uncompress(chunk)             self._uncompressed_length += len(uncompressed_chunk)             uncompressed.append(uncompressed_chunk)             if self._uncompressed_length == self._block_length:                 # Here we have uncompressed all subblocks of the current block                 self._uncompressed_length = 0                 self._block_length = 0                 continue 

代码逻辑相对简单,流程如下:

  • 读取block_length, 并初始化uncompressed_length, 表示当前解压结果的长度
  • 读取compressed_length和compressed_data, 使用raw snappy codec解压,得到uncompressed_data, append到输出中,并更新uncompressed_length.
  • 当uncompressed_length达到block_length时,表示当前block处理完毕,重置内部状态,准备处理下一个block,  如此循环往复,直到文件结束

参考

c++ snappy lib: https://github.com/google/snappy

python snappy lib: https://github.com/andrix/python-snappy

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/203765.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午9:38
下一篇 2026年3月19日 下午9:38


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号