最近在研究HBase,发现很多归并排序的影子,很有意思的是这个在数据结构教材里一笔带过的算法却在分布式计算领域经常出现。再一次证明了我们的教材也许已经很落伍了。。。
为什么会用到归并排序纳?我们知道分布式系统中排序是基础,数据很容易在单台机器(或者节点、模块)中做到有序,但分布式环境需要在整个集群中达到有序。因此自然牵涉到把多个有序列合并为一个有序列的过程,这个过程就是mergesort登场的时候了!
自己照着hbase compact的算法写了一个小的归并排序,代码如下:
Item类,item内保存一个有序的int列表
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; public class Item{ public List
valueList = new ArrayList
(10); public static final int RANGE = 1000; public void initItem(){ Random r = new Random(); for(int i=0;i<10;i++){ int num = r.nextInt(RANGE); valueList.add(num); } Collections.sort(valueList); } public int pop() throws Exception{ if(valueList.size() <= 0){ throw new Exception("nothing values left !"); } return valueList.remove(0); } public boolean isEmpty(){ return this.valueList.isEmpty(); } public boolean hasValues(){ return !this.valueList.isEmpty(); } }
MergeSort类,提供sort方法把多个Item内各自有序的int列表合并为一个大的有序列表:
import java.util.LinkedList; import java.util.List; public class MergeSort { public static List
sort(Item[] itemList) throws Exception{ int itemNum = itemList.length; List
sortedList = new LinkedList
(); int[] buttonValues = new int[itemNum]; for(int i=0;i
buttonValues[i]){ currentValue = buttonValues[i]; currentIndex = i; } } if(itemList[currentIndex].hasValues()){ buttonValues[currentIndex] = itemList[currentIndex].pop(); }else{ buttonValues[currentIndex] = Integer.MAX_VALUE; } sortedList.add(currentValue); hasValueLeft = itemList[0].hasValues(); for(int i=0;i
测试类:MergeSortTest,创建10个Item sort最后打印排序后的结果:
import java.util.ArrayList; import java.util.List; import java.util.SortedMap; public class MergeSortTest { public static Item[] itemList = new Item[10]; / * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub for(int i=0;i<10;i++){ Item item = new Item(); item.initItem(); itemList[i] = item; } List
sortedList = MergeSort.sort(itemList); for(int i : sortedList){ System.out.println(i); } } }
算法很简单:
1。取得待排序列表的首值
2.比较所有的首值,取出最小的那个
3.将2的结果存入结果列表
4.判断当前最小值的待排序列表是否还有值剩余,如有则pop出下一个,如没有则设值为int最大值(取巧使用,不推荐)
5.判读所有待排序列表中还有没有剩余要排序的值,如有则继续1,没有则结束排序
我是仿照hbase 0.1.0版本的compact写的,下面贴下compact的代码,可以看到的是,算法基本一样,除了compact需要考虑删除,而且用了3个数组(key,value,done)
int size = toCompactFiles.size(); CompactionReader[] rdrs = new CompactionReader[size]; int index = 0; for (HStoreFile hsf: toCompactFiles) { try { rdrs[index++] = new MapFileCompactionReader(hsf.getReader(fs, bloomFilter)); } catch (IOException e) { // Add info about which file threw exception. It may not be in the // exception message so output a message here where we know the // culprit. LOG.warn("Failed with " + e.toString() + ": HStoreFile=" + hsf.toString() + (hsf.isReference()? ", Reference=" + hsf.getReference().toString() : "") + " for Store=" + this.storeName); closeCompactionReaders(rdrs); throw e; } } try { HStoreKey[] keys = new HStoreKey[rdrs.length]; ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length]; boolean[] done = new boolean[rdrs.length]; for(int i = 0; i < rdrs.length; i++) { keys[i] = new HStoreKey(); vals[i] = new ImmutableBytesWritable(); done[i] = false; } // Now, advance through the readers in order. This will have the // effect of a run-time sort of the entire dataset. int numDone = 0; for(int i = 0; i < rdrs.length; i++) { rdrs[i].reset(); done[i] = ! rdrs[i].next(keys[i], vals[i]); if(done[i]) { numDone++; } } int timesSeen = 0; Text lastRow = new Text(); Text lastColumn = new Text(); // Map of a row deletes keyed by column with a list of timestamps for value Map
> deletes = null; while (numDone < done.length) { // Find the reader with the smallest key. If two files have same key // but different values -- i.e. one is delete and other is non-delete // value -- we will find the first, the one that was written later and // therefore the one whose value should make it out to the compacted // store file. int smallestKey = -1; for(int i = 0; i < rdrs.length; i++) { if(done[i]) { continue; } if(smallestKey < 0) { smallestKey = i; } else { if(keys[i].compareTo(keys[smallestKey]) < 0) { smallestKey = i; } } } // Reflect the current key/val in the output HStoreKey sk = keys[smallestKey]; if(lastRow.equals(sk.getRow()) && lastColumn.equals(sk.getColumn())) { timesSeen++; } else { timesSeen = 1; // We are on to a new row. Create a new deletes list. deletes = new HashMap
>(); } byte [] value = (vals[smallestKey] == null)? null: vals[smallestKey].get(); if (!isDeleted(sk, value, false, deletes) && timesSeen <= family.getMaxVersions()) { // Keep old versions until we have maxVersions worth. // Then just skip them. if (sk.getRow().getLength() != 0 && sk.getColumn().getLength() != 0) { // Only write out objects which have a non-zero length key and // value compactedOut.append(sk, vals[smallestKey]); } } // Update last-seen items lastRow.set(sk.getRow()); lastColumn.set(sk.getColumn()); // Advance the smallest key. If that reader's all finished, then // mark it as done. if(!rdrs[smallestKey].next(keys[smallestKey], vals[smallestKey])) { done[smallestKey] = true; rdrs[smallestKey].close(); rdrs[smallestKey] = null; numDone++; } } } finally { closeCompactionReaders(rdrs); }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/225319.html原文链接:https://javaforall.net
