用大数据和机器学习进行安全防护

零、前言

机器学习和大数据早已经火的一塌糊涂,将它们应用到安全防护也是近几年的热点话题。本文旨在介绍几种机器学习算法在安全上的应用,尽量用通俗的语言解释机器学习是怎么回事,不说数学原理,只将具体应用。代码实现基于Spark,因为Spark是大数据计算引擎,有自带机器学习包(MLlib),可以满足大数据的计算要求。

一、环境搭建

0x00Spark安装

本文的实验环境是windows10,Spark的安装去官网下载即可,

http://spark.apache.org/downloads.html

解压到任意目录,然后将安装目录添加到环境变量,Spark主要由Java、Scale和Python实现,本文使用Python版本即PySpark。想要在Python中调用PySpark,还需将spark目录中的PySpark文件夹spark-2.1.0-bin-hadoop2.7\python\PySpark,复制到Python的安装目录Python27\Lib\site-packages中。如果安装成功你可以在cmd启动PySpark,并看到如下界面:



    在浏览器打开http://127.0.0.1:4040/jobs/,你可以看到Spark的Web-UI界面:

Linux下的安装或集群配置可以参考这两篇文章:

http://www.tuicool.com/articles/QBRJnen

http://blog.csdn.net/oopsoom/article/details/24257981

0x01:代码示例

现在就来测试一个Wordcount代码示例,

from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile('words.txt')
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x,y:x+y)
    output = counts.collect()
    for (word, count) in output:
        print "%s: %i" % (word, count)

    sc.stop()

运行结果:

bad: 2
spark: 3
mlib: 2
good: 2
hadoop: 1
cool: 2

想要理解这几行代码需要一点点Spark编程基础,这里我就不废话了,可能一时半会也说不清楚,建议去看《Spark快速大数据分析》,入门超棒的一本书。

二、机器学习

首先先来解释一下什么是机器学习,机器学习的宗旨是让机器学会人类识别事物的方法,我们希望机器能够学会人类的思维过程。这其中比较典型的例子就是图片识别,比如下面两张图片,我们一眼就能认出这是一只猫和一只二哈

那么问题来了,为什么人能在一张包含各种颜色的图片中识别出一只动物呢?可能你会说:因为我见过这些动物。没错,这就是经验,经验告诉我们图片中的一些特征组合表示里面有一只二哈,那么我么就认为这是一个包含二哈的图片。可是人类的视觉系统经过了数万年的进化才有了现在的能力,怎么将它传授给机器(代码)呢?于是牛逼的人类开始尝试这件事情,这就是机器学习。

无论是人类还是机器,识别的本质是分类,我们将某个物体成功地归为一类即是成功地识别。识别过程大体可以分为四个步骤:学习、提取特征、识别、分类,学习的本质也是对特征的学习。人类的学习很多时候是无意识的,比如当你第一次见到二哈,有人告诉你了它的品种,当你再见到一只狗的时候你大概就能区分一下这是不是二哈,次数多了变成了你的经验,这就是人类的不断的学习过程。但是机器没有意识,我们必须给机器灌输一些“意识”,告诉他该干什么,该怎么学习,这就是训练。

    机器学习主要分为三种:监督学习、半监督学习和无监督学习。那么什么是监督呢?举个栗子,你从未见过猫和狗,然后给你一组猫和狗的图片,图片的背面写着对应的动物的名称(猫或狗);当你看过数遍以后再给你一组图片,区分一下哪些是猫哪些是狗,这就是监督学习后的分类测试。但是如果你没有第一组带着标签的(Label)图片,你能否直接去识别第二组图片呢?我想也是可以的,你会根据它们的毛发、眼睛、鼻子等特征大体的分成两种不同的图片(虽然你不知道它们叫什么),这个时候我们一般称之为聚类。简单的讲,监督学习就是先用带标签的训练集进行训练后再对测试集分类,无监督学习是直接对测试集分类(聚类)。半监督学习则是两者的结合,通常不带标签的数据远大于带标签的数据。

    最后说一下特征向量,向量这个词最早可能是在数学里见到的,给人的感觉可能是深奥、高大上。在机器学习里我们可以把它看成是一个数组,维度既是数组的长度。放到算法里跑的一定是数值型的数据,比如文本分类,用以分类的向量一定是词频或单词数量等这类数据,而不是字符串。比如一行文本通过N-gram分词后建立的TF-IDF向量为:

三、URL分类

本节主要参考freebuf的一篇文章(见参考文献),以URL分类为例,尝试运行一个真正的机器学习算法。本示例使用监督学习算法–逻辑回归、支持向量机和朴素贝叶斯,通过学习两组已打标的训练集去预测一组已打标的测试集。测试集打标是为了将标签与预测结果比较,以计算预测的准确率。

