Wie speichern Sie benutzerdefinierte Objekte in Dataset?

Laut Introducing Spark Datasets :

Während wir uns auf Spark 2.0 freuen, planen wir einige aufregende Verbesserungen an Datasets, insbesondere: … Benutzerdefinierte Encoder – Während wir derzeit Encoder für eine Vielzahl von Typen automatisch generieren, möchten wir eine API für benutzerdefinierte Objekte öffnen.

und Versuche, benutzerdefinierten Typ in einem Dataset zu speichern, führen zu folgendem Fehler wie:

Der Encoder für den in einem Dataset gespeicherten Typ kann nicht gefunden werden. Primitive Typen (Int, String usw.) und Produkttypen (Fallklassen) werden durch den Import von sqlContext.implicits unterstützt. Unterstützung für die Serialisierung anderer Typen wird in zukünftigen Versionen hinzugefügt

oder:

Java.lang.UnsupportedOperationException: Kein Encoder gefunden für ….

Gibt es Workarounds?


Beachten Sie, dass diese Frage nur als Einstiegspunkt für eine Community-Wiki-Antwort existiert. Fühlen Sie sich frei, sowohl die Frage als auch die Antwort zu verbessern.

   

Aktualisieren

Diese Antwort ist immer noch gültig und informativ, obwohl die Dinge seit 2.2 / 2.3 jetzt besser sind, die integrierte Encoderunterstützung für Set , Seq , Map , Date , Timestamp und BigDecimal hinzufügt. Wenn Sie bei der Erstellung von Typen mit nur SQLImplicits und den üblichen Scala-Typen bleiben, sollten Sie nur mit den implicit in SQLImplicits .


Leider wurde so gut wie nichts hinzugefügt. Die Suche nach @since 2.0.0 in SQLImplicits.scala oder SQLImplicits.scala findet Dinge, die hauptsächlich mit primitiven Typen zu tun haben (und einige SQLImplicits.scala ). Also, zuerst einmal zu sagen: Es gibt derzeit keine wirklich gute Unterstützung für benutzerdefinierte classn-Encoder . Danach sind einige Tricks, die einen so guten Job machen, wie wir es uns je erhoffen können, angesichts dessen, was wir derzeit zur Verfügung haben. Als Vorab-Disclaimer: Das wird nicht perfekt funktionieren und ich werde mein Bestes tun, um alle Einschränkungen klar und im Voraus zu machen.

Was genau ist das Problem?

Wenn Sie ein Dataset erstellen möchten, benötigt Spark einen Encoder (um ein JVM-Objekt vom Typ T in die interne Spark SQL-Repräsentation zu konvertieren), das im Allgemeinen automatisch durch SparkSession von SparkSession wird oder explizit durch den Aufruf von static erstellt werden kann Methoden auf Encoders “(aus den Dokumenten von createDataset ). Ein Encoder hat die Form Encoder[T] wobei T der Typ ist, den Sie codieren. Der erste Vorschlag ist, import spark.implicits._ hinzuzufügen (was Ihnen diese impliziten Encoder gibt) und der zweite Vorschlag besteht darin, den impliziten Encoder explizit unter Verwendung dieser Menge von Encoder-bezogenen functionen zu übergeben.

Für reguläre classn ist kein Encoder verfügbar

 import spark.implicits._ class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

gibt Ihnen den folgenden impliziten Kompilierungszeiterrors:

Der Encoder für den in einem Dataset gespeicherten Typ kann nicht gefunden werden. Primitive Typen (Int, String usw.) und Produkttypen (Fallklassen) werden durch den Import von sqlContext.implicits unterstützt. Unterstützung für die Serialisierung anderer Typen wird in zukünftigen Versionen hinzugefügt

Wenn Sie jedoch den von Ihnen verwendeten Typ umbrechen, um den obigen Fehler in einer class zu erhalten, die Product , wird der Fehler verwirrenderweise zur Laufzeit verzögert

 import spark.implicits._ case class Wrap[T](unwrap: T) class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3)))) 

Kompiliert gut, scheitert aber zur Laufzeit mit

java.lang.UnsupportedOperationException: Kein Encoder für MyObj gefunden

