注意Spark SQL遵循通常的SQL约定,即不区分大小写。也就是说,以下SQL表达式都是有效的并且彼此等价:select extractTLD(host)from apacheLogs,select extracttld(host)from apacheLogs,”select EXTRACTTLD(host) from apacheLogs”。spark.catalog.listFunctions返回的函数名将总是小写字母。
除了在SQL查询中使用UDF,我们还可以直接将它们应用到列表达式。以下表达式返回.net域中的所有请求:
值得注意的是,与Spark在诸如filter,select等方法中的构建相反,用户定义的函数只采用列表达式作为参数。写extractTLD_UDF(“host”)会导致错误。
除了在目录中注册UDF并用于Column表达式和SQL中,我们还可以使用org.apache.spark.sql.functions对象中的udf函数注册一个UDF:
注册UDF后,可以将它应用到Column表达式(例如filter里面),如下所示:
但是不能在SQL查询中使用它,因为还没有通过名称注册它。
?UDF和Catalyst优化器
Spark中用Catalyst优化器来优化所有涉及数据集的查询,会将用户定义的函数视作黑盒。值得注意的是,当过滤器操作涉及UDF时,在连接之前可能不会“下推”过滤器操作。我们通过下面的例子来说明。
通常来说,不依赖UDF而是从内置的“Column”表达式进行组合操作可能效果更好。
?加盟
最后,我们将讨论如何使用以下两个Dataset方法连接数据集:
join返回一个DataFrame
joinWith返回一对Datasets
以下示例连接两个表1、表2(来自维基百科):
表1 员工(Employee)
表2 部门(Department)
定义两个case类,将两个表编码为case类对象的序列(由于空间原因不显示),最后创建两个Dataset对象:
为了执行内部等连接,只需提供要作为“String”连接的列名称:
Spark会自动删除双列,joined.show给出以下输出:
表3 输出
在上面,joined是一个DataFrame,不再是Dataset。连接数据集的行可以作为Seq列名称给出,或者可以指定要执行的equi-join(inner,outer,left_outer,right_outer或leftsemi)类型。想要指定连接类型的话,需要使用Seq表示法来指定要连接的列。请注意,如果执行内部联接(例如,获取在同一部门中工作的所有员工的对):employees.join(employees,Seq(“depID”)),我们没有办法访问连接的DataFrame列:employees.join(employees, Seq(“depID”)).select(“lastname”)会因为重复的列名而失败。处理这种情况的方法是重命名部分列:
除了等连接之外,我们还可以给出更复杂的连接表达式,例如以下查询,它将所有部门连接到不知道部门ID且不在本部门工作的员工:
然后可以不指定任何连接条件,在两个Datasets间执行笛卡尔联接: departments.join(employees).show。
?与joinWith类型保存连接
最后,Dataset的joinWith方法返回一个Dataset,包含原始数据集中匹配行的Scala元组。
表4 返回Dataset
这可以用于自连接后想要规避上述不可访问列的问题情况。
?加入和优化器
Catalyst优化器尝试通过将“过滤器”操作向“下推”,以尽可能多地优化连接,因此它们在实际连接之前执行。
为了这个工作,用户定义的函数(UDF),不应该在连接条件内使用用因为这些被Catalyst处理为黑盒子。
?结论
我们已经讨论了在Apache Spark 2.0中使用类型化的DatasetAPI,如何在Apache Spark中定义和使用用户定义的函数,以及这样做的危险。使用UDF可能产生的主要困难是它们会被Catalyst优化器视作黑盒。
作者:马小龙(Dr. Christoph Schubert),浙江财经大学数据分析和大数据计算客座教授。2006年在德国不来梅大学获得数学博士学位后,在多特蒙德大学软件工程研究所从事研究和教学工作直到2011年来到中国。他的研究方向重点在大数据技术和NoSQL数据库以及功能规划和随机计算模型与模态逻辑。他还是国际大数据分析大会主席。
PS:另有CSDN Spark用户微信群,请添加微信guorui_1118并备注公司+实名+职位申请入群。
转载请注明出处。