【Spark排序算法系列】主要介绍的是目前推荐系统或者广告点击方面用的比较广的几种算法,和他们在Spark中的应用实现,本篇文章主要介绍LR算法。

本系列还包括(持续更新):

背景

逻辑回归(Logistic Regression,LR)是较早应用在推荐排序上的,其属于线性模型,模型简单,可以引入海量离散特征,这样的好处就是模型可以考虑更加细节或者说针对具体个体的因素。如果想要引入非线性因素需要做特征交叉,这样很容易产生百亿特征,在很早之前ctr就主要靠堆人力搞特征工程工作来持续优化效果。

虽然目前在工业界LR应用的并不多,但是对于初学者,一些中小企业或者应用场景不需要负责排序模型的时候,LR扔不失为一个不错的选择。

关于LR的算法原理,这里不做过多说明,可参考:

LR介绍

LR的数学表达式可以简写为:

对于二分类模型,LR是一个分类算法,模型计算得到预测值后会通过以下函数进转化。

如果L(w,x,y) > 0.5 则是1 否则为0。当然在实际应用过程中,并不是一定取0.5作为界限值,而是根据实际情况进行调整。

二进制回归可以转化为多分类回归问题。关于多分类介绍和基于Spark实现多分类可参考多分类实现方式介绍和在Spark上实现多分类逻辑回归(Multinomial Logistic Regression)

在Spark.mllib包中提供了两种LR分类模型,分别是:

  • mini-batch gradient descent(LogisticRegressionWithLBFGS)
  • L-BFGS(LogisticRegressionWithSGD)

但官方给出的建议是:推荐使用LBFGS,因为基于LBFGS的LR比基于SGD的能更快的收敛。其原话如下:

We implemented two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. We recommend L-BFGS over mini-batch gradient descent for faster convergence.

而且LRWithLBFGS不仅支持二分类还支持多分类,但LRWithSGD只支持二分类。所以后续只介绍下Spark mllib中的LogisticRegressionWithLBFGS相关操作。

mllib中的LRWithLBFGS

设置变量和创建spark对象

1
2
3
4
5
6
7
8
9
val file = "data/sample_libsvm_data.txt"
val model_path = "model/lr/"
val model_param = "numInterations:5,regParam:0.1,updater:SquaredL2Updater,gradient:LogisticGradient"

val spark = SparkSession.builder()
.master("local[5]")
.appName("LogisticRegression_Model_Train")
.getOrCreate()
Logger.getRootLogger.setLevel(Level.WARN)

拆分数据集

1
2
3
// 记载数据集 并拆分成训练集和测试集
val data = MLUtils.loadLibSVMFile(spark.sparkContext,file).randomSplit(Array(0.7,0.3))
val (train, test) = (data(0), data(1))

LRWithLBFGS模型设置参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 定义分类的数目,默认为2,是logisticregression的参数
private var numClass: Int = 2
// 定义是否添加截距,默认值为false,是logisticregression的参数
private var isAddIntercept: Option[Boolean] = None
// 定义是否在训练模型前进行验证,是logisticregression的参数
private var isValidateData: Option[Boolean] = None

// 定义迭代的次数,默认值是100,LBFGS的参数
private var numInterations: Option[Int] = None
// 定义正则化系数值,默认值是0.0,LBFGS的参数
private var regParam: Option[Double] = None
// 定义正则化参数,支持:L1Updater[L1]、SquaredL2Updater[L2]、SimpleUpdater[没有正则项],LBFGS的参数
private var updater: Option[String] = None
// 定义计算梯度的方式,支持:LogisticGradient、LeastSquaresGradient、HingeGradient ,LBFGS的参数
private var gradient: Option[String] = None
// 人工定义的收敛阈值
private var threshold:Option[Double]=None
// 定义模型收敛阈值,默认为 10^-6
private var convergenceTol: Double= 1.0e-6

