Preparing Data

  • Notice 1. that the target class should be at the last column.
  • Notice 2. the header column should be removed from the input dataset.
Year Month DayofMonth DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime UniqueCarrier FlightNum ... TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay
0 2000 1 28 5 1647.0 1647 1906.0 1859 HP 154 ... 15 11 0 NaN 0 NaN NaN NaN NaN NaN
1 2000 1 29 6 1648.0 1647 1939.0 1859 HP 154 ... 5 47 0 NaN 0 NaN NaN NaN NaN NaN
2 2000 1 30 7 NaN 1647 NaN 1859 HP 154 ... 0 0 1 NaN 0 NaN NaN NaN NaN NaN
3 2000 1 31 1 1645.0 1647 1852.0 1859 HP 154 ... 7 14 0 NaN 0 NaN NaN NaN NaN NaN
4 2000 1 1 6 842.0 846 1057.0 1101 HP 609 ... 3 8 0 NaN 0 NaN NaN NaN NaN NaN

5 rows × 29 columns

Application Infrastructure

Canonical Maven directory structure

$ find .

Main Program

public class LogisticRegression {
  public static void main(String[] args) {
    String inputFile = args[0];
    String outputPath = args[1];
    SparkConf conf = new SparkConf().setAppName("LogisticRegression");
    SparkContext sc = new SparkContext(conf);
     // Load and parse the data
    int minPartition = 1;
    RDD<String> input = sc.textFile(inputFile, minPartition);
    JavaRDD<String> data = input.toJavaRDD(); 
    JavaRDD<LabeledPoint> parsedData = -> {
      String[] features = line.split(",");
      double[] v = new double[features.length-1];
      for (int i = 0; i < features.length - 1; i++) {
        v[i] = Double.parseDouble(features[i]);
      return new LabeledPoint(Double.parseDouble(features[features.length-1]), Vectors.dense(v));

    // Split initial RDD into two... [60% training data, 40% testing data].
    JavaRDD<LabeledPoint>[] splits = parsedData.randomSplit(new double[] {0.6, 0.4}, 11L);
    JavaRDD<LabeledPoint> training = splits[0].cache();
    JavaRDD<LabeledPoint> test = splits[1];

    // Run training algorithm to build the model.
    LogisticRegressionModel model = new LogisticRegressionWithLBFGS()

    // Compute raw scores on the test set.
    JavaRDD<Tuple2<Object, Object>> predictionAndLabels =
      new Function<LabeledPoint, Tuple2<Object, Object>>() {
        public Tuple2<Object, Object> call(LabeledPoint p) {
          Double prediction = model.predict(p.features());
          return new Tuple2<Object, Object>(prediction, p.label());

    // Save Prediction result

    // Get evaluation metrics.
    MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
    double precision = metrics.precision();
    System.out.println("Precision = " + precision);

    // Save and load model, outputPath+"/model/LogisticRegressionModel");



Deploy Application To Train The Model

cd LogisticRegression
mvn package
hadoop fs -rm -r test/output
spark-submit  --class "LogisticRegression"  target/spark-sample-0.0.1.jar test/input test/output
hadoop fs -copyToLocal test/output/model/LogisticRegressionModel ./Model

Package the application using Maven.

# Package a JAR containing your application
$ mvn package
[INFO] Building jar: {..}/{..}/target/spark-sample-0.0.1.jar

Prepare input file and output environment

# Make input directory
$ hadoop fs -mkdir test/input
# Copy input file(s) from local to remote
$ hadoop fs -copyFromLocal ./input/logistic_input test/input
# Remove output directory to prevent conflicts 
$ hadoop fs -rm -r test/output

Execute it with spark-submit

# Use spark-submit to run your application
$ spark-submit \
  --class "LogisticRegression" \

Check Precision

The application would split the input data into 2 parts (60% train, 40% test) and print out the precision of the trained model.

INFO Precision = 0.81475

Fetched The Trained Model

If you want to use this model in your next step, you can simply load it.

LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc, "test/output/model/LogisticRegressionModel");

Or you can also copy the trained model from remote to local.

hadoop fs -copyToLocal test/output/model/LogisticRegressionModel ./Model