Algorithmus für N-Wege-Merge

Ein Zweiwege-Merge wird als Teil des Mergesort-Algorithmus untersucht. Aber ich bin daran interessiert herauszufinden, wie man am besten einen N-Way Merge durchführen kann?

Sagen wir, ich habe N Dateien, die jeweils 1 Million Integer sortiert haben. Ich muss sie in 1 einzelne Datei zusammenführen, die diese 100 Millionen sortierten ganzen Zahlen haben wird.

Bitte beachten Sie, dass Anwendungsfall für dieses Problem eigentlich externe Sortierung ist, die Festplatte basiert. Daher würde es in realen Szenarien auch Speicherbeschränkungen geben. Eine naive Methode, 2 Dateien gleichzeitig (99 mal) zusammenzuführen, funktioniert nicht. Nehmen wir an, wir haben nur ein kleines verschiebbares Speicherfenster für jedes Array zur Verfügung.

Ich bin mir nicht sicher, ob es bereits eine standardisierte Lösung für diese N-Wege-Zusammenführung gibt. (Googling hat mir nicht viel erzählt) .

Aber wenn Sie wissen, ob ein guter n-way Merge-Algorithmus, bitte posten Algo / Link.

Zeitaufwand: Wenn wir die Anzahl der zu mergeden Dateien ( N ) stark erhöhen, wie würde das die zeitliche Komplexität Ihres Algorithmus beeinflussen?

Danke für deine Antworten.

Ich wurde nicht überall gefragt, aber ich denke, das könnte eine interessante Interviewfrage sein. Daher markiert.

Solutions Collecting From Web of "Algorithmus für N-Wege-Merge"

Wie wäre es mit der folgenden Idee:

  1. Erstellen Sie eine Prioritätswarteschlange
  2. Iterate durch jede Datei f
    1. reihe das Paar (nextNumberIn (f), f) unter Verwendung des ersten Wertes als Prioritätsschlüssel ein
  3. Während Warteschlange nicht leer ist
    1. Kopf (m, f) der Warteschlange aus der Warteschleife nehmen
    2. Ausgabe m
    3. wenn f nicht erschöpft ist
      1. Enqueue (nextNumberIn (f), f)

Da das Hinzufügen von Elementen zu einer Prioritätswarteschlange in logarithmischer Zeit erfolgen kann, ist Element 2 O (N × log N) . Da (fast alle) Iterationen der while-Schleife ein Element hinzufügen, ist die gesamte while-Schleife O (M × log N), wobei M die Gesamtzahl der zu sortierenden Zahlen ist.

Unter der Annahme, dass alle Dateien eine nicht leere Zahlenfolge haben, haben wir M> N und somit sollte der ganze Algorithmus O sein (M × log N) .

Suche nach “Polyphase merge”, schau dir Klassiker an – Donald Knuth & EHFriend.

Vielleicht möchten Sie auch einen Blick auf den vorgeschlagenen Smart Block Merging von Seyedafsari & Hasanzadeh casting, der ähnlich wie frühere Vorschläge Prioritätswarteschlangen verwendet.

Ein weiterer interessanter Grund ist In-Place-Merging-Algorithmus von Kim & Kutzner.

Ich empfehle auch dieses Papier von Vitter: Externe Speicheralgorithmen und Datenstrukturen: Umgang mit massiven Daten .

Eine einfache Idee besteht darin, eine Prioritätswarteschlange der Bereiche beizubehalten, die so zusammengeführt werden, dass der Bereich mit dem kleinsten ersten Element zuerst aus der Warteschlange entfernt wird. Sie können dann eine N-way-Zusammenführung wie folgt ausführen:

  1. Fügen Sie alle Bereiche in die Prioritätswarteschlange ein, wobei leere Bereiche ausgeschlossen werden.
  2. Während die Prioritätswarteschlange nicht leer ist:
    1. Entfernen Sie das kleinste Element aus der Warteschlange.
    2. Hängen Sie das erste Element dieses Bereichs an die Ausgabesequenz an.
    3. Wenn es nicht leer ist, fügen Sie den Rest der Sequenz wieder in die Prioritätswarteschlange ein.

Die Korrektheit dieses Algorithmus ist im Wesentlichen eine Verallgemeinerung des Beweises, dass eine 2-way merge korrekt funktioniert – wenn Sie immer das kleinste Element aus einem Bereich hinzufügen und alle Bereiche sortiert sind, wird die Sequenz als Ganzes sortiert.

