一、机器学习分类算法
0x01:逻辑回归
逻辑回归是一个概率模型,也就是说该模型的预测结果的值域为[0,1]。对于二分类来说,逻辑回归的输出等价于模型预测某个数据点属于正类的概率估计。逻辑回归是线性分类模型中使用最广泛的一个。上面提到过,逻辑回归使用的连接函数为逻辑连接:
逻辑回归的损失函数是逻辑损失:
其中y是实际的输出值(正类为1,负类为-1)。
0x02:线性支持向量机
SVM在回归和分类方面是一个强大且流行的技术。和逻辑回归不同, SVM并不是概率模型,但是可以基于模型对正负的估计预测类别。SVM的连接函数是一个对等连接函数,因此预测的输出表示为:
因此,当 w x T 的估计值大于等于阈值0时, SVM对数据点标记为1,否则标记为0(其中阈值是SVM可以自适应的模型参数)。SVM的损失函数被称为合页损失,定义为:
SVM是一个最大间隔分类器,它试图训练一个使得类别尽可能分开的权重向量。在很多分类任务中, SVM不仅表现得性能突出,而且对大数据集的扩展是线性变化的。
0x03:朴素贝叶斯
朴素贝叶斯是一个概率模型,通过计算给定数据点在某个类别的概率来进行预测。朴素贝叶斯模型假定每个特征分配到某个类别的概率是独立分布的(假定各个特征之间条件独立)。基于这个假设,属于某个类别的概率表示为若干概率乘积的函数,其中这些概率包括某个特征在给定某个类别的条件下出现的概率(条件概率),以及该类别的概率(先验概率)。这样使得模型训练非常直接且易于处理。类别的先验概率和特征的条件概率可以通过数据的频率估计得到。分类过程就是在给定特征和类别概率的情况下选择最可能的类别。另外还有一个关于特征分布的假设,即参数的估计来自数据。 MLlib实现了多项朴素贝叶斯(multinomial naïve Bayes),其中假设特征分布是多项分布,用以表示特征的非负频率统计。
0x04:决策树
决策树是一个强大的非概率模型,它可以表达复杂的非线性模式和特征相互关系。决策树在很多任务上表现出的性能很好,相对容易理解和解释,可以处理类属或者数值特征,同时不要求输入数据归一化或者标准化。决策树非常适合应用集成方法(ensemble method),比如多个决策树的集成,称为决策树森林。 决策树算法是一种自上而下始于根节点(或特征)的方法,在每一个步骤中通过评估特征分裂的信息增益,最后选出分割数据集最优的特征。
二、代码实现
这几种模型在pyspark上的实现其实并不难,只要调用相应的函数即可,这里我以逻辑回归为例,把其他的代码和解析都写到注释里,在使用的时候去掉和添加相应注释即可。
#coding:utf-8 from pyspark import SparkContext from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.feature import HashingTF,IDF,StandardScaler from pyspark.mllib.classification import LogisticRegressionWithSGD,SVMWithSGD,NaiveBayes from pyspark.mllib.tree import DecisionTree def split2(line): '''自定义的字符串分割函数,本例已步长3分割字符串''' step = 3 result = [] length = len(line) for i in xrange(0,length,step): result.append(line[i:i+step]) return result def check(test,model): '''模型检测函数,输出模型的正确率、准确率和召回率,本例省略''' if __name__=="__main__": sc = SparkContext(appName="test") #分别读取正文件和负文件的训练集,然后读取测试集 spam = sc.textFile("hdfs://ubuntu:9000/xxx/bad.txt") normal = sc.textFile("hdfs://ubuntu:9000/xxx/good.txt") test = sc.textFile("hdfs://ubuntu:9000/xxx/test.txt") # 创建一个HashingTF实例来把文本映射为包含10000个特征的向量 tf = HashingTF(numFeatures = 10000) # 各http请求都被切分为单词,每个单词被映射为一个特征 spamFeatures = spam.map(lambda line: tf.transform(split2(line))) normalFeatures = normal.map(lambda line: tf.transform(split2(line))) # =========使用词频统计构建向量========= # positiveExamples = spamFeatures.map(lambda features: LabeledPoint(0, features)) # negativeExamples = normalFeatures.map(lambda features: LabeledPoint(1, features)) # trainingData = positiveExamples.union(negativeExamples) # trainingData.cache() # 因为逻辑回归是迭代算法,所以缓存训练数据RDD #print trainingData.take(1) # =========使用TF-IDF构建向量========= spamFeatures.cache() # 因为逻辑回归是迭代算法,所以缓存训练数据RDD idf = IDF() idfModel = idf.fit(spamFeatures) spamVectors = idfModel.transform(spamFeatures) normalFeatures.cache() # 因为逻辑回归是迭代算法,所以缓存训练数据RDD idfModel = idf.fit(normalFeatures) normalVectors = idfModel.transform(normalFeatures) positiveExamples = normalVectors.map(lambda features: LabeledPoint(1, features)) negativeExamples = spamVectors.map(lambda features: LabeledPoint(0, features)) dataAll = positiveExamples.union(negativeExamples) # =========特征向量压缩========= # scaler = StandardScaler(withMean=True, withStd=True) # normalScaler = scaler.fit(normalVectors) # normalResult = normalScaler.transform(normalVectors) # spamScaler = scaler.fit(spamVectors) # spamResult = spamScaler.transform(spamVectors) # 使用分类算法进行训练,iterations位迭代次数,step为迭代步长 # LogisticRegressionWithSGD可以替换为SVMWithSGD和NaiveBayes # 其中train函数的参数可以根据模型效果自定义 model = LogisticRegressionWithSGD.train(data=dataAll,iterations=10000,step=1) # 决策树的分类类别为2,映射表为空,不纯净度测量为gini,树的深度为5,数据箱子为32 # model = DecisionTree.trainClassifier(dataAll, numClasses=2, categoricalFeaturesInfo={}, # impurity='gini', maxDepth=5, maxBins=32) check(test,model) sc.stop()