欧美一区二区三区,国内熟女精品熟女A片视频小说,日本av网,小鲜肉男男GAY做受XXX网站

最近在學習pyspark

李中冰2年前15瀏覽0評論

最近在學習pyspark?

Spark提供了一個Python_Shell,即pyspark,從而可以以交互的方式使用Python編寫Spark程序。

有關Spark的基本架構介紹參考http://blog.csdn.net/cymy001/article/details/78483614;

有關Pyspark的環境配置參考http://blog.csdn.net/cymy001/article/details/78430892。

pyspark里最核心的模塊是SparkContext(簡稱sc),最重要的數據載體是RDD。RDD就像一個NumPy array或者一個Pandas Series,可以視作一個有序的item集合。只不過這些item并不存在driver端的內存里,而是被分割成很多個partitions,每個partition的數據存在集群的executor的內存中。

引入Python中pyspark工作模塊

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#任何Spark程序都是SparkContext開始的,SparkContext的初始化需要一個SparkConf對象,SparkConf包含了Spark集群配置的各種參數(比如主節點的URL)。初始化后,就可以使用SparkContext對象所包含的各種方法來創建和操作RDD和共享變量。Spark shell會自動初始化一個SparkContext(在Scala和Python下可以,但不支持Java)。

#getOrCreate表明可以視情況新建session或利用已有的session

1

2

3

4

5

6

7

SparkSession是Spark 2.0引入的新概念。SparkSession為用戶提供了統一的切入點,來讓用戶學習spark的各項功能。 在spark的早期版本中,SparkContext是spark的主要切入點,由于RDD是主要的API,我們通過sparkcontext來創建和操作RDD。對于每個其他的API,我們需要使用不同的context。例如,對于Streming,我們需要使用StreamingContext;對于sql,使用sqlContext;對于hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點。SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了SparkContext,所以計算實際上是由SparkContext完成的。

初始化RDD的方法

(1)本地內存中已經有一份序列數據(比如python的list),可以通過sc.parallelize去初始化一個RDD。當執行這個操作以后,list中的元素將被自動分塊(partitioned),并且把每一塊送到集群上的不同機器上。

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#(a)利用list創建一個RDD;使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame轉成Spark RDD。

rdd = sc.parallelize([1,2,3,4,5])

rdd

#Output:ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

#(b)getNumPartitions()方法查看list被分成了幾部分

rdd.getNumPartitions()

#Output:4

1

2

3

4

5

6

7

8

9

10

11

12

13

14

#(c)glom().collect()查看分區狀況

rdd.glom().collect()

#Output:[[1], [2], [3], [4, 5]]

1

2

3

在這個例子中,是一個4-core的CPU筆記本;Spark創建了4個executor,然后把數據分成4個塊。colloect()方法很危險,數據量上BT文件讀入會爆掉內存……

(2)創建RDD的另一個方法是直接把文本讀到RDD。文本的每一行都會被當做一個item,不過需要注意的一點是,Spark一般默認給定的路徑是指向HDFS的,如果要從本地讀取文件的話,給一個file://開頭(windows下是以file:\\開頭)的全局路徑。

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#(a)記錄當前pyspark工作環境位置

import os

cwd=os.getcwd()

cwd

#Output:'C:\\Users\\Yu\\0JulyLearn\\5weekhadoopspark'

#(b)要讀入的文件的全路徑

rdd=sc.textFile("file:\\\\\\" + cwd + "\\names\yob1880.txt")

rdd

#Output:file:\\\C:\Users\Yu\0JulyLearn\5weekhadoopspark\names\yob1880.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

#(c)first()方法取讀入的rdd數據第一個item

rdd.first()

#Output:'Mary,F,7065'

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

甚至可以sc.wholeTextFiles讀入整個文件夾的所有文件。但是要特別注意,這種讀法,RDD中的每個item實際上是一個形如(文件名,文件所有內容)的元組。讀入整個文件夾的所有文件。

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#記錄當前pyspark工作環境位置

import os

cwd=os.getcwd()

cwd

#Output:'C:\\Users\\Yu\\0JulyLearn\\5weekhadoopspark'

rdd = sc.wholeTextFiles("file:\\\\\\" + cwd + "\\names\yob1880.txt")

rdd

#Output:org.apache.spark.api.java.JavaPairRDD@12bcc15

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

rdd.first()

Output:

