1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors object KMeansClustering {<br> def main (args: Array[String]) {<br> if (args.length < 5) {<br> println("Usage:KMeansClustering trainingDataFilePath testDataFilePath numClusters numIterations runTimes")<br> sys.exit(1)<br> }<br><br> val conf = new SparkConf().setAppName("Spark MLlib Exercise:K-Means Clustering")<br> val sc = new SparkContext(conf)<br> /**<br> *Channel Region Fresh Milk Grocery Frozen Detergents_Paper Delicassen<br> * 2 3 12669 9656 7561 214 2674 1338<br> * 2 3 7057 9810 9568 1762 3293 1776<br> * 2 3 6353 8808 7684 2405 3516 7844<br> */<br> val rawTrainingData = sc.textFile(args(0))<br> val parsedTrainingData = rawTrainingData.filter(!isColumnNameLine(_)).map(line => {<br> Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))<br> }).cache()<br> // Cluster the data into two classes using KMeans<br> val numClusters = args(2).toInt<br> val numIterations = args(3).toInt<br> val runTimes = args(4).toInt<br> var clusterIndex:Int = 0<br> val clusters:KMeansModel = KMeans.train(parsedTrainingData, numClusters, numIterations,runTimes)<br> println("Cluster Number:" + clusters.clusterCenters.length)<br> println("Cluster Centers Information Overview:")<br> clusters.clusterCenters.foreach( x => {<br> println("Center Point of Cluster " + clusterIndex + ":")<br> println(x)<br> clusterIndex += 1<br> })<br> //begin to check which cluster each test data belongs to based on the clustering result<br> val rawTestData = sc.textFile(args(1))<br> val parsedTestData = rawTestData.map(line => {<br> Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))<br> })<br> parsedTestData.collect().foreach(testDataLine => {<br> val predictedClusterIndex: Int = clusters.predict(testDataLine)<br> println("The data " + testDataLine.toString + " belongs to cluster " + predictedClusterIndex)<br> })<br> println("Spark MLlib K-means clustering test finished.")<br> }<br><br> private def isColumnNameLine(line:String):Boolean = {<br> if (line != null && line.contains("Channel")) true<br> else false<br> } |
欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) | Powered by Discuz! 7.0.0 |