首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

使用 ML Pipeline 构建机器学习工作流(2)

使用 ML Pipeline 构建机器学习工作流(2)

目标数据集预览本文所使用的测试数据集来自 UCI 的 ,这是一个从纸币鉴别过程中的图片里提取的数据集,总共包含五个列,前 4 列是指标值 (连续型),最后一列是真假标识。
图 3. 测试数据集格式四列依次是小波变换图像的方差,小波变换图像的偏态,小波变换图像的峰度,图像熵,类别标签。其实读者并不需要知道什么是小波变换及其相关改变,只需要知道这是四个特征指标的值,我们将根据这些指标训练模型使用模型预测类别。对于该数据集的更多信息,读者可以参考 UCI 官网的描述。
案例分析与编码实现前面提到,本文的目的是使用 Spark ML Pipeline 构建一个对目标数据集进行分类预测的机器学习工作流,案例背景已经相当清晰,在了解了数据集本身和 ML Pipeline 的相关知识后,接下来就是编程实现了。关于实现基本思路和关键的 11 个步骤笔者已经在代码中做了详细解释,为了方便读者理解,这里特别的把该实例的 Pipeline 里包含的 4 个 Stage 重点介绍下。
这四个 Stage 分别对应代码注释里的步骤 2-5,作用如下:
第一个,使用 StringIndexer 去把源数据里的字符 Label,按照 Label 出现的频次对其进行序列编码, 如,0,1,2,…。在本例的数据中,可能这个步骤的作用不甚明显,因为我们的数据格式良好,Label 本身也只有两种,并且已经是类序列编码的”0”和”1”格式。但是对于多分类问题或者是 Label 本身是字符串的编码方式,如”High”,”Low”,”Medium”等,那么这个步骤就很有用,转换后的格式,才能被 Spark 更好的处理。
第二个,使用 VectorAssembler 从源数据中提取特征指标数据,这是一个比较典型且通用的步骤,因为我们的原始数据集里,经常会包含一些非指标数据,如 ID,Description 等。
第三个,创建一个随机森林分类器 RandomForestClassifier 实例,并设定相关参数,主要是告诉随机森林算法输入 DataFrame 数据里哪个列是特征向量,哪个是类别标识,并告诉随机森林分类器训练 5 棵独立的子树。
第四个,我们使用 IndexToString Transformer 去把之前的序列编码后的 Label 转化成原始的 Label,恢复之前的可读性比较高的 Label,这样不论是存储还是显示模型的测试结果,可读性都会比较高。
这几个 Stage 都会被用来构建 Pipeline 实例,并且会按照顺序执行,最终我们根据得到的 PipelineModel 实例,进一步调用其 transform 方法,去用训练好的模型预测测试数据集的分类。
清单 1. 示例程序源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification._
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorAssembler}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}


object ClassificationPipeline {
def main(args: Array[String]) {
if (args.length < 1){
println("Usage:ClassificationPipeline inputDataFile")
sys.exit(1)
}
val conf = new SparkConf().setAppName("Classification with ML Pipeline")
val sc = new SparkContext(conf)
val sqlCtx = new SQLContext(sc)

/** Step 1
* Read the source data file and convert it to be a dataframe with columns named.
* 3.6216,8.6661,-2.8073,-0.44699,0
* 4.5459,8.1674,-2.4586,-1.4621,0
* 3.866,-2.6383,1.9242,0.10645,0
* 3.4566,9.5228,-4.0112,-3.5944,0
* 0.32924,-4.4552,4.5718,-0.9888,0
* ... ...
*/
val parsedRDD = sc.textFile(args(0)).map(_.split(",")).map(eachRow => {
val a = eachRow.map(x => x.toDouble)
(a(0),a(1),a(2),a(3),a(4))
})
val df = sqlCtx.createDataFrame(parsedRDD).toDF(
"f0","f1","f2","f3","label").cache()

/** *
* Step 2
* StringIndexer encodes a string column of labels
* to a column of label indices. The indices are in [0, numLabels),
* ordered by label frequencies.
* This can help detect label in raw data and give it an index automatically.
* So that it can be easily processed by existing spark machine learning algorithms.
* */
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(df)

/**
* Step 3
* Define a VectorAssembler transformer to transform source features data to be a vector
* This is helpful when raw input data contains non-feature columns, and it is common for
* such a input data file to contain columns such as "ID", "Date", etc.
*/
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("f0","f1","f2","f3"))
.setOutputCol("featureVector")

/**
* Step 4
* Create RandomForestClassifier instance and set the input parameters.
* Here we will use 5 trees Random Forest to train on input data.
*/
val rfClassifier = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("featureVector")
.setNumTrees(5)

/**
* Step 5
* Convert indexed class labels back to original one so that it can be easily understood when we
* need to display or save the prediction result to a file.
*/
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)

//Step 6
//Randomly split the input data by 8:2, while 80% is for training, the rest is for testing.
val Array(trainingData, testData) = df.randomSplit(Array(0.8, 0.2))

/**
* Step 7
* Create a ML pipeline which is constructed by for 4 PipelineStage objects.
* and then call fit method to perform defined operations on training data.
*/
val pipeline = new Pipeline().setStages(Array(labelIndexer,vectorAssembler,rfClassifier,labelConverter))
val model = pipeline.fit(trainingData)

/**
*Step 8
*Perform predictions about testing data. This transform method will return a result DataFrame
*with new prediction column appended towards previous DataFrame.
*
* */
val predictionResultDF = model.transform(testData)

/**
* Step 9
* Select features,label,and predicted label from the DataFrame to display.
* We only show 20 rows, it is just for reference.
*/
predictionResultDF.select("f0","f1","f2","f3","label","predictedLabel").show(20)

/**
* Step 10
* The evaluator code is used to compute the prediction accuracy, this is
* usually a valuable feature to estimate prediction accuracy the trained model.
*/
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("precision")
val predictionAccuracy = evaluator.evaluate(predictionResultDF)
println("Testing Error = " + (1.0 - predictionAccuracy))
/**
* Step 11(Optional)
* You can choose to print or save the the model structure.
*/
val randomForestModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
println("Trained Random Forest Model is:\n" + randomForestModel.toDebugString)
}
}

返回列表