DataFrame API入门操作及代码展示
创始人
2024-04-15 12:18:58
0

文章目录

    • DataFrame风格编程
    • DSL风格编程代码示例
      • 相关API
      • 相关代码示例
    • SQL风格编程代码示例
      • 相关API
      • 相关代码
    • Fucntions包
    • 基于SparkSQL的WordCount代码编写

DataFrame风格编程

  • DataFrame支持两种风格进行编程
    • DSL风格
    • SQL风格
  • DSL称之为领域特定语言,其实就是指DataFrame特有的API,DSL风格意思就是以调用API的方式来处理Data。
  • SQL风格就是使用SQL语句处理DataFrame的数据。

DSL风格编程代码示例

相关API

  • show()方法

    • 功能:展示DataFrame中的数据。
    • 语法:df.show(参数1, 参数2)
      • 参数1: 默认是20, 控制展示多少条。
      • 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 显示请用truncate = True。
  • printSchema()方法

    • 功能:打印输出df的schema信息。
    • 语法:df.printSchema()
  • select()方法

    • 功能:选择DataFrame中的指定列(通过传入参数进行指定)。
    • 语法:df.select()
      • 参数传递1:可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串列名来指定列。
      • 参数传递2:List[Column]对象或者List[str]对象, 用来选择多个列。
  • filter()与where()方法

    • 功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame,两者方法是等价的。
    • 语法:df.filter()、df.where()
  • groupBy()方法

    • 功能:按照指定的列进行数据的分组, 返回值是GroupedData对象。
    • 语法:df.groupBy()

相关代码示例

# coding : utf8
from pyspark.sql import SparkSessionif __name__ == '__main__':ss = SparkSession.builder \.appName("test") \.master("local[*]") \.getOrCreate()sc = ss.sparkContextdf = ss.read.format("csv") \.option("sep", ",") \.schema("id INT, subject STRING, score INT") \.load("../Data/input/stu_score.txt")# DSL风格展示# Column对象获取id_col = df["id"]subject_col = df["subject"]score_col = df["score"]# list形式df.select(["id", "subject"]).limit(10).show()# 可变参数形式df.select("id", "score").limit(10).show()# Column对象形式df.select(id_col, subject_col).limit(10).show()# filter APIdf.filter("score < 99").show()df.filter(df["score"] < 99).show()# where APIdf.where("score < 99").show()df.where(df["score"] < 99).show()# groupby APIdf.groupBy("subject").count().show()df.groupBy(subject_col).count().show()# groupby返回值是GroupedData,不是DataFrame,是一个有分组的数据集合,后面只能是聚合函数print(type(df.groupBy(df["subject"])))

SQL风格编程代码示例

相关API

  • DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表。

  • df.createTempView():注册临时的视图表。

  • df.createOrReplaceTempView():注册或者替换临时的视图表。

  • df.createGlobalTempView():注册全局临时的视图表。

  • 全局表与临时表

    • 全局表:跨SparkSession对象使用,在一个程序的多个SparkSession中均可调用,查询时需要添加前缀:global_temp。
    • 临时表:只在当前的SparkSession中可用。
  • 使用SQL查询我们需要调用SparkSession.sql(“SQL语句”)执行查询,返回值是一个新的DataFrame。

相关代码

# coding : utf8
from pyspark.sql import SparkSessionif __name__ == '__main__':ss = SparkSession.builder \.appName("test") \.master("local[*]") \.getOrCreate()sc = ss.sparkContextdf = ss.read.format("csv") \.option("sep", ",") \.schema("id INT, subject STRING, score INT") \.load("../Data/input/stu_score.txt")# 注册成临时表df.createTempView("score1")df.createOrReplaceTempView("score2") # 创建或替换临时视图表df.createGlobalTempView("score3")  # 创建全局临时视图表ss.sql("SELECT subject, AVG(score) AS  avg_score FROM score1 GROUP BY subject").show()ss.sql("SELECT subject, COUNT(*) AS  cnt FROM score2 GROUP BY subject").show()ss.sql("SELECT subject, MAX(score) AS max_score , MIN(score) AS  min_score FROM global_temp.score3 GROUP BY subject").show()

Fucntions包

  • PySpark提供了一个函数包:pyspark.sql.functions,这个包里提供了一些列的计算函数供SparkSQL使用最常见的有我们所熟悉的split和explode方法。
  • 导入这个包我们可以通过以下代码来实现:
  from pyspark.sql import functions as F
  • 这些功能函数的返回值多数都是column对象。

基于SparkSQL的WordCount代码编写

# coding : utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as Fif __name__ == '__main__':ss = SparkSession.builder \.appName("test") \.master("local[*]") \.getOrCreate()sc = ss.sparkContext# TODO: 1 SQL风格处理rdd = sc.textFile("hdfs://node1:8020/Test/WordCount.txt") \.flatMap(lambda line: line.split(" ")) \.map(lambda x: [x])df1 = rdd.toDF(["words"])df1.createOrReplaceTempView("words")ss.sql("SELECT words, COUNT(*) AS cnt FROM words GROUP  BY words ORDER  BY cnt DESC").show()# TODO: 2 DSL风格处理df2 = ss.read.format("text") \.load("hdfs://node1:8020/Test/WordCount.txt")# withColumn方法# 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在df3 = df2.withColumn("value", F.explode(F.split(df2["value"], " ")))df3.groupBy("value").count() \.withColumnRenamed("value", "words") \.withColumnRenamed("count", "cnt") \.orderBy("cnt", ascending=False).show()
  • 结果展示:
    在这里插入图片描述

相关内容

热门资讯

【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
ChatGPT 怎么用最新详细... ChatGPT 以其强大的信息整合和对话能力惊艳了全球,在自然语言处理上面表现出了惊人...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...