创建模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def createLRModel(model_param: String): LogisticRegressionWithLBFGS={
// 设置模型参数
val optimizer = new LROptimizer()
optimizer.parseString(model_param)
println(s"模型训练参数为:${optimizer.toString}")

// 创建模型并指定相关参数
val LRModel = new LogisticRegressionWithLBFGS()
// 设置分类数目
LRModel.setNumClasses(optimizer.getNumClass)
// 设置是否添加截距
if(optimizer.getIsAddIntercept.nonEmpty) {LRModel.setIntercept(optimizer.getIsAddIntercept.get)}
// 设置是否进行验证模型
if(optimizer.getIsValidateData.nonEmpty){LRModel.setValidateData(optimizer.getIsValidateData.get)}
// 设置迭代次数
if(optimizer.getNumInterations.nonEmpty){LRModel.optimizer.setNumIterations((optimizer.getNumInterations.get))}
// 设置正则项参数
if(optimizer.getRegParam.nonEmpty) { LRModel.optimizer.setRegParam(optimizer.getRegParam.get) }
// 设置正则化参数
if(optimizer.getUpdater.nonEmpty){
optimizer.getUpdater match {
case Some("L1Updater") => LRModel.optimizer.setUpdater( new L1Updater())
case Some("SquaredL2Updater") => LRModel.optimizer.setUpdater(new SquaredL2Updater())
case Some("SimpleUpdater") => LRModel.optimizer.setUpdater(new SimpleUpdater())
case _ => LRModel.optimizer.setUpdater(new SquaredL2Updater())
}
}
// 设置梯度计算方式
if(optimizer.getGradient.nonEmpty){
optimizer.getGradient match {
case Some("LogisticGradient") => LRModel.optimizer.setGradient(new LogisticGradient())
case Some("LeastSquaresGradient") => LRModel.optimizer.setGradient(new LeastSquaresGradient())
case Some("HingeGradient") => LRModel.optimizer.setGradient(new HingeGradient())
case _ => LRModel.optimizer.setGradient(new LogisticGradient())
}
}
// 设置收敛阈值
if(optimizer.getThreshold.nonEmpty){ LRModel.optimizer.setConvergenceTol(optimizer.getThreshold.get)}
else {LRModel.optimizer.setConvergenceTol(optimizer.getConvergenceTol)}

LRModel
}

模型效果评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def evaluteResult(result: RDD[(Double,Double,Double)]) :Unit = {
// MSE
val testMSE = result.map{ case(real, pre, _) => math.pow((real - pre), 2)}.mean()
println(s"Test Mean Squared Error = $testMSE")
// AUC
val metrics = new BinaryClassificationMetrics(result.map(x => (x._2,x._1)).sortByKey(ascending = true),numBins = 2)
println(s"0-1 label AUC is = ${metrics.areaUnderROC}")
val metrics1 = new BinaryClassificationMetrics(result.map(x => (x._3,x._1)).sortByKey(ascending = true),numBins = 2)
println(s"score-label AUC is = ${metrics1.areaUnderROC}")
// 错误率
val error = result.filter(x => x._1!=x._2).count().toDouble / result.count()
println(s"error is = $error")
// 准确率
val accuracy = result.filter(x => x._1==x._2).count().toDouble / result.count()
println(s"accuracy is = $accuracy")
}

保存模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def saveModel(model: LogisticRegressionModel, model_path: String): Unit = {
// 保存模型文件 obj
val out_obj = new ObjectOutputStream(new FileOutputStream(model_path+"model.obj"))
out_obj.writeObject(model)

// 保存模型信息
val model_info=new BufferedWriter(new FileWriter(model_path+"model_info.txt"))
model_info.write(model.toString())
model_info.flush()
model_info.close()

// 保存模型权重
val model_weights=new BufferedWriter(new FileWriter(model_path+"model_weights.txt"))
model_weights.write(model.weights.toString)
model_weights.flush()
model_weights.close()

println(s"模型信息写入文件完成,路径为:$model_path")
}

加载模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def loadModel(model_path: String): Option[LogisticRegressionModel] = {
try{
val in = new ObjectInputStream( new FileInputStream(model_path) )
val model = Option( in.readObject().asInstanceOf[LogisticRegressionModel] )
in.close()
println("Model Load Success")
model
}
catch {
case ex: ClassNotFoundException => {
println(ex.printStackTrace())
None
}
case ex: IOException => {
println(ex.printStackTrace())
println(ex)
None
}
case _: Throwable => throw new Exception
}
}

使用加载的模型进行分值计算

1
2
3
4
5
6
7
8
9
10
11
// 加载obj文件进行预测
val model_new = loadModel(s"$model_path/model.obj")
// 使用加载的模型进行样例预测
val result_new = test.map(line =>{
val pre_label = model_new.get.predict(line.features)
// blas.ddot(x.length, x,1,y,1) (向量x的长度,向量x,向量x的索引递增间隔,向量y,向量y的索引递增间隔)
val pre_score = blas.ddot(model.numFeatures, line.features.toArray, 1, model.weights.toArray, 1)
val score = Math.pow(1+Math.pow(Math.E, -2 * pre_score), -1)
(line.label, pre_label,score)
} )
result_new.take(2).foreach(println)

