Spark – repartition () vs coalesce ()

Laut Lernfunken

Beachten Sie, dass die Neupartitionierung Ihrer Daten eine ziemlich teure Operation ist. Spark hat auch eine optimierte Version von repartition (), die coalesce () genannt wird, die das Vermeiden von Datenverschiebungen ermöglicht, jedoch nur, wenn Sie die Anzahl der RDD-Partitionen verringern.

Ein Unterschied, den ich erhalte, ist, dass man mit partition () die Anzahl der Partitionen erhöhen / verringern kann, aber mit coalesce () kann die Anzahl der Partitionen nur verringert werden.

Wenn die Partitionen auf mehrere Maschinen verteilt sind und coalesce () ausgeführt wird, wie kann sie Datenbewegungen vermeiden?

Solutions Collecting From Web of "Spark – repartition () vs coalesce ()"

Es vermeidet einen vollständigen Shuffle. Wenn bekannt ist, dass die Anzahl abnimmt, kann der Executor Daten auf der minimalen Anzahl von Partitionen speichern, wobei nur die Daten von den zusätzlichen Knoten auf die Knoten verschoben werden, die wir behalten haben.

Also würde es so etwas gehen:

Node 1 = 1,2,3 Node 2 = 4,5,6 Node 3 = 7,8,9 Node 4 = 10,11,12 

Dann coalesce Sie zu 2 Partitionen:

 Node 1 = 1,2,3 + (10,11,12) Node 3 = 7,8,9 + (4,5,6) 

Beachten Sie, dass für Knoten 1 und Knoten 3 keine Verschiebung der ursprünglichen Daten erforderlich war.

Justins Antwort ist großartig und diese Antwort geht tiefer.

Der repartition führt einen vollständigen Shuffle durch und erstellt neue Partitionen mit gleichmäßig verteilten Daten. Erstellen wir einen Datenrahmen mit den Zahlen von 1 bis 12.

 val x = (1 to 12).toList val numbersDf = x.toDF("number") 

numbersDf enthält 4 Partitionen auf meinem Computer.

 numbersDf.rdd.partitions.size // => 4 

Hier ist, wie die Daten auf den Partitionen aufgeteilt sind:

 Partition 00000: 1, 2, 3 Partition 00001: 4, 5, 6 Partition 00002: 7, 8, 9 Partition 00003: 10, 11, 12 

Lassen Sie uns mit der repartition ein Full-Shuffle machen und diese Daten auf zwei Knoten bekommen.

 val numbersDfR = numbersDf.repartition(2) 

Hier ist, wie die numbersDfR Daten auf meinem Rechner partitioniert sind:

 Partition A: 1, 3, 4, 6, 7, 9, 10, 12 Partition B: 2, 5, 8, 11 

Die repartition neue Partitionen und verteilt die Daten gleichmäßig in den neuen Partitionen (die Datenverteilung ist bei größeren Datensätzen gleichmäßiger).

Unterschied zwischen coalesce und repartition

coalesce verwendet vorhandene Partitionen, um die Menge der Daten zu minimieren, die gemischt werden. repartition erstellt neue Partitionen und führt einen vollständigen Shuffle durch. coalesce führt zu Partitionen mit unterschiedlichen Datenmengen (manchmal Partitionen mit sehr unterschiedlichen Größen), und die Partitionierung führt zu ungefähr gleich großen Partitionen.

Ist coalesce oder repartition schneller?

coalesce kann schneller als repartition , aber ungleiche Partitionen sind in der Regel langsamer als gleich große Partitionen. In der Regel müssen Sie die Datasets nach dem Filtern eines großen Datensatzes neu partitionieren. Ich habe herausgefunden, dass die Aufteilung insgesamt schneller ist, da Spark für die Arbeit mit gleich großen Partitionen entwickelt wurde.

Lesen Sie diesen Blogeintrag, wenn Sie noch mehr Details wünschen.

Ein zusätzlicher Punkt ist hier, dass das Grundprinzip von Spark RDD Unveränderlichkeit ist. Die Neupartitionierung oder Koaleszenz erstellt eine neue RDD. Die Basis-RDD wird weiterhin mit ihrer ursprünglichen Anzahl von Partitionen existieren. Für den Fall, dass der Anwendungsfall RDD im Cache beibehalten soll, muss das Gleiche für die neu erstellte RDD getan werden.

 scala> pairMrkt.repartition(10) res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at :26 scala> res16.partitions.length res17: Int = 10 scala> pairMrkt.partitions.length res20: Int = 2 

Alle Antworten fügen ein großes Wissen in diese sehr oft gestellte Frage hinzu.

Also, nach der Tradition der Zeitleiste dieser Frage, hier sind meine 2 Cent.

Ich fand die Aufteilung schneller als in einem sehr speziellen Fall.

In meiner Anwendung, wenn die Anzahl der Dateien, die wir schätzen, niedriger als der bestimmte Schwellenwert ist, funktioniert die Neupartitionierung schneller.

Hier ist was ich meine

 if(numFiles > 20) df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest) else df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest) 

Wenn meine Dateien im obigen Snippet kleiner als 20 waren, dauerte die Koaleszenz für immer, um fertig zu stellen, während die Verteilung viel schneller war und daher der obige Code.

Natürlich hängt diese Anzahl (20) von der Anzahl der Arbeiter und der Datenmenge ab.

Ich hoffe, das hilft.

Auf einfache Weise COALESCE: – ist nur für die Verringerung der Anzahl der Partitionen, Kein Mischen von Daten, es komprimiert nur die Partitionen

REPARTITION: – ist sowohl für die Erhöhung als auch für die Verringerung der Anzahl der Partitionen, aber das Mischen findet statt

Beispiel:-

 val rdd = sc.textFile("path",7) rdd.repartition(10) rdd.repartition(2) 

Beides funktioniert gut

Aber wir gehen generell für diese zwei Dinge, wenn wir die Ausgabe in einem Cluster sehen müssen, wir gehen damit.

Aber auch Sie sollten sicherstellen, dass die Daten, die Knoten zusammenführen, hoch konfiguriert sein sollten, wenn Sie mit riesigen Datenmengen arbeiten. Da alle Daten in diese Knoten geladen werden, kann eine Speicherausnahme führen. Obwohl Reparatur teuer ist, bevorzuge ich es. Da es die Daten gleichmäßig mischt und verteilt.

Wählen Sie zwischen Koaleszieren und Neuverteilung.

Neupartitionierung – Es wird empfohlen, die Neupartitionierung zu verwenden, während die Anzahl der Partitionen erhöht wird, da alle Daten gemischt werden müssen.

Koaleszieren – es wird empfohlen, Koaleszenz zu verwenden, während die Anzahl der Partitionen reduziert wird. Wenn Sie beispielsweise 3 Partitionen haben und diese auf 2 Partitionen reduzieren möchten, verschiebt Coalesce die Daten der 3. Partition auf Partition 1 und 2. Die Partitionen 1 und 2 verbleiben im selben Container. Bei der Neupartitionierung werden jedoch Daten in allen Partitionen gemischt zwischen Executor wird hoch sein und es beeinflusst die performance.

Performance-weise koaleszieren die Performance besser als die Neupartitionierung und reduziert gleichzeitig die Anzahl der Partitionen.