#注意事项:

#当运行本Notebook的程序后,如果要关闭Notebook,请选择菜单: File > Close and Halt 才能确实停止当前正在运行的程序,并且释放资源

#如果没有使用以上方法,只关闭此分页,程序仍在运行,未释放资源,当您打开并运行其他的Notebook,可能会发生错误

1
sc.master

‘spark://192.168.10.170:7077’

1
2
3
4
5
6
global Path    
if sc.master[0:5]=="local" :
Path="file:/home/hduser/pythonsparkexample/PythonProject/"
else:
Path="hdfs://master:9000/user/hduser/"
#如果要在cluster模式运行(hadoop yarn 或Spark Stand alone),请按照书上的说明,先把文件上传到HDFS目录

19.3 建立RDD、DataFrame与Spark SQL tempTable

1
#Step 1  读取文本文件,建立RDD
1
RawUserRDD= sc.textFile(Path+"data/u.user")
1
2
3
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.collect()

[1, 2, 3, 4, 5]

1
RawUserRDD.count()

943

1
RawUserRDD.take(5)

[‘1|24|M|technician|85711’,
‘2|53|F|other|94043’,
‘3|23|M|writer|32067’,
‘4|24|M|technician|43537’,
‘5|33|F|other|15213’]

1
2
userRDD =RawUserRDD.map(lambda line: line.split("|"))
userRDD .take(5)

[[‘1’, ‘24’, ‘M’, ‘technician’, ‘85711’],
[‘2’, ‘53’, ‘F’, ‘other’, ‘94043’],
[‘3’, ‘23’, ‘M’, ‘writer’, ‘32067’],
[‘4’, ‘24’, ‘M’, ‘technician’, ‘43537’],
[‘5’, ‘33’, ‘F’, ‘other’, ‘15213’]]

建立DataFrame

1
sqlContext = SparkSession.builder.getOrCreate()
1
2
3
4
5
6
7
8
9
10
11
from pyspark.sql import Row
user_Rows = userRDD.map(lambda p:
Row(
userid=int(p[0]),
age=int(p[1]),
gender=p[2],
occupation=p[3],
zipcode=p[4]
)
)
user_Rows.take(5)

[Row(age=24, gender=’M’, occupation=’technician’, userid=1, zipcode=’85711’),
Row(age=53, gender=’F’, occupation=’other’, userid=2, zipcode=’94043’),
Row(age=23, gender=’M’, occupation=’writer’, userid=3, zipcode=’32067’),
Row(age=24, gender=’M’, occupation=’technician’, userid=4, zipcode=’43537’),
Row(age=33, gender=’F’, occupation=’other’, userid=5, zipcode=’15213’)]

1
2
3
user_df = sqlContext.createDataFrame(user_Rows)
user_df .printSchema()
user_df.show()

root
|– age: long (nullable = true)
|– gender: string (nullable = true)
|– occupation: string (nullable = true)
|– userid: long (nullable = true)
|– zipcode: string (nullable = true)

+—+——+————-+——+——-+
|age|gender| occupation|userid|zipcode|
+—+——+————-+——+——-+
| 24| M| technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M| technician| 4| 43537|
| 33| F| other| 5| 15213|
| 42| M| executive| 6| 98101|
| 57| M|administrator| 7| 91344|
| 36| M|administrator| 8| 05201|
| 29| M| student| 9| 01002|
| 53| M| lawyer| 10| 90703|
| 39| F| other| 11| 30329|
| 28| F| other| 12| 06405|
| 47| M| educator| 13| 29206|
| 45| M| scientist| 14| 55106|
| 49| F| educator| 15| 97301|
| 21| M|entertainment| 16| 10309|
| 30| M| programmer| 17| 06355|
| 35| F| other| 18| 37212|
| 40| M| librarian| 19| 02138|
| 42| F| homemaker| 20| 95660|
+—+——+————-+——+——-+
only showing top 20 rows

1
user_df.show(5)

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M|technician| 4| 43537|
| 33| F| other| 5| 15213|
+—+——+———-+——+——-+
only showing top 5 rows

1
2
df=user_df.alias("df")
df.show(5)

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M|technician| 4| 43537|
| 33| F| other| 5| 15213|
+—+——+———-+——+——-+
only showing top 5 rows

建立Spark SQL tempTable

1
user_df.registerTempTable("user_table")
1
sqlContext.sql(" SELECT * FROM user_table").show()

+—+——+————-+——+——-+
|age|gender| occupation|userid|zipcode|
+—+——+————-+——+——-+
| 24| M| technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M| technician| 4| 43537|
| 33| F| other| 5| 15213|
| 42| M| executive| 6| 98101|
| 57| M|administrator| 7| 91344|
| 36| M|administrator| 8| 05201|
| 29| M| student| 9| 01002|
| 53| M| lawyer| 10| 90703|
| 39| F| other| 11| 30329|
| 28| F| other| 12| 06405|
| 47| M| educator| 13| 29206|
| 45| M| scientist| 14| 55106|
| 49| F| educator| 15| 97301|
| 21| M|entertainment| 16| 10309|
| 30| M| programmer| 17| 06355|
| 35| F| other| 18| 37212|
| 40| M| librarian| 19| 02138|
| 42| F| homemaker| 20| 95660|
+—+——+————-+——+——-+
only showing top 20 rows

