在PySpark上运行常见的机器学习分类算法

一、机器学习分类算法

0x01:逻辑回归

逻辑回归是一个概率模型,也就是说该模型的预测结果的值域为[0,1]。对于二分类来说,逻辑回归的输出等价于模型预测某个数据点属于正类的概率估计。逻辑回归是线性分类模型中使用最广泛的一个。上面提到过,逻辑回归使用的连接函数为逻辑连接:


逻辑回归的损失函数是逻辑损失:

其中y是实际的输出值(正类为1,负类为-1)。

0x02:线性支持向量机

SVM在回归和分类方面是一个强大且流行的技术。和逻辑回归不同, SVM并不是概率模型,但是可以基于模型对正负的估计预测类别。SVM的连接函数是一个对等连接函数,因此预测的输出表示为:

因此,当 w x T 的估计值大于等于阈值0时, SVM对数据点标记为1,否则标记为0(其中阈值是SVM可以自适应的模型参数)。SVM的损失函数被称为合页损失,定义为:

SVM是一个最大间隔分类器,它试图训练一个使得类别尽可能分开的权重向量。在很多分类任务中, SVM不仅表现得性能突出,而且对大数据集的扩展是线性变化的。

0x03:朴素贝叶斯

朴素贝叶斯是一个概率模型,通过计算给定数据点在某个类别的概率来进行预测。朴素贝叶斯模型假定每个特征分配到某个类别的概率是独立分布的(假定各个特征之间条件独立)。基于这个假设,属于某个类别的概率表示为若干概率乘积的函数,其中这些概率包括某个特征在给定某个类别的条件下出现的概率(条件概率),以及该类别的概率(先验概率)。这样使得模型训练非常直接且易于处理。类别的先验概率和特征的条件概率可以通过数据的频率估计得到。分类过程就是在给定特征和类别概率的情况下选择最可能的类别。另外还有一个关于特征分布的假设,即参数的估计来自数据。 MLlib实现了多项朴素贝叶斯(multinomial naïve Bayes),其中假设特征分布是多项分布,用以表示特征的非负频率统计。

0x04:决策树

决策树是一个强大的非概率模型,它可以表达复杂的非线性模式和特征相互关系。决策树在很多任务上表现出的性能很好,相对容易理解和解释,可以处理类属或者数值特征,同时不要求输入数据归一化或者标准化。决策树非常适合应用集成方法(ensemble method),比如多个决策树的集成,称为决策树森林。 决策树算法是一种自上而下始于根节点(或特征)的方法,在每一个步骤中通过评估特征分裂的信息增益,最后选出分割数据集最优的特征。

二、代码实现

       这几种模型在pyspark上的实现其实并不难,只要调用相应的函数即可,这里我以逻辑回归为例,把其他的代码和解析都写到注释里,在使用的时候去掉和添加相应注释即可。

#coding:utf-8
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF,IDF,StandardScaler
from pyspark.mllib.classification import LogisticRegressionWithSGD,SVMWithSGD,NaiveBayes
from pyspark.mllib.tree import DecisionTree


def split2(line):
    '''自定义的字符串分割函数,本例已步长3分割字符串'''
    step = 3
    result = []
    length = len(line)
    for i in xrange(0,length,step):
        result.append(line[i:i+step])
    return result

def check(test,model):
    '''模型检测函数,输出模型的正确率、准确率和召回率,本例省略'''
    

if __name__=="__main__":
    sc = SparkContext(appName="test")
    #分别读取正文件和负文件的训练集,然后读取测试集
    spam = sc.textFile("hdfs://ubuntu:9000/xxx/bad.txt")
    normal = sc.textFile("hdfs://ubuntu:9000/xxx/good.txt")
    test = sc.textFile("hdfs://ubuntu:9000/xxx/test.txt")
    # 创建一个HashingTF实例来把文本映射为包含10000个特征的向量
    tf = HashingTF(numFeatures = 10000)
    # 各http请求都被切分为单词,每个单词被映射为一个特征
    spamFeatures = spam.map(lambda line: tf.transform(split2(line)))
    normalFeatures = normal.map(lambda line: tf.transform(split2(line)))

    # =========使用词频统计构建向量=========
    # positiveExamples = spamFeatures.map(lambda features: LabeledPoint(0, features))
    # negativeExamples = normalFeatures.map(lambda features: LabeledPoint(1, features))
    # trainingData = positiveExamples.union(negativeExamples)
    # trainingData.cache() # 因为逻辑回归是迭代算法,所以缓存训练数据RDD
    #print trainingData.take(1)
    
    # =========使用TF-IDF构建向量=========
    spamFeatures.cache() # 因为逻辑回归是迭代算法,所以缓存训练数据RDD
    idf = IDF()
    idfModel = idf.fit(spamFeatures)
    spamVectors = idfModel.transform(spamFeatures)
    normalFeatures.cache() # 因为逻辑回归是迭代算法,所以缓存训练数据RDD
    idfModel = idf.fit(normalFeatures)
    normalVectors = idfModel.transform(normalFeatures)
    positiveExamples = normalVectors.map(lambda features: LabeledPoint(1, features))
    negativeExamples = spamVectors.map(lambda features: LabeledPoint(0, features))
    dataAll = positiveExamples.union(negativeExamples)

    # =========特征向量压缩=========
    # scaler = StandardScaler(withMean=True, withStd=True)
    # normalScaler = scaler.fit(normalVectors)
    # normalResult = normalScaler.transform(normalVectors)
    # spamScaler = scaler.fit(spamVectors)
    # spamResult = spamScaler.transform(spamVectors)
    
    # 使用分类算法进行训练,iterations位迭代次数,step为迭代步长
    # LogisticRegressionWithSGD可以替换为SVMWithSGD和NaiveBayes
    # 其中train函数的参数可以根据模型效果自定义
    model = LogisticRegressionWithSGD.train(data=dataAll,iterations=10000,step=1) 
    # 决策树的分类类别为2,映射表为空,不纯净度测量为gini,树的深度为5,数据箱子为32
    # model = DecisionTree.trainClassifier(dataAll, numClasses=2, categoricalFeaturesInfo={},
 #                                     impurity='gini', maxDepth=5, maxBins=32) 
    check(test,model)
    sc.stop()

发表评论

电子邮件地址不会被公开。 必填项已用*标注