spark中的cache、persist、checkpoint都可以将RDD保存起来,进行持久化操作,供后面重用或者容错处理。但是三者有所不同。
RDD.cache() RDD.checkpoint()
;package com.zsz.spark.core.rdd.operator.persistimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark_RDD_Persist {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local").setAppName("persist")val sc = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)mapRDD.cache()// 打印血缘关系println(mapRDD.toDebugString)reduceRDD.collect().foreach(println)println("*****************************")// 打印血缘关系println(mapRDD.toDebugString)sc.stop()}
}
输出结果:
从结果可以看到,cache和persist方式会在血缘关系中添加新的依赖。
package com.zsz.spark.core.rdd.operator.persistimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark_RDD_Persist {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local").setAppName("persist")val sc = new SparkContext(conf)sc.setCheckpointDir("cp")val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)// mapRDD.cache()mapRDD.checkpoint()// 打印血缘关系println(mapRDD.toDebugString)reduceRDD.collect().foreach(println)println("*****************************")// 打印血缘关系println(mapRDD.toDebugString)sc.stop()}
}
输出结果:
从结果看出,checkpoint会切断血缘关系,重新建立新的血缘关系。等同于改变数据源。
上一篇:c++用vector实现定长队列
下一篇:如何使用子项目管理方案?