欧美一区二区三区,国内熟女精品熟女A片视频小说,日本av网,小鲜肉男男GAY做受XXX网站

SparkSQL之外部數(shù)據(jù)源如何成為在企業(yè)開發(fā)中的一把利器?

江奕云2年前20瀏覽0評論

一、簡介#

1.1多數(shù)據(jù)源支持#

Spark支持以下六個核心數(shù)據(jù)源,同時Spark社區(qū)還提供了多達上百種數(shù)據(jù)源的讀取方式,能夠滿足絕大部分使用場景。

CSV

JSON

Parquet

ORC

JDBC/ODBCconnections

Plain-textfiles

注:以下所有測試文件均可從本倉庫的resources目錄進行下載

1.2讀數(shù)據(jù)格式#

所有讀取API遵循以下調(diào)用格式:

Copy

//格式

DataFrameReader.format(...).option("key","value").schema(...).load()

//示例

spark.read.format("csv")

.option("mode","FAILFAST")//讀取模式

.option("inferSchema","true")//是否自動推斷schema

.option("path","path/to/file(s)")//文件路徑

.schema(someSchema)//使用預定義的schema

.load()

讀取模式有以下三種可選項:

讀模式描述

permissive當遇到損壞的記錄時,將其所有字段設(shè)置為null,并將所有損壞的記錄放在名為_corruptiont_record的字符串列中

dropMalformed刪除格式不正確的行

failFast遇到格式不正確的數(shù)據(jù)時立即失敗

1.3寫數(shù)據(jù)格式#

Copy

//格式

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例

dataframe.write.format("csv")

.option("mode","OVERWRITE")//寫模式

.option("dateFormat","yyyy-MM-dd")//日期格式

.option("path","path/to/file(s)")

.save()

寫數(shù)據(jù)模式有以下四種可選項:

Scala/Java描述

SaveMode.ErrorIfExists如果給定的路徑已經(jīng)存在文件,則拋出異常,這是寫數(shù)據(jù)默認的模式

SaveMode.Append數(shù)據(jù)以追加的方式寫入

SaveMode.Overwrite數(shù)據(jù)以覆蓋的方式寫入

SaveMode.Ignore如果給定的路徑已經(jīng)存在文件,則不做任何操作

二、CSV#

CSV是一種常見的文本文件格式,其中每一行表示一條記錄,記錄中的每個字段用逗號分隔。

2.1讀取CSV文件#

自動推斷類型讀取讀取示例:

Copy

spark.read.format("csv")

.option("header","false")//文件中的第一行是否為列的名稱

.option("mode","FAILFAST")//是否快速失敗

.option("inferSchema","true")//是否自動推斷schema

.load("/usr/file/csv/dept.csv")

.show()

使用預定義類型:

Copy

importorg.apache.spark.sql.types.{StructField,StructType,StringType,LongType}

//預定義數(shù)據(jù)格式

valmyManualSchema=newStructType(Array(

StructField("deptno",LongType,nullable=false),

StructField("dname",StringType,nullable=true),

StructField("loc",StringType,nullable=true)

))

spark.read.format("csv")

.option("mode","FAILFAST")

.schema(myManualSchema)

.load("/usr/file/csv/dept.csv")

.show()

2.2寫入CSV文件#

Copy

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")

也可以指定具體的分隔符:

Copy

df.write.format("csv").mode("overwrite").option("sep","\t").save("/tmp/csv/dept2")

2.3可選配置#

為節(jié)省主文篇幅,所有讀寫配置項見文末9.1小節(jié)。三、JSON#

3.1讀取JSON文件#

Copy

spark.read.format("json").option("mode","FAILFAST").load("/usr/file/json/dept.json").show(5)

需要注意的是:默認不支持一條數(shù)據(jù)記錄跨越多行(如下),可以通過配置multiLine為true來進行更改,其默認值為false。

Copy

//默認支持單行

{"DEPTNO":10,"DNAME":"ACCOUNTING","LOC":"NEWYORK"}

//默認不支持多行

{

"DEPTNO":10,

"DNAME":"ACCOUNTING",

"LOC":"NEWYORK"

}

3.2寫入JSON文件#

Copy

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

3.3可選配置#

為節(jié)省主文篇幅,所有讀寫配置項見文末9.2小節(jié)。

四、Parquet#

Parquet是一個開源的面向列的數(shù)據(jù)存儲,它提供了多種存儲優(yōu)化,允許讀取單獨的列非整個文件,這不僅節(jié)省了存儲空間而且提升了讀取效率,它是Spark是默認的文件格式。

4.1讀取Parquet文件#

Copy

spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)

2.2寫入Parquet文件#

Copy

df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")

2.3可選配置#

Parquet文件有著自己的存儲規(guī)則,因此其可選配置項比較少,常用的有如下兩個:

