详解SparkSql输出数据的方式
作者:jlting195
在处理数据时,SparkSql提供了多种数据输出方式,包括普通文件输出、保存到数据库和保存到Hive,普通文件输出支持追加模式、覆写模式、报错模式和忽略模式,本文介绍SparkSql输出数据的方式,感兴趣的朋友一起看看吧
一、普通文件输出方式
方式一:给定输出数据源的类型和地址
df.write.format("json").save(path) df.write.format("csv").save(path) df.write.format("parquet").save(path)
方式二:直接调用对应数据源类型的方法
df.write.json(path) df.write.csv(path) df.write.parquet(path)
append: 追加模式,当数据存在时,继续追加
overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
error/errorifexists: 如果目标存在就报错,默认的模式
ignore: 忽略,数据存在时不做任何操作
代码编写模板:
df.write.mode(saveMode="append").format("csv").save(path)
代码演示普通的文件输出格式:
import os from pyspark.sql import SparkSession if __name__ == '__main__': # 配置环境 os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' spark = SparkSession.builder.master("local[2]").appName("").config( "spark.sql.shuffle.partitions", 2).getOrCreate() df = spark.read.json("../../datas/person.json") # 获取年龄最大的人的名字 df.createOrReplaceTempView("persons") rsDf = spark.sql(""" select name,age from persons where age = (select max(age) from persons) """) # 将结果打印到控制台 #rsDf.write.format("console").save() #rsDf.write.json("../../datas/result",mode="overwrite") #rsDf.write.mode(saveMode='overwrite').format("json").save("../../datas/result") #rsDf.write.mode(saveMode='overwrite').format("csv").save("../../datas/result1") #rsDf.write.mode(saveMode='overwrite').format("parquet").save("../../datas/result2") #rsDf.write.mode(saveMode='append').format("csv").save("../../datas/result1") # text 保存路径为hdfs 直接报错,不支持 #rsDf.write.mode(saveMode='overwrite').text("hdfs://bigdata01:9820/result") #rsDf.write.orc("hdfs://bigdata01:9820/result",mode="overwrite") rsDf.write.parquet("hdfs://bigdata01:9820/result", mode="overwrite") spark.stop()
二、保存到数据库中
代码演示:
import os # 导入pyspark模块 from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession if __name__ == '__main__': # 配置环境 os.environ['JAVA_HOME'] = 'D:\Download\Java\JDK' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ['HADOOP_HOME'] = 'D:\\bigdata\hadoop-3.3.1\hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' spark = SparkSession.builder.master('local[*]').appName('').config("spark.sql.shuffle.partitions", 2).getOrCreate() df5 = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv")\ .toDF('eid','ename','salary','sal','dept_id') df5.createOrReplaceTempView('emp') rsDf = spark.sql("select * from emp") rsDf.write.format("jdbc") \ .option("driver", "com.mysql.cj.jdbc.Driver") \ .option("url", "jdbc:mysql://bigdata01:3306/mysql") \ .option("user", "root") \ .option("password", "123456") \ .option("dbtable", "emp1") \ .save(mode="overwrite") spark.stop() # 使用完后,记得关闭
三、保存到hive中
代码演示:
import os # 导入pyspark模块 from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession if __name__ == '__main__': # 配置环境 os.environ['JAVA_HOME'] = 'D:\Download\Java\JDK' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ['HADOOP_HOME'] = 'D:\\bigdata\hadoop-3.3.1\hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' os.environ['HADOOP_USER_NAME'] = 'root' spark = SparkSession \ .builder \ .appName("HiveAPP") \ .master("local[2]") \ .config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \ .config('hive.metastore.uris', 'thrift://bigdata01:9083') \ .config("spark.sql.shuffle.partitions", 2) \ .enableHiveSupport() \ .getOrCreate() df5 = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv") \ .toDF('eid', 'ename', 'salary', 'sal', 'dept_id') df5.createOrReplaceTempView('emp') rsDf = spark.sql("select * from emp") rsDf.write.saveAsTable("spark.emp") spark.stop() # 使用完后,记得关闭
到此这篇关于SparkSql输出数据的方式的文章就介绍到这了,更多相关SparkSql输出数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!