Die Laufzeitkomplexität dieses Algorithmus kann wie folgt gefunden werden. Sei M die Gesamtzahl der Elemente in allen Sequenzen. Wenn wir einen binären Heap verwenden, dann tun wir höchstens O (M) -Einfügungen und O (M) -Deletionen aus der Prioritätswarteschlange, da für jedes in die Ausgabesequenz geschriebene Element eine Warteschlange herausgezogen wird, gefolgt von einem Warteschlange, um den Rest der Sequenz zurück in die Warteschlange zu stellen. Jeder dieser Schritte benötigt O (lg N) -Operationen, da das Einfügen oder Löschen von einem binären Haufen mit N Elementen darin die Zeit O (lg N) benötigt. Dies ergibt eine Nettolaufzeit von O (M lg N), die weniger als linear mit der Anzahl der Eingabesequenzen wächst.

Es mag einen Weg geben, dies noch schneller zu machen, aber das scheint eine ziemlich gute Lösung zu sein. Die Speicherbelegung ist O (N), da wir O (N) Overhead für den binären Heap benötigen. Wenn wir den binären Heap implementieren, indem wir pointers auf die Sequenzen anstatt auf die Sequenzen selbst speichern, sollte das kein allzu großes Problem darstellen, es sei denn, Sie haben eine wirklich lächerliche Anzahl von Sequenzen, die zusammengeführt werden müssen. In diesem Fall fügen Sie sie einfach in Gruppen zusammen, die in den Speicher passen, und führen dann alle Ergebnisse zusammen.

Hoffe das hilft!

Ein einfacher Ansatz zum Zusammenführen k sortierter Felder (jeweils mit der Länge n) erfordert O (nk ^ 2) Zeit und nicht O (nk) Zeit. Wenn Sie die ersten 2 Arrays zusammenführen, dauert es 2n, und wenn Sie die dritte mit der Ausgabe merge, dauert es 3n, da wir nun zwei Arrays der Länge 2n und n zusammenführen. Wenn wir nun diese Ausgabe mit der vierten merge, benötigt diese Zusammenführung 4n Zeit. So erfordert die letzte Zusammenführung (wenn wir das k-te Array zu unserem bereits sortierten Array hinzufügen) k * n Zeit. Somit wird eine Gesamtzeit von 2n + 3n + 4n benötigt + … k * n was O ist (nk ^ 2).

Es sieht so aus, als könnten wir es in O (kn) Zeit machen, aber es ist nicht so, weil unser Array, das wir zusammenführen, jedes Mal größer wird.
Wir können zwar eine bessere Grenze erreichen, indem wir Teile und herrsche. Ich arbeite immer noch daran und posten eine Lösung, wenn ich eine finde.

