HBase 存储船舶轨迹方案

背景:项目需求要存储三个月的船舶轨迹数据,并且能够根据时间段快速查询单艘船的轨迹数据。船舶数据4~5万,5分钟记录一次轨迹。

1、环境基础:已经安装部署完成hbase 集群环境 ,安装部署链接: https://blog.csdn.net/weixin_41352552/article/details/109851768.

2、运用 springboot 集成hbase,使用定时器从redis中循环取出船舶点插入hbase,轨迹数量达到1千万。

1、目录架构

2、引入相关依赖 pom.xml 文件

<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.0</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion><exclusion><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-hadoop</artifactId><version>2.5.0.RELEASE</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0</version></dependency>

3、hbase连接配置

在application.properties 文件中添加以下配置

################################HBASE 配置##########################################hbase.zookeeper.quorum=master:2181,slave1:2181 # hbase集群的ip+端口

HBaseConn 连接配置

@Component
public class HBaseConn {@Value("${hbase.zookeeper.quorum}")private String zookeeperQuorum;private static final HBaseConn INSTANCE = new HBaseConn();private static Configuration configuration;private static Connection connection;private HBaseConn() {try {if (configuration == null) {configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);}} catch (Exception e) {e.printStackTrace();}}private Connection getConnection() {if (connection == null || connection.isClosed()) {try {connection = ConnectionFactory.createConnection(configuration);} catch (Exception e) {e.printStackTrace();}}return connection;}public static Connection getHBaseConn() {return INSTANCE.getConnection();}public static Configuration getConfiguration() {return configuration;}public static void closeConn() {if (connection != null) {try {connection.close();} catch (IOException ioe) {ioe.printStackTrace();}}}public static Table getTable(String tableName) throws IOException {return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));}
}

HBaseUtil Hbase的工具包

