python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > PySpark与GraphFrames使用

PySpark与GraphFrames的安装与使用环境搭建过程

作者:小小明-代码实体

这篇文章主要介绍了PySpark与GraphFrames的安装与使用教程,本文通过图文并茂实例代码相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

PySpark环境搭建

配置hadoop

spark访问本地文件并执行运算时,可能会遇到权限问题或是dll错误。这是因为spark需要使用到Hadoop的winutils和hadoop.dll,首先我们必须配置好Hadoop相关的环境。可以到github下载:https://github.com/4ttty/winutils

gitcode提供了镜像加速:https://gitcode.net/mirrors/4ttty/winutils

我选择了使用这个仓库提供的最高的Hadoop版本3.0.0将其解压到D:\deploy\hadoop-3.0.0目录下,然后配置环境变量:

image-20220217132732434

我们还需要将对应的hadoop.dll复制到系统中,用命令表达就是:

copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32

不过这步需要拥有管理员权限才可以操作。

为了能够在任何地方使用winutils命令工具,将%HADOOP_HOME%\bin目录加入环境变量中:

image-20220217133520475

安装pyspark与Java

首先,我们安装spark当前(2022-2-17)的最新版本:

pip install pyspark==3.2.1

需要注意pyspark的版本决定了jdk的最高版本,例如假如安装2.4.5版本的pyspark就只能安装1.8版本的jdk,否则会报出java.lang.IllegalArgumentException: Unsupported class file major version 55的错误。

这是因为pyspark内置了Scala,而Scala是基于jvm的编程语言,Scala与jdk的版本存在兼容性问题,JDK与scala的版本兼容性表:

JDK versionMinimum Scala versionsRecommended Scala versions
172.13.6, 2.12.15 (forthcoming)2.13.6, 2.12.15 (forthcoming)
162.13.5, 2.12.142.13.6, 2.12.14
13, 14, 152.13.2, 2.12.112.13.6, 2.12.14
122.13.1, 2.12.92.13.6, 2.12.14
112.13.0, 2.12.4, 2.11.122.13.6, 2.12.14, 2.11.12
82.13.0, 2.12.0, 2.11.0, 2.10.22.13.6, 2.12.14, 2.11.12, 2.10.7
6, 72.11.0, 2.10.02.11.12, 2.10.7

当前3.2.1版本的pyspark内置的Scala版本为2.12.15,意味着jdk17与其以下的所有版本都支持。

这里我依然选择安装jdk8的版本:

image-20220217143447453

测试一下:

>java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

jdk11的详细安装教程(jdk1.8在官网只有安装包,无zip绿化压缩包):

绿化版Java11的环境配置与Python调用Java
https://xxmdmst.blog.csdn.net/article/details/118366166

graphframes安装

pip安装当前最新的graphframes:

pip install graphframes==0.6

然后在官网下载graphframes的jar包。

下载地址:https://spark-packages.org/package/graphframes/graphframes

由于安装的pyspark版本是3.2,所以这里我选择了这个jar包:

image-20220217144829403

然后将该jar包放入pyspark安装目录的jars目录下:

image-20220217145105414

pyspark安装位置可以通过pip查看:

C:\Users\ASUS>pip show pyspark
Name: pyspark
Version: 3.2.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: d:\miniconda3\lib\site-packages
Requires: py4j
Required-by:

使用方法

学习pyspark的最佳路径是官网:https://spark.apache.org/docs/latest/quick-start.html

在下面的网页,官方提供了在线jupyter:

https://spark.apache.org/docs/latest/api/python/getting_started/index.html

image-20220218174237283

启动spark并读取数据

本地模式启动spark:

from pyspark.sql import SparkSession, Row

spark = SparkSession \
    .builder \
    .appName("Python Spark") \
    .master("local[*]") \
    .getOrCreate()
sc = spark.sparkContext
spark

image-20220217153008268

SparkSession输出的内容中包含了spark的web页面,新标签页打开页面后大致效果如上。

点击Environment选项卡可以查看当前环境中的变量:

image-20220217153531616

启动hive支持

找到pyspark的安装位置,例如我的电脑在D:\Miniconda3\Lib\site-packages\pyspark

手动创建conf目录并将hive-site.xml配置文件复制到其中。如果hive使用了MySQL作为原数据库,则还需要将MySQL对应的驱动jar包放入spark的jars目录下。

