Streaming
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="myAppName")
有的电脑直接就能找到,就不需要上面这段代码了
调取Streaming
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=pyspark.SparkContext.getorCreate(conf)
ssc = StreamingContext(sc,10)
lines = ssc.socketTextStream('127.0.0.1',8888)
看起来这个SparkConf像构建一个工作区,通过监听这个工作区来得到值。然后绑定这个窗口到8888端口。
由于是靠python来写的,与书上不一致,只有猜一下他的作用。
接下来就要对进来的数据进行筛选了。先测试一下(防止重启太麻烦)
blackForm = sc.textFile("./blackForm.txt") # 读取黑名单
blackForm=blackForm.map(lambda x:x.split(" ")) # 空格切分
rdd1 = sc.parallelize([["1111","hadoop"],["2222",'spark'],["3333","hive"]]) # 测试rdd
rdd1=rdd1.map(lambda x:[x[1],x[0]])
rdd1.leftOuterJoin(blackForm).collect()
由于要求是用左连接。就改一下输入值的顺序
效果:
那就直接开始在启动程序里写
blackForm = sc.textFile("./blackForm.txt")
blackForm=blackForm.map(lambda x:x.split(" "))
data = lines.map(lambda x:x.split(" ")) # 切分
data = data.map(lambda x:[x[1],x[0]]) # 转格式
data = data.transform(lambda x:x.leftOuterJoin(blackForm))
data = data.filter(lambda x:x[1][1]=='false') # 筛选
data = data.map(lambda x:(x[0],x[1][0])) # 转输出格式
data.pprint()
ssc.start()
启动这段代码后在本机的窗口命令行输入nc -l -p 8888
然后输入
Spark GraphFrame
读入数据查看格式。
test=sc.textFile("d:/data/web-Google.txt")
test.take(10)
看了半天,这样应该是可行的
from pyspark.sql import SparkSession
from pyspark.sql import sqlContext
from graphframes import *
from pyspark.sql.types import *
# 启动pyspark需要带参数
spark = SparkSession.builder.appName("testGraph") \
.master('local') \
.getorCreate()
sc = spark.sparkContext
sqlContext = sqlContext(sc)
countId = sc.textFile('D:/data/web-Google.txt').map(lambda x:x.split("\t")).map(lambda x:x[0]).distinct().map(lambda x:(int(x),x))
schema = StructType([
StructField("id", IntegerType(), True),
StructField("from", StringType(), True),
])
df_v = sqlContext.createDataFrame(countId,schema)
这里的countId就是找到有多少不重复的ID,根据ID创建一个一模一样的value这样就可以用来创建DataFrame了,到后面再把这列from去掉
构造边
schema = StructType([
# true代表不为空
StructField("src", IntegerType(), True),#
StructField("dst", IntegerType(), True),#
StructField("relation", StringType(), True),
])
rdd2 = sc.textFile('D:/data/web-Google.txt') \
.map(lambda x:x.split("\t")) \
.map(lambda x:(int(x[0]),int(x[1]),"link"))
df_e = sqlContext.createDataFrame(rdd2, schema)
构造图
g = GraphFrame(df_v,df_e)
最后出来的图效果
查询顶点和边的个数
countId即是我们的顶点。做一个计数则是顶点个数为73945,边就是edges的条数为5105039
from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F
g_new.aggregateMessages(F.count(AM.msg).alias("count"),
sendToSrc="1").show()
向顶点发送1,然后count计数
筛选出id>1的组成子图
# 好像就是这样,有可能有问题
overOne = g_new.filterVertices('id > 1')
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。