1
sqlContext.sql(" SELECT count(*) counts FROM user_table").show()

+——+
|counts|
+——+
| 943|
+——+

1
2
3
4
sqlContext.sql("""
SELECT count(*) counts
FROM user_table
""").show()

+——+
|counts|
+——+
| 943|
+——+

1
sqlContext.sql(" SELECT *  FROM user_table ").show()

+—+——+————-+——+——-+
|age|gender| occupation|userid|zipcode|
+—+——+————-+——+——-+
| 24| M| technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M| technician| 4| 43537|
| 33| F| other| 5| 15213|
| 42| M| executive| 6| 98101|
| 57| M|administrator| 7| 91344|
| 36| M|administrator| 8| 05201|
| 29| M| student| 9| 01002|
| 53| M| lawyer| 10| 90703|
| 39| F| other| 11| 30329|
| 28| F| other| 12| 06405|
| 47| M| educator| 13| 29206|
| 45| M| scientist| 14| 55106|
| 49| F| educator| 15| 97301|
| 21| M|entertainment| 16| 10309|
| 30| M| programmer| 17| 06355|
| 35| F| other| 18| 37212|
| 40| M| librarian| 19| 02138|
| 42| F| homemaker| 20| 95660|
+—+——+————-+——+——-+
only showing top 20 rows

1
sqlContext.sql(" SELECT *  FROM user_table").show(5)

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M|technician| 4| 43537|
| 33| F| other| 5| 15213|
+—+——+———-+——+——-+
only showing top 5 rows

1
sqlContext.sql(" SELECT *  FROM user_table LIMIT 5").show()

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 53| F| other| 2| 94043|
| 23| M| writer| 3| 32067|
| 24| M|technician| 4| 43537|
| 33| F| other| 5| 15213|
+—+——+———-+——+——-+

19.3 显示部分字段

1
2
userRDDnew= userRDD.map(lambda x: (x[0],x[3],x[2] ,x[1]) )
userRDDnew.take(5)

[(‘1’, ‘technician’, ‘M’, ‘24’),
(‘2’, ‘other’, ‘F’, ‘53’),
(‘3’, ‘writer’, ‘M’, ‘23’),
(‘4’, ‘technician’, ‘M’, ‘24’),
(‘5’, ‘other’, ‘F’, ‘33’)]

1
user_df.select("userid","occupation","gender","age").show(5)

+——+———-+——+—+
|userid|occupation|gender|age|
+——+———-+——+—+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+——+———-+——+—+
only showing top 5 rows

1
user_df.select( user_df.userid, user_df.occupation,user_df.gender,user_df.age ).show(5)

+——+———-+——+—+
|userid|occupation|gender|age|
+——+———-+——+—+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+——+———-+——+—+
only showing top 5 rows

1
df.select(df.userid,df.occupation,df.gender,df.age  ).show(5)

+——+———-+——+—+
|userid|occupation|gender|age|
+——+———-+——+—+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+——+———-+——+—+
only showing top 5 rows

1
user_df.select(user_df.userid, user_df.occupation,df.gender,df.age ).show(5)

+——+———-+——+—+
|userid|occupation|gender|age|
+——+———-+——+—+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+——+———-+——+—+
only showing top 5 rows

1
df[df['userid'],df['occupation'],df['gender'],df['age']  ].show(5)

+——+———-+——+—+
|userid|occupation|gender|age|
+——+———-+——+—+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+——+———-+——+—+
only showing top 5 rows

1
sqlContext.sql(" SELECT userid,occupation,gender,age  FROM user_table").show(5)

+——+———-+——+—+
|userid|occupation|gender|age|
+——+———-+——+—+
| 1|technician| M| 24|
| 2| other| F| 53|
| 3| writer| M| 23|
| 4|technician| M| 24|
| 5| other| F| 33|
+——+———-+——+—+
only showing top 5 rows

19.4 增加计算字段

1
2
userRDDnew= userRDD.map(lambda x: (x[0],x[3],x[2],x[1] ,2016-int(x[1])) )
userRDDnew.take(5)

[(‘1’, ‘technician’, ‘M’, ‘24’, 1992),
(‘2’, ‘other’, ‘F’, ‘53’, 1963),
(‘3’, ‘writer’, ‘M’, ‘23’, 1993),
(‘4’, ‘technician’, ‘M’, ‘24’, 1992),
(‘5’, ‘other’, ‘F’, ‘33’, 1983)]

1
df.select("userid","occupation","gender","age",2016-df.age).show(5)

+——+———-+——+—+————+
|userid|occupation|gender|age|(2016 - age)|
+——+———-+——+—+————+
| 1|technician| M| 24| 1992|
| 2| other| F| 53| 1963|
| 3| writer| M| 23| 1993|
| 4|technician| M| 24| 1992|
| 5| other| F| 33| 1983|
+——+———-+——+—+————+
only showing top 5 rows

1
df.select("userid","occupation","gender","age",(2016-df.age).alias("birthyear")).show(5)

+——+———-+——+—+———+
|userid|occupation|gender|age|birthyear|
+——+———-+——+—+———+
| 1|technician| M| 24| 1992|
| 2| other| F| 53| 1963|
| 3| writer| M| 23| 1993|
| 4|technician| M| 24| 1992|
| 5| other| F| 33| 1983|
+——+———-+——+—+———+
only showing top 5 rows

