我们在使用Spark的时候主要是用来快速处理大批量的数据,那么实际开发和生产中会有哪些数据来源呢,我归类总结有:
- text
- csv
- json
- parquet
- jdbc
- hive
- kafka
- elasticsearch
接下来所有的测试是基于spark local模式,因为local模式便于测试不依赖spark集群环境。有一点要注意将代码运行在spark集群上时要将.master("local[*]")这行去掉,同时需要修改相应的路径名才能访问本地机器文件,以/tmp/people.txt文件为例:
local模式:/tmp/people.txt
集群模式:file:///tmp/people.txt 相当于local模式/tmp/people.txt
hdfs://master:8020/tmp/people.txt 分布式系统文件
在学习各种数据来源前先了解一种最基本的数据源,那就是数据集,也就是我们根据自身开发需求制造出来的数据,常常用在开发和测试一些简单功能上面。
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession\
.builder\
.appName("loadDatas")\
.master("local[*]")\
.enableHiveSupport()\
.getorCreate()
datas = [('Jack', 27), ('Rose', 24), ('Andy', 32)]
df = spark.createDataFrame(datas, ['name', 'age'])
df.show()
# +----+---+
# |name|age|
# +----+---+
# |Jack| 27|
# |Rose| 24|
# |Andy| 32|
# +----+---+
spark.stop()
text
数据源people.txt内容是
Jack 27
Rose 24
Andy 32
from pyspark.sql import SparkSession
from pyspark.sql import Row
if __name__ == '__main__':
spark = SparkSession\
.builder\
.appName("loadTextData")\
.master("local[*]")\
.getorCreate()
lines = spark.sparkContext.textFile("/home/llh/data/people.txt")
parts = lines.map(lambda line: line.split(" "))
people = parts.map(lambda p: Row(name=p[0], age=p[1]))
peopledf = spark.createDataFrame(people)
peopledf.show()
# +---+----+
# |age|name|
# +---+----+
# | 27|Jack|
# | 24|Rose|
# | 32|Andy|
# +---+----+
peopledf.createOrReplaceTempView("people")
namedf = spark.sql("select name from people where age < 30")
namedf.show()
# +----+
# |name|
# +----+
# |Jack|
# |Rose|
# +----+
spark.stop()
csv
数据源people.csv内容是
Jack,27
Rose,24
Andy,32
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
if __name__ == '__main__':
spark = SparkSession\
.builder\
.appName("loadCsvData")\
.master("local[*]")\
.getorCreate()
# 方式一: 与Text生成的表头的另外一种形式
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
peopledf = spark.read.csv("/home/llh/data/people.csv", schema=schema)
peopledf.show()
# +----+---+
# |name|age|
# +----+---+
# |Jack| 27|
# |Rose| 24|
# |Andy| 32|
# +----+---+
# 方式二: 该方式并未使用Spark
data = pd.read_csv("/home/llh/data/people.csv", names=['name','age'])
print(data.head())
# name age
# 0 Jack 27
# 1 Rose 24
# 2 Andy 32
spark.stop()
json
数据源people.json内容是:
{"name":"Jack", "age":27}
{"name":"Rose", "age":24}
{"name":"Andy"}
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession\
.builder\
.appName("loadJsonData")\
.master("local[*]")\
.getorCreate()
peopledf = spark.read.json("/home/llh/data/people.json")
peopledf.show()
# +----+----+
# | age|name|
# +----+----+
# | 27 |Jack|
# | 24 |Rose|
# |null|Andy|
# +----+----+
peopledf.printSchema()
# root
# | -- age: long(nullable=true)
# | -- name: string(nullable=true)
peopledf.select('name').show()
# +----+
# |name|
# +----+
# |Jack|
# |Rose|
# |Andy|
# +----+
peopledf.select(peopledf['name'],peopledf['age']+1).show()
# +----+---------+
# |name|(age + 1)|
# +----+---------+
# |Jack| 28|
# |Rose| 25|
# |Andy| null|
# +----+---------+
peopledf.filter(peopledf['age'] > 25).show()
# +---+----+
# |age|name|
# +---+----+
# | 27|Jack|
# +---+----+
peopledf.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |null| 1|
# | 27| 1|
# | 24| 1|
# +----+-----+
spark.stop()
parquet
这种格式数据一般存放在hdfs上,用一般编辑器打开会显示一堆乱码
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession\
.builder\
.appName("loadParquetData")\
.master("local[*]")\
.getorCreate()
peopledf = spark.read.parquet("/home/llh/data/people.parquet")
peopledf.createOrReplaceTempView("people")
namedf = spark.sql("select name from people where age < 30")
namedf.show()
# +----+
# |name|
# +----+
# |Jack|
# |Rose|
# +----+
spark.stop()
jdbc
jdbc可以包含MysqL、oracle、tidb等,我们这里以MysqL为例,数据库是test,表为people
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession\
.builder\
.appName("loadJdbcData")\
.master("local[*]")\
.getorCreate()
peopledf = spark.read\
.format("jdbc")\
.option("url", "jdbc:MysqL://localhost:3306/test")\
.option("driver", "com.MysqL.jdbc.Driver")\
.option("dbtable", "(select * from people) tmp")\
.option("user", "root")\
.option("password", "1")\
.load()
peopledf.show()
# +----+---+
# |name|age|
# +----+---+
# |Jack| 27|
# |Rose| 24|
# |Andy| 32|
# +----+---+
spark.stop()
运行时可以会报找不到MysqL驱动:java.lang.classNotFoundException: com.MysqL.jdbc.Driver,解决办法是mysql驱动下载一个驱动放到pyspark安装目录jars下,默认在/usr/local/lib/python3.7/site_package/pyspark/jars/
hive
hive数据存放文件分隔符是一种特殊符号"^A",而且一般的spark配置了hive数据库信息,所以可以直接读取hive数据库
编写代码加载people.hive到people表中并显示出来
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession\
.builder\
.appName("loadHiveData")\
.master("local[*]")\
.enableHiveSupport()\
.getorCreate()
spark.sql("create table if not exists people (name string, age int) using hive")
spark.sql("load data local inpath '/home/llh/data/people.hive' into table people")
spark.sql("select * from people").show()
# +----+---+
# |name|age|
# +----+---+
# |Jack| 27|
# |Rose| 24|
# |Andy| 32|
# +----+---+
spark.stop()
kafka
kafka与spark结合常用于实时项目,也就是spark streaming后续会单独写
elasticsearch
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession\
.builder\
.appName("loadEsData")\
.master("local[*]")\
.enableHiveSupport()\
.getorCreate()
peopledf = spark.read\
.format("org.elasticsearch.spark.sql")\
.option("es.nodes", "localhost")\
.option("es.port", 9200)\
.option("es.resource", "people/data")\
.load()
peopledf.registerTempTable("people")
spark.sql("select * from people").show()
# +----+---+
# |name|age|
# +----+---+
# |Jack| 27|
# |Rose| 24|
# |Andy| 32|
# +----+---+
spark.stop()
以上是比较常用的数据来源,当然还有一些比如hbase、phoenix等等...掌握上面的几种再举一反三问题不大。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。