微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink 读取hive的数据

flink1.8 对hive 的支持不够好,造成300W的数据,居然读了2个小时,打算将程序迁移至spark。 先把代码贴上

maven


<dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.2</version>
        </dependency>

        <dependency> 
              <groupId>jdk.tools</groupId> 
              <artifactId>jdk.tools</artifactId> 
              <version>1.8</version> 
              <scope>system</scope> 
              <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> 
        </dependency> 

java


private final static String driverName = "org.apache.hive.jdbc.HiveDriver";// jdbc驱动路径
    private final static String url = ";";// hive库地址+库名
    private final static String user = "";// 用户名
    private final static String password = "!";// 密码
    private final static String table="";
    private final static String sql = " ";

    public static void main(String[] arg) throws Exception {

        long time=System.currentTimeMillis();   
        HttpClientUtil.sendDingMessage("开始同步hive-"+table+";"+Utils.getTimeString());        
        /**
         * 初始化环境
         */
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        try {
            Type@R_492_4045@ion[] types = new Type@R_492_4045@ion[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
            String[] colName = new String[]{"user","name"};     
            RowTypeInfo rowTypeInfo = new RowTypeInfo(types, colName);
            JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat().setDrivername(driverName)
                    .setDBUrl(url)
                    .setUsername(user).setPassword(password);

            Calendar calendar = Calendar.getInstance();
            calendar.setTime(new Date());
            calendar.add(Calendar.DATE, -1); //用昨天产出的数据
            SimpleDateFormat sj = new SimpleDateFormat("yyyyMMdd");
            String d=sj.format(calendar.getTime());

            JDBCInputFormat jdbcInputFormat = builder.setQuery(sql+" and dt='"+d+"' limit 100000000").setRowTypeInfo(rowTypeInfo).finish();
            DataSource<Row> rowlist = env.createInput(jdbcInputFormat);

            DataSet<RedisDataModel> temp= rowlist.filter(new FilterFunction<Row>(){

                @Override
                public boolean filter(Row row) throws Exception {
                    String key=row.getField(0).toString();
                    String value=row.getField(1).toString();
                    if(key.length()<5 || key.startsWith("-") || key.startsWith("$") || value.length()<5 || value.startsWith("-") || value.startsWith("$")) {
                        return false;
                    }else {
                        return true;
                    }
                }

            }).map(new MapFunction<Row, RedisDataModel>(){

                @Override
                public RedisDataModel map(Row value) throws Exception {
                    RedisDataModel m=new RedisDataModel();
                    m.setExpire(-1); 
                    m.setKey(JobConstants.REdis_FLINK_IMEI_USER+value.getField(0).toString());      
                    m.setGlobal(true);
                    m.setValue(value.getField(1).toString());
                    return m;
                } 

            });

            HttpClientUtil.sendDingMessage("同步hive-"+table+"完成;开始推送模型,共有"+temp.count()+"条;"+Utils.getTimeString()); 

            RedisOutputFormat redisOutput = RedisOutputFormat.buildredisOutputFormat()
                    .setHostMaster(AppConfig.getProperty(JobConstants.REdis_HOST_MASTER))
                    .setHostSentinel(AppConfig.getProperty(JobConstants.REdis_HOST_SENTINELS))
                    .setMaxIdle(Integer.parseInt(AppConfig.getProperty(JobConstants.REdis_MAXIDLE)))
                    .setMaxTotal(Integer.parseInt(AppConfig.getProperty(JobConstants.REdis_MAXTOTAL))) 
                    .setMaxWaitMillis(Integer.parseInt(AppConfig.getProperty(JobConstants.REdis_MAXWAITMILLIS)))
                    .setTestOnBorrow(Boolean.parseBoolean(AppConfig.getProperty(JobConstants.REdis_TESTONBORROW)))
                    .finish();   
            temp.output(redisOutput);               
            env.execute("hive-"+table+" sync");

            HttpClientUtil.sendDingMessage("同步hive-"+table+"完成,耗时:"+(System.currentTimeMillis()-time)/1000+"s"); 
        } catch (Exception e) {
            logger.error("",e); 
            HttpClientUtil.sendDingMessage("同步hive-"+table+"失败,时间戳:"+time+",原因:"+e.toString());
        } 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