1
2
3
sqlContext.sql("""
SELECT userid,occupation,gender,age, 2016-age birthyear
FROM user_table""").show(5)

+——+———-+——+—+———+
|userid|occupation|gender|age|birthyear|
+——+———-+——+—+———+
| 1|technician| M| 24| 1992|
| 2| other| F| 53| 1963|
| 3| writer| M| 23| 1993|
| 4|technician| M| 24| 1992|
| 5| other| F| 33| 1983|
+——+———-+——+—+———+
only showing top 5 rows

19.5筛选数据

1
userRDD.filter(lambda r:   r[3]=='technician' and r[2]=='M' and r[1]=='24').take(6)

[[‘1’, ‘24’, ‘M’, ‘technician’, ‘85711’],
[‘4’, ‘24’, ‘M’, ‘technician’, ‘43537’],
[‘456’, ‘24’, ‘M’, ‘technician’, ‘31820’],
[‘717’, ‘24’, ‘M’, ‘technician’, ‘84105’],
[‘832’, ‘24’, ‘M’, ‘technician’, ‘77042’],
[‘889’, ‘24’, ‘M’, ‘technician’, ‘78704’]]

1
user_df.filter("occupation='technician' ").filter("gender='M' ").filter("age=24").show()

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+—+——+———-+——+——-+

1
user_df.filter("occupation='technician' and gender='M' and age=24").show()

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+—+——+———-+——+——-+

1
df.filter((df.occupation=='technician' ) & (df.gender=='M' ) & (df.age==24)).show()

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+—+——+———-+——+——-+

1
df.filter((df['occupation']=='technician' ) & (df['gender']=='M' ) & (df['age']==24)).show()

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+—+——+———-+——+——-+

1
#Step3 使用Spark SQL 筛选数据
1
2
3
4
sqlContext.sql(
'''SELECT *
FROM user_table
where occupation='technician' and gender='M' and age=24''').show(5)

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
+—+——+———-+——+——-+
only showing top 5 rows

19.6 筛选数据

1
2
#Step1 使用多个filter筛选数据
user_df.filter("occupation='technician' ").filter("gender='M' ").filter("age=24").show(5)

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
+—+——+———-+——+——-+
only showing top 5 rows

1
user_df.filter("occupation='technician' and gender='M' and age=24").show()

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+—+——+———-+——+——-+

1
user_df.filter(user_df.occupation=='technician' ).filter(user_df.gender=='M' ).filter(user_df.age==24).show()

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+—+——+———-+——+——-+

1
user_df.filter((df.occupation=='technician' ) & (df.gender=='M' ) & (df.age==24)).show()

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+—+——+———-+——+——-+

1
df.filter((df['occupation']=='technician' ) & (df['gender']=='M' ) &  (df['age']==24)).show()

+—+——+———-+——+——-+
|age|gender|occupation|userid|zipcode|
+—+——+———-+——+——-+
| 24| M|technician| 1| 85711|
| 24| M|technician| 4| 43537|
| 24| M|technician| 456| 31820|
| 24| M|technician| 717| 84105|
| 24| M|technician| 832| 77042|
| 24| M|technician| 889| 78704|
+—+——+———-+——+——-+

19.6 单个字段排序数据

1
userRDD.takeOrdered(10, key = lambda x: int(x[1]),)

[[‘30’, ‘7’, ‘M’, ‘student’, ‘55436’],
[‘471’, ‘10’, ‘M’, ‘student’, ‘77459’],
[‘289’, ‘11’, ‘M’, ‘none’, ‘94619’],
[‘142’, ‘13’, ‘M’, ‘other’, ‘48118’],
[‘609’, ‘13’, ‘F’, ‘student’, ‘55106’],
[‘628’, ‘13’, ‘M’, ‘none’, ‘94306’],
[‘674’, ‘13’, ‘F’, ‘student’, ‘55337’],
[‘880’, ‘13’, ‘M’, ‘student’, ‘83702’],
[‘206’, ‘14’, ‘F’, ‘student’, ‘53115’],
[‘813’, ‘14’, ‘F’, ‘student’, ‘02136’]]

倒序

1
userRDD.takeOrdered(5, key = lambda x: -1*int(x[1]))

[[‘481’, ‘73’, ‘M’, ‘retired’, ‘37771’],
[‘767’, ‘70’, ‘M’, ‘engineer’, ‘00000’],
[‘803’, ‘70’, ‘M’, ‘administrator’, ‘78212’],
[‘860’, ‘70’, ‘F’, ‘retired’, ‘48322’],
[‘559’, ‘69’, ‘M’, ‘executive’, ‘10022’]]

1
2
3
4
sqlContext.sql("""
SELECT userid,occupation,gender,age
FROM user_table
ORDER BY age""").show(5)

+——+———-+——+—+
|userid|occupation|gender|age|
+——+———-+——+—+
| 30| student| M| 7|
| 471| student| M| 10|
| 289| none| M| 11|
| 880| student| M| 13|
| 628| none| M| 13|
+——+———-+——+—+
only showing top 5 rows

1
2
3
4
sqlContext.sql("""
SELECT userid,occupation,gender,age
FROM user_table
ORDER BY age DESC""").show(5)

