Spark SQL Using Python
For SQL users, Spark SQL provides state-of-the-art SQL performance and maintains compatibility with Shark/Hive. In particular, like Shark, Spark SQL supports all existing Hive data formats, user-defined functions (UDF), and the Hive metastore. With features that will be introduced in Apache Spark 1.1.0, Spark SQL beats Shark in TPC-DS performance by almost an order of magnitude.
Used Versions
- Spark version: 1.5.0
- Python version: 2.6.6
Load Data
$ pyspark --packages com.databricks:spark-csv_2.10:1.5.0
Then pyspark would begin to prepare your spark environment. You can import the libraries neede and load your data as it is done.
>>> from pyspark.sql import SQLContext, DataFrameWriter
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('remoteFolder/input/qacct_2011.csv')
If you want to check the data you have loaded, you can use the following commands.
Infered Scema
>>> df.printSchema()
root
|-- Q_SVR: string (nullable = true)
|-- T_DATE: string (nullable = true)
|-- LOGIN_NAME: string (nullable = true)
|-- Q_NAME: string (nullable = true)
|-- JOB_NAME: string (nullable = true)
|-- JOB_ID: string (nullable = true)
|-- JOB_TYPE: string (nullable = true)
|-- MIN_CPU: string (nullable = true)
|-- MAX_CPU: string (nullable = true)
|-- REAL_CPU: string (nullable = true)
|-- Q_TIME: string (nullable = true)
|-- ELP_TIME: string (nullable = true)
|-- CPU_TIME: string (nullable = true)
|-- SU: string (nullable = true)
|-- YM: string (nullable = true)
|-- SUBMIT_DATE: string (nullable = true)
|-- SAT_NO: string (nullable = true)
|-- HOG_NO: string (nullable = true)
|-- CPU_NO: string (nullable = true)
|-- WALL_NO: string (nullable = true)
|-- AFT_SU: string (nullable = true)
|-- AFT_CPU: string (nullable = true)
|-- AFT_TIME: string (nullable = true)
|-- ELP_TIME_R: string (nullable = true)
View Rows
>>> df.show(10)
+-----+-------------------+----------+-----------+--------+------+--------+-------+-------+--------+------+--------+--------+-----+-------+-------------------+------+------+------+-------+------+-------+--------+----------+
|Q_SVR| T_DATE|LOGIN_NAME| Q_NAME|JOB_NAME|JOB_ID|JOB_TYPE|MIN_CPU|MAX_CPU|REAL_CPU|Q_TIME|ELP_TIME|CPU_TIME| SU| YM| SUBMIT_DATE|SAT_NO|HOG_NO|CPU_NO|WALL_NO|AFT_SU|AFT_CPU|AFT_TIME|ELP_TIME_R|
+-----+-------------------+----------+-----------+--------+------+--------+-------+-------+--------+------+--------+--------+-----+-------+-------------------+------+------+------+-------+------+-------+--------+----------+
|axxx5|2011-08-01_00:00:00| cxd|interactive|ls60_cmd| axxi3| S| 1| 1| 1| 0| 19| 19.45| .004|2011/08|2011-07-31_23:59:41| 04| 04| 00| 00| 0| 0| 0| 19|
|axxx5|2011-08-01_00:00:00| sxxxxxx4|interactive|ls60_cmd| axxi1| S| 1| 1| 1| 0| 82| 82.69|.0171|2011/08|2011-07-31_23:58:38| 04| 04| 00| 00| 0| 0| 0| 82|
|axxx5|2011-08-01_00:00:00| man|interactive|ls60_cmd| axxi1| S| 1| 1| 1| 0| 36| 36.13|.0075|2011/08|2011-07-31_23:59:24| 04| 04| 00| 00| 0| 0| 0| 36|
|axxx5|2011-08-01_00:00:00| root|interactive|ls60_cmd| axxi5| S| 1| 1| 1| 0| 86| 86.51|.0179|2011/08|2011-07-31_23:58:34| 04| 04| 00| 00| 0| 0| 0| 86|
|axxx5|2011-08-01_00:00:00| chem|interactive|ls60_cmd| axxi5| S| 1| 1| 1| 0| 10| 10.39|.0021|2011/08|2011-07-31_23:59:50| 04| 04| 00| 00| 0| 0| 0| 10|
|axxx5|2011-08-01_00:00:00| gxm|interactive|ls60_cmd| axxi2| S| 1| 1| 1| 0| 40| 40.85|.0083|2011/08|2011-07-31_23:59:20| 04| 04| 00| 00| 0| 0| 0| 40|
|axxx5|2011-08-01_00:00:00| man|interactive|ls60_cmd| axxi2| S| 1| 1| 1| 0| 25| 25.75|.0052|2011/08|2011-07-31_23:59:35| 04| 04| 00| 00| 0| 0| 0| 25|
|axxx5|2011-08-01_00:00:00| sxxxxxx4|interactive|ls60_cmd| axxi2| S| 1| 1| 1| 0| 87| 87.56|.0181|2011/08|2011-07-31_23:58:33| 04| 04| 00| 00| 0| 0| 0| 87|
|axxx5|2011-08-01_00:00:00| root|interactive|ls60_cmd| axxi2| S| 1| 1| 1| 0| 68| 68.72|.0142|2011/08|2011-07-31_23:58:52| 04| 04| 00| 00| 0| 0| 0| 68|
|axxx5|2011-08-01_00:00:00| man|interactive|ls60_cmd| axxi5| S| 1| 1| 1| 0| 29| 29.83| .006|2011/08|2011-07-31_23:59:31| 04| 04| 00| 00| 0| 0| 0| 29|
+-----+-------------------+----------+-----------+--------+------+--------+-------+-------+--------+------+--------+--------+-----+-------+-------------------+------+------+------+-------+------+-------+--------+----------+
only showing top 10 rows
Create Table
Create Table Using Dataframe
This command create table using infered schema.
>>> df.registerTempTable("qacctall")
Create Empty Table Using Specified Schema
Given SQL statement as
CREATE TABLE qacctdate (subT_DATE String, COUNT_USER FLOAT, COUNT_NUM FLOAT, SUMQ_TIME Float, SUMELP_TIME Float, SUMCPU_TIME FLOAT)
We must decompose the whole statement into several steps:
1. Specify Your Schema
>>> schema = StructType([
StructField("subT_DATE", StringType(), True),
StructField("COUNT_USER", FloatType(), True),
StructField("COUNT_NUM", FloatType(), True),
StructField("SUMQ_TIME", FloatType(), True),
StructField("SUMELP_TIME", FloatType(), True),
StructField("SUMCPU_TIME", FloatType(), True)
])
2. Create Empty Table
>>> df2 = sqlContext.createDataFrame([], schema)
>>> df2.registerTempTable("qacctdate")
- pyspark.sql.types.StructType(fields=None)
- Struct type, consisting of a list of StructField.
- This is the data type representing a Row.
- pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None)
- A field in StructType.
- Parameters:
- name – string, name of the field.
- dataType – DataType of the field.
- nullable – boolean, whether the field can be null (None) or not.
- metadata – a dict from string to simple type that can be toInternald to JSON automatically
If you want to use any other data types on spark, please refer to Spark SQL and DataFrame Guide.
Create Table Using Another Table
Given SQL statement as
CREATE TABLE new_table_name AS
SELECT column1, column2,...
FROM existing_table_name
WHERE ....;
For example,
CREATE TABLE qacctdateorder
SELECT *
FROM qacctdate
ORDER BY subT_DATE;
We can simply use the following command to execute it on spark.
Step 1. Select rows
>>> df_ordered = sqlContext.sql("SELECT * FROM qacctdate ORDER BY subT_DATE")
>>> df_ordered.show(5)
+----------+----------+---------+---------+-----------+--------------------+
| subT_DATE|COUNT_USER|COUNT_NUM|SUMQ_TIME|SUMELP_TIME| SUMCPU_TIME|
+----------+----------+---------+---------+-----------+--------------------+
|2011-08-01| 33.0| 1996.0|5296074.0| 4873626.0|1.0732683480000006E7|
|2011-08-01| 33.0| 1996.0|5296074.0| 4873626.0|1.0732683480000006E7|
|2011-08-02| 32.0| 4928.0|3163158.0|1.3153414E7|2.6667715799999997E7|
|2011-08-02| 32.0| 4928.0|3163158.0|1.3153414E7|2.6667715799999997E7|
|2011-08-03| 24.0| 5014.0|9473140.0|1.8178348E7| 7.8896286476E8|
+----------+----------+---------+---------+-----------+--------------------+
only showing top 5 rows
Step 2. Create Another Table
>>> df_ordered.registerTempTable("qacctdateorder")
Execute SQL Statements
For all SQL statements, please refer to Supported syntax of Spark SQL
Select
Given SQL statement as
FROM qacctall
SELECT substr(T_DATE, 1, 10), count(Distinct LOGIN_NAME), count(*), sum(Q_TIME), sum(ELP_TIME), sum(CPU_TIME)
GROUP BY substr(T_DATE, 1, 10);
We can simply use the following command to execute it on spark.
>>> df_select = sqlContext.sql("SELECT substr(T_DATE, 1, 10), count(Distinct LOGIN_NAME), count(*), sum(Q_TIME), sum(ELP_TIME), sum(CPU_TIME) FROM qacctall GROUP BY substr(T_DATE, 1, 10)")
>>> df_select.show(10)
+----------+---+----+---------+-----------+--------------------+
| _c0|_c1| _c2| _c3| _c4| _c5|
+----------+---+----+---------+-----------+--------------------+
|2011-11-30| 58|7682|1793154.0|5.8867728E7| 4.248628650000001E7|
|2011-09-30| 59| 636| 2376.0|2.5517608E7|3.1475379480000004E7|
|2011-10-30| 44| 432| 94204.0|6.4102362E7| 4.377035656000001E7|
|2011-10-31| 63|1300| 57584.0|4.7405606E7|1.5582149653999996E9|
|2011-08-30| 38| 454|3106736.0|1.1878624E7| 1.06372894726E9|
|2011-08-31| 40|1860|1533592.0|1.5382988E7|3.1321356601000004E9|
|2011-12-01| 57|8070|2161710.0|5.7086352E7|3.5904708620000005E7|
|2011-12-02| 56|7552| 812038.0|2.3611752E7|1.2171202900000008E7|
|2011-12-03| 49|1772|1550372.0| 7.316791E7| 5.957460229999995E7|
|2011-12-04| 39| 444| 601936.0| 2.24238E7|1.2607418140000004E7|
+----------+---+----+---------+-----------+--------------------+
only showing top 10 rows
New column names can be specified.
>>> df_select = sqlContext.sql("SELECT substr(T_DATE, 1, 10) AS subT_DATE, count(Distinct LOGIN_NAME) AS COUNT_USER, count(*) AS COUNT_NUM, sum(Q_TIME) AS SUMQ_TIME, sum(ELP_TIME) AS SUMELP_TIME, sum(CPU_TIME) AS SUMCPU_TIME FROM qacctall GROUP BY substr(T_DATE, 1, 10)")
>>> df_select.show(10)
+----------+----------+---------+---------+-----------+--------------------+
| subT_DATE|COUNT_USER|COUNT_NUM|SUMQ_TIME|SUMELP_TIME| SUMCPU_TIME|
+----------+----------+---------+---------+-----------+--------------------+
|2011-11-30| 58| 7682|1793154.0|5.8867728E7| 4.248628650000001E7|
|2011-09-30| 59| 636| 2376.0|2.5517608E7|3.1475379480000004E7|
|2011-10-30| 44| 432| 94204.0|6.4102362E7| 4.377035656000001E7|
|2011-10-31| 63| 1300| 57584.0|4.7405606E7|1.5582149653999996E9|
|2011-08-30| 38| 454|3106736.0|1.1878624E7| 1.06372894726E9|
|2011-08-31| 40| 1860|1533592.0|1.5382988E7|3.1321356601000004E9|
|2011-12-01| 57| 8070|2161710.0|5.7086352E7|3.5904708620000005E7|
|2011-12-02| 56| 7552| 812038.0|2.3611752E7|1.2171202900000008E7|
|2011-12-03| 49| 1772|1550372.0| 7.316791E7| 5.957460229999995E7|
|2011-12-04| 39| 444| 601936.0| 2.24238E7|1.2607418140000004E7|
+----------+----------+---------+---------+-----------+--------------------+
only showing top 10 rows
INSERT INTO SELECT
Given SQL statement as
INSERT INTO table2 (column1, column2, column3, ...)
SELECT column1, column2, column3, ...
FROM table1
WHERE condition;
For example,
FROM qacctall
INSERT OVERWRITE TABLE qacctdate
SELECT substr(T_DATE, 1, 10), count(Distinct LOGIN_NAME), count(*), sum(Q_TIME), sum(ELP_TIME), sum(CPU_TIME)
GROUP BY substr(T_DATE, 1, 10);
We must decompose the whole statement into several steps:
1. SELECT rows FROM table “qacctall”
>>> df_rows = sqlContext.sql("SELECT substr(T_DATE, 1, 10) AS subT_DATE, count(Distinct LOGIN_NAME) AS COUNT_USER, count(*) AS COUNT_NUM, sum(Q_TIME) AS SUMQ_TIME, sum(ELP_TIME) AS SUMELP_TIME, sum(CPU_TIME) AS SUMCPU_TIME FROM qacctall GROUP BY substr(T_DATE, 1, 10)")
2. Make sure the schema of the selected rows is the same as that of the table to be inserted.
Simple check:
>>> df_table = sqlContext.sql("SELECT * FROM qacctdate")
>>> df_rows.schema == df_table.schema
If False
is shown, then modify the schema of the selected rows to be the same as the table.
>>> df_rows = df_rows.select(
df_rows.subT_DATE,
df_rows.COUNT_USER.cast("float"),
df_rows.COUNT_NUM.cast("float"),
df_rows.SUMQ_TIME.cast("float"),
df_rows.SUMELP_TIME.cast("float"),
df_rows.SUMCPU_TIME.cast("float")
)
>>> df_rows = sqlContext.createDataFrame(df_rows.collect(), df_table.schema)
For more detail, please refer to How to Change Schema of a Spark SQL DataFrame?
3. INSERT into table “qacctdate”
>>> df_writer = DataFrameWriter(df_rows)
>>> df_writer.insertInto("qacctdate")
- pyspark.sql.DataFrameWriter.insertInto(tableName, overwrite=False)[source]
- Inserts the content of the DataFrame to the specified table.
- It requires that the schema of the class:DataFrame is the same as the schema of the table.
Union
Generally, Spark sql can not insert or update directly using simple sql statement, unless you use Hive Context. Alternatively, we can use unionAll
to achieve the same goal as insert
.
For example,
FROM qacctall
INSERT OVERWRITE TABLE qacctdate
SELECT substr(T_DATE, 1, 10), count(Distinct LOGIN_NAME), count(*), sum(Q_TIME), sum(ELP_TIME), sum(CPU_TIME)
GROUP BY substr(T_DATE, 1, 10);
We can simply use the following command to execute it on spark.
>>> df_rows = sqlContext.sql("SELECT substr(T_DATE, 1, 10) AS subT_DATE, count(Distinct LOGIN_NAME) AS COUNT_USER, count(*) AS COUNT_NUM, sum(Q_TIME) AS SUMQ_TIME, sum(ELP_TIME) AS SUMELP_TIME, sum(CPU_TIME) AS SUMCPU_TIME FROM qacctall GROUP BY substr(T_DATE, 1, 10)")
>>> sqlContext.sql("SELECT * FROM qacctdate").unionAll(df_rows).registerTempTable("qacctdate")
Export Table
For details about different types of data sources available in SparkSQL, please refer to Spark SQL - Data Sources
Save to Parquet File
Parquet is a columnar format, supported by many data processing systems.
>>> df = sqlContext.sql("SELECT * FROM qacctall")
>>> df.write.format('parquet').save('./qacctall')
INFO ParquetRelation: Listing hdfs://YOUR_ACCOUNT_HOME/qacctall on driver
And when the next time you want to load this table, you can simply follow these commands:
>>> df = sqlContext.load("./qacctall", "parquet")
>>> df.registerTempTable("qacctall");
Save to JSON Datasets
df.write.format('json').save('./qacctall')
INFO JSONRelation: Listing hdfs://YOUR_ACCOUNT_HOME/qacctall on driver
And when the next time you want to load this table, you can simply follow these commands:
>>> df = sqlContext.load("./qacctall", "json")
>>> df.registerTempTable("qacctall");