微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Java连接hbase获取数据

以下代码是同一个服务连接两个不同的hbase集群获取数据

1、pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>test</groupId>
	<artifactId>test</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>test</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.6.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-auth</artifactId>
			<version>2.6.0</version>
		</dependency>


		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-client</artifactId>
			<version>1.3.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-common</artifactId>
			<version>1.3.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-protocol</artifactId>
			<version>1.3.1</version>
		</dependency>
		
		
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<version>2.0.0.RELEASE</version>
			</plugin>
		</plugins>
	</build>
</project>

2、集群配置文件加载类

package hbase;

import java.util.HashMap;
import java.util.Map;

public class HBaseDaoHolder {

	private static volatile HBaseDao BDSE_HBASEDAO = null;

	private static volatile HBaseDao BDCSC_HBASEDAO = null;




	public static void initAllHbase() {
		initBdse();
		initBdcsc();
	}

	/**
	 * 初始化CDH版Hbase
	 */
	private static void initBdse() {
		// 创建hbasedao
		Map<String, String> hbaseConf = new HashMap<String, String>();
		String ip = "10.161.1.29,10.161.1.30,10.161.1.31";
		hbaseConf.put("hbase.zookeeper.property.maxclientcnxns", "300");
		hbaseConf.put("hbase.ipc.client.socket.timeout.connect", "1000");
		hbaseConf.put("zookeeper.session.timeout", "500");
		hbaseConf.put("hbase.regionserver.handler.count", "500");
		hbaseConf.put("hadoop.security.authentication", "kerberos");
		hbaseConf.put("hbase.master.kerberos.principal", "hbase/[email protected]");
		hbaseConf.put("hbase.regionserver.kerberos.principal", "hbase/[email protected]");
		hbaseConf.put("hbase.zookeeper.property.clientPort", "2181");
		hbaseConf.put("hbase.security.authentication", "kerberos");
		hbaseConf.put("hbase.zookeeper.quorum", ip);
		hbaseConf.put("hadoop.security.auth_to_local", "RULE:[1:$1]");
		BDSE_HBASEDAO = new bdseHbaseDaoImpl(hbaseConf, Boolean.valueOf(true));
	}

	private static void initBdcsc() {
		// 创建hbasedao
		Map<String, String> hbaseConf = new HashMap<String, String>();
		String ip = "10.166.0.6,10.166.0.5,10.166.0.4";
		hbaseConf.put("hbase.zookeeper.property.maxclientcnxns", "300");
		hbaseConf.put("hbase.ipc.client.socket.timeout.connect", "1000");
		hbaseConf.put("zookeeper.session.timeout", "500");
		hbaseConf.put("hbase.regionserver.handler.count", "500");
		hbaseConf.put("hadoop.security.authentication", "kerberos");
		hbaseConf.put("hbase.master.kerberos.principal", "hbase/[email protected]");
		hbaseConf.put("hbase.regionserver.kerberos.principal", "hbase/[email protected]");
		hbaseConf.put("hbase.zookeeper.property.clientPort", "2181");
		hbaseConf.put("hbase.security.authentication", "kerberos");
		hbaseConf.put("hbase.zookeeper.quorum", ip);
		hbaseConf.put("zookeeper.znode.parent", "/hbase1");
		hbaseConf.put("hadoop.security.auth_to_local", "RULE:[1:$1]");
		BDCSC_HBASEDAO = new bdcscHbaseDaoImpl(hbaseConf, Boolean.valueOf(true));
	}

	public static HBaseDao getBdcscHBaseDao() {
		if (BDCSC_HBASEDAO == null) {
			synchronized (HBaseDaoHolder.class) {
				if (BDCSC_HBASEDAO == null) {
					initBdcsc();
				}
			}
		}
		return BDCSC_HBASEDAO;
	}

	public static HBaseDao getBdseHBaseDao() {
		if (BDSE_HBASEDAO == null) {
			synchronized (HBaseDaoHolder.class) {
				if (BDSE_HBASEDAO == null) {
					initBdse();
				}
			}
		}
		return BDSE_HBASEDAO;
	}

}

3、bdcsc集群连接类

package hbase;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroup@R_720_4045@ion;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class bdcscHbaseDaoImpl implements HBaseDao {

	private Logger logger = LoggerFactory.getLogger(bdcscHbaseDaoImpl.class);
	public static final String FLOWNUM = "flowNum";
	private HConnection hTablePool;

	private Configuration conf;
	private UserGroup@R_720_4045@ion ugi;

	public bdcscHbaseDaoImpl(Map<String, String> hbaseConf, Boolean isCDH) {
		conf = HBaseConfiguration.create();
		conf.setInt("hbase.client.retries.number", 2);
		for (String key : hbaseConf.keySet()) {
			conf.set(key, hbaseConf.get(key));
		}

		try {
			if (isCDH.booleanValue()) {
				System.setProperty("java.security.krb5.conf", utils.read("bdcsc_krb5"));
				UserGroup@R_720_404[email protected]figuration(conf);
				ugi = UserGroup@R_720_404[email protected]("probdcsc", utils.read("bdcsc_keyber"));// "/home/bdcsc2/myhome/probdcsc.keytab"
				System.out.println("-------bdcsc當前用戶-----------" + UserGroup@R_720_404[email protected]().toString());
				ugi.doAs(new PrivilegedExceptionAction<Void>() {

					public Void run() throws Exception {
						hTablePool = HConnectionManager.createConnection(conf);
						return null;
					}
				});
			}
		} catch (Exception e) {
			System.out.println("认证失败::::::HbaseDaoImpl");
		}
	}

	public Result getoneRow(String tableName, String rowKey) {
		long accessBegin = System.currentTimeMillis();
		HTableInterface table = null;
		Result rsResult = null;
		try {
			table = hTablePool.getTable(tableName);
			Get get = new Get(rowKey.getBytes());
			rsResult = table.get(get);
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
			System.out.println("数据获取失败:::::bdcscHbaseDaoImpl----getoneRow");
		} finally {
			if (table != null) {
				try {
					table.close();
				} catch (IOException e) {
					logger.error(e.getMessage(), e);
					table = null;
				}
			}
		}
		long accessEnd = System.currentTimeMillis();
		return rsResult;
	}

}