创建spark会话对象时可通过enableHiveSupport()开启hive支持:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext
spark

spark访问hive自己创建的表有可能会出现如下的权限报错:

Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------

是因为当前用户不具备对\tmp\hive的操作权限:

>winutils ls \tmp\hive
drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

把\tmp\hive目录的权限改为777即可顺利访问:

>winutils chmod 777 \tmp\hive

>winutils ls \tmp\hive
drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

Spark的DataFrame与RDD

从spark2.x开始将RDD和DataFrame的API统一抽象成dataset,DataFrame就是Dataset[Row],RDD则是Dataset.rdd。可以将DataFrame理解为包含结构化信息的RDD。

将含row的RDD转换为DataFrame只需要调用toDF方法或SparkSession的createDataFrame方法即可,也可以传入schema覆盖类型或名称设置。

DataFrame的基础api

DataFrame默认支持DSL风格语法,例如:

//查看DataFrame中的内容
df.show()

//查看DataFrame部分列中的内容
df.select(df['name'], df['age'] + 1).show()
df.select("name").show()
//打印DataFrame的Schema信息
df.printSchema()
//过滤age大于等于 21 的
df.filter(df['age'] > 21).show()
//按年龄进行分组并统计相同年龄的人数
personDF.groupBy("age").count().show()

将DataFrame注册成表或视图之后即可进行纯SQL操作:

df.createOrReplaceTempView("people")
//df.createTempView("t_person")

//查询年龄最大的前两名
spark.sql("select * from t_person order by age desc limit 2").show()
//显示表的Schema信息
spark.sql("desc t_person").show()

Pyspark可以直接很方便的注册udf并直接使用:

strlen = spark.udf.register("len", lambda x: len(x))
print(spark.sql("SELECT len('test') length").collect())
print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())

执行结果:

[Row(length='4')]
[Row(length='3')]

RDD的简介

DataFrame的本质是对RDD的包装,可以理解为DataFrame=RDD[Row]+schema。

RDD(A Resilient Distributed Dataset)叫做弹性可伸缩分布式数据集,是Spark中最基本的数据抽象。它代表一个不可变、自动容错、可伸缩性、可分区、里面的元素可并行计算的集合。

在每一个RDD内部具有五大属性:

一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

**一个计算每个分区的函数。**Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

**RDD之间的依赖关系。**RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

**一个Partitioner,即RDD的分片函数。**当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

**一个列表,存储存取每个Partition的优先位置(preferred location)。**对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

RDD的API概览

RDD包含Transformation APIAction API,Transformation API都是延迟加载的只是记住这些应用到基础数据集上的转换动作,只有当执行Action API时这些转换才会真正运行。

Transformation API产生的两类RDD最重要,分别是MapPartitionsRDDShuffledRDD

产生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是mapflatMap,但任何产生MapPartitionsRDD的算子都可以直接使用mapPartitionsmapPartitionsWithIndex实现。

产生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。

combineByKey到groupByKey 底层均是调用combineByKeyWithClassTag方法:

@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners
       ,defaultPartitioner(self))
}
def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

三个重要参数的含义:

groupByKey的partitioner未指定时会传入默认的defaultPartitioner。例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)
a.groupByKey.collect

res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))

aggregateByKey:每个分区使用zeroValue作为初始值,迭代每一个元素用seqOp进行合并,对所有分区的结果用combOp进行合并。例如:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

reduceByKey :每个分区迭代每一个元素用func进行合并,对所有分区的结果用func再进行合并,例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect

Action API有:

动作含义
reduce(func)通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect()在驱动程序中,以数组的形式返回数据集的所有元素
count()返回RDD的元素个数
first()返回RDD的第一个元素(类似于take(1))
take(n)返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement*,*num, [seed])返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])排序并取前N个元素
saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path)将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)将RDD中的元素用NullWritable作为key,实际元素作为value保存为sequencefile格式
countByKey()针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func)在数据集的每一个元素上,运行函数func进行更新。

spark模拟实现mapreduce版wordcount:

