@H_404_6@
@H_404_6@
def _loadFromPostGres(name): url_connect = "jdbc:postgresql:"+dbname properties = {"user": "postgres","password": "postgres"} df = SparkSession.builder.getorCreate().read.jdbc(url=url_connect,table=name,properties=properties) return df df = _loadFromPostGres(""" (SELECT "seriesId","companyId","userId","score" FROM user_series_game WHERE "companyId"=655124304077004298) as user_series_game""") print measure(lambda : len(df.collect()))
@H_404_6@
--- 10.7214591503 seconds --- 1076131
@H_404_6@
import psycopg2 conn = psycopg2.connect(conn_string) cur = conn.cursor() def _exec(): cur.execute("""(SELECT "seriesId","score" FROM user_series_game WHERE "companyId"=655124304077004298)""") return cur.fetchall() print measure(lambda : len(_exec())) cur.close() conn.close()
@H_404_6@
--- 2.27961301804 seconds --- 1076131
@H_404_6@
def measure(func) : start_time = time.time() x = func() print("--- %s seconds ---" % (time.time() - start_time)) return x
请帮我找到这个问题的原因.@H_404_6@
编辑1@H_404_6@
我做了一些基准测试.使用Scala和JDBC – @H_404_6@
@H_404_6@
import java.sql._; import scala.collection.mutable.ArrayBuffer; def exec() { val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000") val conn = DriverManager.getConnection(url,"postgres","postgres"); val sqlText = """SELECT "seriesId","score" FROM user_series_game WHERE "companyId"=655124304077004298""" val t0 = System.nanoTime() val stmt = conn.prepareStatement(sqlText,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY) val rs = stmt.executeQuery() val list = new ArrayBuffer[(Long,Long,Double)]() while (rs.next()) { val seriesId = rs.getLong("seriesId") val companyId = rs.getLong("companyId") val userId = rs.getLong("userId") val score = rs.getDouble("score") list.append((seriesId,companyId,userId,score)) } val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) * 1e-9 + "s") println(list.size) rs.close() stmt.close() conn.close() } exec()
@H_404_6@
Elapsed time: 1.922102285s 1143402
当我在Spark Scala中收集了() – @H_404_6@
@H_404_6@
import org.apache.spark.sql.SparkSession def exec2() { val spark = SparkSession.builder().getorCreate() val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000") val sqlText = """(SELECT "seriesId","score" FROM user_series_game WHERE "companyId"=655124304077004298) as user_series_game""" val t0 = System.nanoTime() val df = spark.read .format("jdbc") .option("url",url) .option("dbtable",sqlText) .option("user","postgres") .option("password","postgres") .load() val list = df.collect() val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) * 1e-9 + "s") print (list.size) } exec2()
@H_404_6@
Elapsed time: 1.486141076s 1143445
因此,在Python序列化中花费了4倍的额外时间.我知道会有一些惩罚,但这似乎太多了.@H_404_6@
解决方法
@H_404_6@
首先,我将介绍psycopg2的工作原理.@H_404_6@
这个lib psycopg2的工作方式与任何其他lib一样,可以连接到RDMS.这个lib会将查询发送到你的postgres的引擎,它会将数据返回给你.像这样直接前行.@H_404_6@
@H_404_6@
Conn -> Query -> ReturnData -> FetchData@H_404_6@
当你使用spark时,两种方式有点不同. Spark不像是在单个线程中运行的编程语言.它有一个分布式系统可以工作.即使您在本地计算机上运行.请参阅Spark有Driver(Master)和Workers的基本概念.@H_404_6@
驱动程序接收执行查询到Postgres的请求,驱动程序不会请求每个工作人员请求来自Postgres的信息.@H_404_6@
如果您看到文档here,您将看到如下注释:@H_404_6@
@H_404_6@
Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.@H_404_6@
此注释表示每个工作人员都有责任为您的postgres请求数据.这是开始这个过程的一小部分开销,但没有什么大不了的.但是这里有一个开销,将数据发送给每个工作者.@H_404_6@
@H_404_6@
print measure(lambda : len(df.collect()))
collect函数将为您的所有工作人员发送命令,以将数据发送给您的驱动程序.要存储在驱动程序的内存中,它就像一个Reduce,它会在进程的中间创建Shuffle. Shuffle是将数据发送给其他工作人员的过程的一个步骤.在收集的情况下,每个工作人员都会将其发送给您的司机.@H_404_6@
所以你的代码的JDBC在JDBC中的步骤是:@H_404_6@
@H_404_6@
(Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver)
Request the Data -> (Workers) Shuffle -> (Driver) Collect@H_404_6@
好吧,在Spark中发生的其他一些事情,比如QueryPlan,构建DataFrame和其他东西.@H_404_6@
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。