Der Grund dafür ist, dass die Encoder, die Spark mit den Implicits erstellt, eigentlich nur zur Laufzeit gemacht werden (via Scala-Relfection). In diesem Fall sind alle Spark-Prüfungen zur Kompilierzeit, dass die äußerste class Product (was alle Case-classn tun) und erst zur Laufzeit realisiert, dass sie immer noch nicht weiß, was sie mit MyObj (das gleiche Problem tritt auf, wenn ich es versucht habe) um ein Dataset[(Int,MyObj)] – Spark wartet bis Laufzeit auf MyObj zu MyObj . Dies sind zentrale Probleme, die dringend behoben werden müssen:

  • Einige classn, die die Product , obwohl sie zur Laufzeit immer abstürzen
  • Es gibt keine Möglichkeit, benutzerdefinierte Encoder für verschachtelte Typen zu übergeben (ich habe keine Möglichkeit, Spark für nur MyObj einen Encoder zu MyObj so dass dieser dann Wrap[MyObj] oder (Int,MyObj) ).

kryo einfach kryo

Die Lösung, die jeder kryo , ist die Verwendung des kryo Encoders.

 import spark.implicits._ class MyObj(val i: Int) implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

Das wird aber ziemlich schnell langweilig. Vor allem, wenn Ihr Code alle Arten von Datensätzen manipuliert, verbindet, gruppiert usw. Sie enden damit, eine Menge zusätzlicher Implikate aufzubauen. Also, warum nicht einfach ein implizites machen, das alles automatisch macht?

 import scala.reflect.ClassTag implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct) 

Und jetzt scheint es, als ob ich fast alles machen kann, was ich will (das folgende Beispiel funktioniert nicht in der spark-shell wo spark.implicits._ automatisch importiert wird)

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and .. val d3 = d1.map(d => (di, d)).alias("d3") // .. deals with the new type val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom! 

Oder fast. Das Problem besteht darin, dass die Verwendung von kryo führt, dass Spark jede Zeile im Dataset als flaches binäres Objekt speichert. Für map , filter , foreach das ist genug, aber für Operationen wie join braucht Spark diese in Spalten zu trennen. Wenn Sie das Schema für d2 oder d3 , sehen Sie, dass es nur eine binäre Spalte gibt:

 d2.printSchema // root // |-- value: binary (nullable = true) 

Teillösung für Tupel

Also, mit der Magie der implicits in Scala (mehr in 6.26.3 Overloading Resolution ), kann ich eine Reihe von implicits machen, die zumindest für Tupel einen möglichst guten Job machen und gut mit existierenden implicits funktionieren:

 import org.apache.spark.sql.{Encoder,Encoders} import scala.reflect.ClassTag import spark.implicits._ // we can still take advantage of all the old implicits implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2] ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3] ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3) // ... you can keep making these 

Dann, mit diesen Implikationen bewaffnet, kann ich mein Beispiel über die Arbeit bringen, wenn auch mit einer Spaltenumbenennung

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2") val d3 = d1.map(d => (di ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") 

Ich habe noch nicht herausgefunden, wie man die erwarteten Tupelnamen ( _1 , _2 , …) standardmäßig erhält, ohne sie umzubenennen – wenn jemand anderes damit spielen will, wird hier der Name "value" eingeführt und Dies ist, wo die Tupelnamen normalerweise hinzugefügt werden. Der entscheidende Punkt ist jedoch, dass ich jetzt ein schönes strukturiertes Schema habe:

 d4.printSchema // root // |-- _1: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) // |-- _2: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) 

Also, zusammenfassend, diese Problemumgehung:

  • erlaubt es uns, separate Spalten für Tupel zu erhalten (damit wir wieder Tupel beitreten können, yay!)
  • wir können uns wieder nur auf die implicits verlassen (also keine Notwendigkeit, überall im kryo vorbei zu kryo )
  • ist fast vollständig rückwärtskompatibel mit import spark.implicits._ (mit etwas Umbenennung)
  • Lassen Sie uns nicht zu den kyro serialisierten binären Spalten kommen, geschweige denn zu Feldern, die diese haben können
  • hat den unangenehmen Nebeneffekt, einige .toDF in “value” .toDF (wenn nötig, kann dies rückgängig gemacht werden, indem .toDF konvertiert .toDF , neue Spaltennamen angegeben und zurück in ein Dataset konvertiert werden) und die Schemanamen scheinen erhalten zu bleiben durch Joins, wo sie am meisten gebraucht werden).

Teillösung für classn im Allgemeinen

Dieser ist weniger angenehm und hat keine gute Lösung. Nun, da wir die Tupel-Lösung oben haben, habe ich eine Ahnung, dass die implizite Konvertierungslösung aus einer anderen Antwort ein wenig weniger schmerzhaft sein wird, da Sie Ihre komplexeren classn in Tupel konvertieren können. Nach dem Erstellen des Datasets würden Sie die Spalten wahrscheinlich mithilfe des Dataframe-Ansatzes umbenennen. Wenn alles gut geht, ist das wirklich eine Verbesserung, da ich jetzt Joins auf den Feldern meiner classn durchführen kann. Wenn ich gerade einen flachen binären kryo Serialisierer benutzt hätte, wäre das nicht möglich gewesen.