4、bdse集群连接类

package hbase;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroup@R_720_4045@ion;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class bdseHbaseDaoImpl implements HBaseDao {

	private Logger logger = LoggerFactory.getLogger(bdseHbaseDaoImpl.class);
	public static final String FLOWNUM = "flowNum";
	private HConnection hTablePool;

	private Configuration conf;
	private UserGroup@R_720_4045@ion ugi;

	public bdseHbaseDaoImpl(Map<String, String> hbaseConf, Boolean isCDH) {
		conf = HBaseConfiguration.create();
		conf.setInt("hbase.client.retries.number", 2);
		for (String key : hbaseConf.keySet()) {
			conf.set(key, hbaseConf.get(key));
		}

		try {
			if (isCDH.booleanValue()) {
				System.setProperty("java.security.krb5.conf", utils.read("bdse_krb5"));
				UserGroup@R_720_404[email protected]figuration(conf);
				ugi = UserGroup@R_720_404[email protected]("probdcsc", utils.read("bdse_keyber"));// "/home/bdcsc2/myhome/probdcsc.keytab"
				System.out.println("-------bdse當前用戶-----------" + UserGroup@R_720_404[email protected]().toString());
				ugi.doAs(new PrivilegedExceptionAction<Void>() {

					public Void run() throws Exception {
						hTablePool = HConnectionManager.createConnection(conf);
						return null;
					}
				});
			}
		} catch (Exception e) {
			System.out.println("认证失败::::::HbaseDaoImpl");
		}
	}

	public Result getoneRow(String tableName, String rowKey) {
		long accessBegin = System.currentTimeMillis();
		HTableInterface table = null;
		Result rsResult = null;
		try {
			table = hTablePool.getTable(tableName);
			Get get = new Get(rowKey.getBytes());
			rsResult = table.get(get);
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
			System.out.println("数据获取失败:::::bdcscHbaseDaoImpl----getoneRow");
		} finally {
			if (table != null) {
				try {
					table.close();
				} catch (IOException e) {
					logger.error(e.getMessage(), e);
					table = null;
				}
			}
		}
		long accessEnd = System.currentTimeMillis();
		return rsResult;
	}

}

5、bdcsc集群获取数据类

package hbase;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class bdcscGet {

	public static void main(String[] args) {
		String tableName = utils.read("bdcsc_table");
		String rowkey = utils.read("bdcsc_rowkey");

		Result res;
		try {
			res = HBaseDaoHolder.getBdcscHBaseDao().getoneRow(tableName, rowkey.toupperCase());
			byte[] databytes = res.getValue(utils.read("bdcsc_liecu").getBytes(), utils.read("bdcsc_ziduan").getBytes());
			if (databytes != null) {
				System.out.println(Bytes.toString(databytes));// zl 修改 防止中文乱码
			}
		} catch (Exception e) {
			System.out.println("查询数据失败bdcscGet");
			e.printstacktrace();
		}
	}

}

6、bdse集群获取数据类

package hbase;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class bdseGet {

	public static void main(String[] args) {
		String tableName = utils.read("bdse_table");
		String rowkey = utils.read("bdse_rowkey");

		Result res;
		try {
			res = HBaseDaoHolder.getBdseHBaseDao().getoneRow(tableName, rowkey.toupperCase());
			byte[] databytes = res.getValue(utils.read("bdse_liecu").getBytes(), utils.read("bdse_ziduan").getBytes());
			if (databytes != null) {
				System.out.println(Bytes.toString(databytes));// zl 修改 防止中文乱码
			}
		} catch (Exception e) {
			System.out.println("查询数据失败bdseGet");
			e.printstacktrace();
		}
	}
	
}

7、读取配置文件工具类

package hbase;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Properties;

public class utils {

	public static String read(String name) {
		try {
			Properties prop = new Properties();
			InputStream in;
			in = new BufferedInputStream(new FileInputStream("a.properties"));
			prop.load(in); /// 加载属性列表
			Iterator<String> it = prop.stringPropertyNames().iterator();
			String property = prop.getProperty(name);
			in.close();
			return property ;
		} catch (Exception e) {
			System.out.println("配置文件读取异常");
			e.printstacktrace();
		}
		return null;
	}

}

8、集群连接接口

package hbase;

import org.apache.hadoop.hbase.client.Result;

public interface HBaseDao {

	public Result getoneRow(String tableName, String rowKey) throws Exception;
	
}

9、启动类

package hbase;

import java.nio.charset.Charset;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class boot {

	public static void main(String[] args) throws Exception {
		System.out.println("系统入口");
		Logger LOGGER = LoggerFactory.getLogger(boot.class);
		String appName = "hbasetest";

		 //测试获取系统认编码    
        System.out.println("获取系统认编码"+System.getProperty("file.encoding"));     
             
        //测试获取系统认的字符编码   
        System.out.println("获取系统认的字符编码  "+Charset.defaultCharset());
		
        HBaseDaoHolder.initAllHbase();
		synchronized (boot.class) {
			while (true) {
				try {
					boot.class.wait();
				} catch (Exception e) {
					throw new RuntimeException(e);
				}
			}
		}

	}
	
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