Spark中的RDD到底是什么意思?
1.RDD是彈性分布式數據集,是一個分布式對象的集合。對個RDD可分為多個片,分片可以在集群環境下的不同節點上計算
2.可以通過兩種方式創建RDD:
a.加載外部數據集
b.在驅動程序中部署對象集合。
c.創建RDD最簡單的方法就是采用現有的內存集合并把它傳遞給sc的并行化方法。適合測試,不適合生產
優勢在于可以快速創建自己的RDD并對其執行相關的操作。
val line = sc.parallelize(List("pandas","i like pandas"))
d.可以加載外部存儲數據用sc.textFile("file:///home/ubuntu/simple.txt")來加載一個將文本文件作為字符串的RDD.
val r = sc.textFile("file:///home/ubuntu/simple.txt")
r: org.apache.spark.rdd.RDD[String] = file:///home/ubuntu/simple.txt MapPartitionsRDD[5] at textFile at <console>:24
4.RDD兩種類型的操作:裝換和動作
a.轉換就是將原來的RDD構建成新的RDD,例如:map,filter
val r1 = r.filter(line => line.contains("20"))
r1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at <console>:27
val r2 = r.filter(line => line.contains("10"))
val r3 = r1.union(r2)
r3: org.apache.spark.rdd.RDD[String] = UnionRDD[11] at union at <console>:32
a_1.當相互轉換得到并使用新的RDD時,Spark跟蹤記錄且設定不同的RDD之間的依賴關系,這種關系稱為血統圖(圖1-7)
它使用這個信息來按照需求計算每個RDD,以及恢復持續化的RDD丟失的那一部分數據。我們每次調用一個新的動作,
整個RDD必須從頭開始計算,可以使用持久化來提高效率。
b.動作是通過RDD來計算的結果,并且將結果返回給驅動程序或者保存到外部存儲系統(HDFS),
如:count()它返回計數,
first()
take(n)包含了前n行記錄。
collect()用來獲取整個RDD,不應該用在大型數據集上。
大多數情況下,RDD不能僅僅被collect()到驅動,原因是數據量太大,一般是把數據寫到HDFS或S3
RDD的內容可以使用saveAsTextFile()或者savaAsSequenceFile()以及其他動作來保存。
scala> r1.first
res12: String = 1201 wang 20
c.惰性評估(Lazy Evaluation)
c1.它意味著,當我們調用RDD的轉換時,不立即執行該操作,相反,Spark在內部記錄元數據以表明該操作已被請求,而不是考慮RDD包含的具體的數據。
c2.Spark通過使用惰性評估,以減少其在各種轉換操作中所需要存儲的中間數據。
5.RDD只有在第一次使用它們中的動作時才計算,可以避免浪費大量的存儲空間。因為我們隨后會立即過濾掉一部分不需要的行
一旦Spark看到整個變換鏈,他可以計算僅需其結果的數據,對于first()動作,Spark只掃描文件,直到他找到第一個匹配的行,不讀整個文件。
6.RDDS在默認的情況下每次運行它們都要進行重新計算。如果重用多個動作,可以使用持久化的方法:RDD.persist(),計算第一次后,Spark將RDD
內容存儲在內存中(整個機器的集群分區),默認不適用持久化的意義在于:如果不重用大數據集,可以避免浪費空間。
7.一般會經常使用持久化去加載數據集到內存中,方面重復的查詢和使用