Code does not terminate on second iteration in Scala Spark -
on second iteration task appears hang @ line :
val wordcountwithlabelscollect = wordcountwithlabels.collect
this scala output :
5/06/19 15:49:33 info dagscheduler: submitting stage 1 (mappedvaluesrdd[3] @ mapvalues @ ques.scala:33), has no missing parents 15/06/19 15:49:33 info memorystore: ensurefreespace(2480) called curmem=2219, maxmem=1030823608 15/06/19 15:49:33 info memorystore: block broadcast_1 stored values in memory (estimated size 2.4 kb, free 983.1 mb) 15/06/19 15:49:33 info memorystore: ensurefreespace(1812) called curmem=4699, maxmem=1030823608 15/06/19 15:49:33 info memorystore: block broadcast_1_piece0 stored bytes in memory (estimated size 1812.0 b, free 983.1 mb) 15/06/19 15:49:33 info blockmanagerinfo: added broadcast_1_piece0 in memory on localhost:54590 (size: 1812.0 b, free: 983.1 mb) 15/06/19 15:49:33 info blockmanagermaster: updated info of block broadcast_1_piece0 15/06/19 15:49:33 info sparkcontext: created broadcast 1 broadcast @ dagscheduler.scala:838 15/06/19 15:49:33 info dagscheduler: submitting 1 missing tasks stage 1 (mappedvaluesrdd[3] @ mapvalues @ ques.scala:33) 15/06/19 15:49:33 info taskschedulerimpl: adding task set 1.0 1 tasks
scala code :
import org.apache.spark.sparkcontext import org.apache.spark.rdd.rdd import org.apache.spark.sparkcontext._ import org.apache.log4j.logger import org.apache.log4j.level object ques extends app { val data = getuncategoriseddata data.foreach(document => { runner }) case class document(label: string, text: string) def reducelist(list: list[(string, int)]) = list.groupby(_._1).mapvalues(_.aggregate(0)(_ + _._2, _ + _)) def runner = { val trainingdata = getsc.parallelize( list( document("sport", "this text document spor a"), document("sport", "this spor text document spor b"), document("news", "this such new categorise data"))) val counts: org.apache.spark.rdd.rdd[(string, list[(string, int)])] = trainingdata.map(doc => ((doc.label, doc.text.split(" ").tolist.map(w => (w, 1))))) val mergedlist = counts.mapvalues((list: list[(string, int)]) => reducelist(list).tolist) val wordcountwithlabels: org.apache.spark.rdd.rdd[(string, list[(string, int)])] = mergedlist.reducebykey((accum: list[(string, int)], value: list[(string, int)]) => { val valuemap = value.tomap val accummap = accum.tomap val mergedmap = accummap ++ valuemap.map { case (k, v) => k -> (v + accummap.getorelse(k, 0)) } mergedmap.tolist }) val wordcountwithlabelscollect = wordcountwithlabels.collect wordcountwithlabels.collect } def getuncategoriseddata: rdd[document] = { lazy val trainingdata = getsc.parallelize( list( document("", "this text document a"), document("", "this text document b"), document("", "this text for document c"))) trainingdata } lazy val getsc = { val conf = new org.apache.spark.sparkconf() .setmaster("local") .setappname("process") .setsparkhome("c:\\spark-1.1.0-bin-hadoop2.4\\spddark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4") .set("spark.executor.memory", "3g") .set("deploy-mode", "standalone") .set("spark_conf_dir", "c:\\data\\sparkconfig") val sc = new sparkcontext(conf) sc } }
what issue here ?
invoke collect multiple times on same collection should not issue ?
if invoke runner in succession :
runner runner
then terminates.
update :
simpler example of same behavior :
import org.apache.spark.sparkcontext import org.apache.spark.rdd.rdd import org.apache.spark.sparkcontext._ import org.apache.log4j.logger import org.apache.log4j.level import org.apache.spark.rdd.pairrddfunctions object ques extends app { val conf = new org.apache.spark.sparkconf() .setmaster("local") .setappname("process") .setsparkhome("c:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4") .set("spark.executor.memory", "3g") val sc = new sparkcontext(conf) val data = sc.parallelize(list("")) val counts: org.apache.spark.rdd.rdd[(string)] = sc.parallelize(list((""))) data.foreach(document => { counts.collect }) }
this code never terminates. appears cannot invoke collect more once within foreach
function ?
update 2 :
i'm not sure why returning driver prior running transformation cause termination :
data.collect.foreach(document => { counts.collect })
i think short answer here trying invoke action inside transformation. actions return value driver; invoking action inside transformation doesn't make sense because transformations executed workers, not driver. can't find place documentation more explicit this section of programming guide.
Comments
Post a Comment