详解Spark Sql在UDF中如何引用外部数据
作者:KYs_Daddy
前言
Spark Sql可以通过UDF来对DataFrame的Column进行自定义操作。在特定场景下定义UDF可能需要用到Spark Context以外的资源或数据。比如从List或Map中取值,或是通过连接池从外部的数据源中读取数据,然后再参与Column的运算。
Excutor中每个task的工作线程都会对UDF的call进行调用,外部资源的使用发生在Excutor端,而资源加载既能发生在Driver端,也可以发生在Excutor端。如果外部资源对象能序列化,我们可以在Driver端进行初始化,然后广播(broadcast)到Excutor端参与运算。对于不能进行序列化的对象,如JedisPool(redis连接池),只能在Excutor端进行初始化。
因此,在UDF中引用外部资源有以下两类方法:
- 能序列化:在Driver端进行初始化,然后通过spark的broadcast方法广播到Excutor上进行使用;
- 不能序列化:在Excutor端进行初始化然后使用。
下面我们将用一个实际例子对上述两种方法进行详细介绍。
本文使用环境:Spark-2.3.0,Java 8。
场景介绍
我们以一个DataFrame(两个字段node_1、node_2)作为原始数据;一棵二叉搜索树(BST)作为Spark外部被引用数据;目标是定义一个UDF来判断:BST中是否刚好存在一个父节点,它的左右子节点值与node_1、node_2两个字段值相同。然后将判断结果输出到新列is_bro。其中DataFrame:
BST:
输出DataFrame:
二叉树的定义与判断是否为父节点的左右子节点的逻辑如下:
import java.io.Serializable; /** * @author wangjiahui * @create 2021-03-14-10:57 */ public class TreeNode implements Serializable{ private Integer val; private TreeNode left; private TreeNode right; public TreeNode() { } public TreeNode(Integer val) { this.val = val; } public TreeNode(Integer val, TreeNode left, TreeNode right) { this.val = val; this.left = left; this.right = right; } public Integer getVal() { return val; } public void setVal(Integer val) { this.val = val; } public TreeNode getLeft() { return left; } public void setLeft(TreeNode left) { this.left = left; } public TreeNode getRight() { return right; } public void setRight(TreeNode right) { this.right = right; } /** * 判断是否刚好有一个父节点的左、右子节点值与num1、num2相同 * @param num1 * @param num2 * @return */ public Boolean isBro( Integer num1, Integer num2) { if (null == getLeft()||null == getRight()) { return false; } if (getLeft().getVal().compareTo(num1)==0 && getRight().getVal().compareTo(num2)==0) { return true; } return getLeft().isBro(num1, num2) || getRight().isBro(num1, num2); } }
生成上图所示BST的方法createTree()如下:
public static TreeNode createTree(){ TreeNode[] treeNodes = new TreeNode[8]; for(int i=1; i<=7; i++){ treeNodes[i] = new TreeNode(i); } treeNodes[2].setLeft(treeNodes[1]); treeNodes[2].setRight(treeNodes[3]); treeNodes[6].setLeft(treeNodes[5]); treeNodes[6].setRight(treeNodes[7]); treeNodes[4].setLeft(treeNodes[2]); treeNodes[4].setRight(treeNodes[6]); return treeNodes[4]; }
方法一 Driver端加载
在Driver端完成初始化并定义UDF
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext()); // 初始化树 TreeNode tree = createTree(); // broadcast Broadcast<TreeNode> broadcastTree = javaSparkContext.broadcast(tree); // lambda表达式定义udf UserDefinedFunction udf = functions.udf((Integer num1, Integer num2) -> { return broadcastTree.getValue().isBro(num1,num2); }, BooleanType); // 注册udf spark.udf().register("isBro",udf); // 使用udf df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));
方法二 Excutor端加载
如果我们直接在call中进行初始化会存在问题:由于多个task的线程会在同一时刻对UDF中的call进行调用,导致资源对象在同一时刻被初始化多次,造成Excutor内存资源浪费。此外,如果外部资源为连接池对象,在同一时刻初始化多次会建立多个连接,增加外部数据源的访问压力。
为此,我们可以借助单例模式中的懒汉式实现,让资源在每个Excutor中只被初始化一次。懒汉式的实现需要新建一个类(命名为IsBroUDF2)并实现UDF2<Integer, Integer, Boolean>接口,重写UDF2的call方法:
import org.apache.spark.sql.api.java.UDF2; /** * @author wangjiahui * @create 2021-03-14-14:25 */ public class IsBroUDF2 implements UDF2<Integer,Integer,Boolean> { // 定义静态的TreeNode成员变量 private static volatile TreeNode treeNode; public IsBroUDF2() { } @Override public Boolean call(Integer num1, Integer num2) throws Exception { // 懒汉式 二次判定 if(null==treeNode){ synchronized (IsBroUDF2.class){ if(null==treeNode){ treeNode=createTree(); } } } return treeNode.isBro(num1,num2); } // 辅助方法 public static TreeNode createTree(){ TreeNode[] treeNodes = new TreeNode[8]; for(int i=1; i<=7; i++){ treeNodes[i] = new TreeNode(i); } treeNodes[2].setLeft(treeNodes[1]); treeNodes[2].setRight(treeNodes[3]); treeNodes[6].setLeft(treeNodes[5]); treeNodes[6].setRight(treeNodes[7]); treeNodes[4].setLeft(treeNodes[2]); treeNodes[4].setRight(treeNodes[6]); return treeNodes[4]; } }
然后注册和使用UDF
// 注册udf spark.udf().register("isBro",new IsBroUDF2(), BooleanType); // 使用udf df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));
在call方法中通过加锁可以实现TreeNode资源在同一个Excutor中只被初始化一次。除了上面介绍的这种懒汉式的写法之外,还可以通过静态内部类懒加载、枚举等方式实现TreeNode资源在Excutor端只被初始化一次。
小结
想要在Spark Sql的UDF中使用Spark外的资源和数据进行运算,我们既可以在Driver端预先进行初始化然后广播到各Excutor上(要求对象能序列化),也可以直接在Excutor端进行加载;如果在Excutor端加载要保证外部资源对象只被初始化一次。
以上就是详解Spark Sql在UDF中如何引用外部数据的详细内容,更多关于Spark Sql UDF引用外部数据的资料请关注脚本之家其它相关文章!