Siehe http://en.wikipedia.org/wiki/External_sorting . Hier ist mein Beispiel für die heapbasierte k-way-Zusammenführung, die einen gepufferten Lesevorgang aus den Quellen verwendet, um die E / A-Reduzierung zu emulieren:

 public class KWayMerger { private readonly IList _sources; private readonly int _bufferSize; private readonly MinHeap> _mergeHeap; private readonly int[] _indices; public KWayMerger(IList sources, int bufferSize, Comparer comparer = null) { if (sources == null) throw new ArgumentNullException("sources"); _sources = sources; _bufferSize = bufferSize; _mergeHeap = new MinHeap>( new MergeComparer(comparer ?? Comparer.Default)); _indices = new int[sources.Count]; } public T[] Merge() { for (int i = 0; i < = _sources.Count - 1; i++) AddToMergeHeap(i); var merged = new T[_sources.Sum(s => s.Length)]; int mergeIndex = 0; while (_mergeHeap.Count > 0) { var min = _mergeHeap.ExtractDominating(); merged[mergeIndex++] = min.Value; if (min.Source != -1) //the last item of the source was extracted AddToMergeHeap(min.Source); } return merged; } private void AddToMergeHeap(int sourceIndex) { var source = _sources[sourceIndex]; var start = _indices[sourceIndex]; var end = Math.Min(start + _bufferSize - 1, source.Length - 1); if (start > source.Length - 1) return; //we're done with this source for (int i = start; i < = end - 1; i++) _mergeHeap.Add(new MergeValue(-1, source[i])); //only the last item should trigger the next buffered read _mergeHeap.Add(new MergeValue(sourceIndex, source[end])); _indices[sourceIndex] += _bufferSize; //we may have added less items, //but if we did we've reached the end of the source so it doesn't matter } } internal class MergeValue { public int Source { get; private set; } public T Value { get; private set; } public MergeValue(int source, T value) { Value = value; Source = source; } } internal class MergeComparer : IComparer> { public Comparer Comparer { get; private set; } public MergeComparer(Comparer comparer) { if (comparer == null) throw new ArgumentNullException("comparer"); Comparer = comparer; } public int Compare(MergeValue x, MergeValue y) { Debug.Assert(x != null && y != null); return Comparer.Compare(x.Value, y.Value); } } 

Hier ist eine mögliche Implementierung von MinHeap . Einige Tests:

 [TestMethod] public void TestKWaySort() { var rand = new Random(); for (int i = 0; i < 10; i++) AssertKwayMerge(rand); } private static void AssertKwayMerge(Random rand) { var sources = new[] { GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(), GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(), GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(), GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(), }; Assert.IsTrue(new KWayMerger(sources, 20).Merge().SequenceEqual(sources.SelectMany(s => s).OrderBy(i => i))); } public static IEnumerable GenerateRandomCollection(Random rand, int minLength, int maxLength, int min = 0, int max = int.MaxValue) { return Enumerable.Repeat(0, rand.Next(minLength, maxLength)).Select(i => rand.Next(min, max)); } 

Ich habe diesen Code im STL-Stil geschrieben, der N-way merge und dachte, ich würde ihn hier posten, um zu verhindern, dass andere das Rad neu erfinden. 🙂

Warnung: Es ist nur leicht getestet. Vor dem Gebrauch testen. 🙂

Du kannst es so benutzen:

 #include  int main() { std::vector > v; std::vector::iterator> vout; std::vector v1; std::vector v2; v1.push_back(1); v1.push_back(2); v1.push_back(3); v2.push_back(0); v2.push_back(1); v2.push_back(2); v.push_back(v1); v.push_back(v2); multiway_merge(v.begin(), v.end(), std::back_inserter(vout), false); } 

Es ermöglicht auch die Verwendung von Iteratorpaaren anstelle der Container selbst.

Wenn Sie Boost.Range verwenden, können Sie einen Teil des Standardcodes entfernen.

Der Code:

 #include  #include  // std::less #include  #include  // std::priority_queue #include  // std::pair #include  template struct multiway_merge_value_insert_iterator : public std::iterator< std::output_iterator_tag, OutIt, ptrdiff_t > { OutIt it; multiway_merge_value_insert_iterator(OutIt const it = OutIt()) : it(it) { } multiway_merge_value_insert_iterator &operator++(int) { return *this; } multiway_merge_value_insert_iterator &operator++() { return *this; } multiway_merge_value_insert_iterator &operator *() { return *this; } template multiway_merge_value_insert_iterator &operator =(It const i) { *this->it = *i; ++this->it; return *this; } }; template multiway_merge_value_insert_iterator multiway_merge_value_inserter(OutIt const it) { return multiway_merge_value_insert_iterator(it); }; template struct multiway_merge_value_less : private Less { multiway_merge_value_less(Less const &less) : Less(less) { } template bool operator()( std::pair const &b /* inverted */, std::pair const &a) const { return b.first != b.second && ( a.first == a.second || this->Less::operator()(*a.first, *b.first)); } }; struct multiway_merge_default_less { template bool operator()(T const &a, T const &b) const { return std::less()(a, b); } }; template struct multiway_merge_range_iterator { typedef typename R::iterator type; }; template struct multiway_merge_range_iterator { typedef typename R::const_iterator type; }; template struct multiway_merge_range_iterator > { typedef It type; }; template typename R::iterator multiway_merge_range_begin(R &r) { return r.begin(); } template typename R::iterator multiway_merge_range_end(R &r) { return r.end(); } template typename R::const_iterator multiway_merge_range_begin(R const &r) { return r.begin(); } template typename R::const_iterator multiway_merge_range_end(R const &r) { return r.end(); } template It multiway_merge_range_begin(std::pair const &r) { return r.first; } template It multiway_merge_range_end(std::pair const &r) { return r.second; } template OutIt multiway_merge( It begin, It const end, OutIt out, Less const &less, PQ &pq, bool const distinct = false) { while (begin != end) { pq.push(typename PQ::value_type( multiway_merge_range_begin(*begin), multiway_merge_range_end(*begin))); ++begin; } while (!pq.empty()) { typename PQ::value_type top = pq.top(); pq.pop(); if (top.first != top.second) { while (!pq.empty() && pq.top().first == pq.top().second) { pq.pop(); } if (!distinct || pq.empty() || less(*pq.top().first, *top.first) || less(*top.first, *pq.top().first)) { *out = top.first; ++out; } ++top.first; pq.push(top); } } return out; } template OutIt multiway_merge( It const begin, It const end, OutIt out, Less const &less, bool const distinct = false) { typedef typename multiway_merge_range_iterator< typename std::iterator_traits::value_type >::type SubIt; if (std::distance(begin, end) < 16) { typedef std::vector > Remaining; Remaining remaining; remaining.reserve( static_cast(std::distance(begin, end))); for (It i = begin; i != end; ++i) { if (multiway_merge_range_begin(*i) != multiway_merge_range_end(*i)) { remaining.push_back(std::make_pair( multiway_merge_range_begin(*i), multiway_merge_range_end(*i))); } } while (!remaining.empty()) { typename Remaining::iterator smallest = remaining.begin(); for (typename Remaining::iterator i = remaining.begin(); i != remaining.end(); ) { if (less(*i->first, *smallest->first)) { smallest = i; ++i; } else if (distinct && i != smallest && !less( *smallest->first, *i->first)) { i = remaining.erase(i); } else { ++i; } } *out = smallest->first; ++out; ++smallest->first; if (smallest->first == smallest->second) { smallest = remaining.erase(smallest); } } return out; } else { std::priority_queue< std::pair, std::vector >, multiway_merge_value_less > q((multiway_merge_value_less(less))); return multiway_merge(begin, end, out, less, q, distinct); } } template OutIt multiway_merge( It const begin, It const end, OutIt const out, bool const distinct = false) { return multiway_merge( begin, end, out, multiway_merge_default_less(), distinct); } 
 Here is my implementation using MinHeap... package merging; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; public class N_Way_Merge { int No_of_files=0; String[] listString; int[] listIndex; PrintWriter pw; private String fileDir = "D:\\XMLParsing_Files\\Extracted_Data"; private File[] fileList; private BufferedReader[] readers; public static void main(String[] args) throws IOException { N_Way_Merge nwm=new N_Way_Merge(); long start= System.currentTimeMillis(); try { nwm.createFileList(); nwm.createReaders(); nwm.createMinHeap(); } finally { nwm.pw.flush(); nwm.pw.close(); for (BufferedReader readers : nwm.readers) { readers.close(); } } long end = System.currentTimeMillis(); System.out.println("Files merged into a single file.\nTime taken: "+((end-start)/1000)+"secs"); } public void createFileList() throws IOException { //creates a list of sorted files present in a particular directory File folder = new File(fileDir); fileList = folder.listFiles(); No_of_files=fileList.length; assign(); System.out.println("No. of files - "+ No_of_files); } public void assign() throws IOException { listString = new String[No_of_files]; listIndex = new int[No_of_files]; pw = new PrintWriter(new BufferedWriter(new FileWriter("D:\\XMLParsing_Files\\Final.txt", true))); } public void createReaders() throws IOException { //creates array of BufferedReaders to read the files readers = new BufferedReader[No_of_files]; for(int i=0;i=0;--i) MinHeapify(listString,listIndex,i); } public void MinHeapify(String[] listString,int[] listIndex,int index){ int left=index*2 + 1; int right=left + 1; int smallest=index; int HeapSize=No_of_files; if(left < = HeapSize-1 && listString[left]!=null && (listString[left].compareTo(listString[index])) < 0) smallest = left; if(right <= HeapSize-1 && listString[right]!=null && (listString[right].compareTo(listString[smallest])) < 0) smallest=right; if(smallest!=index) { String temp=listString[index]; listString[index]=listString[smallest]; listString[smallest]=temp; listIndex[smallest]^=listIndex[index]; listIndex[index]^=listIndex[smallest]; listIndex[smallest]^=listIndex[index]; MinHeapify(listString,listIndex,smallest); } } 

}

Java-Implementierung des Min-Heap-Algorithmus zum Zusammenführen von k-sortierten Arrays:

 public class MergeKSorted { /** * helper object to store min value of each array in a priority queue, * the kth array and the index into kth array * */ static class PQNode implements Comparable{ int value; int kth = 0; int indexKth = 0; public PQNode(int value, int kth, int indexKth) { this.value = value; this.kth = kth; this.indexKth = indexKth; } @Override public int compareTo(PQNode o) { if(o != null) { return Integer.valueOf(value).compareTo(Integer.valueOf(o.value)); } else return 0; } @Override public String toString() { return value+" "+kth+" "+indexKth; } } public static void mergeKSorted(int[][] sortedArrays) { int k = sortedArrays.length; int resultCtr = 0; int totalSize = 0; PriorityQueue pq = new PriorityQueue<>(); for(int i=0; i 0) { PQNode temp = new PQNode(kthArray[0], i, 0); pq.add(temp); } } int[] result = new int[totalSize]; while(!pq.isEmpty()) { PQNode temp = pq.poll(); int[] kthArray = sortedArrays[temp.kth]; result[resultCtr] = temp.value; resultCtr++; temp.indexKth++; if(temp.indexKth < kthArray.length) { temp = new PQNode(kthArray[temp.indexKth], temp.kth, temp.indexKth); pq.add(temp); } } print(result); } public static void print(int[] a) { StringBuilder sb = new StringBuilder(); for(int v : a) { sb.append(v).append(" "); } System.out.println(sb); } public static void main(String[] args) { int[][] sortedA = { {3,4,6,9}, {4,6,8,9,12}, {3,4,9}, {1,4,9} }; mergeKSorted(sortedA); } }