('file:/C:/Users/Yu/0JulyLearn/5weekhadoopspark/names/yob1880.txt',

1

2

3

4

5

其余初始化RDD的方法,包括:HDFS上的文件,Hive中的數據庫與表,Spark SQL得到的結果。這里暫時不做介紹。

RDD Transformation

(1)RDDs可以進行一系列的變換得到新的RDD,有點類似列表推導式的操作,先給出一些RDD上最常用到的transformation:

map() 對RDD的每一個item都執行同一個操作

flatMap() 對RDD中的item執行同一個操作以后得到一個list,然后以平鋪的方式把這些list里所有的結果組成新的list

filter() 篩選出來滿足條件的item

distinct() 對RDD中的item去重

sample() 從RDD中的item中采樣一部分出來,有放回或者無放回

sortBy() 對RDD中的item進行排序

1

2

3

4

5

6

如果想看操作后的結果,可以用一個叫做collect()的action把所有的item轉成一個Python list。數據量大時,collect()很危險……

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

numbersRDD = sc.parallelize(range(1,10+1))

print(numbersRDD.collect())

#map()對RDD的每一個item都執行同一個操作

squaresRDD = numbersRDD.map(lambda x: x**2) # Square every number

print(squaresRDD.collect())

#filter()篩選出來滿足條件的item

filteredRDD = numbersRDD.filter(lambda x: x % 2 == 0) # Only the evens

print(filteredRDD.collect())

#Output:

#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

#[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

#[2, 4, 6, 8, 10]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#flatMap() 對RDD中的item執行同一個操作以后得到一個list,然后以平鋪的方式把這些list里所有的結果組成新的list

sentencesRDD=sc.parallelize(['Hello world','My name is Patrick'])

wordsRDD=sentencesRDD.flatMap(lambda sentence: sentence.split(" "))

print(wordsRDD.collect())

print(wordsRDD.count())

#Output:

#['Hello', 'world', 'My', 'name', 'is', 'Patrick']

#6

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

對比一下:

這里如果使用map的結果是[[‘Hello’, ‘world’], [‘My’, ‘name’, ‘is’, ‘Patrick’]],

使用flatmap的結果是全部展開[‘Hello’, ‘world’, ‘My’, ‘name’, ‘is’, ‘Patrick’]。

flatmap即對應Python里的如下操作:

l = ['Hello world', 'My name is Patrick']

ll = []

for sentence in l:

ll = ll + sentence.split(" ") #+號作用,two list拼接

ll

1

2

3

4

5

(2)最開始列出的各個Transformation,可以一個接一個地串聯使用,比如:

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

def doubleIfOdd(x):

if x % 2 == 1:

return 2 * x

else:

return x

numbersRDD = sc.parallelize(range(1,10+1))

resultRDD = (numbersRDD

.map(doubleIfOdd) #map,filter,distinct()

.filter(lambda x: x > 6)

.distinct()) #distinct()對RDD中的item去重

resultRDD.collect()

#Output:[8, 10, 18, 14]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

(3)當遇到更復雜的結構,比如被稱作“pair RDDs”的以元組形式組織的k-v對(key, value),Spark中針對這種item結構的數據,定義了一些transform和action:

reduceByKey(): 對所有有著相同key的items執行reduce操作

groupByKey(): 返回類似(key, listOfValues)元組的RDD,后面的value List 是同一個key下面的

sortByKey(): 按照key排序

countByKey(): 按照key去對item個數進行統計

collectAsMap(): 和collect有些類似,但是返回的是k-v的字典

1

2

3

4

5

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

rdd=sc.parallelize(["Hello hello", "Hello New York", "York says hello"])

resultRDD=(rdd

.flatMap(lambda sentence:sentence.split(" "))

.map(lambda word:word.lower())

.map(lambda word:(word, 1)) #將word映射成(word,1)

.reduceByKey(lambda x, y: x + y)) #reduceByKey對所有有著相同key的items執行reduce操作

resultRDD.collect()

#Output:[('hello', 4), ('york', 2), ('says', 1), ('new', 1)]

result = resultRDD.collectAsMap() #collectAsMap類似collect,以k-v字典的形式返回

result

#Output:{'hello': 4, 'new': 1, 'says': 1, 'york': 2}

resultRDD.sortByKey(ascending=True).take(2) #sortByKey按鍵排序

#Output:[('hello', 4), ('new', 1)]

#取出現頻次最高的2個詞

print(resultRDD

.sortBy(lambda x: x[1], ascending=False)

.take(2))

#Output:[('hello', 4), ('york', 2)]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

RDD間的操作

(1)如果有2個RDD,可以通過下面這些操作,對它們進行集合運算得到1個新的RDD

rdd1.union(rdd2): 所有rdd1和rdd2中的item組合(并集)

rdd1.intersection(rdd2): rdd1 和 rdd2的交集

rdd1.substract(rdd2): 所有在rdd1中但不在rdd2中的item(差集)

rdd1.cartesian(rdd2): rdd1 和 rdd2中所有的元素笛卡爾乘積(正交和)

1

2

3

4

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#初始化兩個RDD

numbersRDD = sc.parallelize([1,2,3])

moreNumbersRDD = sc.parallelize([2,3,4])

1

2

3

4

5

6

7

8

9

numbersRDD.union(moreNumbersRDD).collect() #union()取并集

#Output:[1, 2, 3, 2, 3, 4]

numbersRDD.intersection(moreNumbersRDD).collect() #intersection()取交集

#Output:[2, 3]

numbersRDD.subtract(moreNumbersRDD).collect() #substract()取差集

#Output:[1]

numbersRDD.cartesian(moreNumbersRDD).collect() #cartesian()取笛卡爾積

#Output:[(1, 2), (1, 3), (1, 4), (2, 2), (2, 3), (2, 4), (3, 2), (3, 3), (3, 4)]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

(2)在給定2個RDD后,可以通過一個類似SQL的方式去join它們

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

# Home of different people

homesRDD = sc.parallelize([

('Brussels', 'John'),

('Brussels', 'Jack'),

('Leuven', 'Jane'),

('Antwerp', 'Jill'),

])

# Quality of life index for various cities

lifeQualityRDD = sc.parallelize([

('Brussels', 10),

('Antwerp', 7),

('RestOfFlanders', 5),

])

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

homesRDD.join(lifeQualityRDD).collect() #join

#Output:

#[('Antwerp', ('Jill', 7)),

# ('Brussels', ('John', 10)),

# ('Brussels', ('Jack', 10))]

homesRDD.leftOuterJoin(lifeQualityRDD).collect() #leftOuterJoin

#Output:

#[('Antwerp', ('Jill', 7)),

# ('Leuven', ('Jane', None)),

# ('Brussels', ('John', 10)),

# ('Brussels', ('Jack', 10))]

homesRDD.rightOuterJoin(lifeQualityRDD).collect() #rightOuterJoin

#Output:

#[('Antwerp', ('Jill', 7)),

# ('RestOfFlanders', (None, 5)),

# ('Brussels', ('John', 10)),

# ('Brussels', ('Jack', 10))]

homesRDD.cogroup(lifeQualityRDD).collect() #cogroup

#Output:

#[('Antwerp',

# (<pyspark.resultiterable.ResultIterable at 0x73d2d68>,

# <pyspark.resultiterable.ResultIterable at 0x73d2940>)),

# ('RestOfFlanders',

# (<pyspark.resultiterable.ResultIterable at 0x73d2828>,

# <pyspark.resultiterable.ResultIterable at 0x73d2b70>)),

# ('Leuven',

# (<pyspark.resultiterable.ResultIterable at 0x73d26a0>,

# <pyspark.resultiterable.ResultIterable at 0x7410a58>)),

# ('Brussels',

# (<pyspark.resultiterable.ResultIterable at 0x73d2b38>,

# <pyspark.resultiterable.ResultIterable at 0x74106a0>))]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

# Oops! Those <ResultIterable>s are Spark's way of returning a list

# that we can walk over, without materializing the list.

# Let's materialize the lists to make the above more readable:

(homesRDD

.cogroup(lifeQualityRDD)

.map(lambda x:(x[0], (list(x[1][0]), list(x[1][1]))))

.collect())

#Output:

#[('Antwerp', (['Jill'], [7])),

# ('RestOfFlanders', ([], [5])),

# ('Leuven', (['Jane'], [])),

# ('Brussels', (['John', 'Jack'], [10]))]

1

2

3

4

5

6

7

8

9

10

11

12

13

惰性計算,actions方法

特別注意:Spark的一個核心概念是惰性計算。當你把一個RDD轉換成另一個的時候,這個轉換不會立即生效執行!??!Spark會把它先記在心里,等到真的有actions需要取轉換結果時,才會重新組織transformations(因為可能有一連串的變換)。這樣可以避免不必要的中間結果存儲和通信。

常見的action如下,當它們出現的時候,表明需要執行上面定義過的transform了:

collect(): 計算所有的items并返回所有的結果到driver端,接著 collect()會以Python list的形式返回結果

first(): 和上面是類似的,不過只返回第1個item

take(n): 類似,但是返回n個item

count(): 計算RDD中item的個數

top(n): 返回頭n個items,按照自然結果排序

reduce(): 對RDD中的items做聚合

1

2

3

4

5

6

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

rdd = sc.parallelize(range(1,10+1))

rdd.reduce(lambda x, y: x + y) #reduce(): 對RDD中的items做聚合

#Output:55

1

2

3

4

5

6

7

8

9

10

reduce的原理:先在每個分區(partition)里完成reduce操作,然后再全局地進行reduce。

有時候需要重復用到某個transform序列得到的RDD結果。但是一遍遍重復計算顯然是要開銷的,所以我們可以通過一個叫做cache()的操作把它暫時地存儲在內存中。緩存RDD結果對于重復迭代的操作非常有用,比如很多機器學習的算法,訓練過程需要重復迭代。

import pyspark

from pyspark import SparkContext as sc

from pyspark import SparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

import numpy as np

numbersRDD = sc.parallelize(np.linspace(1.0, 10.0, 10))

squaresRDD = numbersRDD.map(lambda x: x**2)

squaresRDD.cache() # Preserve the actual items of this RDD in memory

avg = squaresRDD.reduce(lambda x, y: x + y) / squaresRDD.count()

print(avg)

#Output:38.5

native在java,最近在學習pyspark