Wie ändert man die Spaltentypen in DatFrame von Spark SQL?

Angenommen, ich mache etwas wie:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true")) df.printSchema() root |-- year: string (nullable = true) |-- make: string (nullable = true) |-- model: string (nullable = true) |-- comment: string (nullable = true) |-- blank: string (nullable = true) df.show() year make model comment blank 2012 Tesla S No comment 1997 Ford E350 Go get one now th... 

aber ich wollte das year wirklich als Int (und vielleicht einige andere Spalten umwandeln).

Das Beste, was ich mir vorstellen kann, ist

 df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank) org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string] 

Das ist ein bisschen verschlungen.

Ich komme aus R, und ich bin es gewohnt zu schreiben, zB

 df2 % mutate(year = year %>% as.integer, make = make %>% toupper) 

Ich vermisse wahrscheinlich etwas, da es einen besseren Weg geben sollte, dies in Spark / Scala zu tun …

Solutions Collecting From Web of "Wie ändert man die Spaltentypen in DatFrame von Spark SQL?"

Seit Spark Version 1.4 können Sie die Cast-Methode mit DataType auf die Spalte anwenden:

 import org.apache.spark.sql.types.IntegerType val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType)) .drop("year") .withColumnRenamed("yearTmp", "year") 

Wenn Sie SQL-Ausdrücke verwenden, können Sie auch Folgendes tun:

 val df2 = df.selectExpr("cast(year as int) year", "make", "model", "comment", "blank") 

Weitere Informationen finden Sie in der Dokumentation: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

[EDIT: März 2016: Danke für die Stimmen! Obwohl das wirklich nicht die beste Antwort ist, denke ich, dass die Lösungen mit withColumn , withColumnRenamed und cast von msemelman, Martin Senne und anderen einfacher und sauberer sind.

Ich denke, Ihr Ansatz ist in DataFrame , erinnern Sie sich, dass ein Spark DataFrame eine (unveränderliche) RDD von Zeilen ist, also ersetzen wir niemals wirklich eine Spalte, DataFrame jedes Mal einen neuen DataFrame mit einem neuen Schema.

Angenommen, Sie haben ein Original-DF mit dem folgenden Schema:

 scala> df.printSchema root |-- Year: string (nullable = true) |-- Month: string (nullable = true) |-- DayofMonth: string (nullable = true) |-- DayOfWeek: string (nullable = true) |-- DepDelay: string (nullable = true) |-- Distance: string (nullable = true) |-- CRSDepTime: string (nullable = true) 

Und einige UDFs sind in einer oder mehreren Spalten definiert:

 import org.apache.spark.sql.functions._ val toInt = udf[Int, String]( _.toInt) val toDouble = udf[Double, String]( _.toDouble) val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) val days_since_nearest_holidays = udf( (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12 ) 

Das Ändern von Spaltentypen oder sogar das Erstellen eines neuen Datenrahmens aus einem anderen kann folgendermaßen aussehen:

 val featureDf = df .withColumn("departureDelay", toDouble(df("DepDelay"))) .withColumn("departureHour", toHour(df("CRSDepTime"))) .withColumn("dayOfWeek", toInt(df("DayOfWeek"))) .withColumn("dayOfMonth", toInt(df("DayofMonth"))) .withColumn("month", toInt(df("Month"))) .withColumn("distance", toDouble(df("Distance"))) .withColumn("nearestHoliday", days_since_nearest_holidays( df("Year"), df("Month"), df("DayofMonth")) ) .select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", "month", "distance", "nearestHoliday") 

was ergibt:

 scala> df.printSchema root |-- departureDelay: double (nullable = true) |-- departureHour: integer (nullable = true) |-- dayOfWeek: integer (nullable = true) |-- dayOfMonth: integer (nullable = true) |-- month: integer (nullable = true) |-- distance: double (nullable = true) |-- nearestHoliday: integer (nullable = true) 

Das ist ziemlich nah an Ihrer eigenen Lösung. udf val die udf val und andere Transformationen als separate udf val , wird der Code besser lesbar und wiederverwendbar.

Da der cast Vorgang für die Spark- Column verfügbar ist (und da ich persönlich die von @ Svend vorgeschlagenen udf nicht favorisiere), wie wäre es mit:

 df.select( df("year").cast(IntegerType).as("year"), ... ) 

in den gewünschten Typ umwandeln? Als einen netten Nebeneffekt werden Werte, die in diesem Sinne nicht gießbar / “konvertierbar” sind, null .

Falls Sie dies als Hilfsmethode benötigen, verwenden Sie:

 object DFHelper{ def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = { df.withColumn( cn, df(cn).cast(tpe) ) } } 

welches benutzt wird wie:

 import DFHelper._ val df2 = castColumnTo( df, "year", IntegerType ) 

Zuerst, wenn du Typ gießen willst

 import org.apache.spark.sql df.withColumn("year", $"year".cast(sql.types.IntegerType)) 

Bei gleichem Spaltennamen wird die Spalte durch eine neue ersetzt, Sie müssen sie nicht hinzufügen und löschen.

Zweitens, über Scala gegen R. der Scala-Code, der R am ähnlichsten ist, den ich erreichen kann:

 val df2 = df.select( df.columns.map { case year @ "year" => df(year).cast(IntegerType).as(year) case make @ "make" => functions.upper(df(make)).as(make) case other => df(other) }: _* ) 

Obwohl die Länge ein wenig länger als R ist. Beachten Sie, dass die mutate eine function für den R- mutate ist, daher ist Scala sehr gut in der Ausdruckskraft gegeben, ohne eine spezielle function zu verwenden.

( df.columns ist überraschenderweise ein Array [String] anstelle von Array [Column], vielleicht möchten sie, dass es wie der Datenrahmen von Python Pandas aussieht.)

Sie können selectExpr , um es ein wenig sauberer zu machen:

 df.selectExpr("cast(year as int) as year", "upper(make) as make", "model", "comment", "blank") 

Um das Jahr von string nach int zu konvertieren, können Sie dem csv reader die folgende Option hinzufügen: “inferSchema” -> “true”, siehe DataBricks-Dokumentation

Das funktioniert also nur, wenn Sie Probleme beim Speichern in einem jdbc-Treiber wie sqlserver haben, aber es ist sehr hilfreich für Fehler, die Sie mit Syntax und Typen bekommen.

 import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect} import org.apache.spark.sql.jdbc.JdbcType val SQLServerDialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver") override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT)) case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT)) case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE)) case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL)) case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC") } } JdbcDialects.registerDialect(SQLServerDialect) 