ml中的二分类LR

ml包中的LR既可以用来做二分类,也可以用来做多分类。

  • 二分类对应:Binomial logistic regression
  • 多分类对应:multinomial logistic regression

其中二分类可以通过Binomial logistic regression 和 multinomial logistic regression实现。

基于Binomial logistic regression的LR实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def BinaryModel(train: Dataset[Row], model_path: String, spark: SparkSession) = {
// 创建模型
val LRModel = new LogisticRegression()
.setMaxIter(20)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// 训练评估模型
val model = LRModel.fit(train)
evalute(model, train, spark)
}

def evalute(model: LogisticRegressionModel, train: Dataset[Row], spark: SparkSession):Unit = {
// 打印模型参数
println(s"模型参数信息如下:\n ${model.parent.explainParams()} \n")
println(s"Coefficients(系数): ${model.coefficients}")
println(s"Intercept(截距): ${model.intercept}")
// 查看训练集的预测结果 rawPrediction:row 计算的分值,probability:经过sigmoid转换后的概率
val result = model.evaluate(train)
result.predictions.show(10)
// 将 label,0 值概率,predict label提取出来
result.predictions.select("label","probability","prediction").rdd
.map(row => (row.getDouble(0),row.get(1).asInstanceOf[DenseVector].toArray(0),row.getDouble(2)))
.take(10).foreach(println)
// 模型评估
val trainSummary = model.summary
val objectiveHistory = trainSummary.objectiveHistory
println("objectiveHistoryLoss:")
objectiveHistory.foreach(loss => println(loss))

val binarySummary = trainSummary.asInstanceOf[BinaryLogisticRegressionSummary]

val roc = binarySummary.roc
roc.show()
println(s"areaUnderROC: ${binarySummary.areaUnderROC}")

// Set the model threshold to maximize F-Measure
val fMeasure = binarySummary.fMeasureByThreshold
fMeasure.show(10)
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
import spark.implicits ._
val bestThreshold = fMeasure.where($"F-Measure"===maxFMeasure).select("threshold").head().getDouble(0)
model.setThreshold(bestThreshold)
}

基于Multimial logistic regression的LR实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def BinaryModelWithMulti(train: Dataset[Row], model_path: String, spark: SparkSession) = {
// 创建模型
val LRModel = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setFamily("multinomial")
// 训练模型
val model = LRModel.fit(train)
// 打印模型参数
println(s"模型参数信息如下:\n ${model.parent.explainParams()} \n")
println(s"Coefficients(系数): ${model.coefficientMatrix}")
println(s"Intercept(截距): ${model.interceptVector}")
}

ml中的多分类LR

某条样本属于类别k的概率计算为:

其中K表示类别,J表示特征个数

权重最小化使用的是最大似然函数,其更新公式如下:

使用的数据集形式为:

1
2
3
4
5
6
7
8
1 1:-0.222222 2:0.5 3:-0.762712 4:-0.833333
1 1:-0.555556 2:0.25 3:-0.864407 4:-0.916667
1 1:-0.722222 2:-0.166667 3:-0.864407 4:-0.833333
1 1:-0.722222 2:0.166667 3:-0.694915 4:-0.916667
0 1:0.166667 2:-0.416667 3:0.457627 4:0.5
1 1:-0.833333 3:-0.864407 4:-0.916667
2 1:-1.32455e-07 2:-0.166667 3:0.220339 4:0.0833333
2 1:-1.32455e-07 2:-0.333333 3:0.0169491 4:-4.03573e-08

多分类LR模型实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def MultiModel(file_multi: String, spark: SparkSession, model_path: String): Unit = {
val training = spark.read.format("libsvm").load(file_multi)
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for multinomial logistic regression
println(s"Coefficients: \n${lrModel.coefficientMatrix}")
println(s"Intercepts: ${lrModel.interceptVector}")
}


参考资料

https://spark.apache.org/docs/2.1.0/mllib-linear-methods.html#classification

https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#logistic-regression

https://blog.csdn.net/pupilxmk/article/details/80735599


打开微信扫一扫,关注微信公众号【搜索与推荐Wiki】