import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Scan} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession /** * 使用Spark来建立HBase中表sound的二级索引 */ object MyIndexBuilder { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("MyIndexBuilder") .master("local") .getorCreate() // 1、创建HBaseContext val configuration = HBaseConfiguration.create() configuration.set("hbase.zookeeper.quorum", "master,slave1,slave2") val hBaseContext = new HBaseContext(spark.sparkContext, configuration) // 2、读取HBase表sound中的f:n和f:c两个列的值以及他们对应的rowKey的值 // 并且需要区分开是哪一个列的值 val soundRDD = hBaseContext.hbaseRDD(TableName.valueOf("sound"), new Scan()) val indexerRDD: RDD[((String, Array[Byte]), ImmutableBytesWritable)] = soundRDD.flatMap { case (byteRowKey, result) => val nameValue = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("n")) val categoryValue = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("c")) // 区分开是哪一个列的值,使用key来区分 // 返回key是(tableName,列值), value是这个列对应的rowKey的值 Seq((("name_indexer", nameValue), byteRowKey), (("category_indexer", categoryValue), byteRowKey)) } // 3、按照key进行分组,拿到相同列值对应的所有的rowKeys(因为在原表sound中多个rowKey的值可能会对应着相同的列值) val groupedindexerRDD: RDD[((String, Array[Byte]), Iterable[ImmutableBytesWritable])] = indexerRDD.groupByKey() // 4、将不同的列值以及对应的rowKeys写入到相对应的indexer表中 groupedindexerRDD.foreachPartition { partitionIterator => val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "master,slave1,slave2") val conn = ConnectionFactory.createConnection(conf) val nameIndexerTable = conn.getTable(TableName.valueOf("name_indexer")) val categoryIndexerTable = conn.getTable(TableName.valueOf("category_indexer")) try { val nameIndexerTablePuts = new util.ArrayList[Put]() val categoryIndexerTablePuts = new util.ArrayList[Put]() partitionIterator.map { case ((tableName, indexerValue), rowKeys) => val put = new Put(indexerValue) // 将列值作为索引表的rowKey rowKeys.foreach(rowKey => { put.addColumn(Bytes.toBytes("f"), null, rowKey.get()) }) if (tableName.equals("name_indexer")) { nameIndexerTablePuts.add(put) // 需要写入到表name_indexer中的数据 } else { categoryIndexerTablePuts.add(put) // 需要写入到表category_indexer中的数据 } } nameIndexerTable.put(nameIndexerTablePuts) categoryIndexerTable.put(categoryIndexerTablePuts) } finally { nameIndexerTable.close() categoryIndexerTable.close() conn.close() } } spark.stop() } }
第三步:查询结果 我们先从name_indexer这张表中按照RowKey查询属于“中国好声音”的记录,这些记录中的所有的列的值就是需要在sound中查询的RowKey的值 然后从category_indexer这张表中按照RowKey查询属于“综艺”的记录,这些记录中的所有的列的值就是需要在sound中查询的RowKey的值 最后将上面两步查询出来的结果做一个合并,就是将查询出来的结果做一次去重,得到了所有在sound中符合需求的RowKey,然后在根据这些RowKey去sound表中查询相应的数据 我们每一步查询都是根据HBase中的一级索引RowKey来查询的,所以查询速度会非常的快
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; public class SecondaryIndexSearcher { public static void main(String[] args) throws IOException { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", "master,slave1,slave2"); try(Connection connection = ConnectionFactory.createConnection(config)) { Table nameIndexer = connection.getTable(TableName.valueOf("name_indexer")); Table categoryIndexer = connection.getTable(TableName.valueOf("category_indexer")); Table sound = connection.getTable(TableName.valueOf("sound")); // 1、先从表name_indexer中找到rowKey包含“中国好声音”对应的所有的column值 Scan nameIndexerScan = new Scan(); SubstringComparator nameComp = new SubstringComparator("中国好声音"); RowFilter nameRowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, nameComp); nameIndexerScan.setFilter(nameRowFilter); Set<String> soundRowKeySetone = new HashSet<>(); ResultScanner rsOne = nameIndexer.getScanner(nameIndexerScan); try { for (Result r = rsOne.next(); r != null; r = rsOne.next()) { for (Cell cell : r.listCells()) { soundRowKeySetone.add(Bytes.toString(CellUtil.cloneValue(cell))); } } } finally { rsOne.close(); // always close the ResultScanner! } // 2、再从表category_indexer中找到rowKey包含“综艺”对应的所有的column值 Scan categoryIndexerScan = new Scan(); SubstringComparator categoryComp = new SubstringComparator("综艺"); RowFilter categoryRowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, categoryComp); nameIndexerScan.setFilter(categoryRowFilter); Set<String> soundRowKeySetTwo = new HashSet<>(); ResultScanner rsTwo = categoryIndexer.getScanner(categoryIndexerScan); try { for (Result r = rsTwo.next(); r != null; r = rsTwo.next()) { for (Cell cell : r.listCells()) { soundRowKeySetTwo.add(Bytes.toString(CellUtil.cloneValue(cell))); } } } finally { rsTwo.close(); // always close the ResultScanner! } // 3、合并并去重上面两步查询的结果 soundRowKeySetone.addAll(soundRowKeySetTwo); // 4、根据soundRowKeySetone中所有的rowKeys去sound表中查询数据 List<Get> gets = new ArrayList<>(); for (String rowKey : soundRowKeySetone) { Get get = new Get(Bytes.toBytes(rowKey)); gets.add(get); } Result[] results = sound.get(gets); for (Result result : results) { for (Cell cell : result.listCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "===> " + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "{" + Bytes.toString(CellUtil.cloneValue(cell)) + "}"); } } } } }
结论: 那么表name_indexer和category_indexer中的RowKey就是我们解决问题的二级索引, 所以二级索引的本质就是:建立各列值与行键之间的映射关系 最后,我们需要知道创建HBase二级索引的方式 1、Spark来实现二级索引的建立 我们前面使用的是Spark来实现二级索引的建立,但是这种方式适用于离线批处理,这些二级索引是每天或者每段时间执行一次的建立的 2、使用HBase的协处理器(coprocessor) 对于如果数据是实时更新的话,则这种离线批处理的方式是不行的,这个时候我们可以使用HBase的协处理器(coprocessor) HBase的协处理器(coprocessor)的介绍可以参考:https://www.cnblogs.com/small-k/p/9648453.html 3、HBase + Solr其实也是一个二级索引实现,只不过是把二级索引存储在Solr中
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。