Java-Code zum Ändern des Datentyps des Datenrahmens von String zu Integer

 df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType)) 

Es wird einfach den vorhandenen (String-Datentyp) in Integer umwandeln.

die Antworten, die vorschlagen, Cast zu verwenden, FYI, die Cast-Methode in Spark 1.4.1 ist gebrochen.

zum Beispiel hat ein Datenrahmen mit einer Zeichenkettenspalte, die den Wert “8182175552014127960” hat, wenn er in bigint geworfen wird, den Wert “8182175552014128100”.

  df.show +-------------------+ | a| +-------------------+ |8182175552014127960| +-------------------+ df.selectExpr("cast(a as bigint) a").show +-------------------+ | a| +-------------------+ |8182175552014128100| +-------------------+ 

Bevor wir diesen Fehler gefunden haben, mussten wir uns mit vielen Problemen herumschlagen, da wir Bigint-Spalten in der Produktion hatten.

 df.select($"long_col".cast(IntegerType).as("int_col")) 

Sie können den folgenden Code verwenden.

 df.withColumn("year", df("year").cast(IntegerType)) 

IntegerType konvertiert die IntegerType Spalte IntegerType .

Diese Methode löscht die alte Spalte und erstellt neue Spalten mit denselben Werten und einem neuen Datentyp. Meine ursprünglichen Datentypen beim Erstellen des DataFrames waren:

 root |-- id: integer (nullable = true) |-- flag1: string (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag3: string (nullable = true) 

Danach habe ich folgenden Code ausgeführt, um den Datentyp zu ändern:

 df=df.withColumnRenamed(,) // This was done for both flag1 and flag3 df=df.withColumn(,df.col().cast()).drop() 

Danach kam mein Ergebnis heraus:

 root |-- id: integer (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag1: boolean (nullable = true) |-- flag3: boolean (nullable = true) 

Generieren Sie ein einfaches Dataset mit fünf Werten und konvertieren Sie int in einen string Typ:

 val df = spark.range(5).select( col("id").cast("string") ) 
  val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd //Schema to be applied to the table val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType) val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates() 

Mit cast in spark sql kann der Datentyp einer Spalte geändert werden. Tabellenname ist Tabelle und es hat nur zwei Spalten Spalte1 und Spalte2 und Spalte1 Datentyp geändert werden soll. ex-spark.sql (“select cast (column1 als Double) column1NewName, column2 from table”) An die Stelle des double schreiben Sie Ihren Datentyp.

Ein anderer Weg:

 // Generate a simple dataset containing five values and convert int to string type val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")