Spark对于统计量中的最大值,最小值,平均值和方差(均值)的计算都提供了封装,这里小编知道两种计算方法,整理一下分享给大家

DataFrame形式

加载Json数据源

example.json文件格式如下

1
2
3
{"name":"thinkgamer","age":23,"math":78,"chinese":78,"english":95}
{"name":"think","age":25,"math":95,"chinese":88,"english":93}
{"name":"gamer","age":24,"math":93,"chinese":68,"english":88}

1
2
3
4
// persist(StorageLevel.MEMORY_AND_DISK) 当内存不够时cache到磁盘里
val df = spark.read.json("/path/to/example.json").persist(StorageLevel.MEMORY_AND_DISK)
df.show()
df.describe()

我们便可以看到如下的形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+---+-------+-------+----+----------+
|age|chinese|english|math| name|
+---+-------+-------+----+----------+
| 23| 78| 95| 78|thinkgamer|
| 25| 88| 93| 95| think|
| 24| 68| 88| 93| gamer|
+---+-------+-------+----+----------+

+-------+----+-------+-----------------+-----------------+----------+
|summary| age|chinese| english| math| name|
+-------+----+-------+-----------------+-----------------+----------+
| count| 3| 3| 3| 3| 3|
| mean|24.0| 78.0| 92.0|88.66666666666667| null|
| stddev| 1.0| 10.0|3.605551275463989| 9.29157324317757| null|
| min| 23| 68| 88| 78| gamer|
| max| 25| 88| 95| 95|thinkgamer|
+-------+----+-------+-----------------+-----------------+----------+

如果是想看某列的通知值的话,可以用下面的方式

1
df.select("age").describe().show()

1
2
3
4
5
6
7
8
9
+-------+----+
|summary| age|
+-------+----+
| count| 3|
| mean|24.0|
| stddev| 1.0|
| min| 23|
| max| 25|
+-------+----+

RDD形式

假设同样还是上边的数据,只不过现在变成按\t分割的普通文本

1
2
3
thinkgamer  23  78  78  95
think 25 95 88 93
gamer 24 93 68 88

这里可以将rdd转换成dataframe洗形式,也可以使用rdd计算,转化为df的样例如下

1
2
3
4
val new_data = data_txt
.map(_.split("\\s+"))
.map(one => Person(one(0),one(1).toInt,one(2).toDouble,one(3).toDouble,one(4).toDouble))
.toDF()

接下来就是进行和上边df一样的操作了。

那么对于rdd形式的文件如何操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

val data_txt = SparkSC.spark.sparkContext.textFile(input_txt).persist(StorageLevel.MEMORY_AND_DISK)
val new_data = data_txt
.map(_.split("\\s+"))
.map(one => Vectors.dense(one(1).toInt,one(2).toDouble,one(3).toDouble,one(4).toDouble))
val summary: MultivariateStatisticalSummary = Statistics.colStats(new_data)

println("Max:"+summary.max)
println("Min:"+summary.min)
println("Count:"+summary.count)
println("Variance:"+summary.variance)
println("Mean:"+summary.mean)
println("NormL1:"+summary.normL1)
println("Norml2:"+summary.normL2)

输出结果为:

1
2
3
4
5
6
7
Max:[25.0,95.0,88.0,95.0]
Min:[23.0,78.0,68.0,88.0]
Count:3
Variance:[1.0,86.33333333333331,100.0,13.0]
Mean:[24.0,88.66666666666667,78.0,92.0]
NormL1:[72.0,266.0,234.0,276.0]
Norml2:[41.593268686170845,154.1363033162532,135.83813897429545,159.43023552638942]

这里可以得到相关的统计信息,主要区别在于dataframe得到的是标准差,而使用mllib得到的统计值中是方差,但这并不矛盾,两者可以相互转化得到。

当然如果要求四分位数,可以转化成df,使用sql语句进行查询

1
Select PERCENTILE(col,<0.25,0.75>) from tableName;

自己实现

下面是我自己实现的一个方法,传入的参数是一个rdd,返回的是一个字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 计算最大值,最小值,平均值,方差,标准差,四分位数
def getStat(data: RDD[String]):String= {
val sort_data = data
.filter(one => Verify.istoDouble(one))
.map(_.toDouble)
.sortBy(line=>line)
.persist(StorageLevel.MEMORY_AND_DISK) // 默认是true 升序,false为降序

val data_list = sort_data.collect()
val len = data_list.length
val min = data_list(0)
val max = data_list(len-1)
val mean = sort_data.reduce((a,b) => a+b) / len
val variance = sort_data.map(one => math.pow(one-mean,2)).reduce((a,b)=>a+b)/len
val stdder = math.sqrt(variance)
var quant = ""
if(len<4){
val q1 = min
val q2 = min
val q3 = max
quant = q1+"\t"+q2+"\t"+q3
}else {
val q1 = data_list((len * 0.25).toInt - 1)
val q2 = data_list((len * 0.5).toInt - 1)
val q3 = data_list((len * 0.75).toInt - 1)
quant = q1+"\t"+q2+"\t"+q3
}
max+"\t"+min+"\t"+mean+"\t"+variance+"\t"+stdder+"\t"+quant
}

本地碰见的一个错误

1:错误1

1
scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/Array

原因是Spark中spark-sql_2.11-2.2.1 ,是用scala 2.11版本上编译的,而我的本地的scala版本为2.12.4,所以就错了,可以在
里边把相应的scala版本就行修改就行了

2:错误2

1
java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V

原因也是因为我下载安装的scala2.12版本,换成scala2.11版本就可以了



打开微信扫一扫,关注公众号【搜索与推荐Wiki】