scala - OutOfBoundsException with ALS - Flink MLlib -


i'm doing recommandation system movies, using movielens datasets available here : http://grouplens.org/datasets/movielens/

to compute recommandation system, use ml library of flink in scala, , particulalrly als algorithm (org.apache.flink.ml.recommendation.als).

i first map ratings of movie dataset[(int, int, double)] , create trainingset , testset (see code below).

my problem there no bug when i'm using als.fit function whole dataset (all ratings), if remove 1 rating, fit function doesn't work anymore, , don't understand why.

do have ideas? :)

code used :

rating.scala

case class rating(userid: int, movieid: int, rating: double) 

preprocessing.scala

object preprocessing {  def getratings(env : executionenvironment, ratingspath : string): dataset[rating] = {       env.readcsvfile[(int, int, double)](       ratingspath, ignorefirstline = true,       includedfields = array(0,1,2)).map{r => new rating(r._1, r._2, r._3)} } 

processing.scala

object processing {   private val ratingspath: string = "path_to_ratings.csv"    def main(args: array[string]) {      val env = executionenvironment.getexecutionenvironment      val ratings: dataset[rating] = preprocessing.getratings(env, ratingspath)      val trainingset : dataset[(int, int, double)] =     ratings      .map(r => (r.userid, r.movieid, r.rating))      .sortpartition(0, order.ascending)      .first(ratings.count().toint)      val als = als()      .setiterations(10)      .setnumfactors(10)      .setblocks(150)      .settemporarypath("/tmp/tmpals")      val parameters = parametermap()      .add(als.lambda, 0.01) // after tests, value seems fit problem      .add(als.seed, 42l)      als.fit(trainingset, parameters)   } } 

"but if remove 1 rating"

val trainingset : dataset[(int, int, double)] =   ratings     .map(r => (r.userid, r.movieid, r.rating))     .sortpartition(0, order.ascending)     .first((ratings.count()-1).toint) 

the error :

06/19/2015 15:00:24 cogroup (cogroup @ org.apache.flink.ml.recommendation.als$.updatefactors(als.scala:570))(4/4) switched failed

java.lang.arrayindexoutofboundsexception: 5

at org.apache.flink.ml.recommendation.als$blockrating.apply(als.scala:358)

at org.apache.flink.ml.recommendation.als$$anon$111.cogroup(als.scala:635)

at org.apache.flink.runtime.operators.cogroupdriver.run(cogroupdriver.java:152)

...

the problem first operator in combination settemporarypath parameter of flink's als implementation. in order understand problem, let me explain how blocking als algorithm works.

the blocking implementation of alternating least squares first partitions given ratings matrix user-wise , item-wise blocks. these blocks, routing information calculated. routing information says user/item block receives input item/user block, respectively. afterwards, als iteration started.

since flink's underlying execution engine parallel streaming dataflow engine, tries execute many parts of dataflow possible in pipelined fashion. requires have operators of pipeline online @ same time. has advantage flink avoids materialize intermediate results, might prohibitively large. disadvantage available memory has shared among running operators. in case of als size of individual dataset elements (e.g. user/item blocks) rather large, not desired.

in order solve problem, not operators of implementation executed @ same time if have set temporarypath. path defines intermediate results can stored. thus, if you've defined temporary path, als first calculates routing information user blocks , writes them disk, calculates routing information item blocks , writes them disk , last not least starts als iteration reads routing information temporary path.

the calculation of routing information user , item blocks both depend on given ratings data set. in case when calculate user routing information, first read ratings data set , apply first operator on it. first operator returns n-arbitrary elements underlying data set. problem right flink not store result of first operation calculation of item routing information. instead, when start calculation of item routing information, flink re-execute dataflow starting sources. means reads ratings data set disk , applies first operator on again. give in many cases different set of ratings compared result of first first operation. therefore, generated routing information inconsistent , als fails.

you can circumvent problem materializing result of first operator , use result input als algorithm. object flinkmltools contains method persist takes dataset, writes given path , returns new dataset reads written dataset. allows break resulting dataflow graph.

val firsttrainingset : dataset[(int, int, double)] =   ratings     .map(r => (r.userid, r.movieid, r.rating))     .first((ratings.count()-1).toint)  val trainingset = flinkmltools.persist(firsttrainingset, "/tmp/tmpals/training")  val als = als()   .setiterations(10)   .setnumfactors(10)   .setblocks(150)   .settemporarypath("/tmp/tmpals/")  val parameters = parametermap()   .add(als.lambda, 0.01) // after tests, value seems fit problem   .add(als.seed, 42l)  als.fit(trainingset, parameters) 

alternatively, can try leave temporarypath unset. steps (routing information calculation , als iteration) executed in pipelined fashion. means both user , item routing information calculation use same input data set results first operator.

the flink community working on keeping intermediate results of operators in memory. allow pin result of first operator won't calculated twice and, thus, not giving differing results due non-deterministic nature.


Comments

Popular posts from this blog

twig - Using Twigbridge in a Laravel 5.1 Package -

jdbc - Not able to establish database connection in eclipse -

firemonkey - How do I make a beep sound in Android using Delphi and the API? -