在Apache Spark 2.0中使用DataFrames和SQL(3)
2017-05-28 编辑:
我们还在Apache Spark 2.0中直接执行SQL语句。SparkSession的SQL方法返回一个DataFrame。此外,DataFrame的selectExp方法也允许我们为单列指定SQL表达式,如下所示。为了能够引用SQL表达式中的DataFrame,首先有必要将DataFrame注册为临时表,在Spark 2中称为临时视图(temporary view,简称为tempview)。DataFrame为我们提供了以下两种方法:
createTempView创建一个新视图,如果具有该名称的视图已存在,则抛出一个异常;
createOrReplaceTempView创建一个用来替换的临时视图。
两种方法都将视图名称作为唯一参数。
注册表后,可以使用SparkSession的SQL方法来执行SQL语句:
返回具有以下内容的DataFrame:
SparkSession类的catalog字段是Catalog类的一个对象,具有多种处理会话注册表和视图的方法。例如,Catalog的ListTables方法返回一个包含所有已注册表信息的Dataset:
会返回一个包含有关注册表“tableName”中列信息的Dataset,例如:
此外,可以使用DataSet的SelectExpr方法执行某些产生单列的SQL表达式,例如:
这两者都产生DataFrame对象。
?第一步结束语
我们希望让读者相信,Apache Spark 2.0的统一性能够为熟悉SQL的分析师们提供Spark的学习曲线。下一部分将进一步介绍类型化Dataset API的使用、用户定义的函数以及Datasets间的连接。此外,我们将讨论新Dataset API的使用缺陷。
Spark 2.0中使用DataFrames和SQL的第二步
本文第一部分使用了无类型的DataFrame API,其中每行都表示一个Row对象。在下面的内容中,我们将使用更新的DatasetAPI。Dataset是在Apache Spark 1.6中引入的,并已在Spark 2.0中使用DataFrames进行了统一,我们现在有了type DataFrame = Dataset [Row],其中方括号([和] Scala中的泛型类型,因此类似于Java的<和>)。因此,上面讨论的所有诸如select、filter、groupBy、agg、orderBy、limit等方法都以相同的方式使用。
?Datasets:返回类型信息
Spark 2.0以前的DataFrame API本质上是一个无类型的API,这也就意味着在编译期间很可能会因为某些编译器错误,导致无法访问类型信息。
和之前一样,我们将在示例中使用Scala,因为我相信Scala最为简洁。可能涉及的例子:spark将表示SparkSession对象,代表我们的Spark集群。
?例子:分析Apache访问日志
我们将使用Apache访问日志格式数据。先一起回顾Apache日志中的典型行,如下所示:
此行包含以下部分:
127.0.0.1是向服务器发出请求的客户端(远程主机)IP地址(或主机名,如果可用);
输出中的第一个-表示所请求的信息(来自远程机器的用户身份)不可用;
输出中的第二个-表示所请求的信息(来自本地登录的用户身份)不可用;
[01 / Aug / 1995:00:00:01 -0400]表示服务器完成处理请求的时间,格式为:[日/月/年:小时:分:秒 时区],有三个部件:”GET /images/launch-logo.gif HTTP / 1.0”;
请求方法(例如,GET,POST等);
端点(统一资源标识符);
和客户端协议版本(’HTTP / 1.0’)。
1.200这是服务器返回客户端的状态代码。这些信息非常有价值:成功回复(从2开始的代码),重定向(从3开始的代码),客户端导致的错误(以4开头的代码),服务器错误(代码从5开始)。最后一个条目表示返回给客户端的对象大小。如果没有返回任何内容则是-或0。
首要任务是创建适当的类型来保存日志行信息,因此我们使用Scala的case类,具体如下:
默认情况下,case类对象不可变。通过它们的值来比较相等性,而不是通过比较对象引用。
为日志条目定义了合适的数据结构后,现在需要将表示日志条目的String转换为ApacheLog对象。我们将使用正则表达式来达到这一点,参考如下:
可以看到正则表达式包含9个捕获组,用于表示ApacheLog类的字段。
使用正则表达式解析访问日志时,会面临以下问题:
一些日志行的内容大小以-表示,我们想将它转换为0;
一些日志行不符合所选正则表达式给出的格式。
为了克服第二个问题,我们使用Scala的“Option”类型来丢弃不对的格式并进行确认。Option也是一个泛型类型,类型Option[ApacheLog]的对象可以有以下形式:
None,表示不存在一个值(在其他语言中,可能使用null);
Some(log)for a ApacheLog-objectlog。
以下为一行函数解析,并为不可解析的日志条目返回None:
最好的方法是修改正则表达式以捕获所有日志条目,但Option是处理一般错误或不可解析条目的常用技术。
综合起来,现在来剖析一个真正的数据集。我们将使用著名的NASA Apache访问日志数据集,它可以在ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz下载。
下载和解压缩文件后,首先将其打开为String的Dataset,然后使用正则表达式解析:
用spark.read.text方法打开文本文件并返回一个DataFrame,是textfile的行。使用Dataset的as方法将其转换为包含Strings的Dataset对象(而不是Rows包含字符串),并导入spark.implicits._以允许创建一个包含字符串或其他原始类型的Dataset。
现在可以解析数据集:
flatMap将parse_logline函数应用于rawData的每一行,并将Some(ApacheLog)形式的所有结果收集到apacheLogs中,同时丢弃所有不可解析的日志行(所有结果的形式None)。
我们现在可以对“数据集”执行分析,就像在“DataFrame”上一样。Dataset中的列名称只是ApacheLog case类的字段名称。
例如,以下代码打印生成最多404个响应的10个端点:
如前所述,可以将Dataset注册为临时视图,然后使用SQL执行查询:
上面的SQL查询具有与上面的Scala代码相同的结果。
用户定义的函数(user defined function, UDF)
在Spark SQL中,我们可以使用范围广泛的函数,包括处理日期、基本统计和其他数学函数的函数。Spark在函数中的构建是在org.apache.spark.sql.functions对象中定义的。
作为示例,我们使用以下函数提取主机名的顶级域:
如果想在SQL查询中使用这个函数,首先需要注册。这是通过SparkSession的udf对象实现的:
函数名后的最后一个下划线将extractTLD转换为部分应用函数(partially applied function),这是必要的,如果省略它会导致错误。register方法返回一个UserDefinedFunction对象,可以应用于列表达式。
一旦注册,我们可以在SQL查询中使用extractTLD:
要获得注册的用户定义函数概述,可以使用spark.catalog对象的listFunctions方法,该对象返回SparkSession定义的所有函数DataFrame: