微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

3. Spark常见数据源

*以下内容由《Spark快速大数据分析》整理所得。

读书笔记的第三部分是讲的是Spark有哪些常见数据源?怎么读取它们的数据并保存。

Spark有三类常见的数据源:

  • 文件格式与文件系统:它们是存储在本地文件系统或分布式文件系统(比如 NFS、HDFS、Amazon S3 等)中的 数据,例如:文本文件、JSON、SequenceFile, 以及 protocol buffer。
  • Spark sql中的结构化数据源:它针对包括JSON和Apache Hive在内的结构化数据源。
  • 数据库与键值存储:Spark 自带的库和一些第三方库,它们可以用来连接Cassandra、HBase、Elasticsearch以及JDBC源。

 

一、文件格式与文件系统

1. 文本文件

2. JSON

3. CSV

4. SequenceFile

二、Spark sql中的结构化数据源

三、数据库与键值存储

 


一、文件格式与文件系统

1. 文本文件
文本文件读取:

# 方法1:文本文件读取
input = sc.textFile("file://home/holden/repos/sparks/README.md")
# 方法2:如果文件足够小,同时读取整个文件,从而返回一个pair RDD,其中键时输入文件文件名
input = sc.wholeTextFiles("file://home/holden/salesFiles")

文本文件保存:

result.saveAsTextFile(outputFile)

2. JSON
JSON读取:

# JSON读取
import json
data = input.map(lambda x: json.loads(x))

JSON保存:

# JSON保存 - 举例选出喜爱熊猫的人
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile))
# 保存文本文件
result.SaveAsTextFile(outputFilePath)

3. CSV
CSV读取:

import csv
import StringIO

# CSV读取 - 如果数据字段均没有包括换行符,只能一行行读取 def loadRecord(line): """解析一行CSV记录""" input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
# CSV读取 - 如果数据字段嵌有换行符,需要完整读入每个文件 def loadRecords(fileNameContents): """读取给定文件中的所有记录""" input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)

CSV保存:

# CSV保存
def writeRecords(records):
    """写出一些CSV记录"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["names", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]

pandalovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

4. SequenceFile

SequenceFile读取:

# sc.sequenceFile(path, keyClass, valueClass)
data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")

SequenceFile保存(用Scala):

val data = sc.parallelize(List(("Pandas", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)

 

二、Spark sql中的结构化数据源

 

 


 

三、数据库与键值存储

关于Cassandra、HBase、Elasticsearch以及JDBC源的数据库连接,详情请参考书本81-86页内容


 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