+——+————-+——+—+
|userid| occupation|gender|age|
+——+————-+——+—+
| 481| retired| M| 73|
| 860| retired| F| 70|
| 767| engineer| M| 70|
| 803|administrator| M| 70|
| 559| executive| M| 69|
+——+————-+——+—+
only showing top 5 rows

1
user_df.select("userid","occupation","gender","age").orderBy("age").show(5)

+——+———-+——+—+
|userid|occupation|gender|age|
+——+———-+——+—+
| 30| student| M| 7|
| 471| student| M| 10|
| 289| none| M| 11|
| 880| student| M| 13|
| 628| none| M| 13|
+——+———-+——+—+
only showing top 5 rows

1
df.select("userid","occupation","gender","age").orderBy("age",ascending=0 ).show(5)

+——+————-+——+—+
|userid| occupation|gender|age|
+——+————-+——+—+
| 481| retired| M| 73|
| 860| retired| F| 70|
| 767| engineer| M| 70|
| 803|administrator| M| 70|
| 559| executive| M| 69|
+——+————-+——+—+
only showing top 5 rows

1
df.select("userid","occupation","gender","age").orderBy(df.age).show(5)

+——+———-+——+—+
|userid|occupation|gender|age|
+——+———-+——+—+
| 30| student| M| 7|
| 471| student| M| 10|
| 289| none| M| 11|
| 880| student| M| 13|
| 628| none| M| 13|
+——+———-+——+—+
only showing top 5 rows

1
df.select("userid","occupation","gender","age").orderBy(df.age.desc()).show(5)

+——+————-+——+—+
|userid| occupation|gender|age|
+——+————-+——+—+
| 481| retired| M| 73|
| 767| engineer| M| 70|
| 860| retired| F| 70|
| 803|administrator| M| 70|
| 559| executive| M| 69|
+——+————-+——+—+
only showing top 5 rows

19.7 多字段排序数据

1
userRDD.takeOrdered(5, key = lambda x: (-int(x[1]), x[2] ) )

[[‘481’, ‘73’, ‘M’, ‘retired’, ‘37771’],
[‘860’, ‘70’, ‘F’, ‘retired’, ‘48322’],
[‘767’, ‘70’, ‘M’, ‘engineer’, ‘00000’],
[‘803’, ‘70’, ‘M’, ‘administrator’, ‘78212’],
[‘559’, ‘69’, ‘M’, ‘executive’, ‘10022’]]

1
2
3
4
sqlContext.sql("""
SELECT userid, age, gender,occupation,zipcode
FROM user_table
ORDER BY age DESC,gender """).show(5)

+——+—+——+————-+——-+
|userid|age|gender| occupation|zipcode|
+——+—+——+————-+——-+
| 481| 73| M| retired| 37771|
| 860| 70| F| retired| 48322|
| 803| 70| M|administrator| 78212|
| 767| 70| M| engineer| 00000|
| 559| 69| M| executive| 10022|
+——+—+——+————-+——-+
only showing top 5 rows

1
df.orderBy(["age","gender"],ascending=[0,1] ).show(5)

+—+——+————-+——+——-+
|age|gender| occupation|userid|zipcode|
+—+——+————-+——+——-+
| 73| M| retired| 481| 37771|
| 70| F| retired| 860| 48322|
| 70| M| engineer| 767| 00000|
| 70| M|administrator| 803| 78212|
| 69| M| executive| 559| 10022|
+—+——+————-+——+——-+
only showing top 5 rows

1
df.orderBy(df.age.desc(),df.gender ).show(5)

+—+——+————-+——+——-+
|age|gender| occupation|userid|zipcode|
+—+——+————-+——+——-+
| 73| M| retired| 481| 37771|
| 70| F| retired| 860| 48322|
| 70| M| engineer| 767| 00000|
| 70| M|administrator| 803| 78212|
| 69| M| executive| 559| 10022|
+—+——+————-+——+——-+
only showing top 5 rows

19.8 显示不重复数据

1
userRDD.map( lambda x:x[2] ).distinct().collect()

[‘M’, ‘F’]

1
userRDD.map( lambda x:(x[1],x[2]) ).distinct().take(20)

[(‘23’, ‘M’),
(‘42’, ‘M’),
(‘36’, ‘M’),
(‘39’, ‘F’),
(‘28’, ‘F’),
(‘47’, ‘M’),
(‘49’, ‘F’),
(‘30’, ‘M’),
(‘35’, ‘F’),
(‘42’, ‘F’),
(‘25’, ‘M’),
(‘30’, ‘F’),
(‘39’, ‘M’),
(‘49’, ‘M’),
(‘32’, ‘M’),
(‘41’, ‘M’),
(‘7’, ‘M’),
(‘38’, ‘F’),
(‘38’, ‘M’),
(‘27’, ‘F’)]

1
sqlContext.sql(" SELECT distinct gender FROM user_table").show()

+——+
|gender|
+——+
| F|
| M|
+——+

1
sqlContext.sql(" SELECT distinct age,gender  FROM user_table").show()

+—+——+
|age|gender|
+—+——+
| 39| F|
| 48| M|
| 26| M|
| 28| M|
| 54| M|
| 60| M|
| 50| M|
| 53| F|
| 30| M|
| 48| F|
| 47| M|
| 46| M|
| 56| M|
| 31| M|
| 32| M|
| 53| M|
| 29| F|
| 20| F|
| 21| F|
| 42| M|
+—+——+
only showing top 20 rows

