社会焦点

在Apache Spark 2.0中使用DataFrames和SQL(2)

字号+ 作者: 来源: 2017-05-28

最后,我们可以使用lit函数添加一个具有常量值的列,并使用when和otherwise重新编码列值。 例如,我们添加一个新列“ageGroup”,如果“age 20”,则为1,如果“age 30”则为2,否则为3,以及总是为“false”的列“

  最后,我们可以使用lit函数添加一个具有常量值的列,并使用when和otherwise重新编码列值。 例如,我们添加一个新列“ageGroup”,如果“age <20”,则为1,如果“age <30”则为2,否则为3,以及总是为“false”的列“trusted”:

  给出以下DataFrame:

  在Apache Spark 2.0中使用DataFrames和SQL

  drop是select相对的转换操作;它返回一个DataFrame,其中删除了原始DataFrame的某些列。

  最后可使用distinct方法返回原始DataFrame中唯一值的DataFrame:

  返回一个包含单个列的DataFrame和包含值的三行:“北京”、“江苏”、“浙江”。

  filter

  第二个DataFrame转换是Filter方法,它在DataFrame行中进行选择。有两个重载方法:一个接受一个Column,另一个接受一个SQL表达式(一个String)。例如,有以下两种等效方式来过滤年龄大于30岁的所有客户:

  Filter转换接受一般的布尔连接符and(和)和or(或):

  我们在SQL版本中使用单个等号,或者使用三等式“===”(Column类的一个方法)。在==运算符中使用Scala的等于符号会导致错误。我们再次引用Column类文档中的有用方法。

  聚合(aggregation)

  执行聚合是进行数据分析的最基本任务之一。例如,我们可能对每个订单的总金额感兴趣,或者更具体地,对每个省或年龄组的总金额或平均金额感兴趣。可能还有兴趣了解哪个客户的年龄组具有高于平均水平的总数。借用SQL,我们可以使用GROUP BY表达式来解决这些问题。DataFrames提供了类似的功能。可以根据一些列的值进行分组,同样,还可以使用字符串或“Column”对象来指定。

  我们将使用以下DataFrame:

  withColumn方法添加一个新的列或替换一个现有的列。

  聚合数据分两步进行:一个调用GroupBy方法将特定列中相等值的行组合在一起,然后调用聚合函数,如sum(求和值),max(最大值)或为原始DataFrame中每组行计算的“avg”(平均值)。从技术上来说,GroupBy会返回一个RelationalGroupedDataFrame类的对象。RelationalGroupedDataFrame包含max、min、avg、mean和sum方法,所有这些方法都对DataFrame的数字列执行指定操作,并且可以接受一个String-参数来限制所操作的数字列。此外,我们有一个count方法计算每个组中的行数,还有一个通用的agg方法允许我们指定更一般的聚合函数。所有这些方法都会返回一个DataFrame。

  例如:

  输出以下内容:

  在Apache Spark 2.0中使用DataFrames和SQL

  customerAgeGroupDF.groupBy(“agegroup”).max().show()输出:

  在Apache Spark 2.0中使用DataFrames和SQL

  最后,customerAgeGroupDF.groupBy(“agegroup”).min(“age”, “total”).show()输出:

  在Apache Spark 2.0中使用DataFrames和SQL

  还有一个通用的agg方法,接受复杂的列表达式。agg在RelationalGroupedDataFrame和Dataset中都可用。后一种方法对整个数据集执行聚合。这两种方法都允许我们给出列表达式的列表:

  在Apache Spark 2.0中使用DataFrames和SQL

  输出:

  在Apache Spark 2.0中使用DataFrames和SQL

  可用的聚合函数在org.apache.spark.sql.functions中定义。类RelationalGroupedDataset在Apache Spark 1.x中被称为“GroupedData”。 RelationalGroupedDataset的另一个特点是可以对某些列值进行透视。例如,以下内容允许我们列出每个年龄组的总数:

  在Apache Spark 2.0中使用DataFrames和SQL

  给出以下输出:

  在Apache Spark 2.0中使用DataFrames和SQL

  其中null值表示没有省/年龄组的组合。Pivot的重载版本接受一个值列表以进行透视。这一方面允许我们限制列数,另一方面更加有效,因为Spark不需要计算枢轴列中的所有值。例如:

  在Apache Spark 2.0中使用DataFrames和SQL

  给出以下输出:

  在Apache Spark 2.0中使用DataFrames和SQL

  最后,使用枢纽数据也可以进行复杂聚合:

  在Apache Spark 2.0中使用DataFrames和SQL

  输出:

  在Apache Spark 2.0中使用DataFrames和SQL

  这里=!=是Column类的“不等于”方法。

  排序和限制

  OrderBy方法允许我们根据一些列对数据集的内容进行排序。和以前一样,我们可以使用Strings或Column对象来指定列:customerDF.orderBy(”age”)和 customerDF.orderBy($”age”)给出相同的结果。默认排序顺序为升序。如果要降序排序,可以使用Column类的desc方法或者desc函数:

  在Apache Spark 2.0中使用DataFrames和SQL

  观察到desc函数返回了一个Column-object,任何其他列也需要被指定为Column-对象。

  最后,limit方法返回一个包含原始DataFrame中第一个n行的DataFrame。

  ?DataFrame方法与SQL对比

  我们已经发现,DataFrame类的基本方法与SQLselect语句的部分密切相关。下表总结了这一对应关系:

  在Apache Spark 2.0中使用DataFrames和SQL

  到目前为止连接(join)在我们的讨论中已经缺失。Spark的DataFrame支持连接,我们将在文章的下一部分讨论它们。

  下面将讨论完全类型化的DataSets API,连接和用户定义的函数(UDF)。

  ?使用SQL来处理DataFrames

转载请注明出处。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

相关文章
  • 大疆spark到底有多牛逼?!

    大疆spark到底有多牛逼?!

  • 大疆 Spark 无人机到手,有啥问题冲我来!

    大疆 Spark 无人机到手,有啥问题冲我来!

  • 科幻片成了现实,大疆发布的 Spark 是真的「自拍神器」

    科幻片成了现实,大疆发布的 Spark 是真的「自拍神器」

  • 大疆发布微型无人机“晓”Spark,自拍无人机市场是否会迎来新一轮洗牌?

    大疆发布微型无人机“晓”Spark,自拍无人机市场是否会迎来新一轮洗