欧美一区二区三区,国内熟女精品熟女A片视频小说,日本av网,小鲜肉男男GAY做受XXX网站

mysql查詢逗號分割,SQL之外部數據源如何成為在企業開發中的一把利器

李中冰2年前22瀏覽0評論
mysql查詢逗號分割,SQL之外部數據源如何成為在企業開發中的一把利器?

一、簡介#

1.1 多數據源支持#

Spark 支持以下六個核心數據源,同時 Spark 社區還提供了多達上百種數據源的讀取方式,能夠滿足絕大部分使用場景。

CSV

JSON

Parquet

ORC

JDBC/ODBC connections

Plain-text files

注:以下所有測試文件均可從本倉庫的resources 目錄進行下載

1.2 讀數據格式#

所有讀取 API 遵循以下調用格式:

Copy

// 格式

DataFrameReader.format(...).option("key", "value").schema(...).load()

// 示例

spark.read.format("csv")

.option("mode", "FAILFAST") // 讀取模式

.option("inferSchema", "true") // 是否自動推斷 schema

.option("path", "path/to/file(s)") // 文件路徑

.schema(someSchema) // 使用預定義的 schema

.load()

讀取模式有以下三種可選項:

讀模式 描述

permissive 當遇到損壞的記錄時,將其所有字段設置為 null,并將所有損壞的記錄放在名為 _corruption t_record 的字符串列中

dropMalformed 刪除格式不正確的行

failFast 遇到格式不正確的數據時立即失敗

1.3 寫數據格式#

Copy

// 格式

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例

dataframe.write.format("csv")

.option("mode", "OVERWRITE") //寫模式

.option("dateFormat", "yyyy-MM-dd") //日期格式

.option("path", "path/to/file(s)")

.save()

寫數據模式有以下四種可選項:

Scala/Java 描述

SaveMode.ErrorIfExists 如果給定的路徑已經存在文件,則拋出異常,這是寫數據默認的模式

SaveMode.Append 數據以追加的方式寫入

SaveMode.Overwrite 數據以覆蓋的方式寫入

SaveMode.Ignore 如果給定的路徑已經存在文件,則不做任何操作

二、CSV#

CSV 是一種常見的文本文件格式,其中每一行表示一條記錄,記錄中的每個字段用逗號分隔。

2.1 讀取CSV文件#

自動推斷類型讀取讀取示例:

Copy

spark.read.format("csv")

.option("header", "false") // 文件中的第一行是否為列的名稱

.option("mode", "FAILFAST") // 是否快速失敗

.option("inferSchema", "true") // 是否自動推斷 schema

.load("/usr/file/csv/dept.csv")

.show()

使用預定義類型:

Copy

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}

//預定義數據格式

val myManualSchema = new StructType(Array(

StructField("deptno", LongType, nullable = false),

StructField("dname", StringType,nullable = true),

StructField("loc", StringType,nullable = true)

))

spark.read.format("csv")

.option("mode", "FAILFAST")

.schema(myManualSchema)

.load("/usr/file/csv/dept.csv")

.show()

2.2 寫入CSV文件#

Copy

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")

也可以指定具體的分隔符:

Copy

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")

2.3 可選配置#

為節省主文篇幅,所有讀寫配置項見文末 9.1 小節。三、JSON#

3.1 讀取JSON文件#

Copy

spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

需要注意的是:默認不支持一條數據記錄跨越多行 (如下),可以通過配置 multiLine 為 true 來進行更改,其默認值為 false。

Copy

// 默認支持單行

{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}

//默認不支持多行

{

"DEPTNO": 10,

"DNAME": "ACCOUNTING",

"LOC": "NEW YORK"

}

3.2 寫入JSON文件#

Copy

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

3.3 可選配置#

為節省主文篇幅,所有讀寫配置項見文末 9.2 小節。

四、Parquet#

Parquet 是一個開源的面向列的數據存儲,它提供了多種存儲優化,允許讀取單獨的列非整個文件,這不僅節省了存儲空間而且提升了讀取效率,它是 Spark 是默認的文件格式。