1
user_df.select("gender").distinct().show()

+——+
|gender|
+——+
| F|
| M|
+——+

1
user_df.select("age","gender").distinct().show()

+—+——+
|age|gender|
+—+——+
| 48| M|
| 39| F|
| 26| M|
| 28| M|
| 54| M|
| 60| M|
| 50| M|
| 30| M|
| 53| F|
| 48| F|
| 47| M|
| 46| M|
| 56| M|
| 31| M|
| 32| M|
| 53| M|
| 29| F|
| 20| F|
| 21| F|
| 42| M|
+—+——+
only showing top 20 rows

19.9 分组统计数据

1
2
userRDD.map(lambda x: (x[2],1)) \
.reduceByKey(lambda x,y: x+y).collect()

[(‘M’, 670), (‘F’, 273)]

1
userRDD.map(lambda x: ((x[2],x[3]),1)).reduceByKey(lambda x,y: x+y).collect()

[((‘M’, ‘technician’), 26),
((‘M’, ‘writer’), 26),
((‘M’, ‘lawyer’), 10),
((‘M’, ‘scientist’), 28),
((‘M’, ‘entertainment’), 16),
((‘M’, ‘librarian’), 22),
((‘F’, ‘librarian’), 29),
((‘F’, ‘marketing’), 10),
((‘M’, ‘marketing’), 16),
((‘M’, ‘healthcare’), 5),
((‘M’, ‘salesman’), 9),
((‘F’, ‘writer’), 19),
((‘F’, ‘lawyer’), 2),
((‘F’, ‘healthcare’), 11),
((‘F’, ‘scientist’), 3),
((‘F’, ‘salesman’), 3),
((‘F’, ‘entertainment’), 2),
((‘F’, ‘technician’), 1),
((‘F’, ‘other’), 36),
((‘M’, ‘executive’), 29),
((‘M’, ‘administrator’), 43),
((‘M’, ‘student’), 136),
((‘M’, ‘educator’), 69),
((‘F’, ‘educator’), 26),
((‘M’, ‘programmer’), 60),
((‘F’, ‘homemaker’), 6),
((‘F’, ‘artist’), 13),
((‘M’, ‘engineer’), 65),
((‘M’, ‘artist’), 15),
((‘F’, ‘student’), 60),
((‘F’, ‘administrator’), 36),
((‘M’, ‘none’), 5),
((‘M’, ‘other’), 69),
((‘F’, ‘executive’), 3),
((‘M’, ‘retired’), 13),
((‘M’, ‘doctor’), 7),
((‘F’, ‘none’), 4),
((‘F’, ‘programmer’), 6),
((‘F’, ‘engineer’), 2),
((‘F’, ‘retired’), 1),
((‘M’, ‘homemaker’), 1)]

1
2
3
4
sqlContext.sql(""" 
SELECT gender ,count(*) counts
FROM user_table
GROUP BY gender""").show()

+——+——+
|gender|counts|
+——+——+
| F| 273|
| M| 670|
+——+——+

1
2
3
4
5
sqlContext.sql(""" 
SELECT gender,occupation,count(*) counts
FROM user_table
GROUP BY gender,occupation
""").show(100)

+——+————-+——+
|gender| occupation|counts|
+——+————-+——+
| M| executive| 29|
| M| educator| 69|
| F| none| 4|
| F|entertainment| 2|
| F| retired| 1|
| F| artist| 13|
| F| librarian| 29|
| F| engineer| 2|
| F| healthcare| 11|
| F|administrator| 36|
| M| other| 69|
| M| homemaker| 1|
| F| lawyer| 2|
| M| programmer| 60|
| M| salesman| 9|
| M| none| 5|
| M| marketing| 16|
| M|entertainment| 16|
| M| technician| 26|
| M|administrator| 43|
| F| marketing| 10|
| F| programmer| 6|
| F| technician| 1|
| F| executive| 3|
| M| scientist| 28|
| F| educator| 26|
| M| retired| 13|
| M| healthcare| 5|
| M| writer| 26|
| M| lawyer| 10|
| M| student| 136|
| F| salesman| 3|
| M| doctor| 7|
| M| artist| 15|
| F| homemaker| 6|
| M| engineer| 65|
| F| other| 36|
| F| writer| 19|
| F| student| 60|
| F| scientist| 3|
| M| librarian| 22|
+——+————-+——+

1
2
3
user_df.select("gender")      \
.groupby("gender") \
.count().show()

+——+—–+
|gender|count|
+——+—–+
| F| 273|
| M| 670|
+——+—–+

1
2
3
4
5
user_df.select("gender","occupation").                 \
groupby("gender","occupation"). \
count(). \
orderBy("gender","occupation"). \
show(100)

+——+————-+—–+
|gender| occupation|count|
+——+————-+—–+
| F|administrator| 36|
| F| artist| 13|
| F| educator| 26|
| F| engineer| 2|
| F|entertainment| 2|
| F| executive| 3|
| F| healthcare| 11|
| F| homemaker| 6|
| F| lawyer| 2|
| F| librarian| 29|
| F| marketing| 10|
| F| none| 4|
| F| other| 36|
| F| programmer| 6|
| F| retired| 1|
| F| salesman| 3|
| F| scientist| 3|
| F| student| 60|
| F| technician| 1|
| F| writer| 19|
| M|administrator| 43|
| M| artist| 15|
| M| doctor| 7|
| M| educator| 69|
| M| engineer| 65|
| M|entertainment| 16|
| M| executive| 29|
| M| healthcare| 5|
| M| homemaker| 1|
| M| lawyer| 10|
| M| librarian| 22|
| M| marketing| 16|
| M| none| 5|
| M| other| 69|
| M| programmer| 60|
| M| retired| 13|
| M| salesman| 9|
| M| scientist| 28|
| M| student| 136|
| M| technician| 26|
| M| writer| 26|
+——+————-+—–+

