社会焦点

在Apache Spark 2.0中使用DataFrames和SQL(4)

字号+ 作者: 来源: 2017-05-28

注意Spark SQL遵循通常的SQL约定,即不区分大小写。也就是说,以下SQL表达式都是有效的并且彼此等价:select extractTLD(host)from apacheLogs,select extracttld(host)from apacheLogs,”select EXTRACTTLD(

  注意Spark SQL遵循通常的SQL约定,即不区分大小写。也就是说,以下SQL表达式都是有效的并且彼此等价:select extractTLD(host)from apacheLogs,select extracttld(host)from apacheLogs,”select EXTRACTTLD(host) from apacheLogs”。spark.catalog.listFunctions返回的函数名将总是小写字母。

  除了在SQL查询中使用UDF,我们还可以直接将它们应用到列表达式。以下表达式返回.net域中的所有请求:

  值得注意的是,与Spark在诸如filter,select等方法中的构建相反,用户定义的函数只采用列表达式作为参数。写extractTLD_UDF(“host”)会导致错误。

  除了在目录中注册UDF并用于Column表达式和SQL中,我们还可以使用org.apache.spark.sql.functions对象中的udf函数注册一个UDF:

  注册UDF后,可以将它应用到Column表达式(例如filter里面),如下所示:

  但是不能在SQL查询中使用它,因为还没有通过名称注册它。

  ?UDF和Catalyst优化器

  Spark中用Catalyst优化器来优化所有涉及数据集的查询,会将用户定义的函数视作黑盒。值得注意的是,当过滤器操作涉及UDF时,在连接之前可能不会“下推”过滤器操作。我们通过下面的例子来说明。

  通常来说,不依赖UDF而是从内置的“Column”表达式进行组合操作可能效果更好。

  ?加盟

  最后,我们将讨论如何使用以下两个Dataset方法连接数据集:

  • join返回一个DataFrame

  • joinWith返回一对Datasets

  •   以下示例连接两个表1、表2(来自维基百科):

      表1 员工(Employee)

      在Apache Spark 2.0中使用DataFrames和SQL

      表2 部门(Department)

      在Apache Spark 2.0中使用DataFrames和SQL

      定义两个case类,将两个表编码为case类对象的序列(由于空间原因不显示),最后创建两个Dataset对象:

      在Apache Spark 2.0中使用DataFrames和SQL

      为了执行内部等连接,只需提供要作为“String”连接的列名称:

      Spark会自动删除双列,joined.show给出以下输出:

      表3 输出

      在Apache Spark 2.0中使用DataFrames和SQL

      在上面,joined是一个DataFrame,不再是Dataset。连接数据集的行可以作为Seq列名称给出,或者可以指定要执行的equi-join(inner,outer,left_outer,right_outer或leftsemi)类型。想要指定连接类型的话,需要使用Seq表示法来指定要连接的列。请注意,如果执行内部联接(例如,获取在同一部门中工作的所有员工的对):employees.join(employees,Seq(“depID”)),我们没有办法访问连接的DataFrame列:employees.join(employees, Seq(“depID”)).select(“lastname”)会因为重复的列名而失败。处理这种情况的方法是重命名部分列:

      除了等连接之外,我们还可以给出更复杂的连接表达式,例如以下查询,它将所有部门连接到不知道部门ID且不在本部门工作的员工:

      然后可以不指定任何连接条件,在两个Datasets间执行笛卡尔联接: departments.join(employees).show。

      ?与joinWith类型保存连接

      最后,Dataset的joinWith方法返回一个Dataset,包含原始数据集中匹配行的Scala元组。

      在Apache Spark 2.0中使用DataFrames和SQL

      表4 返回Dataset

      在Apache Spark 2.0中使用DataFrames和SQL

      这可以用于自连接后想要规避上述不可访问列的问题情况。

      ?加入和优化器

      Catalyst优化器尝试通过将“过滤器”操作向“下推”,以尽可能多地优化连接,因此它们在实际连接之前执行。

      为了这个工作,用户定义的函数(UDF),不应该在连接条件内使用用因为这些被Catalyst处理为黑盒子。

      ?结论

      我们已经讨论了在Apache Spark 2.0中使用类型化的DatasetAPI,如何在Apache Spark中定义和使用用户定义的函数,以及这样做的危险。使用UDF可能产生的主要困难是它们会被Catalyst优化器视作黑盒。

      作者:马小龙(Dr. Christoph Schubert),浙江财经大学数据分析和大数据计算客座教授。2006年在德国不来梅大学获得数学博士学位后,在多特蒙德大学软件工程研究所从事研究和教学工作直到2011年来到中国。他的研究方向重点在大数据技术和NoSQL数据库以及功能规划和随机计算模型与模态逻辑。他还是国际大数据分析大会主席。

      PS:另有CSDN Spark用户微信群,请添加微信guorui_1118并备注公司+实名+职位申请入群。

    转载请注明出处。


    1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

    相关文章
    • 大疆spark到底有多牛逼?!

      大疆spark到底有多牛逼?!

    • 大疆 Spark 无人机到手,有啥问题冲我来!

      大疆 Spark 无人机到手,有啥问题冲我来!

    • 科幻片成了现实,大疆发布的 Spark 是真的「自拍神器」

      科幻片成了现实,大疆发布的 Spark 是真的「自拍神器」

    • 大疆发布微型无人机“晓”Spark,自拍无人机市场是否会迎来新一轮洗牌?

      大疆发布微型无人机“晓”Spark,自拍无人机市场是否会迎来新一轮洗