备注:此代码没有任何实际作用,仅作为初学者学习用
package com.c.user_behavior
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 用户行为数据清洗
* 1、验证数据格式是否正确,切分后长度必须为17
* 2、手机号脱敏,格式为123xxxx4567
* 3、去掉username中带有的\n,否则导致写入HDFS时会换行
*/
object UserBehaviorCleaner {
def main(args : Array[String]): Unit ={
if(args.length != 2){
println("Usage:please input inputPath and outputPath")
System.exit(1)
}
// 获取输入输出路径
val inputPath = args(0)
val outputPath = args(1)
val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)
// 通过输入路径获取RDD
val eventRDD: RDD[String] = sc.textFile(inputPath)
// 清洗数据,在算子中不要写大量业务逻辑,应该将逻辑封装到方法中
eventRDD.filter(event => checkEventValid(event)) // 验证数据有效性
.map( event => maskPhone(event)) // 手机号脱敏
.map(event => repairUsername(event)) // 修复username中带有\n导致的换行
.coalesce(3)
.saveAsTextFile(outputPath)
sc.stop()
}
/**
* username为用户自定义的,里面有要能存在"\n",导致写入到HDFS时换行
* @param event
*/
def repairUsername(event : String)={
val fields = event.split("\t")
// 取出用户昵称
val username = fields(1)
// 用户昵称不为空时替换"\n"
if(username != null && ! " ".equals(username)){
fields(1) = username.replace("\n","")
}
fields.mkString("\t")
}
/**
* 脱敏手机号
* @param event
*/
def maskPhone(event : String): String ={
var maskPhone = new StringBuilder
val fields: Array[String] = event.split("\t")
// 取出手机号
val phone = fields(9)
// 手机号不为空时做掩码处理
if(phone != null && !"".equals(phone)){
maskPhone = maskPhone.append(phone.substring(0,3)).append("xxxx").append(phone.substring(7,11))
fields(9) = maskPhone.toString()
}
fields.mkString("\t")
}
/**
* 验证数据格式是否正确,只有切分后长度为17的才算正确
* @param event
*/
def checkEventValid(event : String) ={
val fields = event.split("\t")
fields.length == 17
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。