Code does not terminate on second iteration in Scala Spark -


on second iteration task appears hang @ line :

val wordcountwithlabelscollect = wordcountwithlabels.collect 

this scala output :

5/06/19 15:49:33 info dagscheduler: submitting stage 1 (mappedvaluesrdd[3] @ mapvalues @ ques.scala:33), has no missing parents 15/06/19 15:49:33 info memorystore: ensurefreespace(2480) called curmem=2219, maxmem=1030823608 15/06/19 15:49:33 info memorystore: block broadcast_1 stored values in memory (estimated size 2.4 kb, free 983.1 mb) 15/06/19 15:49:33 info memorystore: ensurefreespace(1812) called curmem=4699, maxmem=1030823608 15/06/19 15:49:33 info memorystore: block broadcast_1_piece0 stored bytes in memory (estimated size 1812.0 b, free 983.1 mb) 15/06/19 15:49:33 info blockmanagerinfo: added broadcast_1_piece0 in memory on localhost:54590 (size: 1812.0 b, free: 983.1 mb) 15/06/19 15:49:33 info blockmanagermaster: updated info of block broadcast_1_piece0 15/06/19 15:49:33 info sparkcontext: created broadcast 1 broadcast @ dagscheduler.scala:838 15/06/19 15:49:33 info dagscheduler: submitting 1 missing tasks stage 1 (mappedvaluesrdd[3] @ mapvalues @ ques.scala:33) 15/06/19 15:49:33 info taskschedulerimpl: adding task set 1.0 1 tasks 

scala code :

import org.apache.spark.sparkcontext import org.apache.spark.rdd.rdd import org.apache.spark.sparkcontext._ import org.apache.log4j.logger import org.apache.log4j.level  object ques extends app {    val data = getuncategoriseddata    data.foreach(document => {      runner    })    case class document(label: string, text: string)    def reducelist(list: list[(string, int)]) = list.groupby(_._1).mapvalues(_.aggregate(0)(_ + _._2, _ + _))    def runner = {     val trainingdata = getsc.parallelize(        list(         document("sport", "this text document spor a"),         document("sport", "this spor text document spor b"),         document("news", "this such new categorise data")))      val counts: org.apache.spark.rdd.rdd[(string, list[(string, int)])] = trainingdata.map(doc => ((doc.label, doc.text.split(" ").tolist.map(w => (w, 1)))))      val mergedlist = counts.mapvalues((list: list[(string, int)]) => reducelist(list).tolist)     val wordcountwithlabels: org.apache.spark.rdd.rdd[(string, list[(string, int)])] = mergedlist.reducebykey((accum: list[(string, int)], value: list[(string, int)]) =>       {         val valuemap = value.tomap         val accummap = accum.tomap         val mergedmap = accummap ++ valuemap.map { case (k, v) => k -> (v + accummap.getorelse(k, 0)) }         mergedmap.tolist       })      val wordcountwithlabelscollect = wordcountwithlabels.collect     wordcountwithlabels.collect   }    def getuncategoriseddata: rdd[document] = {     lazy val trainingdata = getsc.parallelize(        list(         document("", "this text document a"),         document("", "this text document b"),         document("", "this text for document c")))      trainingdata    }    lazy val getsc = {      val conf = new org.apache.spark.sparkconf()       .setmaster("local")       .setappname("process")       .setsparkhome("c:\\spark-1.1.0-bin-hadoop2.4\\spddark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")       .set("spark.executor.memory", "3g")       .set("deploy-mode", "standalone")       .set("spark_conf_dir", "c:\\data\\sparkconfig")      val sc = new sparkcontext(conf)      sc   } } 

what issue here ?

invoke collect multiple times on same collection should not issue ?

if invoke runner in succession :

runner runner 

then terminates.

update :

simpler example of same behavior :

import org.apache.spark.sparkcontext import org.apache.spark.rdd.rdd import org.apache.spark.sparkcontext._ import org.apache.log4j.logger import org.apache.log4j.level import org.apache.spark.rdd.pairrddfunctions  object ques extends app {    val conf = new org.apache.spark.sparkconf()     .setmaster("local")     .setappname("process")     .setsparkhome("c:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")     .set("spark.executor.memory", "3g")    val sc = new sparkcontext(conf)    val data = sc.parallelize(list(""))    val counts: org.apache.spark.rdd.rdd[(string)] = sc.parallelize(list(("")))    data.foreach(document => {     counts.collect   })  } 

this code never terminates. appears cannot invoke collect more once within foreach function ?

update 2 :

i'm not sure why returning driver prior running transformation cause termination :

data.collect.foreach(document => {     counts.collect   }) 

i think short answer here trying invoke action inside transformation. actions return value driver; invoking action inside transformation doesn't make sense because transformations executed workers, not driver. can't find place documentation more explicit this section of programming guide.


Comments

Popular posts from this blog

powershell Start-Process exit code -1073741502 when used with Credential from a windows service environment -

twig - Using Twigbridge in a Laravel 5.1 Package -

c# - LINQ join Entities from HashSet's, Join vs Dictionary vs HashSet performance -