Hier ist ein Beispiel, das ein bisschen von allem macht: Ich habe eine class MyObj die Felder der Typen Int , java.util.UUID und Set[String] . Der erste kümmert sich um sich selbst. Die zweite, obwohl ich mit kryo serialisieren kryo wäre nützlicher, wenn sie als String gespeichert würde (da UUID normalerweise etwas sind, an dem ich mich beteiligen möchte). Die dritte gehört wirklich nur in eine binäre Spalte.

 class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String]) // alias for the type to convert to and from type MyObjEncoded = (Int, String, Set[String]) // implicit conversions implicit def toEncoded(o: MyObj): MyObjEncoded = (oi, outoString, os) implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3) 

Jetzt kann ich mit dieser Maschine ein Dataset mit einem schönen Schema erstellen:

 val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")) )).toDF("i","u","s").as[MyObjEncoded] 

Und das Schema zeigt mir Spalten mit den richtigen Namen und mit den beiden ersten beiden Dingen, denen ich mich anschließen kann.

 d.printSchema // root // |-- i: integer (nullable = false) // |-- u: string (nullable = true) // |-- s: binary (nullable = true) 
  1. Verwenden von generischen Encodern.

    Es gibt zwei generische Encoder, die jetzt für kryo und javaSerialization wobei letzterer explizit wie folgt beschrieben wird:

    extrem ineffizient und sollte nur als letztes Mittel verwendet werden.

    Angenommen, man folgt der class

     class Bar(i: Int) { override def toString = s"bar $i" def bar = i } 

    Sie können diese Encoder verwenden, indem Sie einen impliziten Encoder hinzufügen:

     object BarEncoders { implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] } 

    die zusammen wie folgt verwendet werden können:

     object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarEncoders._ val ds = Seq(new Bar(1)).toDS ds.show sc.stop() } } 

    Es speichert Objekte als binary Spalte. DataFrame Sie also nach DataFrame konvertiert werden, DataFrame Sie folgendes Schema:

     root |-- value: binary (nullable = true) 

    Es ist auch möglich, Tupel mit einem kryo Encoder für ein bestimmtes Feld zu codieren:

     val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar]) spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder) // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary] 

    Bitte beachten Sie, dass wir hier nicht auf implizite Encoder angewiesen sind, sondern den Encoder explizit übergeben, so dass dies höchstwahrscheinlich nicht mit der toDS Methode toDS .

  2. Implizite Konvertierungen verwenden:

    Stellen Sie implizite Konvertierungen zwischen der darstellbaren Repräsentation und der benutzerdefinierten class bereit, zum Beispiel:

     object BarConversions { implicit def toInt(bar: Bar): Int = bar.bar implicit def toBar(i: Int): Bar = new Bar(i) } object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarConversions._ type EncodedBar = Int val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1))) val barsDS = bars.toDS barsDS.show barsDS.map(_.bar).show sc.stop() } } 

Verwandte Fragen:

  • Wie erstelle ich einen Encoder für den Optionstyp Konstruktor, zB Option [Int]?

Encoder funktionieren in Spark2.0 mehr oder weniger gleich. Und Kryo ist immer noch die empfohlene serialization .

Sie können folgendes Beispiel mit Spark-Shell betrachten

 scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old. 

Bis jetzt gab es keine appropriate encoders im gegenwärtigen scope, so dass unsere Personen nicht als binary Werte codiert waren. Aber das wird sich ändern, wenn wir einige implicit Encoder mit Kryo Serialisierung bereitstellen.

 // Provide Encoders scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson] normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary] scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson] reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary] // Ecoders will be used since they are now present in Scope scala> val ds3 = sc.parallelize(normalPersons).toDS ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary] scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name)) ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary] // now all our persons show up as binary values scala> ds3.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ scala> ds4.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ // Our instances still work as expected scala> ds3.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Spiderman. I am 17 years old. I am Superman. I am 25 years old. scala> ds4.foreach(p => println(p.aboutMe)) I am 25. I am Superman years old. I am 29. I am Ironman years old. I am 17. I am Spiderman years old. 

Im Falle einer Java Bean-class kann dies nützlich sein

 import spark.sqlContext.implicits._ import org.apache.spark.sql.Encoders implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass]) 

Jetzt können Sie den Datenrahmen einfach als benutzerdefinierten Datenrahmen lesen

 dataFrame.as[MyClass] 

Dadurch wird ein benutzerdefinierter classncodierer und kein Binärcode erstellt.

