最近项目中用到了Hbase相关的操作并封装成工具类,我的Hbase服务器端版本是2.1.0,图示如下:
特此记录便于日后查阅。
org.apache.hbase hbase-shaded-client 2.1.0
org.apache.hadoop hadoop-common 3.0.0
二、application.yml 项目配置
此处我是自定义HBase配置,后面会有专门的配置类来加载这个配置
datasource:hbase:zookeeper:port: 2181quorum: 10.0.61.12,10.0.61.22,10.0.61.24znode:parent: ''
三、HbaseConfig 自定义配置类
import lombok.Data;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @description: Hbase配置类* @author: zhangzhixiang* @createDate: 2022/11/24* @version: 1.0*/
@Data
@Component
@Configuration
public class HbaseConfig {@Value("${datasource.hbase.zookeeper.quorum}")private String zookeeper;@Value("${datasource.hbase.zookeeper.znode.parent}")private String parent;@Value("${datasource.hbase.zookeeper.port}")private String port;public Connection getConnection() throws IOException {org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();config.set("hbase.zookeeper.quorum", zookeeper);config.set("hbase.zookeeper.property.clientPort", port);if (parent != null && !"".equals(parent)) {config.set("zookeeper.znode.parent", parent);}Connection connection = ConnectionFactory.createConnection(config);return connection;}
}
四、HbaseUtil 工具类
首先添加 SpringContext 工具类,下面会用到:
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;/*** @Description: * @Author:zhangzhixiang* @Date: 2022/7/25* @Version: 1.0*/
@Component
public class SpringContext implements ApplicationContextAware {public static ApplicationContext context;public static Environment env;@Overridepublic void setApplicationContext(ApplicationContext context) throws BeansException {SpringContext.context = context;SpringContext.env = context.getEnvironment();}public static Object getBean(String name) {return context.getBean(name);}public static T getBean(Class clazz) {return context.getBean(clazz);}public static ApplicationContext getContext() {return context;}public static Environment getEnv() {return env;}public static String getProperty(String key) {return getProperty(key, "");}public static String getProperty(String key, String defaultValue) {return env.getProperty(key, defaultValue);}public static T getProperty(String key, Class targetType) {return env.getProperty(key, targetType);}public static String getActiveProfile() {return env.getActiveProfiles()[0];}
}
然后我们来写 HbaseUtil 工具类的代码:
import com.swkj.common.base.context.SpringContext;
import com.swkj.common.base.log.GLog;
import com.swkj.common.base.log.LogFactory;
import com.swkj.common.hbase.config.HbaseConfig;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;/*** @description: Hbase工具类* @author: zhangzhixiang* @createDate: 2022/11/24* @version: 1.0*/
@DependsOn("springContext")
@Component
public class HbaseUtil {private static final GLog LOG = LogFactory.getLogger(HbaseUtil.class);private static HbaseConfig hbaseConfig = (HbaseConfig) SpringContext.getBean("hbaseConfig");private static Connection connection = null;private static Admin admin = null;private HbaseUtil() {if (connection == null) {try {connection = hbaseConfig.getConnection();admin = connection.getAdmin();} catch (IOException e) {LOG.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);}}}/*** 创建表** @param tableName 表名* @param columnFamily 列族(数组)*/public void createTable(String tableName, String[] columnFamily) throws IOException {TableName name = TableName.valueOf(tableName);//如果存在则删除if (admin.tableExists(name)) {admin.disableTable(name);admin.deleteTable(name);LOG.error("create htable error! this table {} already exists!", name);} else {HTableDescriptor desc = new HTableDescriptor(name);for (String cf : columnFamily) {desc.addFamily(new HColumnDescriptor(cf));}admin.createTable(desc);}}/*** 插入记录(单行单列族-多列多值)** @param tableName 表名* @param row 行名* @param columnFamilys 列族名* @param columns 列名(数组)* @param values 值(数组)(且需要和列一一对应)*/public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {TableName name = TableName.valueOf(tableName);Table table = connection.getTable(name);Put put = new Put(Bytes.toBytes(row));for (int i = 0; i < columns.length; i++) {put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));table.put(put);}}/*** 插入记录(单行单列族-单列单值)** @param tableName 表名* @param row 行名* @param columnFamily 列族名* @param column 列名* @param value 值*/public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {TableName name = TableName.valueOf(tableName);Table table = connection.getTable(name);Put put = new Put(Bytes.toBytes(row));put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));table.put(put);}/*** 删除一行记录** @param tablename 表名* @param rowkey 行名*/public void deleteRow(String tablename, String rowkey) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Delete d = new Delete(rowkey.getBytes());table.delete(d);}/*** 删除单行单列族记录** @param tablename 表名* @param rowkey 行名* @param columnFamily 列族名*/public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Delete d = new Delete(rowkey.getBytes()).addFamily(Bytes.toBytes(columnFamily));table.delete(d);}/*** 删除单行单列族单列记录** @param tablename 表名* @param rowkey 行名* @param columnFamily 列族名* @param column 列名*/public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Delete d = new Delete(rowkey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));table.delete(d);}/*** 查找一行记录** @param tablename 表名* @param rowKey 行名*/public static String selectRow(String tablename, String rowKey) throws IOException {String record = "";TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Get g = new Get(rowKey.getBytes());Result rs = table.get(g);NavigableMap>> map = rs.getMap();for (Cell cell : rs.rawCells()) {StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t").append(Bytes.toString(cell.getFamilyArray())).append("\t").append(Bytes.toString(cell.getQualifierArray())).append("\t").append(Bytes.toString(cell.getValueArray())).append("\n");String str = stringBuffer.toString();record += str;}return record;}/*** 查找单行单列族单列记录** @param tablename 表名* @param rowKey 行名* @param columnFamily 列族名* @param column 列名* @return*/public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Get g = new Get(rowKey.getBytes());g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));Result rs = table.get(g);return Bytes.toString(rs.value());}/*** 查询表中所有行(Scan方式)** @param tablename* @return*/public String scanAllRecord(String tablename) throws IOException {String record = "";TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Scan scan = new Scan();ResultScanner scanner = table.getScanner(scan);try {for (Result result : scanner) {for (Cell cell : result.rawCells()) {StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t").append(Bytes.toString(cell.getFamilyArray())).append("\t").append(Bytes.toString(cell.getQualifierArray())).append("\t").append(Bytes.toString(cell.getValueArray())).append("\n");String str = stringBuffer.toString();record += str;}}} finally {if (scanner != null) {scanner.close();}}return record;}/*** 根据rowkey关键字查询报告记录** @param tablename* @param rowKeyword* @return*/public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {ArrayList list = new ArrayList<>();Table table = connection.getTable(TableName.valueOf(tablename));Scan scan = new Scan();//添加行键过滤器,根据关键字匹配RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));scan.setFilter(rowFilter);ResultScanner scanner = table.getScanner(scan);try {for (Result result : scanner) {//TODO 此处根据业务来自定义实现list.add(null);}} finally {if (scanner != null) {scanner.close();}}return list;}/*** 根据rowkey关键字和时间戳范围查询报告记录** @param tablename* @param rowKeyword* @return*/public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {ArrayList list = new ArrayList<>();Table table = connection.getTable(TableName.valueOf(tablename));Scan scan = new Scan();//添加scan的时间范围scan.setTimeRange(minStamp, maxStamp);RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));scan.setFilter(rowFilter);ResultScanner scanner = table.getScanner(scan);try {for (Result result : scanner) {//TODO 此处根据业务来自定义实现list.add(null);}} finally {if (scanner != null) {scanner.close();}}return list;}/*** 删除表操作** @param tablename*/public void deleteTable(String tablename) throws IOException {TableName name = TableName.valueOf(tablename);if (admin.tableExists(name)) {admin.disableTable(name);admin.deleteTable(name);}}
}
五、使用
接下来只需要在项目业务类里注入hbaseUtils就可以使用了:
@Autowired
private HbaseUtil hbaseUtil;
测试方法:
import com.swkj.common.hbase.utils.HbaseUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;/*** @description: Hbase工具类测试* @author: zhangzhixiang* @createDate: 2022/11/24* @version: 1.0*/
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(profiles = "local")
public class HbaseServiceTest {@Autowired
private HbaseUtil hbaseUtil;@Testpublic void testHbase() {try {hbaseUtil.createTable("Student", new String[]{"StuInfo", "Grades"});hbaseUtil.insertOneRecord("Student", "0001", "StuInfo", "name", "Tom Green");hbaseUtil.insertOneRecord("Student", "0002", "StuInfo", "Age", "18");System.out.println("=================" + hbaseUtil.selectValue("Student", "0001", "StuInfo", "name"));System.out.println("=================" + hbaseUtil.selectValue("Student", "0002", "StuInfo", "Age"));System.out.println("=================" + hbaseUtil.selectRow("Student", "0001"));System.out.println("=================" + hbaseUtil.selectRow("Student", "0002"));} catch (Exception e) {e.printStackTrace();}}
}
到此 SpringBoot 封装 HBase 操作工具类介绍完成。