scala - Spark: NullPointerException when RDD isn't collected before map -


i trying write function in spark/scala takes 2 rdds , per item in first, finds items second fit within date range of first. code wrote express problem (i've added annotations clarity):

def buildrelationship(sizelogs: rdd[perflog], durationlog : rdd[perflog]) : rdd[(perflog, rdd[perflog])] = {     durationlog.map((duration: perflog) => {         val sizes = sizelogs.filter((size: perflog) => size.starttime >= duration.starttime && size.endtime <= duration.endtime)         (duration, sizes)     }) } 

if call .collect() on map expression @ end of function, exception.

15/06/19 15:57:05 error executor: exception in task 0.0 in stage 3.0 (tid 3) java.lang.nullpointerexception     @ org.apache.spark.rdd.rdd.filter(rdd.scala:282) 

i've found if modify above code both parameters collected @ start , treated arrays rest of function, operates fine.

def buildrelationship(sizelogs: rdd[perflog], durationlog : rdd[perflog]) : array[(perflog, array[perflog])] = {     val durationdata = durationlog.collect()     val sizedata = sizelogs.collect()      durationdata.map((duration: perflog) => {         val sizes = sizedata.filter((size: perflog) => size.starttime >= duration.starttime && size.endtime <= duration.endtime)         (duration, sizes)     }) } 

while works, not seem correct answer parameters grow become quite large.

why work when treated array, not rdd?

you can't iterate other rdd while iterating one. overcome problem don't need collect both rdds, better solution in collect 1 rdd (the smaller one, better performance) , use 2 data structures (rdd , array) n^2 operation.

def buildrelationship(sizelogs: rdd[perflog], durationlog : rdd[perflog]) : rdd[(perflog, array[perflog])] = {    val sizedata = sizelogs.collect     durationlog.map((duration: perflog) => {     val sizes = sizedata.filter((size: perflog) => size.starttime >= duration.starttime && size.endtime <= duration.endtime)      (duration, sizes)    }) } 

for more better performance use spark broadcast. broadcast variable nodes. as

def buildrelationship(sizelogs: rdd[perflog], durationlog : rdd[perflog]) : rdd[(perflog, array[perflog])] = {    val sizedata = sc.broadcast(sizelogs.collect)     durationlog.map((duration: perflog) => {     val sizes = sizedata.value.filter((size: perflog) => size.starttime >= duration.starttime && size.endtime <= duration.endtime)      (duration, sizes)    }) } 

hopefully you.


Comments

Popular posts from this blog

powershell Start-Process exit code -1073741502 when used with Credential from a windows service environment -

twig - Using Twigbridge in a Laravel 5.1 Package -

c# - LINQ join Entities from HashSet's, Join vs Dictionary vs HashSet performance -