For running a spark local cluster (one machine is represented by one threads). The * is to match the number of cores available on the machine. Put a number to force the number of threads desired.

spark-shell --master local[*]

If you run it locally, in order to assign more memory to your spark jobs, add the flag --driver-memory 10g (depending on the RAM you have, obviously).

Broadcast Variables

TBD

Caching RDDs

Spark does not computes transformations right away (for example loading a file or parsing all the RDD rows from String to a structure like pair or case clause). It does the computation when an action is called on the dataset, for example a count or a take. But when a lot of actions are required, it is not cost efficient to recompute the data from scratch each time, and in order to solve this problem the cache method exists. When applied, spark will conserve in memory the RDD at the state cache was called. However, this limits the memory available (since it occupies some to have the data in the RAM), so it is an art knowing when you need to cache the data and when it is not worth it. For a general rule, if you need to do several computations on a same basis state, cache it, and later you can uncache.

Build a jar

In order to put our spark applications into production, we need to build a jar in order to execute it using spark-submit or a task scheduler like oozie. In this case, we will need to build what is known as fat jar, that includes all the dependencies used. I create my projects using IntelliJ IDEA and use sbt as build tool instead of maven.

Sbt does not support the creation of fat jars by default, so we will need a plugin for that. For this reason we will use the plugin assembly, that will force us to make some changes to our build.sbt

name := "simpleApp"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0" % "provided"

libraryDependencies += "com.databricks" %% "spark-xml" % "0.4.1"

//This is the first assembly part. Without it, it sometimes give errors
assemblyMergeStrategy in assembly := {
  case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last
  case PathList("javax", "inject", xs @ _*) => MergeStrategy.last
  case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
  case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
  case PathList("org", "apache", xs @ _*) => MergeStrategy.last
  case PathList("com", "google", xs @ _*) => MergeStrategy.last
  case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
  case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
  case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
  case "about.html" => MergeStrategy.rename
  case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
  case "META-INF/mailcap" => MergeStrategy.last
  case "META-INF/mimetypes.default" => MergeStrategy.last
  case "plugin.properties" => MergeStrategy.last
  case "log4j.properties" => MergeStrategy.last
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

Now, inside the project folder we will need to create a file called assembly.sbt with the following content: (change the version if you need to do so)

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")

And now, the only thing we will need to do in order to build a fat jar is to go to the project folder and execute sbt assembly Our jar will be automatically created and placed under the folder target/scala-2.11/

Best practices

  • Use KryoSerializer instead of the default one (Java)
  • Repartition after doing filters or joins
    • Create an automatic repartition strategy