在Apache Spark 2.0中使用DataFrames和SQL
2017-05-28 编辑:
作者|马小龙(Dr. Christoph Schubert)
责编|郭芮
Spark 2.0中使用DataFrames和SQL的第一步
Spark 2.0开发的一个动机是让它可以触及更广泛的受众,特别是缺乏编程技能但可能非常熟悉SQL的数据分析师或业务分析师。因此,Spark 2.0现在比以往更易使用。在这部分,我将介绍如何使用Apache Spark 2.0。并将重点关注DataFrames作为新Dataset API的无类型版本。
到Spark 1.3,弹性分布式数据集(Resilient Distributed Dataset,RDD)一直是Spark中的主要抽象。RDD API是在Scala集合框架之后建模的,因此间接提供了Hadoop Map / Reduce熟悉的编程原语以及函数式编程(Map、Filter、Reduce)的常用编程原语。虽然RDD API比Map / Reduce范例更具表达性,但表达复杂查询仍然很繁琐,特别是对于来自典型数据分析背景的用户,他们可能熟悉SQL,或来自R/Python编程语言的数据框架。
Spark 1.3引入了DataFrames作为RDD顶部的一个新抽象。DataFrame是具有命名列的行集合,在R和Python相应包之后建模。
Spark 1.6看到了Dataset类作为DataFrame的类型化版本而引入。在Spark 2.0中,DataFrames实际上是Datasets的特殊版本,我们有type DataFrame = Dataset [Row],因此DataFrame和Dataset API是统一的。
表面上,DataFrame就像SQL表。Spark 2.0将这种关系提升到一个新水平:我们可以使用SQL来修改和查询DataSets和DataFrames。通过限制表达数量,有助于更好地优化。数据集也与Catalyst优化器良好集成,大大提高了Spark代码的执行速度。因此,新的开发应该利用DataFrames。
在本文中,我将重点介绍Spark 2.0中DataFrames的基本用法。我将尝试强调Dataset API和SQL间的相似性,以及如何使用SQL和Dataset API互换地查询数据。借由整个代码生成和Catalyst优化器,两个版本将编译相同高效的代码。
代码示例以Scala编程语言给出。我认为这样的代码最清晰,因为Spark本身就是用Scala编写的。
?SparkSession
SparkSession类替换了Apache Spark 2.0中的SparkContext和SQLContext,并为Spark集群提供了唯一的入口点。
为了向后兼容,SparkSession对象包含SparkContext和SQLContext对象,见下文。当我们使用交互式Spark shell时,为我们创建一个名为spark的SparkSession对象。
?创建DataFrames
DataFrame是具有命名列的表。最简单的DataFrame是使用SparkSession的range方法来创建:
使用show给我们一个DataFrame的表格表示,可以使用describe来获得数值属性概述。describe返回一个DataFrame:
观察到Spark为数据帧中唯一的列选择了名称id。 对于更有趣的示例,请考虑以下数据集:
在这种情况下,customerDF对象将有名为_1、_2、_3、_4的列,它们以某种方式违反了命名列的目的。可以通过重命名列来恢复:
使用printSchema和describe提供以下输出:
一般来说我们会从文件加载数据。SparkSession类为提供了以下方法:
在这里我们让Spark从CSV文件的第一行提取头信息(通过设置header选项为true),并使用数字类型(age和total)将数字列转换为相应的数据类型 inferSchema选项。
其他可能的数据格式包括parquet文件和通过JDBC连接读取数据的可能性。
?基本数据操作
我们现在将访问DataFrame中数据的基本功能,并将其与SQL进行比较。
沿袭,操作,动作和整个阶段的代码生成
相同的谱系概念,转换操作和行动操作之间的区别适用于Dataset和RDD。我们下面讨论的大多数DataFrame操作都会产生一个新的DataFrame,但实际上不执行任何计算。要触发计算,必须调用行动操作之一,例如show(将DataFrame的第一行作为表打印),collect(返回一个Row对象的Array),count(返回DataFrame中的行数),foreach(对每一行应用一个函数)。这是惰性求值(lazy evaluation)的常见概念。
下面Dataset类的所有方法实际上依赖于所有数据集的有向非循环图(Directed Acyclic Graph,DAG),从现有数据集中创建一个新的“数据集”。这被称为数据集的沿袭。仅使用调用操作时,Catalyst优化程序将分析沿袭中的所有转换,并生成实际代码。这被称为整阶段代码生成,并且负责Dataset对RDD的性能改进。
Row-行对象
Row类在DataFrame的一行不带类型数据值中充当容器。通常情况下我们不会自己创建Row对象,而是使用下面的语法:
Row对象元素通过位置(从0开始)或者使用apply进行访问:
它会产生一个Any的对象类型。或者最好使用get,方法之一:
因为这样就不会出现原始类型的开销。我们可以使用isNull方法检查行中的一个条目是否为’null’:
我们现在来看看DataFrame类最常用的转换操作:
select
我们将要看的第一个转换是“select”,它允许我们对一个DataFrame的列进行投影和变换。
引用列
通过它们的名称有两种方法来访问DataFrame列:可以将其引用为字符串;或者可以使用apply方法,col-方法或$以字符串作为参数并返回一个Column(列)对象。所以customerDF.col(“customer”)和customerDF(“customer”)都是customerDF的第一列。
选择和转换列
最简单的select转换形式允许我们将DataFrame投影到包含较少列的DataFrame中。下面的四个表达式返回一个只包含customer和province列的DataFrame:
不能在单个select方法中调用混合字符串和列参数:customerDF.select(“customer”, $”province”)导致错误。
使用Column类定义的运算符,可以构造复杂的列表达式:
应用show得到以下结果:
列别名
新数据集的列名称从用于创建的表达式中派生而来,我们可以使用alias或as将列名更改为其他助记符:
产生与前面相同内容的DataFrame,但使用名为name,newAge和isZJ的列。
Column类包含用于执行基本数据分析任务的各种有效方法。我们将参考读者文档的详细信息。