Deeplearning4j on Spark: How To Build Data Pipelines
This page provides some guides on how to create data pipelines for both training and evaluation when using Deeplearning4j on Spark.
This page assumes some familiarity with Spark (RDDs, master vs. workers, etc) and Deeplearning4j (networks, DataSet etc).
As with training on a single machine, the final step of a data pipeline should be to produce a DataSet (single features arrays, single label array) or MultiDataSet (one or more feature arrays, one or more label arrays). In the case of DL4J on Spark, the final step of a data pipeline is data in one of the following formats: (a) an RDD<DataSet>
/JavaRDD<DataSet>
(b) an RDD<MultiDataSet>
/JavaRDD<MultiDataSet>
(c) a directory of serialized DataSet/MultiDataSet (minibatch) objects on network storage such as HDFS, S3 or Azure blob storage (d) a directory of minibatches in some other format
Once data is in one of those four formats, it can be used for training or evaluation.
Note: When training multiple models on a single dataset, it is best practice to preprocess your data once, and save it to network storage such as HDFS. Then, when training the network you can call SparkDl4jMultiLayer.fit(String path)
or SparkComputationGraph.fit(String path)
where path
is the directory where you saved the files.
Spark Data Prepration: How-To Guides
This guide shows how to load data contained in one or more CSV files and produce a JavaRDD<DataSet>
for export, training or evaluation on Spark.
The process is fairly straightforward. Note that the DataVecDataSetFunction
is very similar to the RecordReaderDataSetIterator
that is often used for single machine training.
For example, suppose the CSV had the following format - 6 total columns: 5 features followed by an integer class index for classification, and 10 possible classes
we could load this data for classification using the following code:
However, if this dataset was for regression instead, with again 6 total columns, 3 feature columns (positions 0, 1 and 2 in the file rows) and 3 label columns (positions 3, 4 and 5) we could load it using the same process as above, but changing the last 3 lines to:
RecordReaderMultiDataSetIterator (RRMDSI) is the most common way to create MultiDataSet instances for single-machine training data pipelines. It is possible to use RRMDSI for Spark data pipelines, where data is coming from one or more of RDD<List<Writable>>
(for 'standard' data) or RDD<List<List<Writable>>
(for sequence data).
Case 1: Single RDD<List<Writable>>
to RDD<MultiDataSet>
Consider the following single node (non-Spark) data pipeline for a CSV classification task.
The equivalent to the following Spark data pipeline:
For Sequence data (List<List<Writable>>
) you can use SparkSourceDummySeqReader instead.
Case 2: Multiple RDD<List<Writable>>
or RDD<List<List<Writable>>
to RDD<MultiDataSet>
For this case, the process is much the same. However, internaly, a join is used.
As noted at the start of this page, it is considered a best practice to preprocess and export your data once (i.e., save to network storage such as HDFS and reuse), rather than fitting from an RDD<DataSet>
or RDD<MultiDataSet>
directly in each training job.
There are a number of reasons for this:
Better performance (avoid redundant loading/calculation): When fitting multiple models from the same dataset, it is faster to preprocess this data once and save to disk rather than preprocessing it again for every single training run.
Minimizing memory and other resources: By exporting and fitting from disk, we only need to keep the DataSets we are currently using (plus a small async prefetch buffer) in memory, rather than also keeping many unused DataSet objects in memory. Exporting results in lower total memory use and hence we can use larger networks, larger minibatch sizes, or allocate fewer resources to our job.
Avoiding recomputation: When an RDD is too large to fit into memory, some parts of it may need to be recomputed before it can be used (depending on the cache settings). When this occurs, Spark will recompute parts of the data pipeline multiple times, costing us both time and memory. A pre-export step avoids this recomputation entirely.
Step 1: Saving
Saving the DataSet objects once you have an RDD<DataSet>
is quite straightforward:
Keep in mind that this is a map function, so no data will be saved until the paths RDD is executed - i.e., you should follow this with an operation such as:
or
or
Saving an RDD<MultiDataSet>
can be done in the same way using BatchAndExportMultiDataSetsFunction
instead, which takes the same arguments.
Step 2: Loading and Fitting
The exported data can be used in a few ways. First, it can be used to fit a network directly:
Similarly, we can use SparkComputationGraph.fitMultiDataSet(String path)
if we saved an RDD<MultiDataSet>
instead.
Alternatively, we can load up the paths in a few different ways, depending on if or how we saved them:
Then we can execute training on these paths by using methods such as SparkDl4jMultiLayer.fitPaths(JavaRDD<String>)
Another possible workflow is to start with the data pipeline on a single machine, and export the DataSet or MultiDataSet objects for use on the cluster. This workflow clearly isn't as scalable as preparing data on a cluster (you are using just one machine to prepare data) but it can be an easy option in some cases, especially when you have an existing data pipeline.
This section assumes you have an existing DataSetIterator
or MultiDataSetIterator
used for single-machine training. There are many different ways to create one, which is outside of the scope of this guide.
Step 1: Save the DataSets or MultiDataSets
Saving the contents of a DataSet to a local directory can be done using the following code:
Note that for the purposes of Spark, the exact file names don't matter. The process for saving MultiDataSets is almost identical.
As an aside: you can read these saved DataSet objects on a single machine (for non-Spark training) using FileDataSetIterator).
An alternative approach is to save directly to the cluster using output streams, to (for example) HDFS. This can only be done if the machine running the code is properly configured with the required libraries and access rights. For example, to save the DataSets directly to HDFS you could use:
Step 2: Load and Train on a Cluster The saved DataSet objects can then be copied to the cluster or network file storage (for example, using Hadoop FS utilities on a Hadoop cluster), and used as follows:
or alternatively/equivalently, we can list the paths as an RDD using:
An alternative approach is to use Hadoop MapFile and SequenceFiles, which are efficient binary storage formats. This can be used to convert the output of any DataVec RecordReader
or SequenceRecordReader
(including a custom record reader) to a format usable for use on Spark. MapFileRecordWriter and MapFileSequenceRecordWriter require the following dependencies:
Step 1: Create a MapFile Locally In the following example, a CSVRecordReader will be used, but any other RecordReader could be used in its place:
The process for using a SequenceRecordReader
combined with a MapFileSequenceRecordWriter
is virtually the same.
Note also that MapFileRecordWriter
and MapFileSequenceRecordWriter
both support splitting - i.e., creating multiple smaller map files instead of creating one single (potentially multi-GB) map file. Using splitting is recommended when saving data in this manner for use with Spark.
Step 2: Copy to HDFS or other network file storage
The exact process is beyond the scope of this guide. However, it should be sufficient to simply copy the directory ("/map/file/root/dir" in the example above) to a location on HDFS.
Step 3: Read and Convert to RDD<DataSet>
for Training
We can load the data for training using the following:
This guide shows how load CSV files for training an RNN. The assumption is that the dataset is comprised of multiple CSV files, where:
each CSV file represents one sequence
each row/line of the CSV contains the values for one time step (one or more columns/values, same number of values in all rows for all files)
each CSV may contain a different number of lines to other CSVs (i.e., variable length sequences are OK here)
header lines either aren't present in any files, or are present in all files
A data pipeline can be created using the following process:
This guide shows how to create an RDD<DataSet>
for image classification, starting from images stored either locally, or on a network file system such as HDFS.
The approach here used (added in 1.0.0-beta3) is to first preprocess the images into batches of files - FileBatch objects. The motivation for this approach is simple: the original image files typically use efficient compresion (JPEG for example) which is much more space (and network) efficient than a bitmap (int8 or 32-bit floating point) representation. However, on a cluster we want to minimize disk reads due to latency issues with remote storage - one file read/transfer is going to be faster than minibatchSize
remote file reads.
The TinyImageNet example also shows how this can be done.
Note that one limitation of the implementation is that the set of classes (i.e., the class/category labels when doing classification) needs to be known, provided or collected manually. This differs from using ImageRecordReader for classification on a single machine, which can automatically infer the set of class labels.
First, assume the images are in subdirectories based on their class labels. For example, suppose there are two classes, "cat" and "dog", the directory structure would look like:
(Note the file names don't matter in this example - however, the parent directory names are the class labels)
Step 1 (option 1 of 2): Preprocess Locally
Local preprocessing can be done as follows:
The full import for SparkDataUtils is org.deeplearning4j.spark.util.SparkDataUtils
.
After preprocessing is has been completed, the directory can be copied to the cluster for use in training (Step 2).
Step 1 (option 2 of 2): Preprocess using Spark
Alternatively, if the original images are on remote file storage (such as HDFS), we can use the following:
Step 2: Training The data pipeline for image classification can be constructed as follows. This code is taken from the TinyImageNet example:
And that's it.
Note: for other label generation cases (such as labels provided from the filename instead of parent directory), or for tasks such as semantic segmentation, you can substitute a different PathLabelGenerator instead of the default. For example, if the label should come from the file name, you can use PatternPathLabelGenerator
instead. Let's say images are in the format "cat_img1234.jpg", "dog_2309.png" etc. We can use the following process:
Note that PathLabelGenerator returns a Writable object, so for tasks like image segmentation, you can return an INDArray using the NDArrayWritable class in a custom PathLabelGenerator.
DL4J Spark training supports the ability to load data serialized in a custom format. The assumption is that each file on the remote/network storage represents a single minibatch of data in some readable format.
Note that this approach is typically not required or recommended for most users, but is provided as an additional option for advanced users or those with pre-prepared data in a custom format or a format that is not natively supported by DL4J. When files represent a single record/example (instead of a minibatch) in a custom format, a custom RecordReader could be used instead.
The interfaces of note are:
Both of which extend the single-method Loader interface.
Suppose a HDFS directory contains a number of files, each being a minibatch in some custom format. These can be loaded using the following process:
Where the custom loader class looks something like:
Deeplearning4j on Spark: How To Guides
This page contains a number of how-to guides for common distributed training tasks. Note that for guides on building data pipelines, see here.
Before going through these guides, make sure you have read the introduction guide for deeplearning4j Spark training here.
When submitting a training job to a cluster, a typical workflow is to build an "uber-jar" that is submitted to Spark submit. An uber-jar is single JAR file containing all of the dependencies (libraries, class files, etc) required to run a job. Note that Spark submit is a script that comes with a Spark distribution that users submit their job (in the form of a JAR file) to, in order to begin execution of their Spark job.
This guide assumes you already have code set up to train a network on Spark.
Step 1: Decide on the required dependencies.
There is a lot of overlap with single machine training with DL4J and ND4J. For example, for both single machine and Spark training you should include the standard set of deeplearning4j dependencies, such as:
deeplearning4j-core
deeplearning4j-spark
nd4j-native-platform (for CPU-only training)
In addition, you will need to include the Deeplearning4j's Spark module, dl4j-spark_2.10
or dl4j-spark_2.11
. This module is required for both development and execution of Deeplearning4j Spark jobs. Be careful to use the spark version that matches your cluster - for both the Spark version (Spark 1 vs. Spark 2) and the Scala version (2.10 vs. 2.11). If these are mismatched, your job will likely fail at runtime.
Dependency example: Spark 2, Scala 2.11:
Depedency example, Spark 1, Scala 2.10:
Note that if you add a Spark dependency such as spark-core_2.11, this can be set to provided
scope in your pom.xml (see Maven docs for more details), as Spark submit will add Spark to the classpath. Adding this dependency is not required for execution on a cluster, but may be needed if you want to test or debug a Spark-based job on your local machine.
When training on CUDA GPUs, there are a couple of possible cases when adding CUDA dependencies:
Case 1: Cluster nodes have CUDA toolkit installed on the master and worker nodes
When the CUDA toolkit and CuDNN are available on the cluster nodes, we can use a smaller dependency:
If the OS building the uber-jar is the same OS as the cluster: include nd4j-cuda-x.x
If the OS building the uber-jar is different to the cluster OS (i.e., build on Windows, execute Spark on Linux cluster): include nd4j-cuda-x.x-platform
In both cases, include
where x.x is the CUDA version - for example, x.x=9.2 for CUDA 9.2.
Case 2: Cluster nodes do NOT have the CUDA toolkit installed on the master and worker nodes
When CUDA/CuDNN are NOT installed on the cluster nodes, we can do the following:
First, include the dependencies as per 'Case 1' above
Then include the "redist" javacpp-presets for the cluster operating system, as described here: DL4J CuDNN Docs
Step 2: Configure your pom.xml file for building an uber-jar
When using Spark submit, you will need an uber-jar to submit to start and run your job. After configuring the relevant dependencies in step 1, we need to configure the pom.xml file to properly build the uber-jar.
We recommend that you use the maven shade plugin for building an uber-jar. There are alternative tools/plugins for this purpose, but these do not always include all relevant files from the source jars, such as those required for Java's ServiceLoader mechanism to function correctly. (The ServiceLoader mechanism is used by ND4J and a lot of other software libraries).
A Maven shade configuration suitable for this purpose is provided in the example standalone sample project pom.xml file:
Step 3: Build the uber jar
Finally, open up a command line window (bash on Linux, cmd on Windows, etc) simply run mvn package -DskipTests
to build the uber-jar for your project. Note that the uber-jar should be present under <project_root>/target/<project_name>-bin.jar
. Be sure to use the large ...-bin.jar
file as this is the shaded jar with all of the dependencies.
That's is - you should now have an uber-jar that is suitable for submitting to spark-submit for training networks on Spark with CPUs or NVIDA (CUDA) GPUs.
Deeplearning4j and ND4J support GPU acceleration using NVIDA GPUs. DL4J Spark training can also be performed using GPUs.
DL4J and ND4J are designed in such a way that the code (neural network configuration, data pipeline code) is "backend independent". That is, you can write the code once, and execute it on either a CPU or GPU, simply by including the appropriate backend (nd4j-native backend for CPUs, or nd4j-cuda-x.x for GPUs). Executing on Spark is no different from executing on a single node in this respect: you need to simply include the appropriate ND4J backend, and make sure your machines (master/worker nodes in the case) are appropriately set with the CUDA libraries (see the uber-jar guide for running on CUDA without needing to install CUDA/cuDNN on each node).
When running on GPUs, there are a few components: (a) The ND4J CUDA backend (nd4j-cuda-x.x dependency) (b) The CUDA toolkit (c) The Deeplearning4j CUDA dependency to gain cuDNN support (deeplearning4j-cuda-x.x) (d) The cuDNN library files
Both (a) and (b) must be available for ND4J/DL4J to run using an available CUDA GPU run. (c) and (d) are optional, though are recommended to get optimal performance - NVIDIA's cuDNN library is able to significantly speed up training for many layers, such as convolutional layers (ConvolutionLayer, SubsamplingLayer, BatchNormalization, etc) and LSTM RNN layers.
For configuring dependencies for Spark jobs, see the uber-jar section above. For configuring cuDNN on a single node, see Using Deeplearning4j with CuDNN
In some cases, it may make sense to run the master using CPUs only, and the workers using GPUs. If resources (i.e., the number of available GPU machines) are not constrained, it may simply be easier to have a homogeneous cluster: i.e., set up the cluster so that the master is using a GPU for execution also.
Assuming the master/driver is executing on a CPU machine, and the workers are executing on GPU machines, you can simply include both backends (i.e., both the nd4j-cuda-x.x
and nd4j-native
dependencies as described in the uber-jar section).
When multiple backends are present on the classpath, by default the CUDA backend will be tried first. If this cannot be loaded, the CPU (nd4j-native) backend will be loaded second. Thus, if the driver does not have a GPU, it should fall back to using a CPU. However, this default behaviour can be changed by setting the BACKEND_PRIORITY_CPU
or BACKEND_PRIORITY_GPU
environment variables on the master/driver, as described here. The exact process for setting environment variables may depend on the cluster manager - Spark standalone vs. YARN vs. Mesos. Please consult the documentation for each on how to set the environment variables for Spark jobs for the driver/master.
For important background on how memory and memory configuration works for DL4J and ND4J, start by reading Memory management for ND4J/DL4J.
The memory management on Spark is similar to memory management for single node training:
On-heap memory is configured using the standard Java Xms and Xmx memory configuration settings
Off-heap memory is configured using the javacpp system properties
However, memory configuration in the context of Spark adds some additional complications: 1. Often, memory configuration has to be done separately (sometimes using different mechanisms) for the driver/master vs. the workers 2. The approach for configuring memory can depend on the cluster resource manager - Spark standalone vs. YARN vs. Mesos, etc 3. Cluster resource manager default memory settings are often not appropriate for libraries (such as DL4J/ND4J) that rely heavily on off-heap memory
See the Spark documentation for your cluster manager:
You should set 4 things: 1. The worker on-heap memory (Xmx) - usually set as an argument for Spark submit (for example, --executor-memory 4g
for YARN) 2. The worker off-heap memory (javacpp system properties options) (for example, --conf "spark.executor.extraJavaOptions=-Dorg.bytedeco.javacpp.maxbytes=8G"
) 3. The driver on-heap memory - usually set as an 4. The driver off-heap memory
Some notes:
On YARN, it is generally necessary to set the spark.yarn.driver.memoryOverhead
and spark.yarn.executor.memoryOverhead
properties. The default settings are much too small for DL4J training.
On Spark standalone, you can also configure memory by modifying the conf/spark-env.sh
file on each node, as described in the Spark configuration docs. For example, you could add the following lines to set 8GB heap for the driver, 12 GB off-heap for the driver, 12GB heap for the workers, and 18GB off-heap for the workers:
SPARK_DRIVER_OPTS=-Dorg.bytedeco.javacpp.maxbytes=12G
SPARK_DRIVER_MEMORY=8G
SPARK_WORKER_OPTS=-Dorg.bytedeco.javacpp.maxbytes=18G
SPARK_WORKER_MEMORY=12G
All up, this might look like (for YARN, with 4GB on-heap, 5GB off-heap, 6GB YARN off-heap overhead):
One determinant of the performance of training is the frequency of garbage collection. When using Workspaces (see also this), which are enabled by default, it can be helpful to reduce the frequency of garbage collection. For simple machine training (and on the driver) this is easy:
However, setting this on the driver will not change the settings on the workers. Instead, it can be set for the workers as follows:
The default (as of 1.0.0-beta3) is to perform periodic garbage collection every 5 seconds on the workers.
Deeplearning4j and ND4J can utilize Kryo serialization, with appropriate configuration. Note that due to the off-heap memory of INDArrays, Kryo will offer less of a performance benefit compared to using Kryo in other contexts.
To enable Kryo serialization, first add the nd4j-kryo dependency:
where ${dl4j-version}
is the version used for DL4J and ND4J.
Then, at the start of your training job, add the following code:
Note that when using Deeplearning4j's SparkDl4jMultiLayer or SparkComputationGraph classes, a warning will be logged if the Kryo configuration is incorrect.
For DL4J, the only requirement for CUDA GPUs is to use the appropriate backend, with the appropriate NVIDIA libraries either installed on each node, or provided in the uber-JAR (see Spark how-to guide for more details). For recent versions of YARN, some additional configuration may be required in some cases - see the YARN GPU documentation for more details.
Earlier version of YARN (for example, 2.7.x and similar) did not support GPUs natively. For these versions, it is possible to utilize node labels to ensure that jobs are scheduled onto GPU-only nodes. For more details, see the Hadoop Yarn documentation
Note that YARN-specific memory configuration (see memory how-to) is also required.
Configuring Spark locality settings is an optional configuration option that can improve training performance.
The summary: adding --conf spark.locality.wait=0
to your Spark submit configuration may marginally reduce training times, by scheduling the network fit operations to be started sooner.
For more details, see link 1 and link 2.
Deeplearning4j's Spark implementation uses a threshold encoding scheme for sending parameter updates between nodes. This encoding scheme results in a small quantized message, which significantly reduces the network cost of communicating updates. See the technical explanation page for more details on this encoding process.
This threshold encoding process introduces a "distributed training specific" hyperparameter - the encoding threshold. Both too large thresholds and too small thresholds can result in sub-optimal performance:
Large thresholds mean infrequent communication - too infrequent and convergence can suffer
Small thresholds mean more frequent communication - but smaller changes are communicated at each step
The encoding threshold to be used is controlled by the ThresholdAlgorithm. The specific implementation of the ThresholdAlgorithm determines what threshold should be used.
The default behaviour for DL4J is to use AdaptiveThresholdAlgorithm which tries to keep the sparsity ratio in a certain range.
The sparsity ratio is defined as numValues(encodedUpdate)/numParameters - 1.0 means fully dense (all values communicated), 0.0 means fully sparse (no values communicated)
Larger thresholds mean more sparse values (less network communication), and a smaller threshold means less sparse values (more network communication)
The AdaptiveThresholdAlgorithm tries to keep the sparsity ratio between 0.01 and 0.0001 by default. If the sparsity of the updates falls outside of this range, the threshold is either increased or decreased until it is within this range.
An initial threshold value still needs to be set - we have found the
In practice, we have seen that this adaptive threshold process to work well. The built-in implementations for threshold algorithms include:
AdaptiveThresholdAlgorithm
FixedThresholdAlgorithm: a fixed, non-adaptive threshold using the specified encoding threshold.
TargetSparsityThresholdAlgorithm: an adaptive threshold algorithm that targets a specific sparsity, and increases or decreases the threshold to try to match the target.
In addition, DL4J has a ResidualPostProcessor interface, with the default implementation being ResidualClippingPostProcessor which clips the residual vector to a maximum of 5x the current threshold, every 5 steps. The motivation for this is that the "left over" parts of the updates (i.e., those parts not communicated) are store in the residual vector. If the updates are much larger than the threshold, we can have a phenomenon we have termed "residual explosion" - that is, the residual values can continue to grow to many times the threshold (hence would take many steps to communicate the gradient). The residual post processor is used to avoid this phenomenon.
The threshold algorithm (and initial threshold) and the residual post processor can be set as follows:
Finally, DL4J's SharedTrainingMaster also has an encoding debug mode, enabled by setting .encodingDebugMode(true)
in the SharedTrainingmaster builder. When this is enabled, each of the workers will log the current threshold, sparsity, and various other statistics about the encoding. These statistics can be used to determine if the threshold is appropriately set: for example, many updates that are tens or hundreds of times the threshold may indicate the threshold is too low and should be increased; at the other end of the spectrum, very sparse updates (less than one in 10000 values being communicated) may indicate that the threshold should be decreased.
Deeplearning4j supports most standard evaluation metrics for neural networks. For basic information on evaluation, see the Deeplearning4j Evaluation Page
All of the evaluation metrics that Deeplearning4j supports can be calculated in a distributed manner using Spark.
Step 1: Prepare Your Data
Evaluation data for Deeplearinng4j on Spark is very similar to training data. That is, you can use:
RDD<DataSet>
or JavaRDD<DataSet>
for evaluating single input/output networks
RDD<MultiDataSet>
or JavaRDD<MultiDataSet>
for evaluating multi input/output networks
RDD<String>
or JavaRDD<String>
where each String is a path that points to serialized DataSet/MultiDataSet (or other minibatch file-based formats) on network storage such as HDFS.
See the data page (TODO: LINK) for details on how to prepare your data into one of these formats.
Step 2: Prepare Your Network
Creating your network is straightforward. First, load your network (MultiLayerNetwork or ComputationGraph) into memory on the driver using the information from the following guide: How to save (and load) neural networks trained on Spark
Then, simply create your network using:
Note that you don't need to configure a TrainingMaster (i.e., the 3rd argument is null above), as evaluation does not use it.
Step 3: Call the appropriate evaluation method
For common cases, you can call one of the standard evalutation methods on SparkDl4jMultiLayer or SparkComputationGraph:
For performing multiple evaluations simultaneously (more efficient than performing them sequentially) you can use something like:
Note that some of the evaluation methods have overloads with extra parameters, including:
int evalNumWorkers
- the number of evaluation workers - i.e., the number of copies of a network used for evaluation on each node (up to the maximum number of Spark threads per worker). For large networks (or limited cluster memory), you might want to reduce this to avoid running into memory problems.
int evalBatchSize
- the minibatch size to use when performing evaluation. This needs to be large enough to efficiently use the hardware resources, but small enough to not run out of memory. Values of 32-128 is unsually a good starting point; increase when more memory is available and for smaller networks; decrease if memory is a problem.
DataSetLoader loader
and MultiDataSetLoader loader
- these are available when evaluating on a RDD<String>
or JavaRDD<String>
. They are interfaces to load a path into a DataSet or MultiDataSet using a custom user-defined function. Most users will not need to use these, however the functionality is provided for greater flexibility. They would be used for example if the saved minibatch file format is not a DataSet/MultiDataSet but some other (possibly custom) format.
Finally, if you want to save the results of evaluation (of any type) you can save it to JSON format directly to remote storage such as HDFS as follows:
The import for SparkUtils
is org.datavec.spark.transform.utils.SparkUtils
The evaluation can be loaded using:
Deeplearning4j's Spark functionality is built around the idea of wrapper classes - i.e., SparkDl4jMultiLayer
and SparkComputationGraph
internally use the standard MultiLayerNetwork
and ComputationGraph
classes. You can access the internal MultiLayerNetwork/ComputationGraph classes using SparkDl4jMultiLayer.getNetwork()
and SparkComputationGraph.getNetwork()
respectively.
To save on the master/driver's local file system, get the network as described above and simply use the ModelSerializer
class or MultiLayerNetwork.save(File)/.load(File)
and ComputationGraph.save(File)/.load(File)
methods.
To save to (or load from) a remote location or distributed file system such as HDFS, you can use input and output streams.
For example,
Reading is a similar process:
Deeplearning4j's Spark implementation supports distributed inference. That is, we can easily generate predictions on an RDD of inputs using a cluster of machines. This distributed inference can also be used for networks trained on a single machine and loaded for Spark (see the saving/loading section for details on how to load a saved network for use with Spark).
Note: If you want to perform evaluation (i.e., calculate accuracy, F1, MSE, etc), refer to the evaluation how-to instead.
The method signatures for performing distributed inference are as follows:
There are also overloads that accept an input mask array, when required
Note the parameter K
- this is a generic type to signify the unique 'key' used to identify each example. The key values are not used as part of the inference process. This key is required as Spark's RDDs are unordered - without this, we would have no way to know which element in the predictions RDD corresponds to which element in the input RDD. The batch size parameter is used to specify the minibatch size when performing inference. It does not impact the values returned, but instead is used to balance memory use vs. computational efficiency: large batches might compute a little quicker overall, but require more memory. In many cases, a batch size of 64 is a good starting point to try if you are unsure of what to use.
Unfortunately, dependency problems at runtime can occur on a cluster if your project is not configured correctly. These problems can occur with any Spark jobs, not just those using DL4J - and they may be caused by other dependencies or libraries on the classpath, not by Deeplearning4j dependencies.
When dependency problems occur, they typically produce exceptions like:
NoSuchMethodException
ClassNotFoundException
AbstractMethodError
For example, mismatched Spark versions (trying to use Spark 1 on a Spark 2 cluster) can look like:
Another class of errors is the UnsupportedClassVersionError
for example java.lang.UnsupportedClassVersionError: XYZ : Unsupported major.minor version 52.0
- this can result from trying to run (for example) Java 8 code on a cluster that is set up with only a Java 7 JRE/JDK.
How to debug dependency problems:
Step 1: Collect Dependency Information
The first step (when using Maven) is to produce a dependency tree that you can refer to. Open a command line window (for example, bash on Linux, cmd on Windows), navigate to the root directory of your Maven project and run mvn dependency:tree
This will give you a list of dependencies (direct and transient) that can be helpful to understand exactly what is on the classpath, and why.
Note also that mvn dependency:tree -Dverbose
will provide extra information, and can be useful when debugging problems related to mismatched library versions.
Step 2: Check your Spark Versions
When running into dependency issues, check the following.
First: check the Spark versions If your cluster is running Spark 2, you should be using a version of deeplearning4j-spark_2.10/2.11 (and DataVec) that ends with _spark_2
Look through
If you find a problem, you should change your project dependencies as follows: On a Spark 2 (Scala 2.11) cluster, use:
whereas on a Spark 1 (Scala 2.11) cluster, you should use:
Step 3: Check the Scala Versions
Apache Spark is distributed with versions that support both Scala 2.10 and Scala 2.11.
To avoid problems with Scala versions, you need to do two things: (a) Ensure you don't have a mix of Scala 2.10 and Scala 2.11 (or 2.12) dependencies on your project classpath. Check your dependency tree for entries ending in _2.10
or _2.11
: for example, org.apache.spark:spark-core_2.11:jar:1.6.3:compile
is a Spark 1 (1.6.3) dependency using Scala 2.11 (b) Ensure that your project matches what the cluster is using. For example, if you cluster is running Spark 2 with Scala 2.11, all of your Scala dependencies should use 2.11 also. Note that Scala 2.11 is more common for Spark clusters.
If you find mismatched Scala versions, you will need to align them by changing the dependency versions in your pom.xml (or similar configuration file for other dependency management systems). Many libraries (including Spark and DL4J) release dependencies with both Scala 2.10 and 2.11 versions.
Step 4: Check for Mismatched Library Versions
A number of common utility libraries that are widely used across the Java ecosystem are not compatible across versions. For example, Spark might rely on library X version Y and will fail to run when library X version Z is on the classpath. Furthermore, many of these libraries are split into multiple modules (i.e., multiple separate modular dependencies) that won't work correctly when mixing different versions.
Some that can commonly cause problems include:
Jackson
Guava
DL4J and ND4J use versions of these libraries that should avoid dependency conflicts with Spark. However, it is possible that other (3rd party libraries) can pull in versions of these dependencies.
Often, the exception will give a hint of where to look - i.e., the stack trace might include a specific class, which can be used to identify the problematic library.
Step 5: Once Identified, Fix the Dependency Conflict
To debug these sorts of problems, check the dependency tree (the output of mvn dependency:tree -Dverbose
) carefully. Where necessary, you can use exclusions or add the problematic dependency as a direct dependency to force it's version in your probelm. To do this, you would add the dependency of the version you want directly to your project. Often, this is enough to solve the problem.
Keep in mind that when using Spark submit, Spark will add a copy of Spark and it's dependent libraries to the driver and worker classpaths. This means that for dependencies that are added by Spark, you can't simply exclude them in your project - Spark submit will add them at runtime whether you exclude them or not in your project.
One additional setting that is worth knowing about is the (experimental) Spark configuration options, spark.driver.userClassPathFirst
and spark.executor.userClassPathFirst
(See the Spark configuration docs for more details). In some cases, these options may be a fix for dependency issues.
Spark has some issues regarding how it handles Java objects with large off-heap components, such as the DataSet and INDArray objects used in Deeplearning4j. This section explains the issues related to caching/persisting these objects.
The key points to know about are:
MEMORY_ONLY and MEMORY_AND_DISK persistence can be problematic with off-heap memory, due to Spark not properly estimating the size of objects in the RDD. This can lead to out of (off-heap) memory issues.
When persisting a RDD<DataSet>
or RDD<INDArray>
for re-use, use MEMORY_ONLY_SER or MEMORY_AND_DISK_SER
Why MEMORY_ONLY_SER or MEMORY_AND_DISK_SER Are Recommended
One of the way that Apache Spark improves performance is by allowing users to cache data in memory. This can be done using the RDD.cache()
or RDD.persist(StorageLevel.MEMORY_ONLY())
to store the contents in-memory, in deserialized (i.e., standard Java object) form. The basic idea is simple: if you persist a RDD, you can re-use it from memory (or disk, depending on configuration) without having to recalculate it. However, large RDDs may not entirely fit into memory. In this case, some parts of the RDD have to be recomputed or loaded from disk, depending on the storage level used. Furthermore, to avoid using too much memory, Spark will drop parts (blocks) of an RDD when required.
The main storage levels available in Spark are listed below. For an explanation of these, see the Spark Programming Guide.
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
DISK_ONLY
The problem with Spark is how it handles memory. In particular, Spark will drop part of an RDD (a block) based on the estimated size of that block. The way Spark estimates the size of a block depends on the persistence level. For MEMORY_ONLY
and MEMORY_AND_DISK
persistence, this is done by walking the Java object graph - i.e., look at the fields in an object and recursively estimate the size of those objects. This process does not however take into account the off-heap memory used by Deeplearning4j or ND4J. For objects like DataSets and INDArrays (which are stored almost entirely off-heap), Spark significantly under-estimates the true size of the objects using this process. Furthermore, Spark considers only the amount of on-heap memory use when deciding whether to keep or drop blocks. Because DataSet and INDArray objects have a very small on-heap size, Spark will keep too many of them around with MEMORY_ONLY
and MEMORY_AND_DISK
persistence, resulting in off-heap memory being exhausted, causing out of memory issues.
However, for MEMORY_ONLY_SER
and MEMORY_AND_DISK_SER
Spark stores blocks in serialized form, on the Java heap. The size of objects stored in serialized form can be estimated accurately by Spark (there is no off-heap memory component for the serialized objects) and consequently Spark will drop blocks when required - avoiding any out of memory issues.
DL4J's parameter averaging implementation has the option to collect training stats, by using SparkDl4jMultiLayer.setCollectTrainingStats(true)
. When this is enabled, internet access is required to connect to the NTP (network time protocal) server.
It is possible to get errors like NTPTimeSource: Error querying NTP server, attempt 1 of 10
. Sometimes these failures are transient (later retries will work) and can be ignored. However, if the Spark cluster is configured such that one or more of the workers cannot access the internet (or specifically, the NTP server), all retries can fail.
Two solutions are available:
Don't use sparkNet.setCollectTrainingStats(true)
- this functionality is optional (not required for training), and is disabled by default
Set the system to use the local machine clock instead of the NTP server, as the time source (note however that the timeline information may be very inaccurate as a result)
To use the system clock time source, add the following to Spark submit:
When running a Spark on YARN cluster on Ubuntu 16.04 machines, chances are that after finishing a job, all processes owned by the user running Hadoop/YARN are killed. This is related to a bug in Ubuntu, which is documented at https://bugs.launchpad.net/ubuntu/+source/procps/+bug/1610499. There's also a Stackoverflow discussion about it at https://stackoverflow.com/questions/38419078/logouts-while-running-hadoop-under-ubuntu-16-04.
Some workarounds are suggested.
Option 1
Add
to /etc/systemd/logind.conf, and reboot.
Option 2
Copy the /bin/kill binary from Ubuntu 14.04 and use that one instead.
Option 3
Downgrade to Ubuntu 14.04
Option 4
Spark has some issues regarding how it handles Java objects with large off-heap components, such as the DataSet and INDArray objects used in Deeplearning4j. This section explains the issues related to caching/persisting these objects.
The key points to know about are:
MEMORY_ONLY and MEMORY_AND_DISK persistence can be problematic with off-heap memory, due to Spark not properly estimating the size of objects in the RDD. This can lead to out of (off-heap) memory issues.
When persisting a RDD<DataSet>
or RDD<INDArray>
for re-use, use MEMORY_ONLY_SER or MEMORY_AND_DISK_SER
Why MEMORY_ONLY_SER or MEMORY_AND_DISK_SER Are Recommended
One of the way that Apache Spark improves performance is by allowing users to cache data in memory. This can be done using the RDD.cache()
or RDD.persist(StorageLevel.MEMORY_ONLY())
to store the contents in-memory, in deserialized (i.e., standard Java object) form. The basic idea is simple: if you persist a RDD, you can re-use it from memory (or disk, depending on configuration) without having to recalculate it. However, large RDDs may not entirely fit into memory. In this case, some parts of the RDD have to be recomputed or loaded from disk, depending on the storage level used. Furthermore, to avoid using too much memory, Spark will drop parts (blocks) of an RDD when required.
The main storage levels available in Spark are listed below. For an explanation of these, see the Spark Programming Guide.
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
DISK_ONLY
The problem with Spark is how it handles memory. In particular, Spark will drop part of an RDD (a block) based on the estimated size of that block. The way Spark estimates the size of a block depends on the persistence level. For MEMORY_ONLY
and MEMORY_AND_DISK
persistence, this is done by walking the Java object graph - i.e., look at the fields in an object and recursively estimate the size of those objects. This process does not however take into account the off-heap memory used by Deeplearning4j or ND4J. For objects like DataSets and INDArrays (which are stored almost entirely off-heap), Spark significantly under-estimates the true size of the objects using this process. Furthermore, Spark considers only the amount of on-heap memory use when deciding whether to keep or drop blocks. Because DataSet and INDArray objects have a very small on-heap size, Spark will keep too many of them around with MEMORY_ONLY
and MEMORY_AND_DISK
persistence, resulting in off-heap memory being exhausted, causing out of memory issues.
However, for MEMORY_ONLY_SER
and MEMORY_AND_DISK_SER
Spark stores blocks in serialized form, on the Java heap. The size of objects stored in serialized form can be estimated accurately by Spark (there is no off-heap memory component for the serialized objects) and consequently Spark will drop blocks when required - avoiding any out of memory issues.
DL4J's parameter averaging implementation has the option to collect training stats, by using SparkDl4jMultiLayer.setCollectTrainingStats(true)
. When this is enabled, internet access is required to connect to the NTP (network time protocal) server.
It is possible to get errors like NTPTimeSource: Error querying NTP server, attempt 1 of 10
. Sometimes these failures are transient (later retries will work) and can be ignored. However, if the Spark cluster is configured such that one or more of the workers cannot access the internet (or specifically, the NTP server), all retries can fail.
Two solutions are available:
Don't use sparkNet.setCollectTrainingStats(true)
- this functionality is optional (not required for training), and is disabled by default
Set the system to use the local machine clock instead of the NTP server, as the time source (note however that the timeline information may be very inaccurate as a result)
To use the system clock time source, add the following to Spark submit:
When running a Spark on YARN cluster on Ubuntu 16.04 machines, chances are that after finishing a job, all processes owned by the user running Hadoop/YARN are killed. This is related to a bug in Ubuntu, which is documented at https://bugs.launchpad.net/ubuntu/+source/procps/+bug/1610499. There's also a Stackoverflow discussion about it at https://stackoverflow.com/questions/38419078/logouts-while-running-hadoop-under-ubuntu-16-04.
Some workarounds are suggested.
Option 1
Add
to /etc/systemd/logind.conf, and reboot.
Option 2
Copy the /bin/kill binary from Ubuntu 14.04 and use that one instead.
Option 3
Downgrade to Ubuntu 14.04
Option 4
run sudo loginctl enable-linger hadoop_user_name
on cluster nodes