社会焦点

在Apache Spark 2.0中使用DataFrames和SQL

字号+ 作者: 来源: 2017-05-28

在Apache Spark 2.0中使用DataFrames和SQL,r如何退出spark-shell,spark中rdd怎么进行迭代操作,大疆spark什么时候上市,spark rdd中调用外部变量,各种数据源用spark怎么连接

  作者|马小龙(Dr. Christoph Schubert)

  责编|郭芮

  Spark 2.0中使用DataFrames和SQL的第一步

  Spark 2.0开发的一个动机是让它可以触及更广泛的受众,特别是缺乏编程技能但可能非常熟悉SQL的数据分析师或业务分析师。因此,Spark 2.0现在比以往更易使用。在这部分,我将介绍如何使用Apache Spark 2.0。并将重点关注DataFrames作为新Dataset API的无类型版本。

  到Spark 1.3,弹性分布式数据集(Resilient Distributed Dataset,RDD)一直是Spark中的主要抽象。RDD API是在Scala集合框架之后建模的,因此间接提供了Hadoop Map / Reduce熟悉的编程原语以及函数式编程(Map、Filter、Reduce)的常用编程原语。虽然RDD API比Map / Reduce范例更具表达性,但表达复杂查询仍然很繁琐,特别是对于来自典型数据分析背景的用户,他们可能熟悉SQL,或来自R/Python编程语言的数据框架。

  Spark 1.3引入了DataFrames作为RDD顶部的一个新抽象。DataFrame是具有命名列的行集合,在R和Python相应包之后建模。

  Spark 1.6看到了Dataset类作为DataFrame的类型化版本而引入。在Spark 2.0中,DataFrames实际上是Datasets的特殊版本,我们有type DataFrame = Dataset [Row],因此DataFrame和Dataset API是统一的。

  表面上,DataFrame就像SQL表。Spark 2.0将这种关系提升到一个新水平:我们可以使用SQL来修改和查询DataSets和DataFrames。通过限制表达数量,有助于更好地优化。数据集也与Catalyst优化器良好集成,大大提高了Spark代码的执行速度。因此,新的开发应该利用DataFrames。

  在本文中,我将重点介绍Spark 2.0中DataFrames的基本用法。我将尝试强调Dataset API和SQL间的相似性,以及如何使用SQL和Dataset API互换地查询数据。借由整个代码生成和Catalyst优化器,两个版本将编译相同高效的代码。

  代码示例以Scala编程语言给出。我认为这样的代码最清晰,因为Spark本身就是用Scala编写的。

  ?SparkSession

  SparkSession类替换了Apache Spark 2.0中的SparkContext和SQLContext,并为Spark集群提供了唯一的入口点。

  在Apache Spark 2.0中使用DataFrames和SQL

  为了向后兼容,SparkSession对象包含SparkContext和SQLContext对象,见下文。当我们使用交互式Spark shell时,为我们创建一个名为spark的SparkSession对象。

  ?创建DataFrames

  DataFrame是具有命名列的表。最简单的DataFrame是使用SparkSession的range方法来创建:

  使用show给我们一个DataFrame的表格表示,可以使用describe来获得数值属性概述。describe返回一个DataFrame:

  在Apache Spark 2.0中使用DataFrames和SQL

  观察到Spark为数据帧中唯一的列选择了名称id。 对于更有趣的示例,请考虑以下数据集:

  在Apache Spark 2.0中使用DataFrames和SQL

  在这种情况下,customerDF对象将有名为_1、_2、_3、_4的列,它们以某种方式违反了命名列的目的。可以通过重命名列来恢复:

  在Apache Spark 2.0中使用DataFrames和SQL

  使用printSchema和describe提供以下输出:

  在Apache Spark 2.0中使用DataFrames和SQL

  一般来说我们会从文件加载数据。SparkSession类为提供了以下方法:

  在这里我们让Spark从CSV文件的第一行提取头信息(通过设置header选项为true),并使用数字类型(age和total)将数字列转换为相应的数据类型 inferSchema选项。

  其他可能的数据格式包括parquet文件和通过JDBC连接读取数据的可能性。

  ?基本数据操作

  我们现在将访问DataFrame中数据的基本功能,并将其与SQL进行比较。

  沿袭,操作,动作和整个阶段的代码生成

  相同的谱系概念,转换操作和行动操作之间的区别适用于Dataset和RDD。我们下面讨论的大多数DataFrame操作都会产生一个新的DataFrame,但实际上不执行任何计算。要触发计算,必须调用行动操作之一,例如show(将DataFrame的第一行作为表打印),collect(返回一个Row对象的Array),count(返回DataFrame中的行数),foreach(对每一行应用一个函数)。这是惰性求值(lazy evaluation)的常见概念。

  下面Dataset类的所有方法实际上依赖于所有数据集的有向非循环图(Directed Acyclic Graph,DAG),从现有数据集中创建一个新的“数据集”。这被称为数据集的沿袭。仅使用调用操作时,Catalyst优化程序将分析沿袭中的所有转换,并生成实际代码。这被称为整阶段代码生成,并且负责Dataset对RDD的性能改进。

  Row-行对象

  Row类在DataFrame的一行不带类型数据值中充当容器。通常情况下我们不会自己创建Row对象,而是使用下面的语法:

  Row对象元素通过位置(从0开始)或者使用apply进行访问:

  它会产生一个Any的对象类型。或者最好使用get,方法之一:

  因为这样就不会出现原始类型的开销。我们可以使用isNull方法检查行中的一个条目是否为’null’:

  我们现在来看看DataFrame类最常用的转换操作:

  select

  我们将要看的第一个转换是“select”,它允许我们对一个DataFrame的列进行投影和变换。

  引用列

  通过它们的名称有两种方法来访问DataFrame列:可以将其引用为字符串;或者可以使用apply方法,col-方法或$以字符串作为参数并返回一个Column(列)对象。所以customerDF.col(“customer”)和customerDF(“customer”)都是customerDF的第一列。

  选择和转换列

  最简单的select转换形式允许我们将DataFrame投影到包含较少列的DataFrame中。下面的四个表达式返回一个只包含customer和province列的DataFrame:

  在Apache Spark 2.0中使用DataFrames和SQL

  不能在单个select方法中调用混合字符串和列参数:customerDF.select(“customer”, $”province”)导致错误。

  使用Column类定义的运算符,可以构造复杂的列表达式:

  应用show得到以下结果:

  在Apache Spark 2.0中使用DataFrames和SQL

  列别名

  新数据集的列名称从用于创建的表达式中派生而来,我们可以使用alias或as将列名更改为其他助记符:

  产生与前面相同内容的DataFrame,但使用名为name,newAge和isZJ的列。

  Column类包含用于执行基本数据分析任务的各种有效方法。我们将参考读者文档的详细信息。

转载请注明出处。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

相关文章
  • 大疆spark到底有多牛逼?!

    大疆spark到底有多牛逼?!

  • 大疆 Spark 无人机到手,有啥问题冲我来!

    大疆 Spark 无人机到手,有啥问题冲我来!

  • 科幻片成了现实,大疆发布的 Spark 是真的「自拍神器」

    科幻片成了现实,大疆发布的 Spark 是真的「自拍神器」

  • 大疆发布微型无人机“晓”Spark,自拍无人机市场是否会迎来新一轮洗牌?

    大疆发布微型无人机“晓”Spark,自拍无人机市场是否会迎来新一轮洗