I am new to spark, and try to write some example code base on spark and spark streaming.
So far, I have implemented sorting function in spark, here is the code:
def sort(listSize: Int, slice: Int): Unit = {
val conf = new SparkConf().setAppName(getClass.getName)
val spark = new SparkContext(conf)
//val stopWatch = new StopWatch()
val step1 = System.currentTimeMillis()
val data = genRandom(listSize)
//val data = List(2, 5, 7, 45, 1, 89, 59, 0, 34, 43, 22)
//for (i <- 0 until data.length) println(">>>>>>>>> data " + i + " | " + data(i))
val step2 = System.currentTimeMillis()
println(">>>>>>>>>> genRandom : " + (step2 - step1))
val distData = spark.parallelize(data, slice)
val step3 = System.currentTimeMillis()
println(">>>>>>>>>> parallelize : " + (step3 - step2))
//distData.foreachPartition(x => println("++++++++++ : " + x.reduce(_ + _)))
val result = distData.sortBy(x => x, true)
val step4 = System.currentTimeMillis()
println(">>>>>>>>>> sortBy : " + (step4 - step3))
//val finalResult = result.collect()
val finalResult = result.takeOrdered(10000)
val step5 = System.currentTimeMillis()
println(">>>>>>>>>> collect : " + (step5 - step4))
println(">>>>>>>>>> total time : " + (step5 - step1))
printlnArray(finalResult, 0, 10)
spark.stop()
}
I have a trouble on implementing sort function on spark streaming. As I know, spark RDD provide sort API in spark core, but there is not such API in spark streaming, Do anyone know how to do it ? Thanks