scala - OutOfBoundsException with ALS - Flink MLlib -
i'm doing recommandation system movies, using movielens datasets available here : http://grouplens.org/datasets/movielens/
to compute recommandation system, use ml library of flink in scala, , particulalrly als algorithm (org.apache.flink.ml.recommendation.als
).
i first map ratings of movie dataset[(int, int, double)]
, create trainingset
, testset
(see code below).
my problem there no bug when i'm using als.fit
function whole dataset (all ratings), if remove 1 rating, fit function doesn't work anymore, , don't understand why.
do have ideas? :)
code used :
rating.scala
case class rating(userid: int, movieid: int, rating: double)
preprocessing.scala
object preprocessing { def getratings(env : executionenvironment, ratingspath : string): dataset[rating] = { env.readcsvfile[(int, int, double)]( ratingspath, ignorefirstline = true, includedfields = array(0,1,2)).map{r => new rating(r._1, r._2, r._3)} }
processing.scala
object processing { private val ratingspath: string = "path_to_ratings.csv" def main(args: array[string]) { val env = executionenvironment.getexecutionenvironment val ratings: dataset[rating] = preprocessing.getratings(env, ratingspath) val trainingset : dataset[(int, int, double)] = ratings .map(r => (r.userid, r.movieid, r.rating)) .sortpartition(0, order.ascending) .first(ratings.count().toint) val als = als() .setiterations(10) .setnumfactors(10) .setblocks(150) .settemporarypath("/tmp/tmpals") val parameters = parametermap() .add(als.lambda, 0.01) // after tests, value seems fit problem .add(als.seed, 42l) als.fit(trainingset, parameters) } }
"but if remove 1 rating"
val trainingset : dataset[(int, int, double)] = ratings .map(r => (r.userid, r.movieid, r.rating)) .sortpartition(0, order.ascending) .first((ratings.count()-1).toint)
the error :
06/19/2015 15:00:24 cogroup (cogroup @ org.apache.flink.ml.recommendation.als$.updatefactors(als.scala:570))(4/4) switched failed
java.lang.arrayindexoutofboundsexception: 5
at org.apache.flink.ml.recommendation.als$blockrating.apply(als.scala:358)
at org.apache.flink.ml.recommendation.als$$anon$111.cogroup(als.scala:635)
at org.apache.flink.runtime.operators.cogroupdriver.run(cogroupdriver.java:152)
...
the problem first
operator in combination settemporarypath
parameter of flink's als
implementation. in order understand problem, let me explain how blocking als algorithm works.
the blocking implementation of alternating least squares first partitions given ratings matrix user-wise , item-wise blocks. these blocks, routing information calculated. routing information says user/item block receives input item/user block, respectively. afterwards, als iteration started.
since flink's underlying execution engine parallel streaming dataflow engine, tries execute many parts of dataflow possible in pipelined fashion. requires have operators of pipeline online @ same time. has advantage flink avoids materialize intermediate results, might prohibitively large. disadvantage available memory has shared among running operators. in case of als size of individual dataset
elements (e.g. user/item blocks) rather large, not desired.
in order solve problem, not operators of implementation executed @ same time if have set temporarypath
. path defines intermediate results can stored. thus, if you've defined temporary path, als
first calculates routing information user blocks , writes them disk, calculates routing information item blocks , writes them disk , last not least starts als iteration reads routing information temporary path.
the calculation of routing information user , item blocks both depend on given ratings data set. in case when calculate user routing information, first read ratings data set , apply first
operator on it. first
operator returns n
-arbitrary elements underlying data set. problem right flink not store result of first
operation calculation of item routing information. instead, when start calculation of item routing information, flink re-execute dataflow starting sources. means reads ratings data set disk , applies first
operator on again. give in many cases different set of ratings compared result of first first
operation. therefore, generated routing information inconsistent , als
fails.
you can circumvent problem materializing result of first
operator , use result input als
algorithm. object flinkmltools
contains method persist
takes dataset
, writes given path , returns new dataset
reads written dataset
. allows break resulting dataflow graph.
val firsttrainingset : dataset[(int, int, double)] = ratings .map(r => (r.userid, r.movieid, r.rating)) .first((ratings.count()-1).toint) val trainingset = flinkmltools.persist(firsttrainingset, "/tmp/tmpals/training") val als = als() .setiterations(10) .setnumfactors(10) .setblocks(150) .settemporarypath("/tmp/tmpals/") val parameters = parametermap() .add(als.lambda, 0.01) // after tests, value seems fit problem .add(als.seed, 42l) als.fit(trainingset, parameters)
alternatively, can try leave temporarypath
unset. steps (routing information calculation , als iteration) executed in pipelined fashion. means both user , item routing information calculation use same input data set results first
operator.
the flink community working on keeping intermediate results of operators in memory. allow pin result of first
operator won't calculated twice and, thus, not giving differing results due non-deterministic nature.
Comments
Post a Comment