讀寫操作配置項可選值默認值描述

WritecompressionorcodecNone,

uncompressed,

bzip2,

deflate,gzip,

lz4,orsnappyNone壓縮文件格式

ReadmergeSchematrue,false取決于配置項spark.sql.parquet.mergeSchema

五、ORC#

ORC是一種自描述的、類型感知的列文件格式,它針對大型數(shù)據(jù)的讀寫進行了優(yōu)化,也是大數(shù)據(jù)中常用的文件格式。

5.1讀取ORC文件#

Copy

spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)

4.2寫入ORC文件#

Copy

csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")

六、SQLDatabases#

Spark同樣支持與傳統(tǒng)的關(guān)系型數(shù)據(jù)庫進行數(shù)據(jù)讀寫。但是Spark程序默認是沒有提供數(shù)據(jù)庫驅(qū)動的,所以在使用前需要將對應的數(shù)據(jù)庫驅(qū)動上傳到安裝目錄下的jars目錄中。下面示例使用的是Mysql數(shù)據(jù)庫,使用前需要將對應的mysql-connector-java-x.x.x.jar上傳到j(luò)ars目錄下。

6.1讀取數(shù)據(jù)#

讀取全表數(shù)據(jù)示例如下,這里的help_keyword是mysql內(nèi)置的字典表,只有help_keyword_id和name兩個字段。

Copy

spark.read

.format("jdbc")

.option("driver","com.mysql.jdbc.Driver")//驅(qū)動

.option("url","jdbc:mysql://127.0.0.1:3306/mysql")//數(shù)據(jù)庫地址

.option("dbtable","help_keyword")//表名

.option("user","root").option("password","root").load().show(10)

從查詢結(jié)果讀取數(shù)據(jù):

valpushDownQuery="""(SELECT*FROMhelp_keywordWHEREhelp_keyword_id<20)AShelp_keywords"""

spark.read.format("jdbc")

.option("url","jdbc:mysql://127.0.0.1:3306/mysql")

.option("driver","com.mysql.jdbc.Driver")

.option("user","root").option("password","root")

.option("dbtable",pushDownQuery)

.load().show()

//輸出

+---------------+-----------+

help_keyword_id|name

+---------------+-----------+

0|<>

1|ACTION

2|ADD

3|AES_DECRYPT

4|AES_ENCRYPT

5|AFTER

6|AGAINST

7|AGGREGATE

8|ALGORITHM

9|ALL

10|ALTER

11|ANALYSE

12|ANALYZE

13|AND

14|ARCHIVE

15|AREA

16|AS

17|ASBINARY

18|ASC

1

七、Text#

Text文件在讀寫性能方面并沒有任何優(yōu)勢,且不能表達明確的數(shù)據(jù)結(jié)構(gòu),所以其使用的比較少,讀寫操作如下:

7.1讀取Text數(shù)據(jù)#

Copy

spark.read.textFile("/usr/file/txt/dept.txt").show()

7.2寫入Text數(shù)據(jù)#

Copy

df.write.text("/tmp/spark/txt/dept")

八、數(shù)據(jù)讀寫高級特性#

8.1并行讀#

多個Executors不能同時讀取同一個文件,但它們可以同時讀取不同的文件。這意味著當您從一個包含多個文件的文件夾中讀取數(shù)據(jù)時,這些文件中的每一個都將成為DataFrame中的一個分區(qū),并由可用的Executors并行讀取。

8.2并行寫#

寫入的文件或數(shù)據(jù)的數(shù)量取決于寫入數(shù)據(jù)時DataFrame擁有的分區(qū)數(shù)量。默認情況下,每個數(shù)據(jù)分區(qū)寫一個文件。

8.3分區(qū)寫入#

分區(qū)和分桶這兩個概念和Hive中分區(qū)表和分桶表是一致的。都是將數(shù)據(jù)按照一定規(guī)則進行拆分存儲。需要注意的是partitionBy指定的分區(qū)和RDD中分區(qū)不是一個概念:這里的分區(qū)表現(xiàn)為輸出目錄的子目錄,數(shù)據(jù)分別存儲在對應的子目錄中。

Copy

valdf=spark.read.format("json").load("/usr/file/jsonp.json")

df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")

輸出結(jié)果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出文件。

8.3分桶寫入#

分桶寫入就是將數(shù)據(jù)按照指定的列和桶數(shù)進行散列,目前分桶寫入只支持保存為表,實際上這就是Hive的分桶表。

valnumberBuckets=10

valcolumnToBucketBy="empno"

df.write.format("parquet").mode("overwrite")

.bucketBy(numberBuckets,columnToBucketBy).saveAsTable("bucketedFiles")

.......

具體介紹來源于https://www.cnblogs.com/heibaiying/p/11347390.html