在Apache Spark 2.0中使用DataFrames和SQL(2)
2017-05-28 编辑:
最后,我们可以使用lit函数添加一个具有常量值的列,并使用when和otherwise重新编码列值。 例如,我们添加一个新列“ageGroup”,如果“age <20”,则为1,如果“age <30”则为2,否则为3,以及总是为“false”的列“trusted”:
给出以下DataFrame:
drop是select相对的转换操作;它返回一个DataFrame,其中删除了原始DataFrame的某些列。
最后可使用distinct方法返回原始DataFrame中唯一值的DataFrame:
返回一个包含单个列的DataFrame和包含值的三行:“北京”、“江苏”、“浙江”。
filter
第二个DataFrame转换是Filter方法,它在DataFrame行中进行选择。有两个重载方法:一个接受一个Column,另一个接受一个SQL表达式(一个String)。例如,有以下两种等效方式来过滤年龄大于30岁的所有客户:
Filter转换接受一般的布尔连接符and(和)和or(或):
我们在SQL版本中使用单个等号,或者使用三等式“===”(Column类的一个方法)。在==运算符中使用Scala的等于符号会导致错误。我们再次引用Column类文档中的有用方法。
聚合(aggregation)
执行聚合是进行数据分析的最基本任务之一。例如,我们可能对每个订单的总金额感兴趣,或者更具体地,对每个省或年龄组的总金额或平均金额感兴趣。可能还有兴趣了解哪个客户的年龄组具有高于平均水平的总数。借用SQL,我们可以使用GROUP BY表达式来解决这些问题。DataFrames提供了类似的功能。可以根据一些列的值进行分组,同样,还可以使用字符串或“Column”对象来指定。
我们将使用以下DataFrame:
withColumn方法添加一个新的列或替换一个现有的列。
聚合数据分两步进行:一个调用GroupBy方法将特定列中相等值的行组合在一起,然后调用聚合函数,如sum(求和值),max(最大值)或为原始DataFrame中每组行计算的“avg”(平均值)。从技术上来说,GroupBy会返回一个RelationalGroupedDataFrame类的对象。RelationalGroupedDataFrame包含max、min、avg、mean和sum方法,所有这些方法都对DataFrame的数字列执行指定操作,并且可以接受一个String-参数来限制所操作的数字列。此外,我们有一个count方法计算每个组中的行数,还有一个通用的agg方法允许我们指定更一般的聚合函数。所有这些方法都会返回一个DataFrame。
例如:
输出以下内容:
customerAgeGroupDF.groupBy(“agegroup”).max().show()输出:
最后,customerAgeGroupDF.groupBy(“agegroup”).min(“age”, “total”).show()输出:
还有一个通用的agg方法,接受复杂的列表达式。agg在RelationalGroupedDataFrame和Dataset中都可用。后一种方法对整个数据集执行聚合。这两种方法都允许我们给出列表达式的列表:
输出:
可用的聚合函数在org.apache.spark.sql.functions中定义。类RelationalGroupedDataset在Apache Spark 1.x中被称为“GroupedData”。 RelationalGroupedDataset的另一个特点是可以对某些列值进行透视。例如,以下内容允许我们列出每个年龄组的总数:
给出以下输出:
其中null值表示没有省/年龄组的组合。Pivot的重载版本接受一个值列表以进行透视。这一方面允许我们限制列数,另一方面更加有效,因为Spark不需要计算枢轴列中的所有值。例如:
给出以下输出:
最后,使用枢纽数据也可以进行复杂聚合:
输出:
这里=!=是Column类的“不等于”方法。
排序和限制
OrderBy方法允许我们根据一些列对数据集的内容进行排序。和以前一样,我们可以使用Strings或Column对象来指定列:customerDF.orderBy(”age”)和 customerDF.orderBy($”age”)给出相同的结果。默认排序顺序为升序。如果要降序排序,可以使用Column类的desc方法或者desc函数:
观察到desc函数返回了一个Column-object,任何其他列也需要被指定为Column-对象。
最后,limit方法返回一个包含原始DataFrame中第一个n行的DataFrame。
?DataFrame方法与SQL对比
我们已经发现,DataFrame类的基本方法与SQLselect语句的部分密切相关。下表总结了这一对应关系:
到目前为止连接(join)在我们的讨论中已经缺失。Spark的DataFrame支持连接,我们将在文章的下一部分讨论它们。
下面将讨论完全类型化的DataSets API,连接和用户定义的函数(UDF)。
?使用SQL来处理DataFrames