4.1 讀取Parquet文件#

Copy

spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)

2.2 寫入Parquet文件#

Copy

df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")

2.3 可選配置#

Parquet 文件有著自己的存儲規則,因此其可選配置項比較少,常用的有如下兩個:

讀寫操作 配置項 可選值 默認值 描述

Write compression or codec None,

uncompressed,

bzip2,

deflate, gzip,

lz4, or snappy None 壓縮文件格式

Read mergeSchema true, false 取決于配置項 spark.sql.parquet.mergeSchema

五、ORC#

ORC 是一種自描述的、類型感知的列文件格式,它針對大型數據的讀寫進行了優化,也是大數據中常用的文件格式。

5.1 讀取ORC文件#

Copy

spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)

4.2 寫入ORC文件#

Copy

csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")

六、SQL Databases#

Spark 同樣支持與傳統的關系型數據庫進行數據讀寫。但是 Spark 程序默認是沒有提供數據庫驅動的,所以在使用前需要將對應的數據庫驅動上傳到安裝目錄下的 jars 目錄中。下面示例使用的是 Mysql 數據庫,使用前需要將對應的 mysql-connector-java-x.x.x.jar 上傳到 jars 目錄下。

6.1 讀取數據#

讀取全表數據示例如下,這里的 help_keyword 是 mysql 內置的字典表,只有 help_keyword_id 和 name 兩個字段。

Copy

spark.read

.format("jdbc")

.option("driver", "com.mysql.jdbc.Driver") //驅動

.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //數據庫地址

.option("dbtable", "help_keyword") //表名

.option("user", "root").option("password","root").load().show(10)

從查詢結果讀取數據:

val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""

spark.read.format("jdbc")

.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")

.option("driver", "com.mysql.jdbc.Driver")

.option("user", "root").option("password", "root")

.option("dbtable", pushDownQuery)

.load().show()

//輸出

+---------------+-----------+

help_keyword_id| name

+---------------+-----------+

0| <>

1| ACTION

2| ADD

3|AES_DECRYPT

4|AES_ENCRYPT

5| AFTER

6| AGAINST

7| AGGREGATE

8| ALGORITHM

9| ALL

10| ALTER

11| ANALYSE

12| ANALYZE

13| AND

14| ARCHIVE

15| AREA

16| AS

17| ASBINARY

18| ASC

1

七、Text#

Text 文件在讀寫性能方面并沒有任何優勢,且不能表達明確的數據結構,所以其使用的比較少,讀寫操作如下:

7.1 讀取Text數據#

Copy

spark.read.textFile("/usr/file/txt/dept.txt").show()

7.2 寫入Text數據#

Copy

df.write.text("/tmp/spark/txt/dept")

八、數據讀寫高級特性#

8.1 并行讀#

多個 Executors 不能同時讀取同一個文件,但它們可以同時讀取不同的文件。這意味著當您從一個包含多個文件的文件夾中讀取數據時,這些文件中的每一個都將成為 DataFrame 中的一個分區,并由可用的 Executors 并行讀取。

8.2 并行寫#

寫入的文件或數據的數量取決于寫入數據時 DataFrame 擁有的分區數量。默認情況下,每個數據分區寫一個文件。

8.3 分區寫入#

分區和分桶這兩個概念和 Hive 中分區表和分桶表是一致的。都是將數據按照一定規則進行拆分存儲。需要注意的是 partitionBy 指定的分區和 RDD 中分區不是一個概念:這里的分區表現為輸出目錄的子目錄,數據分別存儲在對應的子目錄中。

Copy

val df = spark.read.format("json").load("/usr/file/json/emp.json")

df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")

輸出結果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出文件。

8.3 分桶寫入#

分桶寫入就是將數據按照指定的列和桶數進行散列,目前分桶寫入只支持保存為表,實際上這就是 Hive 的分桶表。

val numberBuckets = 10

val columnToBucketBy = "empno"

df.write.format("parquet").mode("overwrite")

.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

.......

具體介紹來源于https://www.cnblogs.com/heibaiying/p/11347390.html