微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark GraphX图计算结构详解【图构建器、顶点、边】

一.图构建器

  GraphX提供了几种从RDD或磁盘上的顶点和边的集合构建图形的方法认情况下,没有图构建器会重新划分图的边;相反,边保留在认分区中。Graph.groupEdges要求对图进行重新分区,因为它假定相同的边将在同一分区上放置,因此在调用Graph.partitionBy之前必须要调用groupEdges。 

 1 package org.apache.spark.graphx
 2 
 3 import org.apache.spark.SparkContext
 4 import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
 5 import org.apache.spark.internal.Logging
 6 import org.apache.spark.storage.StorageLevel
 7 
 8 /**
 9  * Provides utilities for loading [[Graph]]s from files.
10  */
11 object GraphLoader extends Logging {
12 
13   /**
14    * Loads a graph from an edge list formatted file where each line contains two integers: a source
15    * id and a target id. Skips lines that begin with `#`.
16    */
17   def edgeListFile(
18       sc: SparkContext,
19       path: String,
20       canonicalOrientation: Boolean = false,
21       numEdgePartitions: Int = -1,
22       edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, //缓存级别,只保存到内存
23       vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
24     : Graph[Int, Int] =
25   {
26     val startTime = System.currentTimeMillis
27 
28     // Parse the edge data table directly into edge partitions
29     val lines =
30       if (numEdgePartitions > 0) { // 加载文件数据
31         sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
32       } else {
33         sc.textFile(path)
34       } // 按照分区进行图构建
35     val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
36       val builder = new EdgePartitionBuilder[Int, Int]
37       iter.foreach { line =>
38         if (!line.isEmpty && line(0) != '#') { // 过滤注释行
39           val lineArray = line.split("\\s+")
40           if (lineArray.length < 2) { // 识别异常数据
41             throw new IllegalArgumentException("Invalid line: " + line)
42           }
43           val srcId = lineArray(0).toLong
44           val dstId = lineArray(1).toLong
45           if (canonicalOrientation && srcId > dstId) {
46             builder.add(dstId, srcId, 1)// 逐个添加边及权重
47           } else {
48             builder.add(srcId, dstId, 1)
49           }
50         }
51       }
52       Iterator((pid, builder.toEdgePartition))
53     }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
54     edges.count() // 触发执行
55 
56     logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
57 
58     GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
59       vertexStorageLevel = vertexStorageLevel)
60   } // end of edgeListFile
61 
62 }

  GraphLoader.edgeListFile是从磁盘或HDFS类似的文件系统中加载图形数据,解析为(源顶点ID, 目标顶点ID)对的邻接列表,并跳过注释行。Graph从指定的边开始创建,然后自动创建和边相邻的任何节点。所有顶点和边属性认为1。参数canonicalOrientation允许沿正方向重新定向边,这是所有连接算法所必须的。

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