'Limiting an RDD size
I have an RDD as follows:
rdd
.filter { case (_, record) => predicates.forall(_.accept(record)) }
.toDS()
.cache()
It basically filters down an RDD after applying a predicate.
The issue I have is this... Some of my data set RDDs are massive and predicates may be empty meaning that we attempt to cache an entire data set.
Instead what I'd like to do is always limit the size of the data set before I cache it.
I've tried placing a limit as follows:
dataSet
.filter { case (_, record) => predicates.forall(_.accept(record)) }
.limit(10000)
.toDS()
.cache()
but I get OOM errors. It looks to me like the partitions are being overloaded before the limit is applied. Therefore I'm wondering if there is some way for the limit to be applied to the partitions. So effectively filtering would be paused once we reach the limit.
Scaling out further isn't an option as these data sets are too big
Solution 1:[1]
You should likely look into sampling the rdd. If you provide a consistent seed you will get a consistent result. You likely don't want "withReplace". This will run faster than using limit. Sample does work on the entire data but filters as it goes reducing the data set.
RDD.sample(withReplacement, fraction, seed=None)Parameters: withReplacement - bool can elements be sampled multiple times (replaced when sampled out)
fraction - float expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
seed - int, optional seed for the random number generation
Relevant code links (rdd.sample), (subclass that does actual work work.)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Matt Andruff |
