微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

postgresql – 从Postgres JDBC表中读取Spark的速度很慢

我试图从Postgresql数据库加载大约1M行到Spark.使用Spark时需要大约10秒.但是,使用psycopg2驱动程序加载相同的查询需要2s.我正在使用 postgresql jdbc驱动程序版本42.0.0

@H_404_6@

@H_404_6@

def _loadFromPostGres(name):
    url_connect = "jdbc:postgresql:"+dbname
    properties = {"user": "postgres","password": "postgres"}
    df = SparkSession.builder.getorCreate().read.jdbc(url=url_connect,table=name,properties=properties)
    return df

df = _loadFromPostGres("""
    (SELECT "seriesId","companyId","userId","score" 
    FROM user_series_game 
    WHERE "companyId"=655124304077004298) as
user_series_game""")

print measure(lambda : len(df.collect()))

输出是 – @H_404_6@

@H_404_6@

--- 10.7214591503 seconds ---
1076131

使用psycopg2 – @H_404_6@

@H_404_6@

import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()

def _exec():
    cur.execute("""(SELECT "seriesId","score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298)""")
    return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()

输出是 – @H_404_6@

@H_404_6@

--- 2.27961301804 seconds ---
1076131

测量功能 – @H_404_6@

@H_404_6@

def measure(func) :
    start_time = time.time()
    x = func()
    print("--- %s seconds ---" % (time.time() - start_time))
    return x

请帮我找到这个问题的原因.@H_404_6@

编辑1@H_404_6@

我做了一些基准测试.使用Scala和JDBC – @H_404_6@

@H_404_6@

import java.sql._;
import scala.collection.mutable.ArrayBuffer;

def exec() {

val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

val conn = DriverManager.getConnection(url,"postgres","postgres");

val sqlText = """SELECT "seriesId","score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298"""

val t0 = System.nanoTime()

val stmt = conn.prepareStatement(sqlText,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY)

val rs = stmt.executeQuery()

val list = new ArrayBuffer[(Long,Long,Double)]()

while (rs.next()) {
    val seriesId = rs.getLong("seriesId")
    val companyId = rs.getLong("companyId")
    val userId = rs.getLong("userId")
    val score = rs.getDouble("score")
    list.append((seriesId,companyId,userId,score))
}

val t1 = System.nanoTime()

println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

println(list.size)

rs.close()
stmt.close()
conn.close()
}

exec()

输出是 – @H_404_6@

@H_404_6@

Elapsed time: 1.922102285s
1143402

当我在Spark Scala中收集了() – @H_404_6@

@H_404_6@

import org.apache.spark.sql.SparkSession

def exec2() {

    val spark = SparkSession.builder().getorCreate()

    val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

    val sqlText = """(SELECT "seriesId","score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298) as user_series_game"""

    val t0 = System.nanoTime()

    val df = spark.read
          .format("jdbc")
          .option("url",url)
          .option("dbtable",sqlText)
          .option("user","postgres")
          .option("password","postgres")
          .load()

    val list = df.collect()

    val t1 = System.nanoTime()

    println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

    print (list.size)
}

exec2()

输出是@H_404_6@

@H_404_6@

Elapsed time: 1.486141076s
1143445

因此,在Python序列化中花费了4倍的额外时间.我知道会有一些惩罚,但这似乎太多了.@H_404_6@

解决方法

原因很简单,有两个原因.

@H_404_6@

首先,我将介绍psycopg2的工作原理.@H_404_6@

这个lib psycopg2的工作方式与任何其他lib一样,可以连接到RDMS.这个lib会将查询发送到你的postgres的引擎,它会将数据返回给你.像这样直接前行.@H_404_6@

@H_404_6@

Conn -> Query -> ReturnData -> FetchData@H_404_6@

当你使用spark时,两种方式有点不同. Spark不像是在单个线程中运行的编程语言.它有一个分布式系统可以工作.即使您在本地计算机上运行.请参阅Spark有Driver(Master)和Workers的基本概念.@H_404_6@

驱动程序接收执行查询到Postgres的请求,驱动程序不会请求每个工作人员请求来自Postgres的信息.@H_404_6@

如果您看到文档here,您将看到如下注释:@H_404_6@

@H_404_6@

Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.@H_404_6@

此注释表示每个工作人员都有责任为您的postgres请求数据.这是开始这个​​过程的一小部分开销,但没有什么大不了的.但是这里有一个开销,将数据发送给每个工作者.@H_404_6@

第二点,您收集这部分代码:@H_404_6@

@H_404_6@

print measure(lambda : len(df.collect()))

collect函数将为您的所有工作人员发送命令,以将数据发送给您的驱动程序.要存储在驱动程序的内存中,它就像一个Reduce,它会在进程的中间创建Shuffle. Shuffle是将数据发送给其他工作人员的过程的一个步骤.在收集的情况下,每个工作人员都会将其发送给您的司机.@H_404_6@

所以你的代码的JDBC在JDBC中的步骤是:@H_404_6@

@H_404_6@

(Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver)
Request the Data -> (Workers) Shuffle -> (Driver) Collect@H_404_6@

好吧,在Spark中发生的其他一些事情,比如QueryPlan,构建DataFrame和其他东西.@H_404_6@

这就是你在简单的Python代码中比Spark更快响应的原因.@H_404_6@

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