1
user_df.stat.crosstab("occupation","gender" ).show(30)

+—————–+—+—+
|occupation_gender| F| M|
+—————–+—+—+
| scientist| 3| 28|
| student| 60|136|
| writer| 19| 26|
| salesman| 3| 9|
| retired| 1| 13|
| administrator| 36| 43|
| programmer| 6| 60|
| doctor| 0| 7|
| homemaker| 6| 1|
| executive| 3| 29|
| engineer| 2| 65|
| entertainment| 2| 16|
| marketing| 10| 16|
| technician| 1| 26|
| artist| 13| 15|
| librarian| 29| 22|
| lawyer| 2| 10|
| educator| 26| 69|
| healthcare| 11| 5|
| none| 4| 5|
| other| 36| 69|
+—————–+—+—+

1
user_df.describe().show()

+——-+—————–+——+————-+—————–+——————+
|summary| age|gender| occupation| userid| zipcode|
+——-+—————–+——+————-+—————–+——————+
| count| 943| 943| 943| 943| 943|
| mean|34.05196182396607| null| null| 472.0| 50868.78810810811|
| stddev|12.19273973305903| null| null|272.3649512449549|30891.373254138158|
| min| 7| F|administrator| 1| 00000|
| max| 73| M| writer| 943| Y1A6B|
+——-+—————–+——+————-+—————–+——————+

19.10 Join关联数据

ZipCode

1
#wget http://federalgovernmentzipcodes.us/free-zipcode-database-Primary.csv
1
2
3
#Path="file:/home/hduser/pythonwork/ipynotebook/"
rawDataWithHeader = sc.textFile(Path+"data/free-zipcode-database-Primary.csv")
rawDataWithHeader .take(2)

[‘“Zipcode”,”ZipCodeType”,”City”,”State”,”LocationType”,”Lat”,”Long”,”Location”,”Decommisioned”,”TaxReturnsFiled”,”EstimatedPopulation”,”TotalWages”‘,
‘“00705”,”STANDARD”,”AIBONITO”,”PR”,”PRIMARY”,18.14,-66.26,”NA-US-PR-AIBONITO”,”false”,,,’]

1
2
3
header = rawDataWithHeader.first()
rawData = rawDataWithHeader.filter(lambda x:x !=header)
rawData.first()

‘“00705”,”STANDARD”,”AIBONITO”,”PR”,”PRIMARY”,18.14,-66.26,”NA-US-PR-AIBONITO”,”false”,,,’

1
2
rData=rawData.map(lambda x: x.replace("\"", ""))   
rData.first()

‘00705,STANDARD,AIBONITO,PR,PRIMARY,18.14,-66.26,NA-US-PR-AIBONITO,false,,,’

1
2
ZipRDD = rData.map(lambda x: x.split(","))
ZipRDD.first()

[‘00705’,
‘STANDARD’,
‘AIBONITO’,
‘PR’,
‘PRIMARY’,
‘18.14’,
‘-66.26’,
‘NA-US-PR-AIBONITO’,
‘false’,
‘’,
‘’,
‘’]

19.19.2 建立zipcode_table

1
2
3
4
5
6
7
8
9
10
from pyspark.sql import Row
zipcode_data =ZipRDD .map(lambda p:
Row(
zipcode=int(p[0]),
zipCodeType=p[1],
city=p[2],
state=p[3]
)
)
zipcode_data.take(5)

[Row(city=’AIBONITO’, state=’PR’, zipCodeType=’STANDARD’, zipcode=705),
Row(city=’ANASCO’, state=’PR’, zipCodeType=’STANDARD’, zipcode=610),
Row(city=’ANGELES’, state=’PR’, zipCodeType=’PO BOX’, zipcode=611),
Row(city=’ARECIBO’, state=’PR’, zipCodeType=’STANDARD’, zipcode=612),
Row(city=’ADJUNTAS’, state=’PR’, zipCodeType=’STANDARD’, zipcode=601)]

1
2
zipcode_df  = sqlContext.createDataFrame(zipcode_data )
zipcode_df.printSchema()

root
|– city: string (nullable = true)
|– state: string (nullable = true)
|– zipCodeType: string (nullable = true)
|– zipcode: long (nullable = true)

1
2
zipcode_df.registerTempTable("zipcode_table")
zipcode_df.show(10)

+———+—–+———–+——-+
| city|state|zipCodeType|zipcode|
+———+—–+———–+——-+
| AIBONITO| PR| STANDARD| 705|
| ANASCO| PR| STANDARD| 610|
| ANGELES| PR| PO BOX| 611|
| ARECIBO| PR| STANDARD| 612|
| ADJUNTAS| PR| STANDARD| 601|
| CASTANER| PR| PO BOX| 631|
| AGUADA| PR| STANDARD| 602|
|AGUADILLA| PR| STANDARD| 603|
|AGUADILLA| PR| PO BOX| 604|
|AGUADILLA| PR| PO BOX| 605|
+———+—–+———–+——-+
only showing top 10 rows

