思路:造一个存有全国每个城市边界经纬度的信息表,把经纬度点使用算法公式按照一定的逻辑挨个循环去匹配。
处理过程中的城市经纬度Map类似于如下:
如果要全国的可以访问如下链接下载相对应的csv
https://download.csdn.net/download/Aaron_ch/16165798
全部实现逻辑代码如下:
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Author: aaron
* @Date: 2021/3/8 18:11
* @Description:
*/
public class JustDoIt {
static boolean isInLongLat(double ALon, double ALat, double[][] APoints) {
boolean bRet = false;
int iSum = 0;
double dLon1, dLon2, dLat1, dLat2, dLon;
int iCount = APoints.length;
for (int i = 0; i < iCount; i++) {
if (i == iCount - 1) {
dLon1 = APoints[i][0];
dLat1 = APoints[i][1];
dLon2 = APoints[0][0];
dLat2 = APoints[0][1];
} else {
dLon1 = APoints[i][0];
dLat1 = APoints[i][1];
dLon2 = APoints[i + 1][0];
dLat2 = APoints[i + 1][1];
}
if (((ALat >= dLat1) && (ALat < dLat2)) || ((ALat >= dLat2) && (ALat < dLat1))) {
if (Math.abs(dLat1 - dLat2) > 0) {
dLon = dLon1 - ((dLon1 - dLon2) * (dLat1 - ALat)) / (dLat1 - dLat2);
if (dLon < ALon) iSum++;
}
}
}
if (iSum % 2 != 0)
bRet = true;
return bRet;
}
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("getCityCode")
.set("hive.exec.dynamici.partition", "true")
.set("hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.autobroadcastJoinThreshold", "204800")
.set("spark.debug.maxToStringFields", "1000")
.set("spark.sql.decimalOperations.allowPrecisionLoss", "false")
.setMaster("yarn");
SparkSession sparkSession = SparkSession.builder()
.config(conf).enableHiveSupport().getorCreate();
//数据准备,取出全国的每个城市的边界经纬度
String sql = "SELECT concat(city_code,'@',blockid) city_key,\n" +
" collect_list(concat(lng,'|',lat)) AS lngat\n" +
"FROM ods_t_city_range_info" +
" where lng is not null and lat is not null " +
"GROUP BY concat(city_code,'@',blockid)";
Dataset<Row> rowDataset = sparkSession.sql(sql);
List<Row> rowList = rowDataset.toJavaRDD().cache().collect();
Map<String, double[][]> mapPoints = new HashMap<>();
for (Row row : rowList) {
String[] poss = row.get(1).toString().replace("WrappedArray(","").replace(")","").split(",");
double[][] results = new double[poss.length][2];
int i = 0;
for (String pos : poss) {
String[] temp = pos.split("\\|");
results[i][0] = Double.valueOf(temp[0].trim());
results[i][1] = Double.valueOf(temp[1].trim());
i++;
}
mapPoints.put(row.get(0).toString(), results);
}
sparkSession.udf().register("getCityCode", (UDF2<Double, Double, String>) (longitude, latitude) -> {
String sRet = "-999";
for (String key : mapPoints.keySet()) {
if (isInLongLat(longitude, latitude, mapPoints.get(key))) {
sRet = key.split("@")[0];
break;
}
}
return sRet;
}
, DataTypes.StringType
);
String sql2 = "SELECT longitude,latitude,getCityCode(cast(longitude as double),cast(latitude as double)) as city_code" +
" from ods_t_phone_location_report " +
"where longitude is not null " +
"and latitude is not null";
sparkSession.sql(sql2).show();
}
}
longitude | latitude | city_code |
118.83 | 31.95 | 320100 |
118.62 | 32.07 | 320100 |
... | ... | ... |
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。