public class HBaseUtil {/*** 创建HBase表.** @param tableName 表名* @param cfs       列族的数组* @return 是否创建成功 boolean*/public static boolean createTable(String tableName, String[] cfs) {try (HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin()) {if (admin.tableExists(tableName)) {return false;}HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));String nameAsString = tableDescriptor.getNameAsString();System.out.println(nameAsString);Arrays.stream(cfs).forEach(cf -> {HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf);columnDescriptor.setMaxVersions(1);tableDescriptor.addFamily(columnDescriptor);});admin.createTable(tableDescriptor);} catch (Exception e) {e.printStackTrace();}return true;}/*** 删除hbase表.** @param tableName 表名* @return 是否删除成功 boolean*/public static boolean deleteTable(String tableName) {try (HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin()) {admin.disableTable(tableName);admin.deleteTable(tableName);} catch (Exception e) {e.printStackTrace();}return true;}/*** hbase插入一条数据.** @param tableName 表名* @param rowKey    唯一标识* @param cfName    列族名* @param qualifier 列标识* @param data      数据* @return 是否插入成功 boolean*/public static boolean putRow(String tableName, String rowKey, String cfName, String qualifier,String data) {try (Table table = HBaseConn.getTable(tableName)) {Put put = new Put(Bytes.toBytes(rowKey));put.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier), Bytes.toBytes(data));table.put(put);} catch (IOException ioe) {ioe.printStackTrace();return false;}return true;}/*** 插入多条数据** @param tableName the table name* @param puts      the puts* @return the boolean*/public static boolean putRows(String tableName, List<Put> puts) {try (Table table = HBaseConn.getTable(tableName)) {table.put(puts);} catch (IOException ioe) {ioe.printStackTrace();return false;}return true;}/*** 获取单条数据.** @param tableName 表名* @param rowKey    唯一标识* @return 查询结果 row*/public static Result getRow(String tableName, String rowKey) {try (Table table = HBaseConn.getTable(tableName)) {Get get = new Get(Bytes.toBytes(rowKey));return table.get(get);} catch (IOException ioe) {ioe.printStackTrace();return null;}}/*** 获取数据** @param tableName  the table name* @param rowKey     the row key* @param filterList the filter list* @return the row*/public static Result getRow(String tableName, String rowKey, FilterList filterList) {try (Table table = HBaseConn.getTable(tableName)) {Get get = new Get(Bytes.toBytes(rowKey));get.setFilter(filterList);return table.get(get);} catch (IOException ioe) {ioe.printStackTrace();return null;}}/*** 检索数据..** @param tableName the table name* @return the scanner*/public static ResultScanner getScanner(String tableName) {try (Table table = HBaseConn.getTable(tableName)) {Scan scan = new Scan();scan.setCaching(1000);return table.getScanner(scan);} catch (IOException ioe) {ioe.printStackTrace();}return null;}/*** 批量检索数据.** @param tableName   表名* @param startRowKey 起始RowKey* @param endRowKey   终止RowKey* @return ResultScanner实例 scanner*/public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey) {try (Table table = HBaseConn.getTable(tableName)) {Scan scan = new Scan();scan.setStartRow(Bytes.toBytes(startRowKey));scan.setStopRow(Bytes.toBytes(endRowKey));scan.setCaching(1000);return table.getScanner(scan);} catch (IOException ioe) {ioe.printStackTrace();}return null;}/*** Gets scanner.** @param tableName   the table name* @param startRowKey the start row key* @param endRowKey   the end row key* @param filterList  the filter list* @return the scanner*/public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey,FilterList filterList) {try (Table table = HBaseConn.getTable(tableName)) {Scan scan = new Scan();scan.setStartRow(Bytes.toBytes(startRowKey));scan.setStopRow(Bytes.toBytes(endRowKey));scan.setFilter(filterList);scan.setCaching(1000);return table.getScanner(scan);} catch (IOException ioe) {ioe.printStackTrace();}return null;}/*** HBase删除一行记录.** @param tableName 表名* @param rowKey    唯一标识* @return 是否删除成功 boolean*/public static boolean deleteRow(String tableName, String rowKey) {try (Table table = HBaseConn.getTable(tableName)) {Delete delete = new Delete(Bytes.toBytes(rowKey));table.delete(delete);} catch (IOException ioe) {ioe.printStackTrace();}return true;}/*** Delete column family boolean.** @param tableName the table name* @param cfName    the cf name* @return the boolean*/public static boolean deleteColumnFamily(String tableName, String cfName) {try (HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin()) {admin.deleteColumn(tableName, cfName);} catch (Exception e) {e.printStackTrace();}return true;}/*** Delete qualifier boolean.** @param tableName the table name* @param rowKey    the row key* @param cfName    the cf name* @param qualifier the qualifier* @return the boolean*/public static boolean deleteQualifier(String tableName, String rowKey, String cfName,String qualifier) {try (Table table = HBaseConn.getTable(tableName)) {Delete delete = new Delete(Bytes.toBytes(rowKey));delete.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier));table.delete(delete);} catch (IOException ioe) {ioe.printStackTrace();}return true;}/*** 利用协处理器进行全表count统计** @param tableName the tableName* @return the long*/public static Long countRowsWithCoprocessor(String tableName) {try (Table table = HBaseConn.getTable(tableName);Connection hBaseConn = HBaseConn.getHBaseConn();Admin admin = hBaseConn.getAdmin()) {TableName name = table.getName();HTableDescriptor descriptor = admin.getTableDescriptor(name);String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";if (!descriptor.hasCoprocessor(coprocessorClass)) {admin.disableTable(name);descriptor.addCoprocessor(coprocessorClass);admin.modifyTable(name, descriptor);admin.enableTable(name);}//计时StopWatch stopWatch = new StopWatch();stopWatch.start();Scan scan = new Scan();AggregationClient aggregationClient = new AggregationClient(HBaseConn.getConfiguration());Long count = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan);stopWatch.stop();System.out.println("RowCount:" + count + ",全表count统计耗时:" + stopWatch.getTotalTimeMillis());return count;} catch (Throwable ioe) {ioe.printStackTrace();}return 0L;}
}

HBaseController 测试HBase 的controller

@RestController
@RequestMapping("/hbase")
public class HBaseController {@Resourceprivate RedisUtils redisUtils;/*** 插入轨迹数据* @return return*/@GetMapping("/saveShipPoint")public String saveShipPoint() {// 从redis中取出船舶轨迹点数据List<Object> list = redisUtils.getMaster().getValuesFromPattern("SHIP1:*");if (list == null) {return null;}List<Put> putList = new ArrayList<>(list.size()*2);list.forEach(e->{// 循环遍历JSONObject data = JSONObject.parseObject(e.toString());String shipNumber = data.getString("shipNumber");String shipName = data.getString("shipName");String longitude = data.getString("longitude");String latitude = data.getString("latitude");if(!StringUtils.isEmpty(shipNumber) && !StringUtils.isEmpty(shipName)&& longitude!=null && latitude!=null){// 行键 船舶唯一标识9位码+当前时间Put put = new Put(Bytes.toBytes(shipNumber+"-"+System.currentTimeMillis()));// 船舶九位码put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("shipNumber"), Bytes.toBytes(shipNumber));// 船舶名称(中文)put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("shipName"), Bytes.toBytes(shipName));// 经度put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("longitude"), Bytes.toBytes(longitude));// 纬度put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("latitude"), Bytes.toBytes(latitude));//put插入数据putList.add(put);}});if (!CollectionUtils.isEmpty(putList)) {// 插入ship表中HBaseUtil.putRows("ship", putList);}return  "OK";}/*** 获取ship表中的记录总数* @return ship表中的记录总数*/@GetMapping("/count")public Long getShipCount() {return HBaseUtil.countRowsWithCoprocessor("ship");}/*** 根据时间段查询船舶轨迹* @return re*/@GetMapping("/find")public String find() {//1605843004000 1605843184000 1605843124000// 使用行键前缀过滤器Filter filter = new PrefixFilter(Bytes.toBytes("413790328-"));FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, Collections.singletonList(filter));// 添加时间段条件ResultScanner scanner = HBaseUtil.getScanner("ship", "413790328-1605841204000", "413790328-1605856498271", filterList);AtomicInteger i = new AtomicInteger();// 获取到对应的船舶轨迹listif (scanner != null) {scanner.forEach(result -> {i.getAndIncrement();System.out.println("rowkey=" + Bytes.toString(result.getRow()));System.out.println("fileName=" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("shipName"))));});System.out.println("一共获取数据:"+ i.get());// 关闭scanner连接scanner.close();}return "OK"+i.get();}/*** 创建表* @return ship表*/@GetMapping("/createShipTable")public Boolean createShipTable() {return HBaseUtil.createTable("ship", new String[]{"info"});}
}

ShipSchedule 定时器

@Component
@Slf4j
public class ShipSchedule {@ResourceHBaseController hBaseController;// 定时存入轨迹数据//@Scheduled(cron = "0/10 * * * * ? ")public void saveShipPoint(){hBaseController.saveShipPoint();}
}

redis中的测试数据(2万6)

实现步骤

1、访问createShipTable方法创建ship表

2、启动定时器 运行saveShipPoint方法,保存船舶得轨迹测试数据(数量达到1000W,大概持续运行一个小时)

3、访问getShipCount方法获取表的总记录数(全表计算用时比较长)

4、访问find方法根据时间段查询船舶轨迹(查询速度达到毫秒级70ms)

5、轨迹数据表可以按天创建每天创建一个ship+日期表(如ship20201122、ship20201123等)。

HBase 存储船舶轨迹方案相关推荐

