scala - Spark: NullPointerException when RDD isn't collected before map -
i trying write function in spark/scala takes 2 rdds , per item in first, finds items second fit within date range of first. code wrote express problem (i've added annotations clarity):
def buildrelationship(sizelogs: rdd[perflog], durationlog : rdd[perflog]) : rdd[(perflog, rdd[perflog])] = { durationlog.map((duration: perflog) => { val sizes = sizelogs.filter((size: perflog) => size.starttime >= duration.starttime && size.endtime <= duration.endtime) (duration, sizes) }) }
if call .collect() on map expression @ end of function, exception.
15/06/19 15:57:05 error executor: exception in task 0.0 in stage 3.0 (tid 3) java.lang.nullpointerexception @ org.apache.spark.rdd.rdd.filter(rdd.scala:282)
i've found if modify above code both parameters collected @ start , treated arrays rest of function, operates fine.
def buildrelationship(sizelogs: rdd[perflog], durationlog : rdd[perflog]) : array[(perflog, array[perflog])] = { val durationdata = durationlog.collect() val sizedata = sizelogs.collect() durationdata.map((duration: perflog) => { val sizes = sizedata.filter((size: perflog) => size.starttime >= duration.starttime && size.endtime <= duration.endtime) (duration, sizes) }) }
while works, not seem correct answer parameters grow become quite large.
why work when treated array, not rdd?
you can't iterate other rdd while iterating one. overcome problem don't need collect both rdds, better solution in collect 1 rdd (the smaller one, better performance) , use 2 data structures (rdd , array) n^2 operation.
def buildrelationship(sizelogs: rdd[perflog], durationlog : rdd[perflog]) : rdd[(perflog, array[perflog])] = { val sizedata = sizelogs.collect durationlog.map((duration: perflog) => { val sizes = sizedata.filter((size: perflog) => size.starttime >= duration.starttime && size.endtime <= duration.endtime) (duration, sizes) }) }
for more better performance use spark broadcast. broadcast variable nodes. as
def buildrelationship(sizelogs: rdd[perflog], durationlog : rdd[perflog]) : rdd[(perflog, array[perflog])] = { val sizedata = sc.broadcast(sizelogs.collect) durationlog.map((duration: perflog) => { val sizes = sizedata.value.filter((size: perflog) => size.starttime >= duration.starttime && size.endtime <= duration.endtime) (duration, sizes) }) }
hopefully you.
Comments
Post a Comment