首页 > 社会焦点 > 正文

在Apache Spark 2.0中使用DataFrames和SQL(3)

2017-05-28 编辑:

  我们还在Apache Spark 2.0中直接执行SQL语句。SparkSession的SQL方法返回一个DataFrame。此外,DataFrame的selectExp方法也允许我们为单列指定SQL表达式,如下所示。为了能够引用SQL表达式中的DataFrame,首先有必要将DataFrame注册为临时表,在Spark 2中称为临时视图(temporary view,简称为tempview)。DataFrame为我们提供了以下两种方法:

  • createTempView创建一个新视图,如果具有该名称的视图已存在,则抛出一个异常;

  • createOrReplaceTempView创建一个用来替换的临时视图。

  •   两种方法都将视图名称作为唯一参数。

      注册表后,可以使用SparkSession的SQL方法来执行SQL语句:

      返回具有以下内容的DataFrame:

      在Apache Spark 2.0中使用DataFrames和SQL

      SparkSession类的catalog字段是Catalog类的一个对象,具有多种处理会话注册表和视图的方法。例如,Catalog的ListTables方法返回一个包含所有已注册表信息的Dataset:

      在Apache Spark 2.0中使用DataFrames和SQL

      会返回一个包含有关注册表“tableName”中列信息的Dataset,例如:

      此外,可以使用DataSet的SelectExpr方法执行某些产生单列的SQL表达式,例如:

      这两者都产生DataFrame对象。

      ?第一步结束语

      我们希望让读者相信,Apache Spark 2.0的统一性能够为熟悉SQL的分析师们提供Spark的学习曲线。下一部分将进一步介绍类型化Dataset API的使用、用户定义的函数以及Datasets间的连接。此外,我们将讨论新Dataset API的使用缺陷。

      Spark 2.0中使用DataFrames和SQL的第二步

      本文第一部分使用了无类型的DataFrame API,其中每行都表示一个Row对象。在下面的内容中,我们将使用更新的DatasetAPI。Dataset是在Apache Spark 1.6中引入的,并已在Spark 2.0中使用DataFrames进行了统一,我们现在有了type DataFrame = Dataset [Row],其中方括号([和] Scala中的泛型类型,因此类似于Java的<和>)。因此,上面讨论的所有诸如select、filter、groupBy、agg、orderBy、limit等方法都以相同的方式使用。

      ?Datasets:返回类型信息

      Spark 2.0以前的DataFrame API本质上是一个无类型的API,这也就意味着在编译期间很可能会因为某些编译器错误,导致无法访问类型信息。

      和之前一样,我们将在示例中使用Scala,因为我相信Scala最为简洁。可能涉及的例子:spark将表示SparkSession对象,代表我们的Spark集群。

      ?例子:分析Apache访问日志

      我们将使用Apache访问日志格式数据。先一起回顾Apache日志中的典型行,如下所示:

      此行包含以下部分:

    1. 127.0.0.1是向服务器发出请求的客户端(远程主机)IP地址(或主机名,如果可用);

    2. 输出中的第一个-表示所请求的信息(来自远程机器的用户身份)不可用;

    3. 输出中的第二个-表示所请求的信息(来自本地登录的用户身份)不可用;

    4. [01 / Aug / 1995:00:00:01 -0400]表示服务器完成处理请求的时间,格式为:[日/月/年:小时:分:秒 时区],有三个部件:”GET /images/launch-logo.gif HTTP / 1.0”;

    5. 请求方法(例如,GET,POST等);

    6. 端点(统一资源标识符);

    7. 和客户端协议版本(’HTTP / 1.0’)。

      1.200这是服务器返回客户端的状态代码。这些信息非常有价值:成功回复(从2开始的代码),重定向(从3开始的代码),客户端导致的错误(以4开头的代码),服务器错误(代码从5开始)。最后一个条目表示返回给客户端的对象大小。如果没有返回任何内容则是-或0。

      首要任务是创建适当的类型来保存日志行信息,因此我们使用Scala的case类,具体如下:

      在Apache Spark 2.0中使用DataFrames和SQL

      默认情况下,case类对象不可变。通过它们的值来比较相等性,而不是通过比较对象引用。

      为日志条目定义了合适的数据结构后,现在需要将表示日志条目的String转换为ApacheLog对象。我们将使用正则表达式来达到这一点,参考如下:

      可以看到正则表达式包含9个捕获组,用于表示ApacheLog类的字段。

      使用正则表达式解析访问日志时,会面临以下问题:

  • 一些日志行的内容大小以-表示,我们想将它转换为0;

  • 一些日志行不符合所选正则表达式给出的格式。

  •   为了克服第二个问题,我们使用Scala的“Option”类型来丢弃不对的格式并进行确认。Option也是一个泛型类型,类型Option[ApacheLog]的对象可以有以下形式:

  • None,表示不存在一个值(在其他语言中,可能使用null);

  • Some(log)for a ApacheLog-objectlog。

  •   以下为一行函数解析,并为不可解析的日志条目返回None:

      在Apache Spark 2.0中使用DataFrames和SQL

      最好的方法是修改正则表达式以捕获所有日志条目,但Option是处理一般错误或不可解析条目的常用技术。

      综合起来,现在来剖析一个真正的数据集。我们将使用著名的NASA Apache访问日志数据集,它可以在ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz下载。

      下载和解压缩文件后,首先将其打开为String的Dataset,然后使用正则表达式解析:

      用spark.read.text方法打开文本文件并返回一个DataFrame,是textfile的行。使用Dataset的as方法将其转换为包含Strings的Dataset对象(而不是Rows包含字符串),并导入spark.implicits._以允许创建一个包含字符串或其他原始类型的Dataset。

      现在可以解析数据集:

      flatMap将parse_logline函数应用于rawData的每一行,并将Some(ApacheLog)形式的所有结果收集到apacheLogs中,同时丢弃所有不可解析的日志行(所有结果的形式None)。

      我们现在可以对“数据集”执行分析,就像在“DataFrame”上一样。Dataset中的列名称只是ApacheLog case类的字段名称。

      例如,以下代码打印生成最多404个响应的10个端点:

      在Apache Spark 2.0中使用DataFrames和SQL

      如前所述,可以将Dataset注册为临时视图,然后使用SQL执行查询:

      在Apache Spark 2.0中使用DataFrames和SQL

      上面的SQL查询具有与上面的Scala代码相同的结果。

      用户定义的函数(user defined function, UDF)

      在Spark SQL中,我们可以使用范围广泛的函数,包括处理日期、基本统计和其他数学函数的函数。Spark在函数中的构建是在org.apache.spark.sql.functions对象中定义的。

      作为示例,我们使用以下函数提取主机名的顶级域:

      如果想在SQL查询中使用这个函数,首先需要注册。这是通过SparkSession的udf对象实现的:

      函数名后的最后一个下划线将extractTLD转换为部分应用函数(partially applied function),这是必要的,如果省略它会导致错误。register方法返回一个UserDefinedFunction对象,可以应用于列表达式。

      一旦注册,我们可以在SQL查询中使用extractTLD:

      要获得注册的用户定义函数概述,可以使用spark.catalog对象的listFunctions方法,该对象返回SparkSession定义的所有函数DataFrame:


    大家都爱看
    大疆spark到底有多牛逼?!大疆spark到底有多牛逼?! 大疆 Spark 无人机到手,有啥问题冲我来!大疆 Spark 无人机到手,有啥问题冲我来!
    查看更多热点新闻