问题描述 将k个已经排序的数组归并成一个大的排序的结果数组。这些数组可能数量比较大,以至于不能直接装载到内存中。
这个问题比较有意思。后面一截的描述是我个人加上去的。实际上一个简单的模型就是将k个已经排好序的数组或者序列合并成一个排好序的结果数组。那么我们该怎么来考虑这个问题呢?
初步分析 其实这个问题可以说是一个问题的更加普遍形式。回顾一下我们在讨论归并排序的时候。那时候我们是将一个数组分解成两个子序列进行排序,然后再进行归并。这个过程一直递归下去。而其中归并结果的过程正好就是将两个已经排好序的序列合并成一个排好序的数组。我们可以看下面这一部分代码来回顾一下当时的实现:
Java代码 [url=] [/url]
- public
static
void merge(int[] a, int start, int middle, int end) - {
- int[] array = new
int[end - start + 1]; - int i = 0, j = start, k = middle + 1;
- while(j <= middle && k <= end)
- {
- if(a[j] <= a[k])
- array[i++] = a[j++];
- else
- {
- count += middle - j + 1;
- array[i++] = a[k++];
- }
- }
- while(j <= middle)
- array[i++] = a[j++];
- while(k <= end)
- array[i++] = a[k++];
- for(i = 0; i < array.length; i++)
- a[start + i] = array;
- }
我们可以看到对于两个序列来说,我们可以通过遍历两个序列,一直比较两个序列中最前面的元素,然后取更加小的那个,这样直到一个序列已经取完了。我们再把还有剩余元素的序列遍历取出来。
这是两个序列的归并,可是如果扩展到多个序列的情况,我们该怎么来调整呢?
思路 k个数组的元素都已经排好序了,假设所有数组的元素总和为n。我们只要不停的遍历所有数组,从每个数组里面从头到尾的去取,然后把每次得到的最小的元素取出来就可以。这样,我们就很容易得到一个解决办法。
方法一:循环遍历 这个办法的思路就比较直接,首先,我们比较所有k个数组的头一个元素,找到最小的那一个,然后取出来。我们在该最小元素所在的数组取下一个元素,然后重复前面的过程去找最小的那个。这样依次循环直到找到所有的元素。
一个简单的实现伪代码如下:
Java代码 [url=] [/url]
- public
void process(ItemList[] lists) { - Item min = Item.MAX_VAL;
- boolean notEmpty = true;
- while(notEmpty) {
- for(Items list : lists) {
- if(list.hasNext() && list.getItem() < min)
- min = list.getItem();
- }
- if(min != Item.MAX_VAL) {
- int index = min.getIndex();
- // process min here
- lists.next();
- } else {
- notEmpty = false;
- }
- }
- }
这里写了一个大致的思路。我们用一个notEmpty来标志所有序列是否已经遍历完了。我们每次遍历所有序列的当前元素,找到最小的。这样每次我们找一个元素都要比较k次,假设所有n个元素,其总体时间复杂度就达到了O(nk)。
这个方法看起来简单直观,对于O(nk)的时间复杂度来说,看起来还是可以接受的。只是如果k也比较大的话,对性能也还是有一点影响。那么有没有其他方法可以改进呢?
方法二:最小堆k路归并排序 还有一个典型的方法就是利用最小堆的特性。我们在前面讲述堆排序和优先队列的时候对于最小堆都有过一定程度的讨论。其本质上就是我们首先从k路序列中都取一个元素出来。因为所有的都是已经按照从小到大排序的,我们不需要考虑其他的。每个序列里取出来的肯定是他们这个序列里最小的。那么我们所要做的就是在这些最小元素里找到全局最小的那个。
到了这里,我们发现和前面那个方法比起来,它没有多少特殊的,主要是用建堆和调整的方式来比较元素。其他的过程基本上一样。因为我们要在取出当前最小元素后要接着取这个元素所在序列的后面一个元素。这个时候,我们就需要考虑,如果这个序列后面没有元素了,我们该怎么办呢?如果还有的话,我们该怎么调整呢?这两个问题在前面的方法里并不存在,因为它是始终要遍历每个。只要有存在元素的序列就不怕。这里要解决这个问题,我们可以有两种方法。
1. 假定在处理元素的过程中,某个序列的元素取光了。我们可以在开始的时候针对所有序列的最后都加一个表示无穷大的数值。这样如果取完这个序列之后可以保证它后续肯定不会被选择到。
2. 我们将该元素用堆最后的元素替换,然后调整堆的属性并将堆的大小减1。这样我们这个大小为k的堆慢慢会变成k-1, k-2...1这些个长度的堆。一直到我们把这些堆里序列的元素处理完。
针对第2种方法,我定义了几个特定的实现方法:
Java代码 [url=] [/url]
- public
int heapExtractMin() throws Exception { - if(heapSize < 1)
- throw
new Exception("heap underflow"); - int min = a[0];
- a[0] = a[heapSize - 1];
- heapSize--;
- minHeapify(0);
- return min;
- }
这个方法就是用末尾的元素替换堆顶元素,然后用minHeapify来调整它。
而建立这么一个大小为k的堆,我们可以预先取出每个序列里的第一个元素拷贝到一个数组里来构造堆,也可以使用逐步插入元素构建堆的方式。关于这两种建堆的方式我在这篇文章里有专门讨论过。这里我们用插入元素建堆的方式:
Java代码 [url=] [/url]
- public
void minHeapInsert(int key) throws Exception { - heapSize++;
- a[heapSize - 1] = Integer.MAX_VALUE;
- heapDecreaseKey(heapSize - 1, key);
- }
- public
void heapDecreaseKey(int i, int key) throws Exception { - if(key > a)
- throw
new Exception("new key is bigger than current key"); - a = key;
- while(i > 0 && a[parent(i) > a]) {
- swap(i, parent(i));
- i = parent(i);
- }
- }
这里代码的意思无非就是将插入到最后的元素向上调整以保证最小堆的特性。
再结合前面讨论的结果,如果我们需要调整和处理所有序列,需要一个是判断序列是否为空,另外一个就是动态的调整堆大小。这部分的代码如下:
Java代码 [url=] [/url]
- public
static
void main(String[] args) { - ItemGenerator[] list = new ItemGenerator[10];
- Item[] itemList = new Item[10];
- for(int i = 0; i < 10; i++) {
- itemList = new ItemGenerator(i).getItem();
- }
- // Insertion to build min heap
- MinPriorityQueue queue = new MinPriorityQueue(itemList, 0);
- for(int i = 0; i < 10; i++) {
- queue.minHeapInsert(list.getItem());
- }
- // process min value in k sequences
- k = itemList.length;
- while(k > 0) {
- int min = queue.heapMinimum();
- process(min);
- int i = min.getIndex();
- if(list.hasNext()) {
- queue.updateQueueHead(list.getItem());
- } else {
- queue.heapExtractMin();
- k--;
- }
- }
- }
我们是首先建立一个最小堆,然后每次都取最小的元素并处理。有一个process(min)方法。这里没有给出定义。只是标注一下可以处理元素的地方。我们可以根据实际情况调整。在处理完元素后我们要根据当前的min元素取得它所在的序列。这里我们也是假定min有getIndex方法。我们就可以知道是哪个序列的元素被取了要接着补上。如果没有了则调用queue.heapExtractMin()来调整堆,表示这个堆有一个序列已经空了。k堆要缩减为k-1堆。因为详细代码比较繁琐,这里就没有补充进来了。知道了这么个思路也比较容易写出来。
方法三:胜者树k路归并排序 在我前面一篇讨论求第二小元素的文章里就专门提到了胜者树。在那篇文章里我们对胜者树的定义和特性做了一个详细的讨论。这里我们来看看怎么解决这个问题。
首先,胜者树会是一个这样的形式:

和我们前面讨论的有点不同,这里几乎堆的每个叶节点对应一个输入的序列。我们让他们构成一个完全二叉树。以上图为例,我们进行一轮胜者的选择之后,堆结构则如下:

我们可以看到,最终在堆顶的那个元素是最小的,而且有一条路径从叶节点到堆的根。如果我们把最小的这个元素处理完之后该怎么调整呢?下图可以说明这个问题:

我们发现这个问题是通过在原来序列里取后续的元素,然后像胜者树调整一样向上,符合条件的元素放上面,然后一直比较到根。这样就找到了下一个最小的元素。这样一直循环下去。如果一个序列处理完了我们可以采用在末尾增加一个无穷大的值。
总的来说,这个方法和普通的最小堆调整差不多,就是调整的方式不一样而已。我们也可以很容易得出对象的实现,这里就不再贴详细的实现代码了。
总结 k路归并算法其实整体的思路并不复杂。我们常用的两种方法一种是建立一个最小k堆,然后每次取最小的元素,也就是堆顶的元素。然后再调整。还有一种就是建立一棵胜者树。其实思路也类似,每次取最顶端的元素,这个元素就是胜者,也就是最小的那个元素。然后从胜者树所在的叶节点对应的序列里取下一个元素。然后再进行比较向上调整。这两种方法都有一个需要注意的地方就是需要根据当前的操作节点来确定该节点所处的序列。从某种角度来说,k路归并算法对于处理大规模的数据有非常重要的意义。我们在一些面试关于大量数据处理的讨论上都会用到这个方法和相关的思路。 |