show()方法
printSchema()方法
select()方法
filter()与where()方法
groupBy()方法
# coding : utf8
from pyspark.sql import SparkSessionif __name__ == '__main__':ss = SparkSession.builder \.appName("test") \.master("local[*]") \.getOrCreate()sc = ss.sparkContextdf = ss.read.format("csv") \.option("sep", ",") \.schema("id INT, subject STRING, score INT") \.load("../Data/input/stu_score.txt")# DSL风格展示# Column对象获取id_col = df["id"]subject_col = df["subject"]score_col = df["score"]# list形式df.select(["id", "subject"]).limit(10).show()# 可变参数形式df.select("id", "score").limit(10).show()# Column对象形式df.select(id_col, subject_col).limit(10).show()# filter APIdf.filter("score < 99").show()df.filter(df["score"] < 99).show()# where APIdf.where("score < 99").show()df.where(df["score"] < 99).show()# groupby APIdf.groupBy("subject").count().show()df.groupBy(subject_col).count().show()# groupby返回值是GroupedData,不是DataFrame,是一个有分组的数据集合,后面只能是聚合函数print(type(df.groupBy(df["subject"])))
DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表。
df.createTempView():注册临时的视图表。
df.createOrReplaceTempView():注册或者替换临时的视图表。
df.createGlobalTempView():注册全局临时的视图表。
全局表与临时表
使用SQL查询我们需要调用SparkSession.sql(“SQL语句”)执行查询,返回值是一个新的DataFrame。
# coding : utf8
from pyspark.sql import SparkSessionif __name__ == '__main__':ss = SparkSession.builder \.appName("test") \.master("local[*]") \.getOrCreate()sc = ss.sparkContextdf = ss.read.format("csv") \.option("sep", ",") \.schema("id INT, subject STRING, score INT") \.load("../Data/input/stu_score.txt")# 注册成临时表df.createTempView("score1")df.createOrReplaceTempView("score2") # 创建或替换临时视图表df.createGlobalTempView("score3") # 创建全局临时视图表ss.sql("SELECT subject, AVG(score) AS avg_score FROM score1 GROUP BY subject").show()ss.sql("SELECT subject, COUNT(*) AS cnt FROM score2 GROUP BY subject").show()ss.sql("SELECT subject, MAX(score) AS max_score , MIN(score) AS min_score FROM global_temp.score3 GROUP BY subject").show()
pyspark.sql.functions,这个包里提供了一些列的计算函数供SparkSQL使用最常见的有我们所熟悉的split和explode方法。 from pyspark.sql import functions as F
# coding : utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as Fif __name__ == '__main__':ss = SparkSession.builder \.appName("test") \.master("local[*]") \.getOrCreate()sc = ss.sparkContext# TODO: 1 SQL风格处理rdd = sc.textFile("hdfs://node1:8020/Test/WordCount.txt") \.flatMap(lambda line: line.split(" ")) \.map(lambda x: [x])df1 = rdd.toDF(["words"])df1.createOrReplaceTempView("words")ss.sql("SELECT words, COUNT(*) AS cnt FROM words GROUP BY words ORDER BY cnt DESC").show()# TODO: 2 DSL风格处理df2 = ss.read.format("text") \.load("hdfs://node1:8020/Test/WordCount.txt")# withColumn方法# 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在df3 = df2.withColumn("value", F.explode(F.split(df2["value"], " ")))df3.groupBy("value").count() \.withColumnRenamed("value", "words") \.withColumnRenamed("count", "cnt") \.orderBy("cnt", ascending=False).show()
