How to Change Schema of a Spark SQL DataFrame?
For the reason that I want to insert rows selected from a table
(df_rows) to another table, I need to make sure that
The schema of the rows selected are the same as the schema of the table
Since the function pyspark.sql.DataFrameWriter.insertInto
, which inserts the content of the DataFrame to the specified table, requires that the schema of the class:DataFrame is the same as the schema of the table.
Simple check
>>> df_table = sqlContext.sql("SELECT * FROM qacctdate")
>>> df_rows.schema == df_table.schema
If False
is shown, then we need to modify the schema of the selected rows to be the same as the table.
- Schema of selected rows
>>> df_rows.printSchema()
root
|-- subT_DATE: string (nullable = true)
|-- COUNT_USER: long (nullable = false)
|-- COUNT_NUM: long (nullable = false)
|-- SUMQ_TIME: double (nullable = true)
|-- SUMELP_TIME: double (nullable = true)
|-- SUMCPU_TIME: double (nullable = true)
- Schema of table to be inserted
>>> df_table.printSchema()
root
|-- subT_DATE: string (nullable = true)
|-- COUNT_USER: float (nullable = true)
|-- COUNT_NUM: float (nullable = true)
|-- SUMQ_TIME: float (nullable = true)
|-- SUMELP_TIME: float (nullable = true)
|-- SUMCPU_TIME: float (nullable = true)
Cast Type of Values If Needed
To modify the schema of the selcted rows, the following command is used:
>>> df_rows = sqlContext.createDataFrame(df_rows.collect(), df_table.schema)
However, TypeError
occurs:
TypeError: FloatType can not accept object in type <type 'int'>
So I need to manually cast the type of values.
For example, the following command raises an error:
>>> df_test = sqlContext.createDataFrame([('2011-11-30', 58, 7682,1793154.0,58867728, 42486286500)],df_table.schema)
But this would pass:
>>> df_test = sqlContext.createDataFrame([('2011-11-30', float(58),float(7682),1793154.0,float(58867728), float(42486286500))],df_table.schema)
In conclusion, I need to cast type of multiple columns manually:
>>> df_rows = df_rows.select(
df_rows.subT_DATE,
df_rows.COUNT_USER.cast("float"),
df_rows.COUNT_NUM.cast("float"),
df_rows.SUMQ_TIME.cast("float"),
df_rows.SUMELP_TIME.cast("float"),
df_rows.SUMCPU_TIME.cast("float")
)
Change The Schema
In order to change the schema, I try to create a new DataFrame based on the content of the original DataFrame using the following script.
>>> # This is not an efficient way to change the schema.
>>> df_rows = sqlContext.createDataFrame(df_rows.collect(), df_table.schema)
However, thanks to the comment from Anthony Hsu, this script is found to be catastrophic since the method collect()
may crash the driver program when the data is large.
Although DataFrames no longer inherit from RDD directly since Spark SQL 1.3, they can still be converted to RDDs by calling the .rdd
method. That’s why we can use .rdd
instead of collect()
:
>>> # This is a better way to change the schema
>>> df_rows = sqlContext.createDataFrame(df_rows.rdd, df_table.schema)
Check Result
>>> df_rows.schema == df_table.schema
True