  1. HBase存储剖析与数据迁移

    1.概述 HBase的存储结构和关系型数据库不一样,HBase面向半结构化数据进行存储.所以,对于结构化的SQL语言查询,HBase自身并没有接口支持.在大数据应用中,虽然也有SQL查询引擎可以查询H ...

  2. 基于云上分布式NoSQL的海量气象数据存储和查询方案

    前言 气象数据是一类典型的大数据,具有数据量大.时效性高.数据种类丰富等特点.气象数据中大量的数据是时空数据,记录了时间和空间范围内各个点的各个物理量的观测量或者模拟量,每天产生的数据量常在几十TB到 ...

  3. HBase存储相关概念

    行存:mysql中定义列也会占用存储空间 面向列:列并非事先定义 hbase存储的是KV对 非结构化数据:比如爬取的数据,列不确定 结构化数据:要有什么字段就所有行都有 半结构化数据:json数据即是 ...

  4. 万兆NAS存储网络组建方案

    NAS存储网络是满足中小企业实现低成本存储需求的解决方案,其中,NAS指的是网络附加存储,是存储数据的设备,被称为NAS存储器,它以数据为中心,将存储设备与服务器彻底分离,集中管理数据,从而释放带宽. ...

  5. 给我一个西门子plc采集大数据存储与分析方案

