- Introduce
大数据分布式技术结合图库Neo4J项目,由于Neo4j采用单节点,性能存在以下问题:
所以重新设计架构,采用分布式中间件来取代单节点式Neo4j部分功能。经测试,几套架构尚可满足Spark离线处理和实时计算需求。
- Coding Introduce
def getDriver(): Driver = {
val url = Contants.NEO4j_URL
val user = Contants.NEO4J_USER
val password = Contants.NEO4j_PWD
val driver = GraphDatabase.driver(url, AuthTokens.basic(user, password), Config.build()
.withMaxIdleSessions(1000)
.withConnectionLivenessCheckTimeout(10,TimeUnit.SECONDS)
.toConfig)
return driver
}
def getSession(driver: Driver): Session = {
val session = driver.session()
return session
}
def relationShip(session: Session, msisdn: String, touser: String,capdate:String): String = {
//查询个人和个人之间的文件关系
val result = session.run("match (p1:person)-[r:filetofile]-(p2:person) where p1.msisdn={msisdn} and p2.touser={touser} return r.uuid as uuid,p1.wxid as wxid1,p1.name as name1,p2.msisdn as msisdn2,p2.wxid as wxid2,p2.name as name2",
parameters("msisdn", msisdn, "touser", touser))
if (result.hasNext) {
val record = result.next()
val uuid = record.get("uuid").asstring()
val wxid1 = record.get("wxid1").asstring()
val name1 = record.get("name1").asstring()
val msisdn2 = record.get("msisdn2").asstring()
val wxid2 = record.get("wxid2").asstring()
val name2 = record.get("name2").asstring()
return uuid + "|" + wxid1 + "|" + name1 + "|" + msisdn2 + "|" + wxid2 + "|" + name2
} else {
val uuid = UUID.randomUUID().toString.replaceAll("-", "")
val rel = session.run("match (p1:person),(p2:person) where p1.msisdn={msisdn} and p2.touser={touser} merge (p1)-[r:filetofile]-(p2) on create set r.uuid={uuid},r.capdate={capdate} return p1.wxid as wxid1,p1.name as name1,p2.msisdn as msisdn2,p2.wxid as wxid2,p2.name as name2;",
parameters("msisdn", msisdn, "touser", touser, "uuid", uuid,"capdate",capdate))
if (rel.hasNext) {
val record = rel.next()
val wxid1 = record.get("wxid1").asstring()
val name1 = record.get("name1").asstring()
val msisdn2 = record.get("msisdn2").asstring()
val wxid2 = record.get("wxid2")
val name2 = record.get("name2")
return uuid + "|" + wxid1 + "|" + name1 + "|" + msisdn2 + "|" + wxid2 + "|" + name2
}
}
- 传入msisdn,touser查询该关系是否存在。
若存在,则返回关系+节点属性
如果不存在,则新建关系,且关系属性上使用唯一UUID作为标识。同时返回关系+属性参数。
要注意遍历RDD时一定要在每次遍历查询之后关闭Neo4j的Driver,防止内存溢出。
try {
resultMappRdd.foreachRDD(rdd => {
rdd.savetoEs("wechat_neo4j/file")
})
} catch {
case e: InterruptedException =>
Thread.currentThread().interrupt()
} finally {
try {
if (ssc != null) {
ssc.start()
ssc.awaitTermination()
}
} catch {
case e:Exception =>
e.printstacktrace()
}
之后把接口方法体中返回的节点+关系数据遍历插入ES中。
注意在实时计算采用这种方法时,有时会在流量暴增情况下出现:上个队列批次尚未处理完成,下个批次队列就进入线程中。会出现OOM问题。所以可以使用Thread.currentThred.interrupt方法让OOM之后数据重新开始。
同时为了解决实时计算流量暴增情况:
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。