SpringBoot 封装 HBase 操作工具类
创始人
2024-02-24 00:47:12
0

        最近项目中用到了Hbase相关的操作并封装成工具类,我的Hbase服务器端版本是2.1.0,图示如下:

        特此记录便于日后查阅。

一、pom.xml 依赖

org.apache.hbasehbase-shaded-client2.1.0

org.apache.hadoophadoop-common3.0.0

二、application.yml 项目配置

        此处我是自定义HBase配置,后面会有专门的配置类来加载这个配置

datasource:hbase:zookeeper:port: 2181quorum: 10.0.61.12,10.0.61.22,10.0.61.24znode:parent: ''

三、HbaseConfig 自定义配置类

import lombok.Data;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @description: Hbase配置类* @author: zhangzhixiang* @createDate: 2022/11/24* @version: 1.0*/
@Data
@Component
@Configuration
public class HbaseConfig {@Value("${datasource.hbase.zookeeper.quorum}")private String zookeeper;@Value("${datasource.hbase.zookeeper.znode.parent}")private String parent;@Value("${datasource.hbase.zookeeper.port}")private String port;public Connection getConnection() throws IOException {org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();config.set("hbase.zookeeper.quorum", zookeeper);config.set("hbase.zookeeper.property.clientPort", port);if (parent != null && !"".equals(parent)) {config.set("zookeeper.znode.parent", parent);}Connection connection = ConnectionFactory.createConnection(config);return connection;}
}

四、HbaseUtil 工具类

        首先添加 SpringContext 工具类,下面会用到:

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;/*** @Description: * @Author:zhangzhixiang* @Date: 2022/7/25* @Version: 1.0*/
@Component
public class SpringContext implements ApplicationContextAware {public static ApplicationContext context;public static Environment env;@Overridepublic void setApplicationContext(ApplicationContext context) throws BeansException {SpringContext.context = context;SpringContext.env = context.getEnvironment();}public static Object getBean(String name) {return context.getBean(name);}public static  T getBean(Class clazz) {return context.getBean(clazz);}public static ApplicationContext getContext() {return context;}public static Environment getEnv() {return env;}public static String getProperty(String key) {return getProperty(key, "");}public static String getProperty(String key, String defaultValue) {return env.getProperty(key, defaultValue);}public static  T getProperty(String key, Class targetType) {return env.getProperty(key, targetType);}public static String getActiveProfile() {return env.getActiveProfiles()[0];}
}

         然后我们来写 HbaseUtil 工具类的代码:

