General Notes

  • By default RDDs are recompiled each time an action is run on them.
  • Action involce communication btwn executor and driver
  • To cache use either cache() or persist(), if the same data will be used more than once.
  • Use toDebugString() to see execution plan.
  • PairRDD: prefer reduceByKey than groupByKey and …
  • PariRDD: on join() the first collection can be persisted => only the second will be shuffled
  • ! RuleOfThumb: a shuffle can occur when the result RDD depends on other elements
  • ! RuleOfThumb: first do filters than joins
  • DataFrames: prefer several simple filters than one complex, as it is easier to be optimized.
  • DataFrames: utility methods while developing - show() to get the first 20th elements, printSchema()
  • DataSets: relational filter operations are optimize-able by Catalyst, functional ones are not.
  • DataSets: ds.filter($"city".as[String] === "Paris") vs ds.filter(p => p.city == "Paris")
  • DataSets: with functional filter operations the whole object is serialized, instead of the needed columns only => more work

Partition properties (PairRDDs)

  • Partitioning by a) cache or b) range.
  • Tuples on the same partition are guaranteed to be on the same machine.
  • Customizing a partitioner is only possible on PairRDD.
  • The result of partitionBy should be persisted. Otherwise partitioning is repeatedly applied 9causeing shuffling) each time the partitioned RDD is used.
  • Some operations of RDDs automatically result in and RDD with known partitioner (e.g. sortbyKey -> RangePartitioner, groupByKey -> HashPartitioner).
  • Some key based operations hold to (and propagate) partitioner // may cause shuffling ??

Caching strategies

TYPE SPACE CPU IN MEMORY ON DISK
memory (default) High Low Yes No
memory_ser Low High Yes No
memory_disk High Medium Some Some
memory_disk_ser* Low High Some Some
disk_only Low High No Yes
  • stores serialized representations

RDD Evaluations

Transformation Eager action
map collection
flatMap count
filter take
distinct reduce
union foreach
intersect takeSample
subtract takeOrdered
cartesian saveAsTextFile

PairRDD Evaluations

Transformation Eager action
groupByKey countByKey
reduceByKey  
mapValues  
key  
join  
outerLeftJoin  
outerRightJoin  

Logistic regression example

\[w <- w * \alpha \sum_{i=1}^n (w; x_i; y_i)\]
  • Note: upon each iteration the points value is recalculated, therefore the persist() stops this recalculation
val points = sc.textFile("mytextFile.txt").map(parsePoints).persist()
var w = Vector.zeros(d)
for(i <- 1 to numIterations){
  val gradient = points.map{
    p => (1 / (1 + exp (-p.y * w.dot(p.x))) - 1) * p.y * p.y}
    .reduce(_+_)
    w -= alpha * gradient
}