object MapreduceWordcount {
  def main(args: Array[String]): Unit = {
    import org.apache.spark._
    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
    sc.setLogLevel("WARN")
    
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.spark.rdd.HadoopRDD

    import scala.collection.mutable.ArrayBuffer
    
    def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
      for (word <- v.toString.split("\\s+"))
        collect += ((word, 1))
    }
    def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
      collect += ((key, value.sum))
    }
    val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((split, it) =>{
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        it.foreach(kv => map(kv._1, kv._2, collect))
        collect.toIterator
      })
      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
      .mapPartitions(it => {
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        var lastKey: String = ""
        var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
        for ((currKey, value) <- it) {
          if (!currKey.equals(lastKey)) {
            if (values.length != 0)
              reduce(lastKey, values.toIterator, collect)
            values.clear()
          }
          values += value
          lastKey = currKey
        }
        if (values.length != 0) reduce(lastKey, values.toIterator, collect)
        collect.toIterator
      })
    rdd.foreach(println)
  }
}

各类RDD

image-20220218181635289

当我们需要给Datafream添加自增列时,可以使用zipWithUniqueId方法:

from pyspark.sql.types import StructType, LongType

schema = data.schema.add(StructField("id", LongType()))
rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))
data = rowRDD.toDF(schema)
data.show()

API用法详情可参考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

cache&checkpoint

RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

rdd.persist()

checkpoint的源码注释可以看到:

从中我们得知,在执行checkpoint方法时,最好同时,将该RDD缓存起来,否则,checkpoint也会产生一个计算任务。

sc.setCheckpointDir("checkpoint")
rdd.cache()
rdd.checkpoint()

graphframes 的用法

GraphFrame是将Spark中的Graph算法统一到DataFrame接口的Graph操作接口,为Scala、Java和Python提供了统一的图处理API。

Graphframes是开源项目,源码工程如下:https://github.com/graphframes/graphframes

可以参考:

在GraphFrames中图的顶点(Vertex)和边(edge)都是以DataFrame形式存储的:

创建图的示例:

from graphframes import GraphFrame
vertices = spark.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
    ("d", "David", 29),
    ("e", "Esther", 32),
    ("f", "Fanny", 36),
    ("g", "Gabby", 60)], ["id", "name", "age"])
edges = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "b", "follow"),
    ("f", "c", "follow"),
    ("e", "f", "follow"),
    ("e", "d", "friend"),
    ("d", "a", "friend"),
    ("a", "e", "friend")
], ["src", "dst", "relationship"])
# 生成图
g = GraphFrame(vertices, edges)

GraphFrame提供三种视图:

print("顶点表视图:")
graph.vertices.show() # graph.vertices 就是原始的vertices
print("边表视图:")
graph.edges.show() # graph.edges 就是原始的 edges
print("三元组视图:")
graph.triplets.show()

获取顶点的度、入度和出度:

# 顶点的度
graph.degrees.show()
# 顶点的入度
graph.inDegrees.show()
# 顶点的出度
graph.outDegrees.show()

Motif finding (模式发现)

示例:

# 多个路径条件
motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
# 在搜索的结果上进行过滤
motif.filter("b.age > 30")
# 不需要返回路径中的元素时,可以使用匿名顶点和边
motif = graph.find("(start)-[]->()")
# 设置路径不存在的条件
motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")

假设我们要想给用户推荐关注的人,可以找出这样的关系:A关注B,B关注C,但是A未关注C。找出这样的关系就可以把C推荐给A:

# Motif: A->B->C but not A->C
results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
# 排除自己
results = results.filter("A.id != C.id")
# 选择需要的列
results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))
results.show()

结果:

+---+---+
|  A|  C|
+---+---+
|  e|  c|
|  e|  a|
|  d|  b|
|  a|  d|
|  f|  b|
|  d|  e|
|  a|  f|
|  a|  c|
+---+---+

Motif在查找路径过程的过程中,还可以沿着路径携带状态。例如我们想要找出关系链有4个顶点,而且其中3条边全部都是"friend"关系:

from pyspark.sql.functions import col, lit, when
from functools import reduce
chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
def sumFriends(cnt, relationship):
    "定义下一个顶点更新状态的条件:如果关系为friend则cnt+1"
    return when(relationship == "friend", cnt+1).otherwise(cnt)
