排序大规模的数据常用到外部并归排序。此算法分为两个步骤:首先把要处理的数据分成若干个区块,对每个区块载入内存进行排序;第二步则是把排序好的区块进行并归融合。我用此文章来介绍下一个优化的并归算法,称为K通道并归算法。该算法可以使用一次并归处理,就可以有效的融合K个区块。

  对于比较少的数据进行排序,一般是直接载入主内存,进行快速排序。大多数主流语言都提供了API库支持:比如.NET运行库的Array.Sort函数,Java运行库的Arrays.sort函数注1;简单的调用API库函数即可排序好数据。

  但是在实际情况中,有时我们需要对大规模的数据进行排序。比如处理一个搜索引擎爬虫产生的数据,可能一天有几个GB的数据量,甚至上TB的数据。把这些数据全部加载入内存进行快速排序是不现实的。所以才有了外部并归排序算法。

 

  假设我们要处理的数据是存储在一个外部文件中。文件的每一行都保存两列数据。第一列是ID,是一个32 bit int,第二列是Frequency,是一个64 bit int。列之间以跳格符 \t 分隔开,行之间以换行符 \n 分隔开。这个文件的头几行数据大概类似:

53	523
35	35
153	2510
58	9134
803	7091

  要求是:1、必须把此文件的每一行数据按照第一列进行排序;2、把排序好的数据存到另外一个不同的文件。

 

  简单介绍下第一个步骤:把要处理的数据分成若干个区块,对每个区块载入内存进行排序。我用C#来实现代码。

  假设我们用如下的类来表示每一行的数据:

class Entry : IComparable<Entry>
{
    public int Id { get; set; }
    public long Freq { get; set; }

    public int CompareTo(Entry other) {
        return this.Id - other.Id;
    }

    public Entry(string line) {
        string[] tokens = line.Split('\t');
        this.Id = Int32.Parse(tokens[0]);
        this.Freq = Int32.Parse(tokens[1]);
    }

    public override string ToString() {
        return this.Id + "\t" + this.Freq;
    }
}

   每次载入100万行数据,进行排序,然后把排序好的数据写入一个临时文件里面。代码如下:

private const int INPUT_BUFFER_SIZE = 1024 * 1024 * 100;
private const int OUTPUT_BUFFER_SIZE = 1024 * 1024 * 100;
private const int SORT_BUFFER_SIZE = 1000000;

private void Split(string inputFile, string tmpFilePattern) {
    using (FileStream fsin = new FileStream(inputFile, FileMode.Open))
    using (BufferedStream bfsin = new BufferedStream(fsin, INPUT_BUFFER_SIZE))
    using (StreamReader sr = new StreamReader(bfsin)) {
        List<Entry> sortBuffer = new List<Entry>(SORT_BUFFER_SIZE);
        int fcounter = 0;
        string tmpFile = null;

        while (ReadBlock(sr, sortBuffer) > 0) {
            tmpFile = String.Format(tmpFilePattern, fcounter++);
            SaveBlock(sortBuffer, tmpFile);

            sortBuffer.Clear();
        }
    }
}

private int ReadBlock(StreamReader reader, List<Entry> buff) {
    string line = null;
    int counter = 0;

    while ((line = reader.ReadLine()) != null) {
        buff.Add(new Entry(line));
        counter++;
    }

    return counter;
}

private void SaveBlock(List<Entry> buffer, string outputFile) {
    using (FileStream fsout = new FileStream(outputFile, FileMode.Create))
    using (BufferedStream bfsout = new BufferedStream(fsout, OUTPUT_BUFFER_SIZE))
    using (StreamWriter sw = new StreamWriter(bfsout)) {
        string line = null;

        foreach (Entry entry in buffer) {
            line = entry.ToString();
            sw.WriteLine(line);
        }

        sw.Flush();
    }
}

 

  接下来是第二个步骤:并归产生的若干个区块。我们已知每个区块都已经是排序好的。我们会同时打开K个文件,进行并归操作。所以这个算法叫做K通道并归算法。

  以如下的3个输入流做例子。每一个输入流都是已经排序好的整数。首先我们打开3个指针,指向每一个流的第一个元素。

  找到指针指向最小的值,把它写入输出流。然后把最小值所在的流的指针前进一位。例子中最小的值是1,所在的流是第一个流。这一步操作之后,3个流变成了如下所示:

  重复前一个步骤:找到指针指向最小的值,把它写入输出流。然后把最小值所在的流的指针前进一位。现在中最小的值是2,所在的流是第一个流。这一步操作之后,3个流变成了如下所示:

  重复前一个步骤。最小的值是2,所在的流是第二个流。这一步操作之后,3个流变成了如下所示:

  重复前一个步骤。最小的值是3,所在的流是第三个流。这一步操作之后,3个流变成了如下所示:

  如此反复,一直到读取完所有的输入流。这个时候,并归的输出流的数据就是已经完全排序好了。

  现在有个问题:并归的每一步操作都需要从K个输入流中找出最小的值。最简单的实现,就是把每一个输入流给扫描一遍以找到最小值。这样每一步骤会消耗O(K),假设并归之后总共有N个值,那么整个并归会消耗O(KN)。显然比较慢,不可取。可以有更优的解法么?

  答案是使用优先队列(Priority Queue)这个数据结构。给定K个数,最小优先队列能够:在O(1)的时间里面找到最小值,在O(logK)的时间里面移除最小值,在O(logK)的时间里插入一个新的值。这样,每一步会消耗O(logK),对于总共N个值的并归操作,总的消耗是O(NlogK)。.NET运行库现在并没有提供优先队列的实现注2

  K通道并归实现的代码如下:

private const int MEGRE_FILE_READ_BUFFER = 1024 * 1024 * 10;
private const int MEGRE_FILE_WRITE_BUFFER = 1024 * 1024 * 50;

public void KwayMerge(string[] tmpFiles, string outputFile) {
    EntryEnumerator[] ee = null;
    StreamWriter fout = null;
            
    try {
        ee = OpenEnumerators(tmpFiles);
        fout = OpenOutputFile(outputFile);

        Merge(ee, fout);

    } finally {
        foreach (EntryEnumerator e in ee) {
            if (e != null)
                e.Dispose();
        }

        if (fout != null)
            fout.Dispose();
    }
}

private void Merge(EntryEnumerator[] ee, StreamWriter fout) {
    PriorityQueue<EntryEnumerator> mergeBuffer = new PriorityQueue<EntryEnumerator>();

    foreach (EntryEnumerator e in ee) {
        e.MoveNext();
        mergeBuffer.Add(e);
    }

    while (mergeBuffer.Count >= 1) {
        EntryEnumerator e = mergeBuffer.Remove();
        Entry key = e.Current;

        fout.WriteLine(key.ToString());

        if (e.MoveNext())
            mergeBuffer.Add(e);
    }

    if (mergeBuffer.Count > 0) {
        EntryEnumerator lastOne = mergeBuffer.Min;

        do {
            Entry key = lastOne.Current;
            fout.WriteLine(key.ToString());

        } while (lastOne.MoveNext());
    }

    fout.Flush();
}

  OpenEnumerators和OpenOutputFile的代码如下:

private EntryEnumerator[] OpenEnumerators(string[] tmpFiles) {
    EntryEnumerator[] ee = new EntryEnumerator[tmpFiles.Length];

    for (int i = 0; i < tmpFiles.Length; i++) {
        FileStream fs = new FileStream(tmpFiles[i], FileMode.Open);
        BufferedStream bfs = new BufferedStream(fs, MEGRE_FILE_READ_BUFFER);
        StreamReader sr = new StreamReader(bfs);

        ee[i] = new EntryEnumerator(sr, i);
    }

    return ee;
}

private StreamWriter OpenOutputFile(string file) {
    FileStream fs = new FileStream(file, FileMode.CreateNew);
    BufferedStream bfs = new BufferedStream(fs, MEGRE_FILE_WRITE_BUFFER);
    return new StreamWriter(bfs);
}

  EntryEnumerator和Entry的代码如下:

class EntryEnumerator : IEnumerator<Entry>, IComparable<EntryEnumerator>, IEquatable<EntryEnumerator>
{
    private StreamReader sr;
    private int srId;
    private Entry current;

    public EntryEnumerator(StreamReader sr, int srId) {
        this.sr = sr;
        this.srId = srId;
    }

    public bool MoveNext() {
        string line = sr.ReadLine();

        if (line == null) {
            current = null;
            return false;
        } else {
            current = new Entry(line);
            return true;
        }
    }

    public Entry Current {
        get {
            return this.current;
        }
    }

    public void Dispose() {
        sr.Dispose();
    }

    object System.Collections.IEnumerator.Current {
        get { return this.current; }
    }

    public void Reset() {
        throw new NotImplementedException();
    }

    public int CompareTo(EntryEnumerator other) {
        int comp = this.current.CompareTo(other.current);
        if (comp != 0)
            return comp;

        return this.srId - other.srId;
    }

    public bool Equals(EntryEnumerator other) {
        if (other == null)
            return false;

        return this.current.Equals(other.current) && 
            this.srId == other.srId;
    }

    public override int GetHashCode() {
        return this.srId;
    }

    public override bool Equals(object obj) {
        return this.Equals(obj as EntryEnumerator);
    }
}
class Entry : IComparable<Entry>, IEquatable<Entry>
{
    public int Id { get; set; }
    public long Freq { get; set; }

    public Entry(string line) {
        string[] tokens = line.Split('\t');
        this.Id = Int32.Parse(tokens[0]);
        this.Freq = Int32.Parse(tokens[1]);
    }

    public int CompareTo(Entry other) {
        return this.Id - other.Id;
    }

    public bool Equals(Entry other) {
        if (other == null)
            return false;

        return this.Id == other.Id &&
            this.Freq == other.Freq;
    }

    public override bool Equals(object obj) {
        return this.Equals(obj as Entry);
    }

    public override int GetHashCode() {
        return this.Id;
    }

    public override string ToString() {
        return this.Id + "\t" + this.Freq;
    }
}

 

_______________________

注1:Java的排序实现有点意思,对原始型别(比如int,float,byte)是执行的快速排序,而对象型别则是进行并归排序。

注2:可能.NET运行库里面最像优先队列的是SortedSet了。我查看了微软的源代码,它的取得最小值操作的时候,对整个Tree做一个InOrder的遍历,时间消耗是O(n)。有意思的是,Mono运行库里面对SortedSet的实现,却是优先队列;获取Min的操作是O(logn)。

作者: 隐约有歌 发表于 2010-11-29 23:14 原文链接

推荐.NET配套的通用数据层ORM框架:CYQ.Data 通用数据层框架
新浪微博粉丝精灵,刷粉丝、刷评论、刷转发、企业商家微博营销必备工具"