import com.swkj.common.base.context.SpringContext;
import com.swkj.common.base.log.GLog;
import com.swkj.common.base.log.LogFactory;
import com.swkj.common.hbase.config.HbaseConfig;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;/*** @description: Hbase工具类* @author: zhangzhixiang* @createDate: 2022/11/24* @version: 1.0*/
@DependsOn("springContext")
@Component
public class HbaseUtil {private static final GLog LOG = LogFactory.getLogger(HbaseUtil.class);private static HbaseConfig hbaseConfig = (HbaseConfig) SpringContext.getBean("hbaseConfig");private static Connection connection = null;private static Admin admin = null;private HbaseUtil() {if (connection == null) {try {connection = hbaseConfig.getConnection();admin = connection.getAdmin();} catch (IOException e) {LOG.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);}}}/*** 创建表** @param tableName    表名* @param columnFamily 列族(数组)*/public void createTable(String tableName, String[] columnFamily) throws IOException {TableName name = TableName.valueOf(tableName);//如果存在则删除if (admin.tableExists(name)) {admin.disableTable(name);admin.deleteTable(name);LOG.error("create htable error! this table {} already exists!", name);} else {HTableDescriptor desc = new HTableDescriptor(name);for (String cf : columnFamily) {desc.addFamily(new HColumnDescriptor(cf));}admin.createTable(desc);}}/*** 插入记录(单行单列族-多列多值)** @param tableName     表名* @param row           行名* @param columnFamilys 列族名* @param columns       列名(数组)* @param values        值(数组)(且需要和列一一对应)*/public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {TableName name = TableName.valueOf(tableName);Table table = connection.getTable(name);Put put = new Put(Bytes.toBytes(row));for (int i = 0; i < columns.length; i++) {put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));table.put(put);}}/*** 插入记录(单行单列族-单列单值)** @param tableName    表名* @param row          行名* @param columnFamily 列族名* @param column       列名* @param value        值*/public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {TableName name = TableName.valueOf(tableName);Table table = connection.getTable(name);Put put = new Put(Bytes.toBytes(row));put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));table.put(put);}/*** 删除一行记录** @param tablename 表名* @param rowkey    行名*/public void deleteRow(String tablename, String rowkey) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Delete d = new Delete(rowkey.getBytes());table.delete(d);}/*** 删除单行单列族记录** @param tablename    表名* @param rowkey       行名* @param columnFamily 列族名*/public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Delete d = new Delete(rowkey.getBytes()).addFamily(Bytes.toBytes(columnFamily));table.delete(d);}/*** 删除单行单列族单列记录** @param tablename    表名* @param rowkey       行名* @param columnFamily 列族名* @param column       列名*/public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Delete d = new Delete(rowkey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));table.delete(d);}/*** 查找一行记录** @param tablename 表名* @param rowKey    行名*/public static String selectRow(String tablename, String rowKey) throws IOException {String record = "";TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Get g = new Get(rowKey.getBytes());Result rs = table.get(g);NavigableMap>> map = rs.getMap();for (Cell cell : rs.rawCells()) {StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t").append(Bytes.toString(cell.getFamilyArray())).append("\t").append(Bytes.toString(cell.getQualifierArray())).append("\t").append(Bytes.toString(cell.getValueArray())).append("\n");String str = stringBuffer.toString();record += str;}return record;}/*** 查找单行单列族单列记录** @param tablename    表名* @param rowKey       行名* @param columnFamily 列族名* @param column       列名* @return*/public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Get g = new Get(rowKey.getBytes());g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));Result rs = table.get(g);return Bytes.toString(rs.value());}/*** 查询表中所有行(Scan方式)** @param tablename* @return*/public String scanAllRecord(String tablename) throws IOException {String record = "";TableName name = TableName.valueOf(tablename);Table table = connection.getTable(name);Scan scan = new Scan();ResultScanner scanner = table.getScanner(scan);try {for (Result result : scanner) {for (Cell cell : result.rawCells()) {StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t").append(Bytes.toString(cell.getFamilyArray())).append("\t").append(Bytes.toString(cell.getQualifierArray())).append("\t").append(Bytes.toString(cell.getValueArray())).append("\n");String str = stringBuffer.toString();record += str;}}} finally {if (scanner != null) {scanner.close();}}return record;}/*** 根据rowkey关键字查询报告记录** @param tablename* @param rowKeyword* @return*/public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {ArrayList list = new ArrayList<>();Table table = connection.getTable(TableName.valueOf(tablename));Scan scan = new Scan();//添加行键过滤器,根据关键字匹配RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));scan.setFilter(rowFilter);ResultScanner scanner = table.getScanner(scan);try {for (Result result : scanner) {//TODO 此处根据业务来自定义实现list.add(null);}} finally {if (scanner != null) {scanner.close();}}return list;}/*** 根据rowkey关键字和时间戳范围查询报告记录** @param tablename* @param rowKeyword* @return*/public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {ArrayList list = new ArrayList<>();Table table = connection.getTable(TableName.valueOf(tablename));Scan scan = new Scan();//添加scan的时间范围scan.setTimeRange(minStamp, maxStamp);RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));scan.setFilter(rowFilter);ResultScanner scanner = table.getScanner(scan);try {for (Result result : scanner) {//TODO 此处根据业务来自定义实现list.add(null);}} finally {if (scanner != null) {scanner.close();}}return list;}/*** 删除表操作** @param tablename*/public void deleteTable(String tablename) throws IOException {TableName name = TableName.valueOf(tablename);if (admin.tableExists(name)) {admin.disableTable(name);admin.deleteTable(name);}}
}

五、使用

        接下来只需要在项目业务类里注入hbaseUtils就可以使用了:

@Autowired
private HbaseUtil hbaseUtil;

        测试方法:

import com.swkj.common.hbase.utils.HbaseUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;/*** @description: Hbase工具类测试* @author: zhangzhixiang* @createDate: 2022/11/24* @version: 1.0*/
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(profiles = "local")
public class HbaseServiceTest {@Autowired
private HbaseUtil hbaseUtil;@Testpublic void testHbase() {try {hbaseUtil.createTable("Student", new String[]{"StuInfo", "Grades"});hbaseUtil.insertOneRecord("Student", "0001", "StuInfo", "name", "Tom Green");hbaseUtil.insertOneRecord("Student", "0002", "StuInfo", "Age", "18");System.out.println("=================" + hbaseUtil.selectValue("Student", "0001", "StuInfo", "name"));System.out.println("=================" + hbaseUtil.selectValue("Student", "0002", "StuInfo", "Age"));System.out.println("=================" + hbaseUtil.selectRow("Student", "0001"));System.out.println("=================" + hbaseUtil.selectRow("Student", "0002"));} catch (Exception e) {e.printStackTrace();}}
}

        到此 SpringBoot 封装 HBase 操作工具类介绍完成。

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...