K通道并归算法的优化实现
排序大规模的数据常用到外部并归排序。此算法分为两个步骤:首先把要处理的数据分成若干个区块,对每个区块载入内存进行排序;第二步则是把排序好的区块进行并归融合。我用此文章来介绍下一个优化的并归算法,称为K通道并归算法。该算法可以使用一次并归处理,就可以有效的融合K个区块。
对于比较少的数据进行排序,一般是直接载入主内存,进行快速排序。大多数主流语言都提供了API库支持:比如.NET运行库的Array.Sort函数,Java运行库的Arrays.sort函数注1;简单的调用API库函数即可排序好数据。
但是在实际情况中,有时我们需要对大规模的数据进行排序。比如处理一个搜索引擎爬虫产生的数据,可能一天有几个GB的数据量,甚至上TB的数据。把这些数据全部加载入内存进行快速排序是不现实的。所以才有了外部并归排序算法。
假设我们要处理的数据是存储在一个外部文件中。文件的每一行都保存两列数据。第一列是ID,是一个32 bit int,第二列是Frequency,是一个64 bit int。列之间以跳格符 \t 分隔开,行之间以换行符 \n 分隔开。这个文件的头几行数据大概类似:
53 523 35 35 153 2510 58 9134 803 7091
要求是:1、必须把此文件的每一行数据按照第一列进行排序;2、把排序好的数据存到另外一个不同的文件。
简单介绍下第一个步骤:把要处理的数据分成若干个区块,对每个区块载入内存进行排序。我用C#来实现代码。
假设我们用如下的类来表示每一行的数据:
class Entry : IComparable<Entry> { public int Id { get; set; } public long Freq { get; set; } public int CompareTo(Entry other) { return this.Id - other.Id; } public Entry(string line) { string[] tokens = line.Split('\t'); this.Id = Int32.Parse(tokens[0]); this.Freq = Int32.Parse(tokens[1]); } public override string ToString() { return this.Id + "\t" + this.Freq; } }
每次载入100万行数据,进行排序,然后把排序好的数据写入一个临时文件里面。代码如下:
private const int INPUT_BUFFER_SIZE = 1024 * 1024 * 100; private const int OUTPUT_BUFFER_SIZE = 1024 * 1024 * 100; private const int SORT_BUFFER_SIZE = 1000000; private void Split(string inputFile, string tmpFilePattern) { using (FileStream fsin = new FileStream(inputFile, FileMode.Open)) using (BufferedStream bfsin = new BufferedStream(fsin, INPUT_BUFFER_SIZE)) using (StreamReader sr = new StreamReader(bfsin)) { List<Entry> sortBuffer = new List<Entry>(SORT_BUFFER_SIZE); int fcounter = 0; string tmpFile = null; while (ReadBlock(sr, sortBuffer) > 0) { tmpFile = String.Format(tmpFilePattern, fcounter++); SaveBlock(sortBuffer, tmpFile); sortBuffer.Clear(); } } } private int ReadBlock(StreamReader reader, List<Entry> buff) { string line = null; int counter = 0; while ((line = reader.ReadLine()) != null) { buff.Add(new Entry(line)); counter++; } return counter; } private void SaveBlock(List<Entry> buffer, string outputFile) { using (FileStream fsout = new FileStream(outputFile, FileMode.Create)) using (BufferedStream bfsout = new BufferedStream(fsout, OUTPUT_BUFFER_SIZE)) using (StreamWriter sw = new StreamWriter(bfsout)) { string line = null; foreach (Entry entry in buffer) { line = entry.ToString(); sw.WriteLine(line); } sw.Flush(); } }
接下来是第二个步骤:并归产生的若干个区块。我们已知每个区块都已经是排序好的。我们会同时打开K个文件,进行并归操作。所以这个算法叫做K通道并归算法。
以如下的3个输入流做例子。每一个输入流都是已经排序好的整数。首先我们打开3个指针,指向每一个流的第一个元素。
找到指针指向最小的值,把它写入输出流。然后把最小值所在的流的指针前进一位。例子中最小的值是1,所在的流是第一个流。这一步操作之后,3个流变成了如下所示:
重复前一个步骤:找到指针指向最小的值,把它写入输出流。然后把最小值所在的流的指针前进一位。现在中最小的值是2,所在的流是第一个流。这一步操作之后,3个流变成了如下所示:
重复前一个步骤。最小的值是2,所在的流是第二个流。这一步操作之后,3个流变成了如下所示:
重复前一个步骤。最小的值是3,所在的流是第三个流。这一步操作之后,3个流变成了如下所示:
如此反复,一直到读取完所有的输入流。这个时候,并归的输出流的数据就是已经完全排序好了。
现在有个问题:并归的每一步操作都需要从K个输入流中找出最小的值。最简单的实现,就是把每一个输入流给扫描一遍以找到最小值。这样每一步骤会消耗O(K),假设并归之后总共有N个值,那么整个并归会消耗O(KN)。显然比较慢,不可取。可以有更优的解法么?
答案是使用优先队列(Priority Queue)这个数据结构。给定K个数,最小优先队列能够:在O(1)的时间里面找到最小值,在O(logK)的时间里面移除最小值,在O(logK)的时间里插入一个新的值。这样,每一步会消耗O(logK),对于总共N个值的并归操作,总的消耗是O(NlogK)。.NET运行库现在并没有提供优先队列的实现注2。
K通道并归实现的代码如下:
private const int MEGRE_FILE_READ_BUFFER = 1024 * 1024 * 10; private const int MEGRE_FILE_WRITE_BUFFER = 1024 * 1024 * 50; public void KwayMerge(string[] tmpFiles, string outputFile) { EntryEnumerator[] ee = null; StreamWriter fout = null; try { ee = OpenEnumerators(tmpFiles); fout = OpenOutputFile(outputFile); Merge(ee, fout); } finally { foreach (EntryEnumerator e in ee) { if (e != null) e.Dispose(); } if (fout != null) fout.Dispose(); } } private void Merge(EntryEnumerator[] ee, StreamWriter fout) { PriorityQueue<EntryEnumerator> mergeBuffer = new PriorityQueue<EntryEnumerator>(); foreach (EntryEnumerator e in ee) { e.MoveNext(); mergeBuffer.Add(e); } while (mergeBuffer.Count >= 1) { EntryEnumerator e = mergeBuffer.Remove(); Entry key = e.Current; fout.WriteLine(key.ToString()); if (e.MoveNext()) mergeBuffer.Add(e); } if (mergeBuffer.Count > 0) { EntryEnumerator lastOne = mergeBuffer.Min; do { Entry key = lastOne.Current; fout.WriteLine(key.ToString()); } while (lastOne.MoveNext()); } fout.Flush(); }
OpenEnumerators和OpenOutputFile的代码如下:
private EntryEnumerator[] OpenEnumerators(string[] tmpFiles) { EntryEnumerator[] ee = new EntryEnumerator[tmpFiles.Length]; for (int i = 0; i < tmpFiles.Length; i++) { FileStream fs = new FileStream(tmpFiles[i], FileMode.Open); BufferedStream bfs = new BufferedStream(fs, MEGRE_FILE_READ_BUFFER); StreamReader sr = new StreamReader(bfs); ee[i] = new EntryEnumerator(sr, i); } return ee; } private StreamWriter OpenOutputFile(string file) { FileStream fs = new FileStream(file, FileMode.CreateNew); BufferedStream bfs = new BufferedStream(fs, MEGRE_FILE_WRITE_BUFFER); return new StreamWriter(bfs); }
EntryEnumerator和Entry的代码如下:
class EntryEnumerator : IEnumerator<Entry>, IComparable<EntryEnumerator>, IEquatable<EntryEnumerator> { private StreamReader sr; private int srId; private Entry current; public EntryEnumerator(StreamReader sr, int srId) { this.sr = sr; this.srId = srId; } public bool MoveNext() { string line = sr.ReadLine(); if (line == null) { current = null; return false; } else { current = new Entry(line); return true; } } public Entry Current { get { return this.current; } } public void Dispose() { sr.Dispose(); } object System.Collections.IEnumerator.Current { get { return this.current; } } public void Reset() { throw new NotImplementedException(); } public int CompareTo(EntryEnumerator other) { int comp = this.current.CompareTo(other.current); if (comp != 0) return comp; return this.srId - other.srId; } public bool Equals(EntryEnumerator other) { if (other == null) return false; return this.current.Equals(other.current) && this.srId == other.srId; } public override int GetHashCode() { return this.srId; } public override bool Equals(object obj) { return this.Equals(obj as EntryEnumerator); } }
class Entry : IComparable<Entry>, IEquatable<Entry> { public int Id { get; set; } public long Freq { get; set; } public Entry(string line) { string[] tokens = line.Split('\t'); this.Id = Int32.Parse(tokens[0]); this.Freq = Int32.Parse(tokens[1]); } public int CompareTo(Entry other) { return this.Id - other.Id; } public bool Equals(Entry other) { if (other == null) return false; return this.Id == other.Id && this.Freq == other.Freq; } public override bool Equals(object obj) { return this.Equals(obj as Entry); } public override int GetHashCode() { return this.Id; } public override string ToString() { return this.Id + "\t" + this.Freq; } }
_______________________
注1:Java的排序实现有点意思,对原始型别(比如int,float,byte)是执行的快速排序,而对象型别则是进行并归排序。
注2:可能.NET运行库里面最像优先队列的是SortedSet了。我查看了微软的源代码,它的取得最小值操作的时候,对整个Tree做一个InOrder的遍历,时间消耗是O(n)。有意思的是,Mono运行库里面对SortedSet的实现,却是优先队列;获取Min的操作是O(logn)。