微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

【Spark】Spark的机器学习算法库——Spark MLilb

文章目录

1 导入

1.1 基本概念

MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。具体来说,其主要包括以下几方面的内容

  1. 算法工具:常用的学习算法,如分类、回归、聚类和协同过滤;
  2. 特征化工具:特征提取、转化、降维,和选择的工具;
  3. 管道(Pipeline):用于构建、评估和调整机器学习管道的工具;
  4. 持久性:保存和加载算法,模型和管道;
  5. 实用工具:线性代数,统计,数据处理等工具。

1.2 spark.mlib和spark.ml

Spark机器学习库从1.2版本以后被分为两个包:

  • spark.mllib 包含基于RDD的原始算法API。Spark MLlib历史比较长,在1.0以前的版本即已经包含了,提供的算法实现都是基于原始的RDD。
  • spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline弥补了原始MLlib库的不足,向用户提供了一个基于DataFrame的机器学习工作流式API套件。

2 机器学习工作流(ML Pipelines)

2.1 基本概念

1、DataFrame:使用Spark sql中的DataFrame作为数据集,它可以容纳各种数据类型。 较之RDD,包含了schema信息,更类似传统数据库中的二维表格。它被ML Pipeline用来存储源数据。例如,DataFrame中的列可以是存储的文本,特征向量,真实标签和预测的标签等。

2、Transformer:翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个Transformer。它可以把一个不包含预测标签的测试数据集DataFrame打上标签,转化成另一个包含预测标签的DataFrame。技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame。

3、Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在Pipeline里通常是被用来操作DataFrame数据,并生产一个Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。如一个随机森林算法就是一个Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。

4、Parameter:Parameter被用来设置Transformer或者Estimator的参数。现在所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。

5、PipeLine:翻译为工作流或者管道。工作流将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出

2.2 工作流的构建

下面以逻辑斯蒂回归为例,构建一个典型的机器学习过程,来具体介绍一下工作流是如何应用的。

我们的目的是查找出所有包含“spark”的句子,即将包含“spark”的句子的标签设为1,没有“spark”的句子的标签设为0。

Spark2.0以上版本的spark-shell在启动时会自动创建一个名为spark的SparkSession对象,当需要手工创建时,SparkSession可以由其伴生对象的builder()方法创建出来,如下代码段所示:

构建SparkSession对象

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().
            master("local").
            appName("my App Name").
            getorCreate() 

引入要包含的包

import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline,PipelineModel}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

构建训练数据集

val training = spark.createDataFrame(Seq(
            (0L, "a b c d e spark", 1.0),
            (1L, "b d", 0.0),
            (2L, "spark f g h", 1.0),
            (3L, "hadoop mapreduce", 0.0)
          )).toDF("id", "text", "label")

定义Pipeline中的各个工作流阶段Pipelinestage

包括转换器和评估器,具体的包含tokenizer, hashingTF和lr三个步骤

val tokenizer = new Tokenizer().
            setInputCol("text").
            setoutputCol("words")
 
val hashingTF = new HashingTF().
            setNumFeatures(1000).
            setInputCol(tokenizer.getoutputCol).
            setoutputCol("features")
 
val lr = new LogisticRegression().
            setMaxIter(10).
            setRegParam(0.01)

创建一个Pipeline

val pipeline = new Pipeline().
            setStages(Array(tokenizer, hashingTF, lr))

现在构建的Pipeline本质上是一个Estimator,在它的fit()方法运行之后,它将产生一个PipelineModel,它是一个Transformer

val model = pipeline.fit(training)

构建测试数据

val test = spark.createDataFrame(Seq(
            (4L, "spark i j k"),
            (5L, "l m n"),
            (6L, "spark a"),
            (7L, "apache hadoop")
          )).toDF("id", "text")

预测

model.transform(test).
            select("id", "text", "probability", "prediction").
            collect().
            foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double)
              =>
              println(s"($id, $text) --> prob=$prob, prediction=$prediction")
            }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