# 将更新方法应用到整个链的,链上每有一个关系是 friend 就加一,链上共三个关系。
condition = reduce(lambda cnt, e: sumFriends(
    cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
chainWith2Friends2 = chain4.where(condition >= 3)
chainWith2Friends2.show()

结果:

+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
|              a|            ab|              b|            bc|              c|            cd|              d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|   {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+

Subgraphs 子图

可以直接过滤其顶点或边,dropIsolatedVertices()方法用于删除孤立没有连接的点:

graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

还可以基于模式发现获取到的边创建Subgraphs :

paths = graph.find("(a)-[e]->(b)")\
    .filter("e.relationship = 'follow'")\
    .filter("a.age < b.age")
# 抽取边信息e2 = paths.select("e.src", "e.dst", "e.relationship")
e2 = paths.select("e.*")
# 创建Subgraphs
g2 = GraphFrame(graph.vertices, e2)

GraphFrames支持的GraphX算法

pageRank算法:

results = graph.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.sort("pagerank", ascending=False).show()

结果:

+---+-------+---+-------------------+
| id|   name|age|           pagerank|
+---+-------+---+-------------------+
|  b|    Bob| 36| 2.7025217677349773|
|  c|Charlie| 30| 2.6667877057849627|
|  a|  Alice| 34| 0.4485115093698443|
|  e| Esther| 32| 0.3613490987992571|
|  f|  Fanny| 36|0.32504910549694244|
|  d|  David| 29|0.32504910549694244|
|  g|  Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+

可以设置起始顶点:

graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")
graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)

广度优先搜索BFS:

搜索从姓名叫Esther到年龄小于32的最小路径:

paths = graph.bfs("name = 'Esther'", "age < 32")
paths.show()
+--------------+--------------+---------------+
|          from|            e0|             to|
+--------------+--------------+---------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+--------------+--------------+---------------+

可以指定只能在指定的边搜索:

graph.bfs("name = 'Esther'",
          "age < 32",
          edgeFilter="relationship != 'friend'",
          maxPathLength=4
          ).show()
+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+

Connected components 连通组件:

必须先设置检查点:

sc.setCheckpointDir("checkpoint")

graph.connectedComponents().show()

结果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
|  g|  Gabby| 60|146028888064|
+---+-------+---+------------+

可以看到仅g点在一个连通区域内,可以调用dropIsolatedVertices()方法,删除这种孤立的没有连接的点:

graph.dropIsolatedVertices().connectedComponents().show()

结果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
+---+-------+---+------------+

Strongly connected components 强连通组件:

graph.stronglyConnectedComponents(maxIter=10).show()

Shortest paths 最短路径:

每个顶点到a或d的最短路径:

graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+
| id|   name|age|       distances|
+---+-------+---+----------------+
|  g|  Gabby| 60|              {}|
|  f|  Fanny| 36|              {}|
|  e| Esther| 32|{a -> 2, d -> 1}|
|  d|  David| 29|{a -> 1, d -> 0}|
|  c|Charlie| 30|              {}|
|  b|    Bob| 36|              {}|
|  a|  Alice| 34|{a -> 0, d -> 2}|
+---+-------+---+----------------+

Triangle count 三角形计数:

graph.triangleCount().show()
+-----+---+-------+---+
|count| id|   name|age|
+-----+---+-------+---+
|    1|  a|  Alice| 34|
|    0|  b|    Bob| 36|
|    0|  c|Charlie| 30|
|    1|  d|  David| 29|
|    1|  e| Esther| 32|
|    0|  g|  Gabby| 60|
|    0|  f|  Fanny| 36|
+-----+---+-------+---+

说明顶点a/e/d构成三角形。

标签传播算法(LPA):

graph.labelPropagation(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+
| id|   name|age|        label|
+---+-------+---+-------------+
|  g|  Gabby| 60| 146028888064|
|  f|  Fanny| 36|1047972020224|
|  b|    Bob| 36|1047972020224|
|  a|  Alice| 34|1382979469312|
|  c|Charlie| 30|1382979469312|
|  e| Esther| 32|1460288880640|
|  d|  David| 29|1460288880640|
+---+-------+---+-------------+

PySpark3.X与pandas融合

Pyspark从3.0版本开始出现了pandas_udf装饰器、applyInPandas和mapInPandas,基于这些方法,我们就可以使用熟悉的pandas的语法处理spark对象的数据。

首先创建几条测试数据,并启动 Apache Arrow

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
df.show()

自定义UDF和UDAF

pyspark暂不支持自定义UDTF。

使用pandas_udf装饰器我们可以创建出基于pandas的udf自定义函数,在DSL的语法中可以被直接使用:

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b
df.select(multiply_func("id", "v").alias("product")).show()

注册函数和视图后,可以直接在SQL中使用:

df.createOrReplaceTempView("t")
spark.udf.register("multiply", multiply_func)
spark.sql('select multiply(id, v) product from t').show()

结果均为:

+-------+
|product|
+-------+
|    1.0|
|    2.0|
|    6.0|
|   10.0|
|   20.0|
+-------+

还支持聚合函数和窗口函数:

from pyspark.sql import Window

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()
# 对字段'v'进行求均值
df.select(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分组,求'v'的均值
df.groupby("id").agg(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分组,求'v'的均值,并赋值给新的一列
df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()

注册到udf之后同样可以直接使用SQL实现:

spark.udf.register("mean2", mean_udf)
spark.sql('select mean2(v) mean_v from t').show()
spark.sql('select id,mean2(v) mean_v from t group by id').show()
spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()

结果均为:

+--------+
| mean_v |
+--------+
|     4.2|
+--------+

+---+--------+
| id| mean_v |
+---+--------+
|  1|     1.5|
|  2|     6.0|
+---+--------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+

分组聚合与JOIN

applyInPandas需要在datafream调用groupby之后才能使用:

def subtract_mean(pdf):
    v = pdf.v
    pdf['v1'] = v - v.mean()
    pdf['v2'] = v + v.mean()
    return pdf
t = df.groupby("id")
t.applyInPandas(
    subtract_mean, schema="id long, v double, v1 double, v2 double").show()

结果:

+---+----+----+----+
| id|   v|  v1|  v2|
+---+----+----+----+
|  1| 1.0|-0.5| 2.5|
|  1| 2.0| 0.5| 3.5|
|  2| 3.0|-3.0| 9.0|
|  2| 5.0|-1.0|11.0|
|  2|10.0| 4.0|16.0|
+---+----+----+----+

subtract_mean函数接收的是对应id的dataframe数据,schema指定了返回值的名称和类型列表。

通过以下代码我们可以知道,applyInPandas可以借助cogroup进行表连接:

val a = sc.parallelize(List(1, 2, 1, 3))
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
val e = a.map((_, "e"))
scala> b.cogroup(c).foreach(println)
(3,(CompactBuffer(b),CompactBuffer(c)))
(1,(CompactBuffer(b, b),CompactBuffer(c, c)))
(2,(CompactBuffer(b),CompactBuffer(c)))

示例:

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
  	# l、r is a pandas.DataFrame
  	# 这里是按照id分组
  	# 那么,l和r分别是对应id的df1和df2数据
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

map迭代

执行以下代码:

def filter_func(iterator):
    for i, pdf in enumerate(iterator):
        print(i, pdf.values.tolist())
        yield pdf


df.mapInPandas(filter_func, schema=df.schema).show()

后台看到执行结果为:

0 [[2.0, 5.0]]                                                     
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]

前台结果几乎保持原样。可以知道iterator是一个分区迭代器,迭代出当前分区的每一行数据都被封装成一个pandas对象。

Pyspark与Pandas的交互

将spark的Datafream对象转换为原生的pandas对象只需调用toPandas()方法即可:

sdf.toPandas()

将原生的pandas对象转换为spark对象可以使用spark的顶级方法:

spark.createDataFrame(pdf)

习惯使用pandas的童鞋,还可以直接使用pandas-on-Spark,在spark3.2.0版本时已经匹配到pandas 1.3版本的API。通过pandas-on-Spark,我们可以完全用pandas的api操作数据,而底层执行却是spark的并行化。

使用pandas-on-Spark最好设置一下环境变量:

import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

将spark对象转换为pandas-on-Spark对象:

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

pdf = df.to_pandas_on_spark()
print(type(pdf))
pdf

image-20220218090829123

pandas-on-Spark对象也可以还原成spark对象:

pdf.to_spark()

另外spark提供直接将文件读取成pandas-on-Spark对象的api,例如:

import pyspark.pandas as ps

pdf = ps.read_csv("example_csv.csv")

ps对象与原生pandas对象的API几乎完全一致。

ps对象相对于原生pandas对象的API几乎一致,同时还支持一些强悍的功能,例如直接以SQL形式访问:

ps.sql("SELECT count(*) as num FROM {pdf}")

{pdf}访问了变量名为pdf的pandas-on-Spark对象。

到此这篇关于PySpark与GraphFrames的安装与使用的文章就介绍到这了,更多相关PySpark与GraphFrames使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文