1
2
3
4
sqlContext.sql(""" 
SELECT z.*
FROM zipcode_table z
""").show(10)

+———+—–+———–+——-+
| city|state|zipCodeType|zipcode|
+———+—–+———–+——-+
| AIBONITO| PR| STANDARD| 705|
| ANASCO| PR| STANDARD| 610|
| ANGELES| PR| PO BOX| 611|
| ARECIBO| PR| STANDARD| 612|
| ADJUNTAS| PR| STANDARD| 601|
| CASTANER| PR| PO BOX| 631|
| AGUADA| PR| STANDARD| 602|
|AGUADILLA| PR| STANDARD| 603|
|AGUADILLA| PR| PO BOX| 604|
|AGUADILLA| PR| PO BOX| 605|
+———+—–+———–+——-+
only showing top 10 rows

1
2
3
4
5
6
sqlContext.sql(""" 
SELECT u.* ,z.city,z.state
FROM user_table u
LEFT JOIN zipcode_table z ON u.zipcode = z.zipcode
WHERE z.state='NY'
""").show(10)

+—+——+————-+——+——-+—————-+—–+
|age|gender| occupation|userid|zipcode| city|state|
+—+——+————-+——+——-+—————-+—–+
| 29| M| other| 478| 10019| NEW YORK| NY|
| 22| F| healthcare| 405| 10019| NEW YORK| NY|
| 22| M| student| 327| 11101|LONG ISLAND CITY| NY|
| 48| M| educator| 656| 10314| STATEN ISLAND| NY|
| 27| F| writer| 617| 11201| BROOKLYN| NY|
| 35| F| other| 760| 14211| BUFFALO| NY|
| 30| F| writer| 557| 11217| BROOKLYN| NY|
| 27| M| marketing| 806| 11217| BROOKLYN| NY|
| 32| F| other| 155| 11217| BROOKLYN| NY|
| 23| M|administrator| 509| 10011| NEW YORK| NY|
+—+——+————-+——+——-+—————-+—–+
only showing top 10 rows

1
2
3
4
5
6
sqlContext.sql(""" 
SELECT z.state ,count(*)
FROM user_table u
LEFT JOIN zipcode_table z ON u.zipcode = z.zipcode
GROUP BY z.state
""").show(60)

+—–+——–+
|state|count(1)|
+—–+——–+
| SC| 11|
| AZ| 14|
| LA| 6|
| MN| 78|
| NJ| 18|
| DC| 14|
| OR| 20|
| VA| 27|
| null| 35|
| RI| 3|
| KY| 11|
| WY| 1|
| NH| 6|
| MI| 23|
| NV| 3|
| WI| 22|
| ID| 7|
| CA| 116|
| CT| 17|
| NE| 6|
| MT| 2|
| NC| 19|
| VT| 5|
| MD| 27|
| DE| 3|
| MO| 17|
| IL| 50|
| ME| 2|
| ND| 2|
| WA| 24|
| MS| 3|
| AL| 3|
| IN| 9|
| AE| 1|
| OH| 32|
| TN| 12|
| IA| 14|
| NM| 2|
| PA| 34|
| SD| 1|
| NY| 60|
| TX| 51|
| WV| 3|
| GA| 19|
| MA| 35|
| KS| 4|
| CO| 20|
| FL| 24|
| AK| 5|
| AR| 1|
| OK| 9|
| AP| 1|
| UT| 9|
| HI| 2|
+—–+——–+

1
#user_df.leftOuterJoin(zipcode_df )
1
2
3
4
joined_df=user_df.join(zipcode_df ,  \
user_df.zipcode == zipcode_df.zipcode, "left_outer")

joined_df.printSchema()

root
|– age: long (nullable = true)
|– gender: string (nullable = true)
|– occupation: string (nullable = true)
|– userid: long (nullable = true)
|– zipcode: string (nullable = true)
|– city: string (nullable = true)
|– state: string (nullable = true)
|– zipCodeType: string (nullable = true)
|– zipcode: long (nullable = true)

1
joined_df.show(10)

+—+——+————-+——+——-+————-+—–+———–+——-+
|age|gender| occupation|userid|zipcode| city|state|zipCodeType|zipcode|
+—+——+————-+——+——-+————-+—–+———–+——-+
| 59| F|administrator| 131| 15237| PITTSBURGH| PA| STANDARD| 15237|
| 17| M| student| 619| 44134| CLEVELAND| OH| STANDARD| 44134|
| 38| F|entertainment| 839| 90814| LONG BEACH| CA| STANDARD| 90814|
| 48| M|administrator| 409| 98225| BELLINGHAM| WA| STANDARD| 98225|
| 31| M| educator| 791| 20064| WASHINGTON| DC| UNIQUE| 20064|
| 51| M| engineer| 271| 22932| CROZET| VA| STANDARD| 22932|
| 17| M|entertainment| 375| 37777| LOUISVILLE| TN| STANDARD| 37777|
| 27| M| student| 758| 53706| MADISON| WI| STANDARD| 53706|
| 33| M| scientist| 272| 53706| MADISON| WI| STANDARD| 53706|
| 30| F| librarian| 344| 94117|SAN FRANCISCO| CA| STANDARD| 94117|
+—+——+————-+——+——-+————-+—–+———–+——-+
only showing top 10 rows

