Simple Examples with Spark Streaming
Types of queries one wants on answer on a data stream:
- Sampling data from a stream - Construct a random sample
- Queries over sliding windows - Number of items of type x in the last k elements of the stream
- Filtering a data stream - Select elements with property x from the stream
- Counting distinct elements - Number of distinct elements in the last k elements of the stream
- Estimating moments - Estimate average/std deviation of last k elements
- Finding frequent elements
Quick Example: WordCount
In this example we would read files written in a directory as a stream of data.
Step 1. Create A Java Project
Canonical Maven directory structure
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/JavaNetworkWordCount.java
The content of ./pom.xml
and ./src/main/java/JavaNetworkWordCount.java
are provided in below.
Main Class
/* JavaNetworkWordCount.java */
import ...
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*/
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
String dataDirectory = "remoteFolder/streaming";
JavaDStream<String> lines =ssc.textFileStream(dataDirectory);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)));
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
Spark Streaming will monitor the directory dataDirectory
(= "remoteFolder/streaming"
) and process any files created in that directory (files written in nested directories not supported).
Note that
- The files must have the same data format.
- The files must be created in
dataDirectory
by atomically moving into or renaming.
Project Dependencies
<!--pom.xml-->
<project>
<groupId>org.nchc.spark</groupId>
<artifactId>spark-sample</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>JavaNetworkWordCount</name>
<packaging>jar</packaging>
<version>0.0.1</version>
<properties>
<java.version>1.8</java.version>
<spark.version>1.5.0</spark.version>
</properties>
<dependencies>
<!-- Spark core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
- Note that
SparkConf().setAppName
in main java class must matchproject->name
in pom.xml. - Note that
project->dependencies
in pom.xml must contain all libraries we import in our java classes.
Step 2. Package
The Project using Maven
# Package a JAR containing your application
$ mvn clean package
...
[INFO] Building jar: {..}/{..}/target/spark-sample-0.0.1.jar
Step 3. Prepare Streaming Data
Create the directory named the same as variable dataDirectory
you set in your project.
$ hadoop fs -mkdir remoteFolder/streaming
Then create a file named hello
with data you want to receive as a stream.
$ vi hello
hello world
Step 4. Run The Project
Open a terminal, named “Terminal A”, to execute the following commnd.
$ spark-submit --class "JavaNetworkWordCount" JavaNetworkWordCount/target/spark-sample-0.0.1.jar
Step 5. Move Files into Streaming Directory
Open another terminal, named “Terminal B” to execute the following commnds.
$ hadoop fs -rm remoteFolder/streaming/hello
$ hadoop fs -copyFromLocal hello remoteFolder/streaming
$ hadoop fs -cat remoteFolder/streaming/*
Then in “Terminal A” you would see the following information show on screen.
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
If you didn’t see, execute the same commands repeatedly until you see.
$ hadoop fs -rm remoteFolder/streaming/hello
$ hadoop fs -copyFromLocal hello remoteFolder/streaming
$ hadoop fs -cat remoteFolder/streaming/*
DataFrame and SQL Operations
You can easily use DataFrames and SQL operations on streaming data.
Step 1. Create A Java Project
Canonical Maven directory structure
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/JavaRow.java
./src/main/java/JavaSQLNetworkWordCount.java
The content of ./pom.xml
, ./src/main/java/JavaNetworkWordCount.java
, and ./src/main/java/JavaRow.java are provided in below.
Classes
/* JavaRow.java */
/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
private String word;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
/* JavaSQLNetworkWordCount.java */
import ...
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*/
public final class JavaSQLNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private static final Pattern COMMA = Pattern.compile(",");
public static void main(String[] args) throws Exception {
// Create the context with a 5 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaSQLNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
String dataDirectory = "remoteFolder/streaming";
JavaDStream<String> lines =ssc.textFileStream(dataDirectory);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)));
words.foreachRDD( (rdd, time) -> {
// Get the singleton instance of SQLContext
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
JavaRow record = new JavaRow();
record.setWord(word);
return record;
});
DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
// Register as table
wordsDataFrame.registerTempTable("words");
// Do word count on table using SQL and print it
DataFrame wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word");
wordCountsDataFrame.show();
return null;
}
);
ssc.start();
ssc.awaitTermination();
}
}
Spark Streaming will monitor the directory dataDirectory
(= "remoteFolder/streaming"
) and process any files created in that directory (files written in nested directories not supported).
Note that
- The files must have the same data format.
- The files must be created in
dataDirectory
by atomically moving into or renaming.
Project Dependencies (add spark-sql)
<!--pom.xml-->
<project>
<groupId>org.nchc.spark</groupId>
<artifactId>spark-sample</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>JavaSQLNetworkWordCount</name>
<packaging>jar</packaging>
<version>0.0.1</version>
<properties>
<java.version>1.8</java.version>
<spark.version>1.5.0</spark.version>
</properties>
<dependencies>
<!-- Spark core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
- Note that
SparkConf().setAppName
in main java class must matchproject->name
in pom.xml. - Note that
project->dependencies
in pom.xml must contain all libraries we import in our java classes.
Step 2. Package
The Project using Maven
# Package a JAR containing your application
$ mvn clean package
...
[INFO] Building jar: {..}/{..}/target/spark-sample-0.0.1.jar
Step 3. Prepare Streaming Data
Create the directory named the same as variable dataDirectory
you set in your project.
$ hadoop fs -mkdir remoteFolder/streaming
Then create a file named hello
with data you want to receive as a stream.
$ vi hello
hello world
Step 4. Run The Project
Open a terminal, named “Terminal A”, to execute the following commnd.
$ spark-submit --class "JavaSQLNetworkWordCount" JavaSQLNetworkWordCount/target/spark-sample-0.0.1.jar
Step 5. Move Files into Streaming Directory
Open another terminal, named “Terminal B” to execute the following commnds.
$ hadoop fs -rm remoteFolder/streaming/hello
$ hadoop fs -copyFromLocal hello remoteFolder/streaming
$ hadoop fs -cat remoteFolder/streaming/*
Then in “Terminal A” you would see the following information show on screen.
+-----+-----+
| word|total|
+-----+-----+
|hello| 1|
|world| 1|
+-----+-----+
If you didn’t see, execute the same commands repeatedly until you see.
$ hadoop fs -rm remoteFolder/streaming/hello
$ hadoop fs -copyFromLocal hello remoteFolder/streaming
$ hadoop fs -cat remoteFolder/streaming/*