Deeplearning4j on Spark: How To Build Data Pipelines
This page provides some guides on how to create data pipelines for both training and evaluation when using Deeplearning4j on Spark.
This page assumes some familiarity with Spark (RDDs, master vs. workers, etc) and Deeplearning4j (networks, DataSet etc).
As with training on a single machine, the final step of a data pipeline should be to produce a DataSet (single features arrays, single label array) or MultiDataSet (one or more feature arrays, one or more label arrays). In the case of DL4J on Spark, the final step of a data pipeline is data in one of the following formats: (a) an RDD<DataSet>
/JavaRDD<DataSet>
(b) an RDD<MultiDataSet>
/JavaRDD<MultiDataSet>
(c) a directory of serialized DataSet/MultiDataSet (minibatch) objects on network storage such as HDFS, S3 or Azure blob storage (d) a directory of minibatches in some other format
Once data is in one of those four formats, it can be used for training or evaluation.
Note: When training multiple models on a single dataset, it is best practice to preprocess your data once, and save it to network storage such as HDFS. Then, when training the network you can call SparkDl4jMultiLayer.fit(String path)
or SparkComputationGraph.fit(String path)
where path
is the directory where you saved the files.
Spark Data Prepration: How-To Guides
This guide shows how to load data contained in one or more CSV files and produce a JavaRDD<DataSet>
for export, training or evaluation on Spark.
The process is fairly straightforward. Note that the DataVecDataSetFunction
is very similar to the RecordReaderDataSetIterator
that is often used for single machine training.
For example, suppose the CSV had the following format - 6 total columns: 5 features followed by an integer class index for classification, and 10 possible classes
we could load this data for classification using the following code:
However, if this dataset was for regression instead, with again 6 total columns, 3 feature columns (positions 0, 1 and 2 in the file rows) and 3 label columns (positions 3, 4 and 5) we could load it using the same process as above, but changing the last 3 lines to:
RecordReaderMultiDataSetIterator (RRMDSI) is the most common way to create MultiDataSet instances for single-machine training data pipelines. It is possible to use RRMDSI for Spark data pipelines, where data is coming from one or more of RDD<List<Writable>>
(for 'standard' data) or RDD<List<List<Writable>>
(for sequence data).
Case 1: Single RDD<List<Writable>>
to RDD<MultiDataSet>
Consider the following single node (non-Spark) data pipeline for a CSV classification task.
The equivalent to the following Spark data pipeline:
For Sequence data (List<List<Writable>>
) you can use SparkSourceDummySeqReader instead.
Case 2: Multiple RDD<List<Writable>>
or RDD<List<List<Writable>>
to RDD<MultiDataSet>
For this case, the process is much the same. However, internaly, a join is used.
As noted at the start of this page, it is considered a best practice to preprocess and export your data once (i.e., save to network storage such as HDFS and reuse), rather than fitting from an RDD<DataSet>
or RDD<MultiDataSet>
directly in each training job.
There are a number of reasons for this:
Better performance (avoid redundant loading/calculation): When fitting multiple models from the same dataset, it is faster to preprocess this data once and save to disk rather than preprocessing it again for every single training run.
Minimizing memory and other resources: By exporting and fitting from disk, we only need to keep the DataSets we are currently using (plus a small async prefetch buffer) in memory, rather than also keeping many unused DataSet objects in memory. Exporting results in lower total memory use and hence we can use larger networks, larger minibatch sizes, or allocate fewer resources to our job.
Avoiding recomputation: When an RDD is too large to fit into memory, some parts of it may need to be recomputed before it can be used (depending on the cache settings). When this occurs, Spark will recompute parts of the data pipeline multiple times, costing us both time and memory. A pre-export step avoids this recomputation entirely.
Step 1: Saving
Saving the DataSet objects once you have an RDD<DataSet>
is quite straightforward:
Keep in mind that this is a map function, so no data will be saved until the paths RDD is executed - i.e., you should follow this with an operation such as:
or
or
Saving an RDD<MultiDataSet>
can be done in the same way using BatchAndExportMultiDataSetsFunction
instead, which takes the same arguments.
Step 2: Loading and Fitting
The exported data can be used in a few ways. First, it can be used to fit a network directly:
Similarly, we can use SparkComputationGraph.fitMultiDataSet(String path)
if we saved an RDD<MultiDataSet>
instead.
Alternatively, we can load up the paths in a few different ways, depending on if or how we saved them:
Then we can execute training on these paths by using methods such as SparkDl4jMultiLayer.fitPaths(JavaRDD<String>)
Another possible workflow is to start with the data pipeline on a single machine, and export the DataSet or MultiDataSet objects for use on the cluster. This workflow clearly isn't as scalable as preparing data on a cluster (you are using just one machine to prepare data) but it can be an easy option in some cases, especially when you have an existing data pipeline.
This section assumes you have an existing DataSetIterator
or MultiDataSetIterator
used for single-machine training. There are many different ways to create one, which is outside of the scope of this guide.
Step 1: Save the DataSets or MultiDataSets
Saving the contents of a DataSet to a local directory can be done using the following code:
Note that for the purposes of Spark, the exact file names don't matter. The process for saving MultiDataSets is almost identical.
As an aside: you can read these saved DataSet objects on a single machine (for non-Spark training) using FileDataSetIterator).
An alternative approach is to save directly to the cluster using output streams, to (for example) HDFS. This can only be done if the machine running the code is properly configured with the required libraries and access rights. For example, to save the DataSets directly to HDFS you could use:
Step 2: Load and Train on a Cluster The saved DataSet objects can then be copied to the cluster or network file storage (for example, using Hadoop FS utilities on a Hadoop cluster), and used as follows:
or alternatively/equivalently, we can list the paths as an RDD using:
An alternative approach is to use Hadoop MapFile and SequenceFiles, which are efficient binary storage formats. This can be used to convert the output of any DataVec RecordReader
or SequenceRecordReader
(including a custom record reader) to a format usable for use on Spark. MapFileRecordWriter and MapFileSequenceRecordWriter require the following dependencies:
Step 1: Create a MapFile Locally In the following example, a CSVRecordReader will be used, but any other RecordReader could be used in its place:
The process for using a SequenceRecordReader
combined with a MapFileSequenceRecordWriter
is virtually the same.
Note also that MapFileRecordWriter
and MapFileSequenceRecordWriter
both support splitting - i.e., creating multiple smaller map files instead of creating one single (potentially multi-GB) map file. Using splitting is recommended when saving data in this manner for use with Spark.
Step 2: Copy to HDFS or other network file storage
The exact process is beyond the scope of this guide. However, it should be sufficient to simply copy the directory ("/map/file/root/dir" in the example above) to a location on HDFS.
Step 3: Read and Convert to RDD<DataSet>
for Training
We can load the data for training using the following:
This guide shows how load CSV files for training an RNN. The assumption is that the dataset is comprised of multiple CSV files, where:
each CSV file represents one sequence
each row/line of the CSV contains the values for one time step (one or more columns/values, same number of values in all rows for all files)
each CSV may contain a different number of lines to other CSVs (i.e., variable length sequences are OK here)
header lines either aren't present in any files, or are present in all files
A data pipeline can be created using the following process:
This guide shows how to create an RDD<DataSet>
for image classification, starting from images stored either locally, or on a network file system such as HDFS.
The approach here used (added in 1.0.0-beta3) is to first preprocess the images into batches of files - FileBatch objects. The motivation for this approach is simple: the original image files typically use efficient compresion (JPEG for example) which is much more space (and network) efficient than a bitmap (int8 or 32-bit floating point) representation. However, on a cluster we want to minimize disk reads due to latency issues with remote storage - one file read/transfer is going to be faster than minibatchSize
remote file reads.
The TinyImageNet example also shows how this can be done.
Note that one limitation of the implementation is that the set of classes (i.e., the class/category labels when doing classification) needs to be known, provided or collected manually. This differs from using ImageRecordReader for classification on a single machine, which can automatically infer the set of class labels.
First, assume the images are in subdirectories based on their class labels. For example, suppose there are two classes, "cat" and "dog", the directory structure would look like:
(Note the file names don't matter in this example - however, the parent directory names are the class labels)
Step 1 (option 1 of 2): Preprocess Locally
Local preprocessing can be done as follows:
The full import for SparkDataUtils is org.deeplearning4j.spark.util.SparkDataUtils
.
After preprocessing is has been completed, the directory can be copied to the cluster for use in training (Step 2).
Step 1 (option 2 of 2): Preprocess using Spark
Alternatively, if the original images are on remote file storage (such as HDFS), we can use the following:
Step 2: Training The data pipeline for image classification can be constructed as follows. This code is taken from the TinyImageNet example:
And that's it.
Note: for other label generation cases (such as labels provided from the filename instead of parent directory), or for tasks such as semantic segmentation, you can substitute a different PathLabelGenerator instead of the default. For example, if the label should come from the file name, you can use PatternPathLabelGenerator
instead. Let's say images are in the format "cat_img1234.jpg", "dog_2309.png" etc. We can use the following process:
Note that PathLabelGenerator returns a Writable object, so for tasks like image segmentation, you can return an INDArray using the NDArrayWritable class in a custom PathLabelGenerator.
DL4J Spark training supports the ability to load data serialized in a custom format. The assumption is that each file on the remote/network storage represents a single minibatch of data in some readable format.
Note that this approach is typically not required or recommended for most users, but is provided as an additional option for advanced users or those with pre-prepared data in a custom format or a format that is not natively supported by DL4J. When files represent a single record/example (instead of a minibatch) in a custom format, a custom RecordReader could be used instead.
The interfaces of note are:
Both of which extend the single-method Loader interface.
Suppose a HDFS directory contains a number of files, each being a minibatch in some custom format. These can be loaded using the following process:
Where the custom loader class looks something like: