在实际应用场景中,我们对于Spark往往有各式各样的需求,比如说想MR中的二次排序,Top N,多路劲输出等。那么这篇文章我们就来看下这几个问题。

二次排序

假设我们的数据是这样的:

1
2
3
4
5
6
7
8
1   2
1 3
1 1
1 6
1 4
2 5
2 8
2 3

我们想要实现第一列按降序排列,当第一列相同时,第二列按降序排列

定义一个SecondSortKey类:

1
2
3
4
5
6
7
8
9
10
class SecondSortKey(val first: Int, val second: Int)
extends Ordered[SecondSortKey] with Serializable {
override def compare(that: SecondSortKey): Int = {
if (this.first - that.first == 0) {
this.second - that.second
} else {
this.first - that.first
}
}
}

然后这样去使用

1
2
3
4
5
6
7
val lines = sc.textFile("test.txt")
val pairs = lines.map { x =>
(new SecondSortKey(x.split("\\s+")(0).toInt,
x.split("\\s+")(1).toInt), x)
}
val sortedPairs = pairs.sortByKey(false);
sortedPairs.map(_._2).foreach(println)

当然这里如果想按第一列升序,当第一列相同时,第二列升序的顺序排列,只需要对SecondSoryKey做如下修改即可

1
2
3
4
5
6
7
8
9
10
class SecondSortKey(val first: Int, val second: Int)
extends Ordered[SecondSortKey] with Serializable {
override def compare(that: SecondSortKey): Int = {
if (this.first - that.first !== 0) {
this.second - that.second
} else {
this.first - that.first
}
}
}

当时使用的使用去掉

1
pairs.sortByKey(false)

中的false

Top N

同样还是上边的数据,假设我们要得到第一列中的前五位

1
2
3
4
5
6
val lines = sc.textFile("test.txt")
val rdd = lines
.map(x => x.split("\\s+"))
.map(x => (x(0),x(1)))
.sortByKey()
rdd.take(N).foreach(println)

多路径输出

自己在使用的过程中,通过搜索发现了两种方法
1:调用saveAsHadoopFile函数并自定义一个OutputFormat类

自定义RDDMultipleTextOutputFormat类

RDDMultipleTextOutputFormat类中的generateFileNameForKeyValue函数有三个参数,key和value就是我们RDD的Key和Value,而name参数是每个Reduce的编号。本例中没有使用该参数,而是直接将同一个Key的数据输出到同一个文件中。

1
2
3
4
5
6
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat  

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}

调用

1
2
3
4
sc.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt")))  
.map(value => (value._1, value._2 + "Test"))
.partitionBy(new HashPartitioner(3))
.saveAsHadoopFile("/iteblog", classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat])

这里的

1
new HashPartitioner(3)

中的3是有key的种类决定的,当然在实际应用场景中,我们可能并不知道有多少k,这个时候就可以通过一个rdd 的 distinct操作来得到唯一key的数目。

2:使用dataframe

1
2
3
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2,"charlie")])
people_df = people_rdd.toDF(["number", "name"])
people_df.write.partitionBy("number").format("text").save(path )

当然这两种方法都有一个缺陷,就是当数据量特别大的时候,数据在repartition的过程中特别耗费资源,也会容易出现任务failed的情况,小编采用的解决办法是,适当的对原rdd进行split,然后遍历每个rdd,进行multioutput操作

形似如下:

1
2
3
4
5
6
val rdd = sc.textFile(input)
var split_rdd = rdd.randomSplit(Array(1.0,1.0,1.0,1.0))
for (one <- Array(1,2,3,4))
{
split_rdd(one)XXXX
}


参考:



打开微信扫一扫,关注微信公众号【搜索与推荐Wiki】