General Notes
- By default RDDs are recompiled each time an action is run on them.
- Action involce communication btwn executor and driver
- To cache use either
cache() or persist(), if the same data will be used more than once.
- Use
toDebugString() to see execution plan.
- PairRDD: prefer reduceByKey than groupByKey and …
- PariRDD: on
join() the first collection can be persisted => only the second will be shuffled
- ! RuleOfThumb: a shuffle can occur when the result RDD depends on other elements
- ! RuleOfThumb: first do filters than joins
- DataFrames: prefer several simple filters than one complex, as it is easier to be optimized.
- DataFrames: utility methods while developing -
show() to get the first 20th elements, printSchema()
- DataSets: relational filter operations are optimize-able by Catalyst, functional ones are not.
- DataSets:
ds.filter($"city".as[String] === "Paris") vs ds.filter(p => p.city == "Paris")
- DataSets: with functional filter operations the whole object is serialized, instead of the needed columns only => more work
Partition properties (PairRDDs)
- Partitioning by a) cache or b) range.
- Tuples on the same partition are guaranteed to be on the same machine.
- Customizing a partitioner is only possible on PairRDD.
- The result of
partitionBy should be persisted. Otherwise partitioning is repeatedly applied 9causeing shuffling) each time the partitioned RDD is used.
- Some operations of RDDs automatically result in and RDD with known partitioner (e.g.
sortbyKey -> RangePartitioner, groupByKey -> HashPartitioner).
- Some key based operations hold to (and propagate) partitioner // may cause shuffling ??
Caching strategies
| TYPE |
SPACE |
CPU |
IN MEMORY |
ON DISK |
| memory (default) |
High |
Low |
Yes |
No |
| memory_ser |
Low |
High |
Yes |
No |
| memory_disk |
High |
Medium |
Some |
Some |
| memory_disk_ser* |
Low |
High |
Some |
Some |
| disk_only |
Low |
High |
No |
Yes |
- stores serialized representations
RDD Evaluations
| Transformation |
Eager action |
| map |
collection |
| flatMap |
count |
| filter |
take |
| distinct |
reduce |
| union |
foreach |
| intersect |
takeSample |
| subtract |
takeOrdered |
| cartesian |
saveAsTextFile |
PairRDD Evaluations
| Transformation |
Eager action |
| groupByKey |
countByKey |
| reduceByKey |
|
| mapValues |
|
| key |
|
| join |
|
| outerLeftJoin |
|
| outerRightJoin |
|
Logistic regression example
\[w <- w * \alpha \sum_{i=1}^n (w; x_i; y_i)\]
- Note: upon each iteration the
points value is recalculated, therefore the persist() stops this recalculation
val points = sc.textFile("mytextFile.txt").map(parsePoints).persist()
var w = Vector.zeros(d)
for(i <- 1 to numIterations){
val gradient = points.map{
p => (1 / (1 + exp (-p.y * w.dot(p.x))) - 1) * p.y * p.y}
.reduce(_+_)
w -= alpha * gradient
}