机器学习算法的实现首先是要构造好的特征向量,在类似的分类算法中比较好的是首先通过N-Gram将文本数据向量化,比如对于下面的例子,如果N取3,步长为1,则:

http://www.abc.com

[htt,ttp,tp:,p:/,://,//w,/ww,www,ww.,w.a,.ab,abc,bc.,c.c,.co,com]

然后去计算TF-IDF,词频—逆文档频率(简称TF-IDF)是一种用来从文本文档(例如URL)中生成特征向量的简单方法。它为文档中的每个词计算两个统计值:一个是词频(TF),也就是每个词在文档中出现的次数,另一个是逆文档频率(IDF),用来衡量一个词在整个文档语料库中出现的(逆)频繁程度。这些值的积,也就是TF×IDF,展示了一个词与特定文档的相关程度(比如这个词在某文档中很常见,但在整个语料库中却很少见)。如上图所示。

N-Gram的分词方法乍一看好像没什么道理,因为一般的特征向量的构造是提取的特征关键词。比如如果我们将向量定义为script、select、union、eval等词出现的词频,那就很好理解,因为那些词都是恶意关键词,在正则匹配中一般也会拦截。但其实N-Gram也是一样的效果,一个特定的关键词会被切分成特定的序列,比如select被分成[sel,ele,lec,ect],而其他的正常的词一般不会出现这样的序列。

算法代码如下:

#-*-coding:utf-8-*-
#Author:Xman21
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF,IDF
from pyspark.mllib.classification import LogisticRegressionWithSGD,SVMWithSGD,NaiveBayes
import sys
reload(sys)
sys.setdefaultencoding('utf8')

def split2(line,distance,step):
    '''分词函数,按照一定的步长和距离分词,生成N-gram序列'''
    result = []
    length = len(line)
    for i in xrange(0,length,step):
        result.append(line[i:i+distance])
    return result

def check(test,model):
    '''训练检测函数'''
    countAll = 16447
    lines = test.collect()
    correctCount = 0
    for line in lines:
        temp = line[0:line.rfind(" ")]
        testExample = tf.transform(split2(temp,3,1))
        try:
            if int(line.split(" ")[-1]) == model.predict(testExample):
                correctCount = correctCount+1
        except:
            print line
    C = float(correctCount)/countAll
    print "正确率:"+str(C)

def TFIDF(badData,goodData,distance,step):
    '''IT-IDF函数,根据不同的分词方法生成TF-IDF向量'''
    badFeatures = badData.map(lambda line: tf.transform(split2(line,distance,step)))
    goodFeatures = goodData.map(lambda line: tf.transform(split2(line,distance,step)))
    badFeatures.cache()
    goodFeatures.cache()
    idf = IDF()
    idfModel = idf.fit(badFeatures)
    badVectors = idfModel.transform(badFeatures)
    idfModel = idf.fit(goodFeatures)
    goodVectors = idfModel.transform(goodFeatures)
    # 这里对训练集打标,bad标记为1,good标记为0
    badExamples = badVectors.map(lambda features: LabeledPoint(1, features))
    goodExamples = goodVectors.map(lambda features: LabeledPoint(0, features))
    dataAll = badExamples.union(goodExamples)
    return dataAll

if __name__=="__main__":
    sc = SparkContext(appName="webshellCheck")
    # bad.txt是恶意URL
    bad = sc.textFile("bad.txt")
    # good.txt是正常URL
    good = sc.textFile("good.txt")
    # test.txt是测试集
    test = sc.textFile("test.txt")

    # 创建一个HashingTF实例来把http文本映射为包含10000个特征的向量
    tf = HashingTF(numFeatures = 10000)

    # 这里有三种算法,使用某种算法去掉相应的注释即可
    # 生成Logistic算法数据
    dataLogistic = TFIDF(bad,good,3,1)
    # #生成SVMWithSGD算法数据
    # dataSVMWithSGD = TFIDF(bad,good,5,5)
    # #生成NaiveBayes算法数据
    # dataNaiveBayes = TFIDF(bad,good,6,6)
    
    # 使用分类算法进行训练,iterations位迭代次数,step为迭代步长
    modelLogistic = LogisticRegressionWithSGD.train(data=dataLogistic,iterations=10000,step=6) 
    print "LogisticRegressionWithSGD train success"
    # modelSVMWithSGD = SVMWithSGD.train(data=dataSVMWithSGD,iterations=100000,step=5) 
    # print "SVMWithSGD train success"
    # modelNaiveBayes = NaiveBayes.train(data=dataNaiveBayes,lambda_=0.1) 
    # print "NaiveBayes train success"
    check(test,modelLogistic)
    sc.stop()

运行结果:

LogisticRegressionWithSGD train success
正确率:0.990697391622

我们换一个只有10条数据的测试集test2.txt并修改下测试函数,输出一下测试效果

def check(test,model):
    '''训练检测函数'''
    countAll = 16447
    lines = test.collect()
    correctCount = 0
    for line in lines:
        temp = line[0:line.rfind(" ")]
        testExample = tf.transform(split2(temp,3,1))
        #try:
        if model.predict(testExample)==1:
            print line+" ====> "+"恶意URL"
        else:
            print line+" ====> "+"正常URL"
        # except:
        #     print line
    # C = float(correctCount)/countAll
    # print "正确率:"+str(C)

运行结果:


可以看到识别效果非常完美。

四、日志处理

这一节主要写一写Spark的威力,如何用Spark快速的数据统计并进行机器学习分类,以Apache默认日志为例(Apache详细日志格式大家可以百度一下),如下:

115.28.44.151 - - [28/Mar/2014:00:26:10 +0800] "GET /manager/html HTTP/1.1" 404 162 "-" "Mozilla/3.0 (compatible; Indy Library)"

第一步要对原始日志进行预处理,将其分割成一个列表。使用如下正则

log_Pattern = r'^(?P<remote_addr>.*?) - (?P<remote_user>.*) \[(?P<time_local>.*?)\] "(?P<request>.*?)" '\
        '(?P<status>.*?) (?P<body_bytes_sent>.*?) "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)"$'

    可以将日志分割为这样的列表

[u'115.28.44.151', u'-', u'28/03/2014:00:26:10', u'GET', u'/manager/html', u'404', u'162', u'-', u'Mozilla/3.0 (compatible; Indy Library)']

然后就可以很方便的运用RDD操作求出访问最多的前10个IP、前10个URL之类的信息,也可以根据日志中的request部分利用机器学算法进行日志分类,根据上文URL分类的代码,再封装两个类,全部代码如下:

#-*-coding:utf-8-*-
#Author:Xman21

from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF,IDF
from pyspark.mllib.classification import LogisticRegressionWithSGD,SVMWithSGD,NaiveBayes
import re
import time
import sys
reload(sys)
sys.setdefaultencoding('utf8')


class log_Apache():
    """Apache日志切分类,切分后格式为
    [ip,user,time,method,request,status,body_bytes,referer,user_agent]"""
    def __init__(self):
        pass
    def re_Split(self,line):
        month = {"Jan":"01","Feb":"02","Mar":"03","Apr":"04","May":"05","Jun":"06","Jul":"07","Aug":"08","Sep":"09"
            ,"Oct":"10","Nov":"11","Dec":"12"}
        line_Result = []
        log_Pattern = r'^(?P<remote_addr>.*?) - (?P<remote_user>.*) \[(?P<time_local>.*?)\] "(?P<request>.*?)" '\
        '(?P<status>.*?) (?P<body_bytes_sent>.*?) "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)"$'
        log_Pattern_Obj = re.compile(log_Pattern).search(line)
        try:
            remote_addr = log_Pattern_Obj.group('remote_addr')
        except:
            remote_addr = ''
        try:
            remote_user = log_Pattern_Obj.group('remote_user')
        except:
            remote_user = ''
        try:
            time_local = log_Pattern_Obj.group('time_local').split(" ")[0]
            for i in month:
                if i in time_local:
                    time_local = time_local.replace(i,month[i])
                    break
        except:
            time_local = ''
        
        try:
            temp = log_Pattern_Obj.group('request').split(" ")
            if len(temp)>1:
                request_mothed = temp[0]
                request_url = temp[1]
            elif len(temp) == 1:
                request_mothed = ''
                request_url = temp[0]
            else:
                request_mothed = ''
                request_url = ''
        except:
            request_mothed = ''
            request_url = ''
        try:
            status = log_Pattern_Obj.group('status')
        except:
            status = ''
        try:
            body_bytes_sent = log_Pattern_Obj.group('body_bytes_sent')
        except:
            body_bytes_sent = ''
        try:
            http_referer = log_Pattern_Obj.group('http_referer')
        except:
            http_referer = ''
        try:
            http_user_agent = log_Pattern_Obj.group('http_user_agent')
        except:
            http_user_agent = ''
        line_Result += [remote_addr,remote_user,time_local,request_mothed,request_url,
            status,body_bytes_sent,http_referer,http_user_agent]
        return line_Result

    def log_Split(self,sc,log_Lines):
        re_Split = self.re_Split
        log_Result = log_Lines.map(lambda line:re_Split(line))
        return log_Result

class Statistics():
    """统计类,获取日志的一些信息"""
    def __init__(self):
        pass
    def IP(self,dataRDD):
        """访问前10的IP统计"""
        temp = dataRDD.map(lambda line: (line[0],1)).reduceByKey(lambda x,y:x+y).map(lambda line:(line[1],line[0])).\
            sortByKey(False).map(lambda line:(line[1],line[0])).take(10)
        return temp
    def URL(self,dataRDD):
        """访问前10的页面统计"""
        temp = dataRDD.map(lambda line: (line[4],1)).reduceByKey(lambda x,y:x+y).map(lambda line:(line[1],line[0])).\
            sortByKey(False).map(lambda line:(line[1],line[0])).take(10)
        return temp

class model_Check(): 
    """机器学习算法类,用以日志分类"""
    def __init__(self):
        pass
    def split2(self,line,distance,step):
        '''分词函数,按照一定的步长和距离分词,生成N-gram序列'''
        result = []
        length = len(line)
        for i in xrange(0,length,step):
            result.append(line[i:i+distance])
        return result

    def check(self,tf,test,test_list,model):
        '''训练检测函数'''
        count = 0
        lines_test = test.collect()
        lines_test_list = test_list.collect()
        correctCount = 0
        for line1,line2 in zip(lines_test,lines_test_list):
            temp = line2[4]
            testExample = tf.transform(self.split2(temp,3,1))
            #try:
            if model.predict(testExample)==1:
                count += 1
                print line1+" ====> "+"恶意log"
        print count 

    def TFIDF(self,tf,badData,goodData,distance,step):
        '''IT-IDF函数,根据不同的分词方法生成TF-IDF向量'''
        split2 = self.split2
        badFeatures = badData.map(lambda line: tf.transform(split2(line,distance,step)))
        goodFeatures = goodData.map(lambda line: tf.transform(split2(line,distance,step)))
        badFeatures.cache()
        goodFeatures.cache()
        idf = IDF()
        idfModel = idf.fit(badFeatures)
        badVectors = idfModel.transform(badFeatures)
        idfModel = idf.fit(goodFeatures)
        goodVectors = idfModel.transform(goodFeatures)
        badExamples = badVectors.map(lambda features: LabeledPoint(1, features))
        goodExamples = goodVectors.map(lambda features: LabeledPoint(0, features))
        dataAll = badExamples.union(goodExamples)
        return dataAll

    def model_check(self,sc,bad,good,test,test_list):
        # 创建一个HashingTF实例来把http文本映射为包含10000个特征的向量
        tf = HashingTF(numFeatures = 10000)

        # 生成Logistic算法数据
        dataLogistic = self.TFIDF(tf,bad,good,3,1)
        # 使用分类算法进行训练,iterations位迭代次数,step为迭代步长
        modelLogistic = LogisticRegressionWithSGD.train(data=dataLogistic,iterations=10000,step=6) 
        print "LogisticRegressionWithSGD train success"
        self.check(tf,test,test_list,modelLogistic)
        sc.stop()

if __name__ == '__main__':
    sc = SparkContext(appName = "logCheck")
    bad = sc.textFile("bad.txt")
    good = sc.textFile("good.txt")
    test = sc.textFile("Apache.log")
    #日志预处理
    step1 = log_Apache()
    result1 = step1.log_Split(sc,test)
    step2 = Statistics()
    print "访问次数最高的前十个IP及其次数:"
    print step2.IP(result1)
    print "访问次数最高的前十个URL及其次数:"
    print step2.URL(result1)
    step3 = model_Check()
    step3.model_check(sc,bad,good,test,result1)

可以看到程序能将日志中访问最高的前十个IP和URL统计了出来

    也能将日志中的恶意记录识别出来

但同时也有较多的误报,这也是现在机器学习比较棘手的一个问题。

五、总结

说了这么多,那机器学习到底有什么好的呢?这不也存在误报么。误报这个问题我觉得任何入侵检测系统都会存在,但是个人认为机器学习的最大好处之一就是能够识别未知攻击。最常见的例子,当我们去尝试绕过一个WAF去进行SQL注入的时候,只需要找到那么一个字符串组合使其正则无法匹配就好了,比如将select变为sel/**/ect。人的肉眼能直接看出这是个变形的注入语句,机器学习同样具备这种能力,虽然没有见过新型的注入语句,但是在训练集里训练过类似的,那就能识别出来。

当然,机器学习是有弊端的,最大的一点就是速度慢,往往很难达到实时预警的效果。另外就是模型不好训练,因为训练集的构造是很困难的。

六、参考文献

http://www.cnblogs.com/shishanyuan/p/4699644.html

http://www.freebuf.com/articles/web/134334.html

http://www.freebuf.com/articles/web/126543.html

http://www.freebuf.com/articles/network/131279.html

http://blog.csdn.net/v_july_v/article/details/7624837


用大数据和机器学习进行安全防护》上有 1 条评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注