微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

记一次Hbase的行键过滤器事故问题

 

数据总数:746条数据

因为后面需要进行算法合成,而且spark目前对这种算法支持并不好,因此采用代码编写,所以在查询hbase的过程中采用的是java直接查询

但是为了加快查询速度,我尽可能的使用了过滤器

 

1:初期Hbase的rowkey组合:时间+"_"+订单id

查询思路:

1:能快速检索,减少GC,采用过滤器

2:支持时间段查询

根据上面两点,我采用时间过滤,比如:startTime=201904010000  endTime=201904180000|;【注意这个符号:“|” 】然后根据行键过滤器

CompareFilter.CompareOp.GREATER_OR_EQUAL和
CompareFilter.CompareOp.LESS_OR_EQUAL进行大小对比

 

使用代码查询的时候,添加了行键过滤器

 FilterList filterList=new FilterList();
            //time+id
            if(startTime != null){
                RowFilter rf = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL,
                        new BinaryComparator(Bytes.toBytes(startTime)));
                filterList.addFilter(rf);
            }
            if(endTime != null){
                RowFilter rf = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
                        new BinaryComparator(Bytes.toBytes(endTime)));
                filterList.addFilter(rf);
            }
            scan.setFilter(filterList);

完整代码

 /**
     * 行键过滤器
     * */
    public static List<Map<String , String>> rowFilter(String tableName , String startTime , String endTime){
        Connection connection = null;
        Scan scan = new Scan();
        scan.setCacheBlocks(false);
        ResultScanner rs = null;
        Table table = null;
        List<Map<String , String>> list = new ArrayList<Map<String , String>>();
        try{
            connection = ConnectionFactory.createConnection(config);
            table = connection.getTable(TableName.valueOf(tableName));
            FilterList filterList=new FilterList();
            //time+id
            if(startTime != null){
                RowFilter rf = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL,
                        new BinaryComparator(Bytes.toBytes(startTime)));
                filterList.addFilter(rf);
            }
            if(endTime != null){
                RowFilter rf = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
                        new BinaryComparator(Bytes.toBytes(endTime)));
                filterList.addFilter(rf);
            }
            scan.setFilter(filterList);
            rs = table.getScanner(scan);
            for (Result r : rs) {
                Map<String , String> map = new HashMap<String , String>();
                for (Cell cell : r.listCells()) {
                    map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())
                            , Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
                list.add(map);
            }
        }catch (Exception e){
            e.printstacktrace();
        }finally {
            if (null != rs) {
                rs.close();
            }
            try {
                if (null != table) {
                    table.close();
                }
                if (null != connection && !connection.isClosed()) {
                    System.out.println("scan Result is closed");
                    connection.close();
                }
            } catch (IOException e) {
                e.printstacktrace();
            }
        }
        return list;



    }
初期完整代码

那么这种方案查询后返回的结果是:361条数据! 实际Hbase测试表中有746条数据,那么可以肯定,是行键过滤器出错了(后面再研究为啥出错)

 

改善:

更改rowkey结构,采用:订单id+"_"+time来实现

然后过滤器代码改善:

 FilterList filterList=new FilterList();
            //id+time
            if(startTime != null){
                RowFilter rf = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL,
                        new RegexStringComparator(".*_"+startTime));
                filterList.addFilter(rf);
            }
            if(endTime != null){
                RowFilter rf = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
                        new RegexStringComparator(".*_"+endTime));
                filterList.addFilter(rf);
            }
            scan.setFilter(filterList);

上面其实就是采用正则表达式进行后缀识别,这样我就可以根据后缀进行时间过滤

完整代码

    /**
     * 行键过滤器
     * */
    public static List<Map<String , String>> rowEndFilter(String tableName , String startTime , String endTime){
        Connection connection = null;
        Scan scan = new Scan();
        scan.setCacheBlocks(false);
        ResultScanner rs = null;
        Table table = null;
        List<Map<String , String>> list = new ArrayList<Map<String , String>>();
        try{
            connection = ConnectionFactory.createConnection(config);
            table = connection.getTable(TableName.valueOf(tableName));
            FilterList filterList=new FilterList();
            //id+time
            if(startTime != null){
                RowFilter rf = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL,
                        new RegexStringComparator(".*_"+startTime));
                filterList.addFilter(rf);
            }
            if(endTime != null){
                RowFilter rf = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
                        new RegexStringComparator(".*_"+endTime));
                filterList.addFilter(rf);
            }
            scan.setFilter(filterList);
            rs = table.getScanner(scan);
            for (Result r : rs) {
                Map<String , String> map = new HashMap<String , String>();
                for (Cell cell : r.listCells()) {
                    map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())
                            , Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
                list.add(map);
            }
        }catch (Exception e){
            e.printstacktrace();
        }finally {
            if (null != rs) {
                rs.close();
            }
            try {
                if (null != table) {
                    table.close();
                }
                if (null != connection && !connection.isClosed()) {
                    System.out.println("scan Result is closed");
                    connection.close();
                }
            } catch (IOException e) {
                e.printstacktrace();
            }
        }
        return list;



    }
View Code

上面就会查询出完整数据。

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