Sie können UDTRegistration verwenden und dann Case Classes, Tuples, etc … funktionieren alle korrekt mit Ihrem benutzerdefinierten Typ!

Angenommen, Sie möchten eine benutzerdefinierte Enumeration verwenden:

 trait CustomEnum { def value:String } case object Foo extends CustomEnum { val value = "F" } case object Bar extends CustomEnum { val value = "B" } object CustomEnum { def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get } 

Registrieren Sie es so:

 // First define a UDT class for it: class CustomEnumUDT extends UserDefinedType[CustomEnum] { override def sqlType: DataType = org.apache.spark.sql.types.StringType override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value) // Note that this will be a UTF8String type override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString) override def userClass: Class[CustomEnum] = classOf[CustomEnum] } // Then Register the UDT Class! // NOTE: you have to put this file into the org.apache.spark package! UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName) 

Dann benutze es!

 case class UsingCustomEnum(id:Int, en:CustomEnum) val seq = Seq( UsingCustomEnum(1, Foo), UsingCustomEnum(2, Bar), UsingCustomEnum(3, Foo) ).toDS() seq.filter(_.en == Foo).show() println(seq.collect()) 

Angenommen, Sie möchten einen polymorphen Datensatz verwenden:

 trait CustomPoly case class FooPoly(id:Int) extends CustomPoly case class BarPoly(value:String, secondValue:Long) extends CustomPoly 

… und das nutzt es so:

 case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show() 

Sie können einen benutzerdefinierten UDT schreiben, der alles in Bytes codiert (ich benutze hier die Java-Serialisierung, aber es ist wahrscheinlich besser, Sparks Kryo-Kontext zu instrumentieren).

Definieren Sie zuerst die UDT-class:

 class CustomPolyUDT extends UserDefinedType[CustomPoly] { val kryo = new Kryo() override def sqlType: DataType = org.apache.spark.sql.types.BinaryType override def serialize(obj: CustomPoly): Any = { val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) oos.writeObject(obj) bos.toByteArray } override def deserialize(datum: Any): CustomPoly = { val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]]) val ois = new ObjectInputStream(bis) val obj = ois.readObject() obj.asInstanceOf[CustomPoly] } override def userClass: Class[CustomPoly] = classOf[CustomPoly] } 

Dann registriere es:

 // NOTE: The file you do this in has to be inside of the org.apache.spark package! UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName) 

Dann kannst du es benutzen!

 // As shown above: case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show() 

Meine Beispiele werden in Java sein, aber ich kann mir nicht vorstellen, dass es schwierig ist, sich an Scala anzupassen.

Ich war recht erfolgreich, indem ich RDD Verwendung von spark.createDataset und Encoders.bean in Dataset umwandelte, solange Fruit eine einfache Java-Bean ist .

Schritt 1: Erstellen Sie die einfache Java Bean.

 public class Fruit implements Serializable { private String name = "default-fruit"; private String color = "default-color"; // AllArgsConstructor public Fruit(String name, String color) { this.name = name; this.color = color; } // NoArgsConstructor public Fruit() { this("default-fruit", "default-color"); } // ...create getters and setters for above fields // you figure it out } 

Ich würde mich an classn mit primitiven Typen und String als Felder halten, bevor die DataBricks-Leute ihre Encoder verstärken. Wenn Sie eine class mit verschachteltem Objekt haben, erstellen Sie ein weiteres einfaches Java-Bean mit allen abgeflachten Feldern, so dass Sie RDD-Transformationen verwenden können, um den komplexen Typ dem einfacheren zuzuordnen. Sicher, es ist ein bisschen mehr Arbeit, aber ich denke, es wird sehr hilfreich sein, wenn man mit einem flachen Schema arbeitet.

Schritt 2: Holen Sie sich Ihr Dataset von der RDD

 SparkSession spark = SparkSession.builder().getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(); List fruitList = ImmutableList.of( new Fruit("apple", "red"), new Fruit("orange", "orange"), new Fruit("grape", "purple")); JavaRDD fruitJavaRDD = jsc.parallelize(fruitList); RDD fruitRDD = fruitJavaRDD.rdd(); Encoder fruitBean = Encoders.bean(Fruit.class); Dataset fruitDataset = spark.createDataset(rdd, bean); 

Und voila! Aufschäumen, abspülen, wiederholen.

Für diejenigen, die in meiner Situation vielleicht auch meine Antwort geben.

Um genau zu sein,

  1. Ich habe ‘Set typed data’ von SQLContext gelesen. Das ursprüngliche Datenformat ist DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Dann konvertieren Sie es in RDD mit rdd.map () mit mutable.WrappedArray Typ.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Ergebnis:

    (1,Set(1))