rest - java service - spark communication -
i pretty new spark , looking guidance :)
i have java service acts backend application. adhoc queries ui , figured out of these queries taking lot of time. so, decided move spark these tasks done. stuck how establish communication apache spark java.
i saw other questions , seems spark job server ooyala solves problem. curious know if there other options solve problem.
here sample code provided work on spark in java, can write code after input taken in spark, since i've used here pre-built library of kmean clustering, can take yours.
import java.util.regex.pattern; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.api.java.javasparkcontext; import org.apache.spark.api.java.function.function; import org.apache.spark.mllib.clustering.kmeans; import org.apache.spark.mllib.clustering.kmeansmodel; import org.apache.spark.mllib.linalg.vector; import org.apache.spark.mllib.linalg.vectors; public final class spark_kmeans { private static class parsepoint implements function<string, vector> { private static final pattern space = pattern.compile(" "); @override public vector call(string line) { string[] tok = space.split(line); double[] point = new double[tok.length]; (int = 0; < tok.length; ++i) { point[i] = double.parsedouble(tok[i]); } return vectors.dense(point); } } public static void main(string[] args1) { string[] args = { "/usr/spark_pack/spark_input", "3", "5" }; if (args.length < 3) { system.err .println("usage: javakmeans <input_file> <k> <max_iterations> [<runs>]"); system.exit(1); } string inputfile = args[0]; int k = integer.parseint(args[1]); int iterations = integer.parseint(args[2]); int runs = 1; if (args.length >= 4) { runs = integer.parseint(args[3]); } string sparkhome = "/usr/spark_pack/spark-1.3.0-bin-hadoop2.4/"; string sparkmasterurl = "spark://master:7077"; string jarfile1 = "/usr/spark_pack/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar"; string jarfile2 = "/usr/spark_pack/spark_jar/spark_mlib.jar"; /* * javastreamingcontext ssc = new javastreamingcontext(sparkmasterurl, * "kshitij stream engine", new duration(1000), sparkhome); */ sparkconf conf = new sparkconf().setappname("log analyzer sql") .setmaster(sparkmasterurl).setsparkhome(sparkhome) .setjars(new string[] { jarfile1, jarfile2 }) .setappname("javakmeans"); javasparkcontext sc = new javasparkcontext(conf); /* * sparkconf sparkconf = new sparkconf().setappname("javakmeans"); * javasparkcontext sc = new javasparkcontext(sparkconf); */ javardd<string> lines = sc.textfile(inputfile); //****this portion algo part************************* javardd<vector> points = lines.map(new parsepoint()); kmeansmodel model = kmeans.train(points.rdd(), k, iterations, runs, kmeans.k_means_parallel()); system.out.println("cluster centers:"); (vector center : model.clustercenters()) { system.out.println(" " + center); } double cost = model.computecost(points.rdd()); system.out.println("cost: " + cost); sc.stop(); } }
Comments
Post a Comment