How To
Deeplearning4j on Spark: How To Build Data Pipelines
This page provides some guides on how to create data pipelines for both training and evaluation when using Deeplearning4j on Spark.
This page assumes some familiarity with Spark (RDDs, master vs. workers, etc) and Deeplearning4j (networks, DataSet etc).
As with training on a single machine, the final step of a data pipeline should be to produce a DataSet (single features arrays, single label array) or MultiDataSet (one or more feature arrays, one or more label arrays). In the case of DL4J on Spark, the final step of a data pipeline is data in one of the following formats: (a) an
RDD<DataSet>
/JavaRDD<DataSet>
(b) an RDD<MultiDataSet>
/JavaRDD<MultiDataSet>
(c) a directory of serialized DataSet/MultiDataSet (minibatch) objects on network storage such as HDFS, S3 or Azure blob storage (d) a directory of minibatches in some other formatOnce data is in one of those four formats, it can be used for training or evaluation.
Note: When training multiple models on a single dataset, it is best practice to preprocess your data once, and save it to network storage such as HDFS. Then, when training the network you can call
SparkDl4jMultiLayer.fit(String path)
or SparkComputationGraph.fit(String path)
where path
is the directory where you saved the files.Spark Data Prepration: How-To Guides
This guide shows how to load data contained in one or more CSV files and produce a
JavaRDD<DataSet>
for export, training or evaluation on Spark.The process is fairly straightforward. Note that the
DataVecDataSetFunction
is very similar to the RecordReaderDataSetIterator
that is often used for single machine training.For example, suppose the CSV had the following format - 6 total columns: 5 features followed by an integer class index for classification, and 10 possible classes
1.0,3.2,4.5,1.1,6.3,0
1.6,2.4,5.9,0.2,2.2,1
...
we could load this data for classification using the following code:
String filePath = "hdfs:///your/path/some_csv_file.csv";
JavaSparkContext sc = new JavaSparkContext();
JavaRDD<String> rddString = sc.textFile(filePath);
RecordReader recordReader = new CSVRecordReader(',');
JavaRDD<List<Writable>> rddWritables = rddString.map(new StringToWritablesFunction(recordReader));
int labelIndex = 5; //Labels: a single integer representing the class index in column number 5
int numLabelClasses = 10; //10 classes for the label
JavaRDD<DataSet> rddDataSetClassification = rddWritables.map(new DataVecDataSetFunction(labelIndex, numLabelClasses, false));
However, if this dataset was for regression instead, with again 6 total columns, 3 feature columns (positions 0, 1 and 2 in the file rows) and 3 label columns (positions 3, 4 and 5) we could load it using the same process as above, but changing the last 3 lines to:
int firstLabelColumn = 3; //First column index for label
int lastLabelColumn = 5; //Last column index for label
JavaRDD<DataSet> rddDataSetRegression = rddWritables.map(new DataVecDataSetFunction(firstColumnLabel, lastColumnLabel, true, null, null));
RecordReaderMultiDataSetIterator (RRMDSI) is the most common way to create MultiDataSet instances for single-machine training data pipelines. It is possible to use RRMDSI for Spark data pipelines, where data is coming from one or more of
RDD<List<Writable>>
(for 'standard' data) or RDD<List<List<Writable>>
(for sequence data).Case 1: Single
RDD<List<Writable>>
to RDD<MultiDataSet>
Consider the following single node (non-Spark) data pipeline for a CSV classification task.
RecordReader recordReader = new CSVRecordReader(numLinesToSkip,delimiter);
recordReader.initialize(new FileSplit(new ClassPathResource("iris.txt").getFile()));
int batchSize = 32;
int labelColumn = 4;
int numClasses = 3;
MultiDataSetIterator iter = new RecordReaderMultiDataSetIterator.Builder(batchSize)
.addReader("data", recordReader)
.addInput("data", 0, labelColumn-1)
.addOutputOneHot("data", labelColumn, numClasses)
.build();
The equivalent to the following Spark data pipeline:
JavaRDD<List<Writable>> rdd = sc.textFile(f.getPath()).map(new StringToWritablesFunction(new CSVRecordReader()));
MultiDataSetIterator iter = new RecordReaderMultiDataSetIterator.Builder(batchSize)
.addReader("data", new SparkSourceDummyReader(0)) //Note the use of the "SparkSourceDummyReader"
.addInput("data", 0, labelColumn-1)
.addOutputOneHot("data", labelColumn, numClasses)
.build();
JavaRDD<MultiDataSet> mdsRdd = IteratorUtils.mapRRMDSI(rdd, rrmdsi2);
For Sequence data (
List<List<Writable>>
) you can use SparkSourceDummySeqReader instead.Case 2: Multiple
RDD<List<Writable>>
or RDD<List<List<Writable>>
to RDD<MultiDataSet>
For this case, the process is much the same. However, internaly, a join is used.
JavaRDD<List<Writable>> rdd1 = ...
JavaRDD<List<Writable>> rdd2 = ...
RecordReaderMultiDataSetIterator rrmdsi = new RecordReaderMultiDataSetIterator.Builder(batchSize)
.addReader("rdd1", new SparkSourceDummyReader(0)) //0 = use first rdd in list
.addReader("rdd2", new SparkSourceDummyReader(1)) //1 = use second rdd in list
.addInput("rdd1", 1, 2) //
.addOutput("rdd2", 1, 2)
.build();
List<JavaRDD<List<Writable>>> list = Arrays.asList(rdd1, rdd2);
int[] keyIdxs = new int[]{0,0}; //Column 0 in rdd1 and rdd2 is the 'key' used for joining
boolean filterMissing = false; //If true: filter out any records that don't have matching keys in all RDDs
JavaRDD<MultiDataSet> mdsRdd = IteratorUtils.mapRRMDSI(list, null, keyIdxs, null, filterMissing, rrmdsi);
As noted at the start of this page, it is considered a best practice to preprocess and export your data once (i.e., save to network storage such as HDFS and reuse), rather than fitting from an
RDD<DataSet>
or RDD<MultiDataSet>
directly in each training job.There are a number of reasons for this:
- Better performance (avoid redundant loading/calculation): When fitting multiple models from the same dataset, it is faster to preprocess this data once and save to disk rather than preprocessing it again for every single training run.
- Minimizing memory and other resources: By exporting and fitting from disk, we only need to keep the DataSets we are currently using (plus a small async prefetch buffer) in memory, rather than also keeping many unused DataSet objects in memory. Exporting results in lower total memory use and hence we can use larger networks, larger minibatch sizes, or allocate fewer resources to our job.
- Avoiding recomputation: When an RDD is too large to fit into memory, some parts of it may need to be recomputed before it can be used (depending on the cache settings). When this occurs, Spark will recompute parts of the data pipeline multiple times, costing us both time and memory. A pre-export step avoids this recomputation entirely.
Step 1: Saving
Saving the DataSet objects once you have an
RDD<DataSet>
is quite straightforward:JavaRDD<DataSet> rddDataSet = ...
int minibatchSize = 32; //Minibatch size of the saved DataSet objects
String exportPath = "hdfs:///path/to/export/data";
JavaRDD<String> paths = rddDataSet.mapPartitionsWithIndex(new BatchAndExportDataSetsFunction(minibatchSize, exportPath), true);
Keep in mind that this is a map function, so no data will be saved until the paths RDD is executed - i.e., you should follow this with an operation such as:
paths.saveAsTextFile("hdfs:///path/to/text/file.txt"); //Specified file will contain paths/URIs of all saved DataSet objects
or
List<String> paths = paths.collect(); //Collection of paths/URIs of all saved DataSet objects
or
paths.foreach(new VoidFunction<String>() {
@Override
public void call(String path) {
//Some operation on each path
}
});
Saving an
RDD<MultiDataSet>
can be done in the same way using BatchAndExportMultiDataSetsFunction
instead, which takes the same arguments.Step 2: Loading and Fitting
The exported data can be used in a few ways. First, it can be used to fit a network directly:
String exportPath = "hdfs:///path/to/export/data";
SparkDl4jMultiLayer net = ...
net.fit(exportPath); //Loads the serialized DataSet objects found in the 'exportPath' directory
Similarly, we can use
SparkComputationGraph.fitMultiDataSet(String path)
if we saved an RDD<MultiDataSet>
instead.Alternatively, we can load up the paths in a few different ways, depending on if or how we saved them:
JavaSparkContext sc = new JavaSparkContext();
//If we used saveAsTextFile:
String saveTo = "hdfs:///path/to/text/file.txt";
paths.saveAsTextFile(saveTo); //Save
JavaRDD<String> loadedPaths = sc.textFile(saveTo); //Load
//If we used collecting:
List<String> paths = paths.collect(); //Collect
JavaRDD<String> loadedPaths = sc.parallelize(paths); //Parallelize
//If we want to list the directory contents:
String exportPath = "hdfs:///path/to/export/data";
JavaRDD<String> loadedPaths = SparkUtils.listPaths(sc, exportPath); //List paths using org.deeplearning4j.spark.util.SparkUtils
Then we can execute training on these paths by using methods such as
SparkDl4jMultiLayer.fitPaths(JavaRDD<String>)