1
joined_df.filter("state='NY' ").show(10)

+—+——+————-+——+——-+—————-+—–+———–+——-+
|age|gender| occupation|userid|zipcode| city|state|zipCodeType|zipcode|
+—+——+————-+——+——-+—————-+—–+———–+——-+
| 29| M| other| 478| 10019| NEW YORK| NY| STANDARD| 10019|
| 22| F| healthcare| 405| 10019| NEW YORK| NY| STANDARD| 10019|
| 22| M| student| 327| 11101|LONG ISLAND CITY| NY| STANDARD| 11101|
| 48| M| educator| 656| 10314| STATEN ISLAND| NY| STANDARD| 10314|
| 27| F| writer| 617| 11201| BROOKLYN| NY| STANDARD| 11201|
| 35| F| other| 760| 14211| BUFFALO| NY| STANDARD| 14211|
| 30| F| writer| 557| 11217| BROOKLYN| NY| STANDARD| 11217|
| 27| M| marketing| 806| 11217| BROOKLYN| NY| STANDARD| 11217|
| 32| F| other| 155| 11217| BROOKLYN| NY| STANDARD| 11217|
| 23| M|administrator| 509| 10011| NEW YORK| NY| STANDARD| 10011|
+—+——+————-+——+——-+—————-+—–+———–+——-+
only showing top 10 rows

1
2
GroupByState_df=joined_df.groupBy("state").count()
GroupByState_df.show(60)

+—–+—–+
|state|count|
+—–+—–+
| SC| 11|
| AZ| 14|
| LA| 6|
| MN| 78|
| NJ| 18|
| DC| 14|
| OR| 20|
| VA| 27|
| null| 35|
| RI| 3|
| KY| 11|
| WY| 1|
| NH| 6|
| MI| 23|
| NV| 3|
| WI| 22|
| ID| 7|
| CA| 116|
| CT| 17|
| NE| 6|
| MT| 2|
| NC| 19|
| VT| 5|
| MD| 27|
| DE| 3|
| MO| 17|
| IL| 50|
| ME| 2|
| WA| 24|
| ND| 2|
| MS| 3|
| AL| 3|
| IN| 9|
| AE| 1|
| OH| 32|
| TN| 12|
| IA| 14|
| NM| 2|
| PA| 34|
| SD| 1|
| NY| 60|
| TX| 51|
| WV| 3|
| GA| 19|
| MA| 35|
| KS| 4|
| FL| 24|
| CO| 20|
| AK| 5|
| AR| 1|
| OK| 9|
| AP| 1|
| UT| 9|
| HI| 2|
+—–+—–+

19.11 以Pandas DataFrame绘图

1
2
3
import pandas as pd
GroupByState_pandas_df =GroupByState_df.toPandas().set_index('state')
GroupByState_pandas_df







































































































































































































































count
state
SC11
AZ14
LA6
MN78
NJ18
DC14
OR20
VA27
NaN35
RI3
KY11
WY1
NH6
MI23
NV3
WI22
ID7
CA116
CT17
NE6
MT2
NC19
VT5
MD27
DE3
MO17
IL50
ME2
WA24
ND2
MS3
AL3
IN9
AE1
OH32
TN12
IA14
NM2
PA34
SD1
NY60
TX51
WV3
GA19
MA35
KS4
CO20
FL24
AK5
AR1
OK9
AP1
UT9
HI2

1
GroupByState_pandas_df.T























































stateSCAZLAMNNJDCORVANoneRIMAKSCOFLAKAROKAPUTHI
count1114678181420273533542024519192

1 rows × 54 columns


1
2
3
4
5
import matplotlib.pyplot as plt
%matplotlib inline
ax = GroupByState_pandas_df ['count'] \
.plot(kind='bar', title ="State ",figsize=(12,6),legend=True, fontsize=12)
plt.show()

1
2
3
4
5
6
Occupation_df=sqlContext.sql(""" 
SELECT u.occupation ,count(*) counts
FROM user_table u
GROUP BY occupation
""")
Occupation_df.show(30)

+————-+——+
| occupation|counts|
+————-+——+
| librarian| 51|
| retired| 14|
| lawyer| 12|
| none| 9|
| writer| 45|
| programmer| 66|
| marketing| 26|
| other| 105|
| executive| 32|
| scientist| 31|
| student| 196|
| salesman| 12|
| artist| 28|
| technician| 27|
|administrator| 79|
| engineer| 67|
| healthcare| 16|
| educator| 95|
|entertainment| 18|
| homemaker| 7|
| doctor| 7|
+————-+——+

1
2
Occupation_pandas_df =Occupation_df.toPandas().set_index('occupation')
Occupation_pandas_df



































































































counts
occupation
librarian51
retired14
lawyer12
none9
writer45
programmer66
marketing26
other105
executive32
scientist31
student196
salesman12
artist28
technician27
administrator79
engineer67
healthcare16
educator95
entertainment18
homemaker7
doctor7

1
2
3
4
ax =Occupation_pandas_df['counts'].plot(kind='pie',
title ="occupation",figsize=(8,8),startangle=90,autopct='%1.1f%%')
ax.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
plt.show()

png