    对于西门子PLC采集大数据存储与分析方案,下面是一个建议: 数据采集: 在PLC中设置数据采集程序,以记录关键数据并定期发送到数据存储仓库. 数据存储: 使用大数据存储技术,例如 Hadoop.Spa ...

  6. 基于IPSAN的存储典型备份方案

    本贴转自 http://www.datakingtech.com/solutionsEnt_IPSAN.htm 关键词:数据库 安全 迁移 克隆 存储 DBE 备份 恢复 金橙 备份任务 集中备份管理 ...

  7. 分布式爬虫系统设计、实现与实战:爬取京东、苏宁易购全网手机商品数据+MySQL、HBase存储

    1 概述 在不用爬虫框架的情况,经过多方学习,尝试实现了一个分布式爬虫系统,并且可以将数据保存到不同地方,类似MySQL.HBase等. 基于面向接口的编码思想来开发,因此这个系统具有一定的扩展性,有 ...

  8. 船舶轨迹预测的方法有哪些

    船舶轨迹预测的方法有以下几种: 线性预测: 这种方法基于过去的船舶轨迹数据,使用线性回归或其他线性模型来预测未来的轨迹. 多项式预测: 这种方法使用多项式回归或其他多项式模型来预测未来的轨迹. 神经网 ...

  9. python Cartopy 船舶轨迹数据可视化 【GPS AIS VMS】

    更新: 最近又发现了更好用的工具keplergl,基本不需要写代码,拿着数据拆箱即用. 欢迎访问我的这篇博客:酷炫Keplergl 实现功能:将轨迹数据可视化到地图上 适用范围:车辆.船舶 等含有GP ...

最新文章

  1. 中国高校人工智能学院院长 【截止到 2019-05-07】
  2. [Linux] Linux Shell查找文件
  3. mysql简单概述_MySQL入门很简单: 1 数据库概述
  4. “举报”阿里巴巴 Arthas,大幅降低 OOM Trouble shooting 门槛
  5. linux 网卡配置不一致,linux环境下,双网卡配置不同网段后,路由问题
  6. Cpp / 右值、纯右值、将亡值
  7. 如何运行含spark的python脚本
  8. 我用AI回怼美女汽车销售系列[yolo车牌识别](五) 完结
  9. 国外数据库十大风云人物,你认识几个?
  10. Shell组件的返回码,0为成功,其他为失败.
  11. 有关c#.net“无法加载 CSOpenGLC.dll:找不到指定的模块”的问题解决办法
  12. 中科院毕业去向(硕士+博士)
  13. screen 命令详解
  14. Hadoop文件系统
  15. 泰裤辣!五一烧烤倒计时,还有人没做好攻略吗?
  16. arduino与hcsr04_Arduino 驱动 HC-SR04 超声波测距模块
  17. JS基础 将字符串数组用|或其他符号分割
  18. 经典算法之右边界二分查找法(俗称基本右边界二分搜索法)
  19. 人的一生就是在不断的学习中成长
  20. 由java:local_policy.jar和US_export_policy.jar引发的“血案”

热门文章

  1. MongoDB 全文索引
  2. 祝中国程序员早日拥有加班自由
  3. 瑞士轮赛制模拟器_【科普】瑞士轮比赛赛制,本次Major 16进8采用的就是瑞士制...
  4. 设置 EXTRA_CFLAGS 以编译 debug 版本 dpdk 库
  5. HTML基本的一些知识以及标签的使用
  6. 思科宣布推出面向中小企业的统一通信平台和全新销售效率计划
  7. 第二证券|银保监会:强化金融稳定保障体系 加力支持扩大内需
  8. 64位win7系统的VS2010生成C#执行exe无法在32位机器运行的解决办法
  9. 蓝牙 舵狗 openmv通信相关
  10. ASP.NET入门随想六之大航海家(2)