General Notes
- By default RDDs are recompiled each time an action is run on them.
- Action involce communication btwn executor and driver
- To cache use either
cache()
or persist()
, if the same data will be used more than once.
- Use
toDebugString()
to see execution plan.
- PairRDD: prefer reduceByKey than groupByKey and …
- PariRDD: on
join()
the first collection can be persisted => only the second will be shuffled
- ! RuleOfThumb: a shuffle can occur when the result RDD depends on other elements
- ! RuleOfThumb: first do filters than joins
- DataFrames: prefer several simple filters than one complex, as it is easier to be optimized.
- DataFrames: utility methods while developing -
show()
to get the first 20th elements, printSchema()
- DataSets: relational filter operations are optimize-able by Catalyst, functional ones are not.
- DataSets:
ds.filter($"city".as[String] === "Paris")
vs ds.filter(p => p.city == "Paris")
- DataSets: with functional filter operations the whole object is serialized, instead of the needed columns only => more work
Partition properties (PairRDDs)
- Partitioning by a) cache or b) range.
- Tuples on the same partition are guaranteed to be on the same machine.
- Customizing a partitioner is only possible on PairRDD.
- The result of
partitionBy
should be persisted. Otherwise partitioning is repeatedly applied 9causeing shuffling) each time the partitioned RDD is used.
- Some operations of RDDs automatically result in and RDD with known partitioner (e.g.
sortbyKey -> RangePartitioner
, groupByKey -> HashPartitioner
).
- Some key based operations hold to (and propagate) partitioner // may cause shuffling ??
Caching strategies
TYPE |
SPACE |
CPU |
IN MEMORY |
ON DISK |
memory (default) |
High |
Low |
Yes |
No |
memory_ser |
Low |
High |
Yes |
No |
memory_disk |
High |
Medium |
Some |
Some |
memory_disk_ser* |
Low |
High |
Some |
Some |
disk_only |
Low |
High |
No |
Yes |
- stores serialized representations
RDD Evaluations
Transformation |
Eager action |
map |
collection |
flatMap |
count |
filter |
take |
distinct |
reduce |
union |
foreach |
intersect |
takeSample |
subtract |
takeOrdered |
cartesian |
saveAsTextFile |
PairRDD Evaluations
Transformation |
Eager action |
groupByKey |
countByKey |
reduceByKey |
|
mapValues |
|
key |
|
join |
|
outerLeftJoin |
|
outerRightJoin |
|
Logistic regression example
\[w <- w * \alpha \sum_{i=1}^n (w; x_i; y_i)\]
- Note: upon each iteration the
points
value is recalculated, therefore the persist()
stops this recalculation
val points = sc.textFile("mytextFile.txt").map(parsePoints).persist()
var w = Vector.zeros(d)
for(i <- 1 to numIterations){
val gradient = points.map{
p => (1 / (1 + exp (-p.y * w.dot(p.x))) - 1) * p.y * p.y}
.reduce(_+_)
w -= alpha * gradient
}