微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

【Spark】根据经纬度算出对应的城市

思路:造一个存有全国每个城市边界经纬度的信息表,把经纬度点使用算法公式按照一定的逻辑挨个循环去匹配。

处理过程中的城市经纬度Map类似于如下:

如果要全国的可以访问如下链接下载相对应的csv

https://download.csdn.net/download/Aaron_ch/16165798

全部实现逻辑代码如下:

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Author: aaron
 * @Date: 2021/3/8 18:11
 * @Description:
 */
public class JustDoIt {

    static boolean isInLongLat(double ALon, double ALat, double[][] APoints) {
        boolean bRet = false;
        int iSum = 0;
        double dLon1, dLon2, dLat1, dLat2, dLon;
        int iCount = APoints.length;
        for (int i = 0; i < iCount; i++) {
            if (i == iCount - 1) {
                dLon1 = APoints[i][0];
                dLat1 = APoints[i][1];
                dLon2 = APoints[0][0];
                dLat2 = APoints[0][1];
            } else {
                dLon1 = APoints[i][0];
                dLat1 = APoints[i][1];
                dLon2 = APoints[i + 1][0];
                dLat2 = APoints[i + 1][1];
            }
            if (((ALat >= dLat1) && (ALat < dLat2)) || ((ALat >= dLat2) && (ALat < dLat1))) {
                if (Math.abs(dLat1 - dLat2) > 0) {
                    dLon = dLon1 - ((dLon1 - dLon2) * (dLat1 - ALat)) / (dLat1 - dLat2);
                    if (dLon < ALon) iSum++;
                }
            }
        }

        if (iSum % 2 != 0)
            bRet = true;
        return bRet;
    }

    public static void main(String[] args) {

        SparkConf conf = new SparkConf()
                .setAppName("getCityCode")
                .set("hive.exec.dynamici.partition", "true")
                .set("hive.exec.dynamic.partition.mode", "nonstrict")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.sql.autobroadcastJoinThreshold", "204800")
                .set("spark.debug.maxToStringFields", "1000")
                .set("spark.sql.decimalOperations.allowPrecisionLoss", "false")
                .setMaster("yarn");

        SparkSession sparkSession = SparkSession.builder()
                .config(conf).enableHiveSupport().getorCreate();

        //数据准备,取出全国的每个城市的边界经纬度
        String sql = "SELECT concat(city_code,'@',blockid) city_key,\n" +
                "       collect_list(concat(lng,'|',lat)) AS lngat\n" +
                "FROM ods_t_city_range_info" +
                " where lng is not null and lat is not null " +
                "GROUP BY concat(city_code,'@',blockid)";

        Dataset<Row> rowDataset = sparkSession.sql(sql);

        List<Row> rowList = rowDataset.toJavaRDD().cache().collect();

        Map<String, double[][]> mapPoints = new HashMap<>();

        for (Row row : rowList) {
            String[] poss = row.get(1).toString().replace("WrappedArray(","").replace(")","").split(",");
            double[][] results = new double[poss.length][2];
            int i = 0;
            for (String pos : poss) {
                String[] temp = pos.split("\\|");
                results[i][0] = Double.valueOf(temp[0].trim());
                results[i][1] = Double.valueOf(temp[1].trim());
                i++;
            }
            mapPoints.put(row.get(0).toString(), results);
        }


        sparkSession.udf().register("getCityCode", (UDF2<Double, Double, String>) (longitude, latitude) -> {
                    String sRet = "-999";
                    for (String key : mapPoints.keySet()) {
                        if (isInLongLat(longitude, latitude, mapPoints.get(key))) {
                            sRet = key.split("@")[0];
                            break;
                        }
                    }
                    return sRet;
                }
                , DataTypes.StringType
        );


        String sql2 = "SELECT longitude,latitude,getCityCode(cast(longitude as double),cast(latitude as double)) as city_code" +
                       " from ods_t_phone_location_report " +
                       "where longitude is not null " +
                         "and latitude is not null";

        sparkSession.sql(sql2).show();

    }
}

最终输出效果如下:

longitudelatitudecity_code
118.83

31.95

320100

118.62

32.07

320100
.........

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