dl4j-spark_2.10
or dl4j-spark_2.11
. This module is required for both development and execution of Deeplearning4j Spark jobs. Be careful to use the spark version that matches your cluster - for both the Spark version (Spark 1 vs. Spark 2) and the Scala version (2.10 vs. 2.11). If these are mismatched, your job will likely fail at runtime.provided
scope in your pom.xml (see Maven docs for more details), as Spark submit will add Spark to the classpath. Adding this dependency is not required for execution on a cluster, but may be needed if you want to test or debug a Spark-based job on your local machine.mvn package -DskipTests
to build the uber-jar for your project. Note that the uber-jar should be present under <project_root>/target/<project_name>-bin.jar
. Be sure to use the large ...-bin.jar
file as this is the shaded jar with all of the dependencies.nd4j-cuda-x.x
and nd4j-native
dependencies as described in the uber-jar section).BACKEND_PRIORITY_CPU
or BACKEND_PRIORITY_GPU
environment variables on the master/driver, as described here. The exact process for setting environment variables may depend on the cluster manager - Spark standalone vs. YARN vs. Mesos. Please consult the documentation for each on how to set the environment variables for Spark jobs for the driver/master.--executor-memory 4g
for YARN) 2. The worker off-heap memory (javacpp system properties options) (for example, --conf "spark.executor.extraJavaOptions=-Dorg.bytedeco.javacpp.maxbytes=8G"
) 3. The driver on-heap memory - usually set as an 4. The driver off-heap memoryspark.yarn.driver.memoryOverhead
and spark.yarn.executor.memoryOverhead
properties. The default settings are much too small for DL4J training.conf/spark-env.sh
file on each node, as described in the Spark configuration docs. For example, you could add the following lines to set 8GB heap for the driver, 12 GB off-heap for the driver, 12GB heap for the workers, and 18GB off-heap for the workers:SPARK_DRIVER_OPTS=-Dorg.bytedeco.javacpp.maxbytes=12G
SPARK_DRIVER_MEMORY=8G
SPARK_WORKER_OPTS=-Dorg.bytedeco.javacpp.maxbytes=18G
SPARK_WORKER_MEMORY=12G
${dl4j-version}
is the version used for DL4J and ND4J.--conf spark.locality.wait=0
to your Spark submit configuration may marginally reduce training times, by scheduling the network fit operations to be started sooner..encodingDebugMode(true)
in the SharedTrainingmaster builder. When this is enabled, each of the workers will log the current threshold, sparsity, and various other statistics about the encoding. These statistics can be used to determine if the threshold is appropriately set: for example, many updates that are tens or hundreds of times the threshold may indicate the threshold is too low and should be increased; at the other end of the spectrum, very sparse updates (less than one in 10000 values being communicated) may indicate that the threshold should be decreased.RDD<DataSet>
or JavaRDD<DataSet>
for evaluating single input/output networksRDD<MultiDataSet>
or JavaRDD<MultiDataSet>
for evaluating multi input/output networksRDD<String>
or JavaRDD<String>
where each String is a path that points to serialized DataSet/MultiDataSet (or other minibatch file-based formats) on network storage such as HDFS.int evalNumWorkers
- the number of evaluation workers - i.e., the number of copies of a network used for evaluation on each node (up to the maximum number of Spark threads per worker). For large networks (or limited cluster memory), you might want to reduce this to avoid running into memory problems.int evalBatchSize
- the minibatch size to use when performing evaluation. This needs to be large enough to efficiently use the hardware resources, but small enough to not run out of memory. Values of 32-128 is unsually a good starting point; increase when more memory is available and for smaller networks; decrease if memory is a problem.DataSetLoader loader
and MultiDataSetLoader loader
- these are available when evaluating on a RDD<String>
or JavaRDD<String>
. They are interfaces to load a path into a DataSet or MultiDataSet using a custom user-defined function. Most users will not need to use these, however the functionality is provided for greater flexibility. They would be used for example if the saved minibatch file format is not a DataSet/MultiDataSet but some other (possibly custom) format.SparkUtils
is org.datavec.spark.transform.utils.SparkUtils
SparkDl4jMultiLayer
and SparkComputationGraph
internally use the standard MultiLayerNetwork
and ComputationGraph
classes. You can access the internal MultiLayerNetwork/ComputationGraph classes using SparkDl4jMultiLayer.getNetwork()
and SparkComputationGraph.getNetwork()
respectively.ModelSerializer
class or MultiLayerNetwork.save(File)/.load(File)
and ComputationGraph.save(File)/.load(File)
methods.K
- this is a generic type to signify the unique 'key' used to identify each example. The key values are not used as part of the inference process. This key is required as Spark's RDDs are unordered - without this, we would have no way to know which element in the predictions RDD corresponds to which element in the input RDD. The batch size parameter is used to specify the minibatch size when performing inference. It does not impact the values returned, but instead is used to balance memory use vs. computational efficiency: large batches might compute a little quicker overall, but require more memory. In many cases, a batch size of 64 is a good starting point to try if you are unsure of what to use.UnsupportedClassVersionError
for example java.lang.UnsupportedClassVersionError: XYZ : Unsupported major.minor version 52.0
- this can result from trying to run (for example) Java 8 code on a cluster that is set up with only a Java 7 JRE/JDK.mvn dependency:tree
This will give you a list of dependencies (direct and transient) that can be helpful to understand exactly what is on the classpath, and why.mvn dependency:tree -Dverbose
will provide extra information, and can be useful when debugging problems related to mismatched library versions._spark_2
_2.10
or _2.11
: for example, org.apache.spark:spark-core_2.11:jar:1.6.3:compile
is a Spark 1 (1.6.3) dependency using Scala 2.11 (b) Ensure that your project matches what the cluster is using. For example, if you cluster is running Spark 2 with Scala 2.11, all of your Scala dependencies should use 2.11 also. Note that Scala 2.11 is more common for Spark clusters.mvn dependency:tree -Dverbose
) carefully. Where necessary, you can use exclusions or add the problematic dependency as a direct dependency to force it's version in your probelm. To do this, you would add the dependency of the version you want directly to your project. Often, this is enough to solve the problem.spark.driver.userClassPathFirst
and spark.executor.userClassPathFirst
(See the Spark configuration docs for more details). In some cases, these options may be a fix for dependency issues.RDD<DataSet>
or RDD<INDArray>
for re-use, use MEMORY_ONLY_SER or MEMORY_AND_DISK_SERRDD.cache()
or RDD.persist(StorageLevel.MEMORY_ONLY())
to store the contents in-memory, in deserialized (i.e., standard Java object) form. The basic idea is simple: if you persist a RDD, you can re-use it from memory (or disk, depending on configuration) without having to recalculate it. However, large RDDs may not entirely fit into memory. In this case, some parts of the RDD have to be recomputed or loaded from disk, depending on the storage level used. Furthermore, to avoid using too much memory, Spark will drop parts (blocks) of an RDD when required.MEMORY_ONLY
and MEMORY_AND_DISK
persistence, this is done by walking the Java object graph - i.e., look at the fields in an object and recursively estimate the size of those objects. This process does not however take into account the off-heap memory used by Deeplearning4j or ND4J. For objects like DataSets and INDArrays (which are stored almost entirely off-heap), Spark significantly under-estimates the true size of the objects using this process. Furthermore, Spark considers only the amount of on-heap memory use when deciding whether to keep or drop blocks. Because DataSet and INDArray objects have a very small on-heap size, Spark will keep too many of them around with MEMORY_ONLY
and MEMORY_AND_DISK
persistence, resulting in off-heap memory being exhausted, causing out of memory issues.MEMORY_ONLY_SER
and MEMORY_AND_DISK_SER
Spark stores blocks in serialized form, on the Java heap. The size of objects stored in serialized form can be estimated accurately by Spark (there is no off-heap memory component for the serialized objects) and consequently Spark will drop blocks when required - avoiding any out of memory issues.SparkDl4jMultiLayer.setCollectTrainingStats(true)
. When this is enabled, internet access is required to connect to the NTP (network time protocal) server.NTPTimeSource: Error querying NTP server, attempt 1 of 10
. Sometimes these failures are transient (later retries will work) and can be ignored. However, if the Spark cluster is configured such that one or more of the workers cannot access the internet (or specifically, the NTP server), all retries can fail.sparkNet.setCollectTrainingStats(true)
- this functionality is optional (not required for training), and is disabled by defaultRDD<DataSet>
or RDD<INDArray>
for re-use, use MEMORY_ONLY_SER or MEMORY_AND_DISK_SERRDD.cache()
or RDD.persist(StorageLevel.MEMORY_ONLY())
to store the contents in-memory, in deserialized (i.e., standard Java object) form. The basic idea is simple: if you persist a RDD, you can re-use it from memory (or disk, depending on configuration) without having to recalculate it. However, large RDDs may not entirely fit into memory. In this case, some parts of the RDD have to be recomputed or loaded from disk, depending on the storage level used. Furthermore, to avoid using too much memory, Spark will drop parts (blocks) of an RDD when required.MEMORY_ONLY
and MEMORY_AND_DISK
persistence, this is done by walking the Java object graph - i.e., look at the fields in an object and recursively estimate the size of those objects. This process does not however take into account the off-heap memory used by Deeplearning4j or ND4J. For objects like DataSets and INDArrays (which are stored almost entirely off-heap), Spark significantly under-estimates the true size of the objects using this process. Furthermore, Spark considers only the amount of on-heap memory use when deciding whether to keep or drop blocks. Because DataSet and INDArray objects have a very small on-heap size, Spark will keep too many of them around with MEMORY_ONLY
and MEMORY_AND_DISK
persistence, resulting in off-heap memory being exhausted, causing out of memory issues.MEMORY_ONLY_SER
and MEMORY_AND_DISK_SER
Spark stores blocks in serialized form, on the Java heap. The size of objects stored in serialized form can be estimated accurately by Spark (there is no off-heap memory component for the serialized objects) and consequently Spark will drop blocks when required - avoiding any out of memory issues.SparkDl4jMultiLayer.setCollectTrainingStats(true)
. When this is enabled, internet access is required to connect to the NTP (network time protocal) server.NTPTimeSource: Error querying NTP server, attempt 1 of 10
. Sometimes these failures are transient (later retries will work) and can be ignored. However, if the Spark cluster is configured such that one or more of the workers cannot access the internet (or specifically, the NTP server), all retries can fail.sparkNet.setCollectTrainingStats(true)
- this functionality is optional (not required for training), and is disabled by defaultsudo loginctl enable-linger hadoop_user_name
on cluster nodes