数据迁移

目标

  • 能够描述项目数据迁移的方案
  • 了解hbase的特点
  • 能够熟悉数据迁移中的数据包装和转换
  • 能够完成文章数据的全量和增量迁移
  • 能够完成热点文章数据的迁移

1 为什么需要自动同步

因为MySQL保存着我们爬取的以及自建的数据,对于爬取的数据,数据量比较大,使用mysql 存储会影响mysql的性能,并且我们需要对数据进行流式计算,对数据进行各种统计,mysq满足不了我们的需求,我们就将mysql中的全量数据同步到HBASE中,由HBASE保存海量数据,mysql中的全量数据会定期进行删除。

HBASE中保存着海量数据,我们需要计算出热点数据,并将数据同步到mysql以及MONGODB中,mysql中保存主体关系数据,MONGODB保存着具体数据信息。

因为热点数据也会失效,今天是热数据,明天就不是了,也需要定期对热点数据进行删除,我们定时删除一个月之前的热点数据,保持本月的热数据。

2 迁移方案

2.1 需求分析

2.1.1 功能需求

有了大量数据集基础后,实时计算后的热点数据需要保存起来,因为mysql保存大量文章数据会影响mysql的性能,所以采用mysql+mongoDB的方式进行存储。

2.1.1 全量数据迁移方案

通过定时任务将mysql中爬取或者自建的文章同步到HBASE中,并将同步过的数据状态改为已同步,下次同步的时候就不会再次同步这些数据了。

2.1.2 热数据迁移方案

HBASE中有全量数据,大数据端计算出热点数据,需要将这些热点数据同步到MYSQL和MONGDB中,用于页面显示

2.2 设计思路

  • 将mysql数据库中的全量数据定时读取出来,将多个对象打包成一个对象,保存到HBASE中,保存成功后更新数据库中的状态改为已同步,下一次就不会同步该条数据了。

  • 使用KAFKA监听热点数据计算结果,接收到热点数据信息后,从HBASE得到打包的数据,并将数据进行拆分,将关系数据保存到mysql中,将具体数据保存到mongodb中。

  • 因为热点数据会失效,定期清除mysql和mongodb中的过期数据

2.3 数据同步注意的问题

  • HBASE数据主要靠rowKey进行查询的,rowKey设计就用mysql中的主键ID作为rowKey,查询的时候直接根据Rowkey获取数据

  • 因为需要同步到HBASE的数据是多个数据表的数据,一条数据由多个对象组成,存储的时候使用列族区分不同的对象,里面存储不同的字段。

3 项目中集成hbase与Mongodb

在leadnews-common 集成:

  • hbase.properties
  • mongo.properties
  • pom.xml
<!--mongoDB-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<!--HBase-->
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.1.5</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion><exclusion><groupId>junit</groupId><artifactId>junit</artifactId></exclusion><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions>
</dependency>

host文件配置:
在服务器host文件中配置域名,根据自己的服务器地址更改

172.16.1.52 javaedge

4 常用组件介绍

4.1 Hbase相关操作

Hbase 操作工具类用于将数据存储到Hbase中,其中有些方法用于存储或删除。

4.1.1 项目导入

导入资料文件夹中的项目leadnews-migration

4.1.2 公共存储类

StorageData

公共存储数据表,由多个StorageEntity组成

StorageData 是最重要的一个存储对象,他是保存一个bean信息的类,负责存储bean信息以及转换和反向转换bean 。

该类用到一个重要的工具类ReflectUtils 反射工具类和DataConvertUtils数据类型转换工具类主要用于日期类型的转换

主要方法
  • 添加StorageEntry方法
public void addStorageEntry(StorageEntry entry)

该方法有几个重载方法,用于向StorageEntry列表中添加StorageEntry对象的

  • 获取该对象对应的Object对象
public Object getObjectValue()

该方法用于将存储的实体数据转换为Bean的实体,用了ReflectUtils反射工具类进行操作

  • 将Bean 转换为StorageData的存储结构
public static StorageData getStorageData(Object bean)

该方法用于将不同的bean转换为同一种存储结构进行存储

StorageEntity

公共代码存储的实体

StorageEntry

公共存储对象的一个key-value的字段

4.1.3 Hbase操作相关工具类

(1)HBaseConstants 类

配置类Hbase存储的的表名称

public class HBaseConstants {public static final String APARTICLE_QUANTITY_TABLE_NAME = "APARTICLE_QUANTITY_TABLE_NAME";
}

(2)HBaseInvok
hbase的的回调操作类


/*** Hbase 的回调类* 用于我们操作的时候就行回调*/
public interface HBaseInvok {/*** 回调方法*/public void invok();
}

(3)HBaseStorage

hbase 的存储对象 继承自StorageEntity

(4)HBaseClent

hbase client操作的工具类

(5)HBaseConfig
用于将HbaseClient对象的相关配置
(6)HBaseStorageClient

Hbase 存储客户端工具类 是对HbaseClient工具类的封装

这个类是自己封装的存储客户端

该类位于heima-leadnews-common 包下的com.heima.hbase.HBaseStorageClient

其中用到了HBaseClent 客户端工具,他是一个操作工具类,不需要我们具体的写拿过来用就可以

4.1.4 测试代码

@SpringBootTest
@RunWith(SpringRunner.class)
public class HbaseTest {@Autowiredprivate HBaseClent hBaseClent;@Testpublic void testCreateTable(){List<String> columnFamily = new ArrayList<>();columnFamily.add("test_cloumn_family1");columnFamily.add("test_cloumn_family2");boolean ret = hBaseClent.creatTable("hbase_test_table_name", columnFamily);}@Testpublic void testDelTable(){hBaseClent.deleteTable("hbase_test_table_name");}@Testpublic void testSaveData(){String []columns ={"name","age"};String [] values = {"zhangsan","28"};hBaseClent.putData("hbase_test_table_name","test_row_key_001","test_cloumn_family1",columns,values);}@Testpublic void testFindByRowKey(){Result hbaseResult = hBaseClent.getHbaseResult("hbase_test_table_name", "test_row_key_001");System.out.println(hbaseResult);}
}

4.2 MongoDB操作工具类

mongoDB是一个文档型数据库,也需要存储多个不同的对象,我们也用到了HBASE中用到的StorageEntity 存储结构,我们下面会讲
我们用到了Spring MongoTemplate 来操作数据库
介绍以下我们的实体

(1)MongoConstant

mongoDB操作的常量定义了操作mongodb的表名称

代码位置:com.heima.common.mongo.constants.MongoConstant

public class MongoConstant {public static final String APARTICLE_MIGRATION_TABLE = "APARTICLE_MIGRATION_TABLE";
}

(2)MongoStorageEntity

MongoStorageEntity 是我们存储MongoDB数据的存储结构主要是基于StorageEntity 结构来的

mongoDB操作的实体类继承了StorageEntity 制定了 表明以及实体类型

代码位置:com.heima.common.mongo.entity.MongoStorageEntity

/*** mongoDB 存储实体* Document 是指表明是什么*/
@Document(collection = "mongo_storage_data")
@Setter
@Getter
public class MongoStorageEntity extends StorageEntity {/*** 主键的Key** @Id 标明该字段是主键*/@Idprivate String rowKey;}

(3)MongoDBconfigure

对mongdb操作的配置类

代码位置:com.heima.common.mongo.MongoDBconfigure


@Configuration
@PropertySource("classpath:mongo.properties")
public class MongoDBconfigure {@Value("${mongo.host}")private String host;@Value("${mongo.port}")private int port;@Value("${mongo.dbname}")private String dbName;@Beanpublic MongoTemplate getMongoTemplate() {return new MongoTemplate(getSimpleMongoDbFactory());}public SimpleMongoDbFactory getSimpleMongoDbFactory() {return new SimpleMongoDbFactory(new MongoClient(host, port), dbName);}
}

(4)测试代码

@SpringBootTest(classes = MigrationApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class MongoTest {@Autowiredprivate MongoTemplate mongotemplate;@Autowiredprivate HBaseStorageClient hBaseStorageClient;@Testpublic void test() {Class<?>[] classes = new Class<?>[]{ApArticle.class, ApArticleContent.class, ApAuthor.class};//List<Object> entityList = hBaseStorageClient.getHbaseDataEntityList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, "1", Arrays.asList(classes));List<String> strList = Arrays.asList(classes).stream().map(x -> x.getName()).collect(Collectors.toList());List<StorageData> storageDataList = hBaseStorageClient.gethBaseClent().getStorageDataList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, "1", strList);MongoStorageEntity mongoStorageEntity = new MongoStorageEntity();mongoStorageEntity.setDataList(storageDataList);mongoStorageEntity.setRowKey("1");MongoStorageEntity tmp = mongotemplate.findById("1", MongoStorageEntity.class);if (null != tmp) {mongotemplate.remove(tmp);}MongoStorageEntity tq = mongotemplate.insert(mongoStorageEntity);System.out.println(tq);}@Testpublic void test1() {MongoStorageEntity mongoStorageEntity = mongotemplate.findById("1", MongoStorageEntity.class);if (null != mongoStorageEntity && null != mongoStorageEntity.getDataList()) {mongoStorageEntity.getDataList().forEach(x -> {System.out.println(x.getObjectValue());});}}
}

5 业务层代码

5.1 Habse操作实体类

(1)ArticleCallBack
Hbase相关回调操作的工具类

(2)ArticleHBaseInvok
Hbase 对回调对象的封装,以及对回调的invoke执行对象

(3)ArticleQuantity
对整个需要存储的对象的封装。

5.2 文章配置接口

5.2.1 mapper

ApArticleConfigMapper中新增方法

ApArticleConfigMapper.xml

5.2.2 service

对文章配置操作的service

ApArticleConfigServiceImpl是对ApArticleConfig的操作

5.3 文章内容接口

5.3.1 mapper定义

ApArticleContentMapper新增方法

 List<ApArticleContent> selectByArticleIds(List<String> articleIds);

5.3.2 service

对文章内容操作的Service

public interface ApArticleContenService {List<ApArticleContent> queryByArticleIds(List<String> ids);ApArticleContent getByArticleIds(Integer id);
}

ApArticleContenServiceImpl

对ApArticleConten相关的操作

代码位置:com.heima.migration.service.impl.ApArticleContenServiceImpl

@Service
public class ApArticleContenServiceImpl implements ApArticleContenService {@Autowiredprivate ApArticleContentMapper apArticleContentMapper;@Overridepublic List<ApArticleContent> queryByArticleIds(List<String> ids) {return apArticleContentMapper.selectByArticleIds(ids);}@Overridepublic ApArticleContent getByArticleIds(Integer id) {return apArticleContentMapper.selectByArticleId(id);}
}

7.4 文章接口

7.4.1 mapper定义

ApArticleMapper新增方法

/*** 查询** @param apArticle* @return*/
List<ApArticle> selectList(ApArticle apArticle);
/*** 更新* @param apArticle*/
void updateSyncStatus(ApArticle apArticle);

ApArticleMapper.xml

<sql id="Base_Column_Where"><where><if test="title!=null and title!=''">and title = #{title}</if><if test="authorId!=null and authorId!=''">and author_id = #{authorId}</if><if test="authorName!=null and authorName!=''">and author_name = #{authorName}</if><if test="channelId!=null and channelId!=''">and channel_id = #{channelId}</if><if test="channelName!=null and channelName!=''">and channel_name = #{channelName}</if><if test="layout!=null and layout!=''">and layout = #{layout}</if><if test="flag!=null and flag!=''">and flag = #{flag}</if><if test="views!=null and views!=''">and views = #{views}</if><if test="syncStatus!=null">and sync_status = #{syncStatus}</if></where>
</sql>
<select id="selectList" resultMap="resultMap">select<include refid="Base_Column_List"/>from ap_article<include refid="Base_Column_Where"/>
</select>
<update id="updateSyncStatus">UPDATE ap_article SET sync_status = #{syncStatus} WHERE id=#{id}
</update>

7.4.2 service

对ApArticle操作的Service

接口位置:com.heima.migration.service.ApArticleService

public interface ApArticleService {public ApArticle getById(Long id);/*** 获取未同步的数据** @return*/public List<ApArticle> getUnsyncApArticleList();/*** 更新同步状态** @param apArticle*/void updateSyncStatus(ApArticle apArticle);
}

ApArticleServiceImpl

对ApArticleService相关的操作

代码位置:com.heima.migration.service.impl.ApArticleServiceImpl

@Log4j2
@Service
public class ApArticleServiceImpl implements ApArticleService {@Autowiredprivate ApArticleMapper apArticleMapper;public ApArticle getById(Long id) {return apArticleMapper.selectById(id);}/*** 获取未同步的数据** @return*/public List<ApArticle> getUnsyncApArticleList() {ApArticle apArticleQuery = new ApArticle();apArticleQuery.setSyncStatus(false);return apArticleMapper.selectList(apArticleQuery);}/*** 更新数据同步状态** @param apArticle*/public void updateSyncStatus(ApArticle apArticle) {log.info("开始更新数据同步状态,apArticle:{}", apArticle);if (null != apArticle) {apArticle.setSyncStatus(true);apArticleMapper.updateSyncStatus(apArticle);}}}

7.5 文章作者接口

7.5.1 mapper定义

ApAuthorMapper

 List<ApAuthor> selectByIds(List<Integer> ids);

ApAuthorMapper.xml

<select id="selectByIds" resultMap="BaseResultMap">select * from ap_authorwhere id in<foreach item="item" index="index" collection="list" open="(" separator="," close=")">#{item}</foreach>
</select>

7.5.2 service

对ApAuthor操作的Service

接口位置:com.heima.migration.service.ApAuthorService

public interface ApAuthorService {List<ApAuthor> queryByIds(List<Integer> ids);ApAuthor getById(Long id);
}

ApAuthorServiceImpl

对ApAuthor相关的操作

代码位置:com.heima.migration.service.impl.ApAuthorServiceImpl

@Service
public class ApAuthorServiceImpl implements ApAuthorService {@Autowiredprivate ApAuthorMapper apAuthorMapper;@Overridepublic List<ApAuthor> queryByIds(List<Integer> ids) {return apAuthorMapper.selectByIds(ids);}@Overridepublic ApAuthor getById(Long id) {if (null != id) {return apAuthorMapper.selectById(id.intValue());}return null;}
}

7.6 综合迁移接口

ArticleQuantityService

操作ArticleQuantity对象的Service ArticleQuantity对象封装了文章相关的数据

接口位置:com.heima.migration.service.ArticleQuantityService

public interface ArticleQuantityService {/*** 获取ArticleQuantity列表* @return*/public List<ArticleQuantity> getArticleQuantityList();/*** 根据ArticleId获取ArticleQuantity* @param id* @return*/public ArticleQuantity getArticleQuantityByArticleId(Long id);/*** 根据ByArticleId从Hbase中获取ArticleQuantity* @param id* @return*/public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id);/*** 数据库到Hbase的同步*/public void dbToHbase();/*** 根据articleId 将数据库的数据同步到Hbase* @param articleId*/public void dbToHbase(Integer articleId);}

ArticleQuantityServiceImpl

对ArticleQuantity的相关操作

代码位置:com.heima.migration.service.impl.ArticleQuantityServiceImpl

/*** 查询未同步的数据,并封装成ArticleQuantity 对象*/
@Service
@Log4j2
public class ArticleQuantityServiceImpl implements ArticleQuantityService {@Autowiredprivate ApArticleContenService apArticleContenService;@Autowiredprivate ApArticleConfigService apArticleConfigService;@Autowiredprivate ApAuthorService apAuthorService;@Autowiredprivate HBaseStorageClient hBaseStorageClient;@Autowiredprivate ApArticleService apArticleService;/*** 查询位同步数据的列表** @return*/public List<ArticleQuantity> getArticleQuantityList() {log.info("生成ArticleQuantity列表");//查询未同步的庶数据List<ApArticle> apArticleList = apArticleService.getUnsyncApArticleList();if (apArticleList.isEmpty()) {return null;}//获取ArticleId 的listList<String> apArticleIdList = apArticleList.stream().map(apArticle -> String.valueOf(apArticle.getId())).collect(Collectors.toList());//获取AuthorId 的 listList<Integer> apAuthorIdList = apArticleList.stream().map(apAuthor -> apAuthor.getAuthorId() == null ? null : apAuthor.getAuthorId().intValue()).filter(x -> x != null).collect(Collectors.toList());//根据apArticleIdList 批量查询出内容列表List<ApArticleContent> apArticleContentList = apArticleContenService.queryByArticleIds(apArticleIdList);//根据apArticleIdList 批量查询出配置列表List<ApArticleConfig> apArticleConfigList = apArticleConfigService.queryByArticleIds(apArticleIdList);//根据apAuthorIdList 批量查询出作者列List<ApAuthor> apAuthorList = apAuthorService.queryByIds(apAuthorIdList);//将不同的对象转换为 ArticleQuantity 对象List<ArticleQuantity> articleQuantityList = apArticleList.stream().map(apArticle -> {return new ArticleQuantity() {{//设置apArticle 对象setApArticle(apArticle);// 根据apArticle.getId() 过滤出符合要求的 ApArticleContent 对象List<ApArticleContent> apArticleContents = apArticleContentList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());if (null != apArticleContents && !apArticleContents.isEmpty()) {setApArticleContent(apArticleContents.get(0));}// 根据 apArticle.getId 过滤出 ApArticleConfig 对象List<ApArticleConfig> apArticleConfigs = apArticleConfigList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());if (null != apArticleConfigs && !apArticleConfigs.isEmpty()) {setApArticleConfig(apArticleConfigs.get(0));}// 根据 apArticle.getAuthorId().intValue() 过滤出 ApAuthor 对象List<ApAuthor> apAuthors = apAuthorList.stream().filter(x -> x.getId().equals(apArticle.getAuthorId().intValue())).collect(Collectors.toList());if (null != apAuthors && !apAuthors.isEmpty()) {setApAuthor(apAuthors.get(0));}//设置回调方法 用户方法的回调 用于修改同步状态 插入Hbase 成功后同步状态改为已同步setHBaseInvok(new ArticleHBaseInvok(apArticle, (x) -> apArticleService.updateSyncStatus(x)));}};}).collect(Collectors.toList());if (null != articleQuantityList && !articleQuantityList.isEmpty()) {log.info("生成ArticleQuantity列表完成,size:{}", articleQuantityList.size());} else {log.info("生成ArticleQuantity列表完成,size:{}", 0);}return articleQuantityList;}public ArticleQuantity getArticleQuantityByArticleId(Long id) {if (null == id) {return null;}ArticleQuantity articleQuantity = null;ApArticle apArticle = apArticleService.getById(id);if (null != apArticle) {articleQuantity = new ArticleQuantity();articleQuantity.setApArticle(apArticle);ApArticleContent apArticleContent = apArticleContenService.getByArticleIds(id.intValue());articleQuantity.setApArticleContent(apArticleContent);ApArticleConfig apArticleConfig = apArticleConfigService.getByArticleId(id.intValue());articleQuantity.setApArticleConfig(apArticleConfig);ApAuthor apAuthor = apAuthorService.getById(apArticle.getAuthorId());articleQuantity.setApAuthor(apAuthor);}return articleQuantity;}public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id) {if (null == id) {return null;}ArticleQuantity articleQuantity = null;List<Class> typeList = Arrays.asList(ApArticle.class, ApArticleContent.class, ApArticleConfig.class, ApAuthor.class);List<Object> objectList = hBaseStorageClient.getStorageDataEntityList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, DataConvertUtils.toString(id), typeList);if (null != objectList && !objectList.isEmpty()) {articleQuantity = new ArticleQuantity();for (Object value : objectList) {if (value instanceof ApArticle) {articleQuantity.setApArticle((ApArticle) value);} else if (value instanceof ApArticleContent) {articleQuantity.setApArticleContent((ApArticleContent) value);} else if (value instanceof ApArticleConfig) {articleQuantity.setApArticleConfig((ApArticleConfig) value);} else if (value instanceof ApAuthor) {articleQuantity.setApAuthor((ApAuthor) value);}}}return articleQuantity;}/*** 数据库到Hbase同步*/public void dbToHbase() {long cutrrentTime = System.currentTimeMillis();List<ArticleQuantity> articleQuantitList = getArticleQuantityList();if (null != articleQuantitList && !articleQuantitList.isEmpty()) {log.info("开始进行定时数据库到HBASE同步,筛选出未同步数据量:{}", articleQuantitList.size());if (null != articleQuantitList && !articleQuantitList.isEmpty()) {List<HBaseStorage> hbaseStorageList = articleQuantitList.stream().map(ArticleQuantity::getHbaseStorage).collect(Collectors.toList());hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hbaseStorageList);}} else {log.info("定时数据库到HBASE同步为筛选出数据");}log.info("定时数据库到HBASE同步结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);}@Overridepublic void dbToHbase(Integer articleId) {long cutrrentTime = System.currentTimeMillis();log.info("开始进行异步数据库到HBASE同步,articleId:{}", articleId);if (null != articleId) {ArticleQuantity articleQuantity = getArticleQuantityByArticleId(articleId.longValue());if (null != articleQuantity) {HBaseStorage hBaseStorage = articleQuantity.getHbaseStorage();hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hBaseStorage);}}log.info("异步数据库到HBASE同步结束,articleId:{},耗时:{}", articleId, System.currentTimeMillis() - cutrrentTime);}
}

7.7 热点文章接口

ApHotArticleService

对ApHotArticle操作Service

接口位置:com.heima.migration.service.ApHotArticleService

public interface ApHotArticleService {List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery);void insert(ApHotArticles apHotArticles);/*** 热数据 Hbase 同步** @param apArticleId*/public void hotApArticleSync(Integer apArticleId);void deleteById(Integer id);/*** 查询过期的数据** @return*/public List<ApHotArticles> selectExpireMonth();void deleteHotData(ApHotArticles apHotArticle);
}

ApHotArticleServiceImpl

对ApHotArticle的相关操作

代码位置:com.heima.migration.service.impl.ApHotArticleServiceImpl

/*** 热点数据操作Service 类*/
@Service
@Log4j2
public class ApHotArticleServiceImpl implements ApHotArticleService {@Autowiredprivate ApHotArticlesMapper apHotArticlesMapper;@Autowiredprivate MongoTemplate mongoTemplate;@Autowiredprivate ArticleQuantityService articleQuantityService;@Autowiredprivate HBaseStorageClient hBaseStorageClient;@Overridepublic List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery) {return apHotArticlesMapper.selectList(apHotArticlesQuery);}/*** 根据ID删除** @param id*/@Overridepublic void deleteById(Integer id) {log.info("删除热数据,apArticleId:{}", id);apHotArticlesMapper.deleteById(id);}/*** 查询一个月之前的数据** @return*/@Overridepublic List<ApHotArticles> selectExpireMonth() {return apHotArticlesMapper.selectExpireMonth();}/*** 删除过去的热数据** @param apHotArticle*/@Overridepublic void deleteHotData(ApHotArticles apHotArticle) {deleteById(apHotArticle.getId());String rowKey = DataConvertUtils.toString(apHotArticle.getId());hBaseStorageClient.gethBaseClent().deleteRow(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, rowKey);MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKey, MongoStorageEntity.class);if (null != mongoStorageEntity) {mongoTemplate.remove(mongoStorageEntity);}}/*** 插入操作** @param apHotArticles*/@Overridepublic void insert(ApHotArticles apHotArticles) {apHotArticlesMapper.insert(apHotArticles);}/*** 热点数据同步方法** @param apArticleId*/@Overridepublic void hotApArticleSync(Integer apArticleId) {log.info("开始将热数据同步,apArticleId:{}", apArticleId);ArticleQuantity articleQuantity = getHotArticleQuantity(apArticleId);if (null != articleQuantity) {//热点数据同步到DB中hotApArticleToDBSync(articleQuantity);//热点数据同步到MONGOhotApArticleMongoSync(articleQuantity);log.info("热数据同步完成,apArticleId:{}", apArticleId);} else {log.error("找不到对应的热数据,apArticleId:{}", apArticleId);}}/*** 获取热数据的ArticleQuantity 对象** @param apArticleId* @return*/private ArticleQuantity getHotArticleQuantity(Integer apArticleId) {Long id = Long.valueOf(apArticleId);ArticleQuantity articleQuantity = articleQuantityService.getArticleQuantityByArticleId(id);if (null == articleQuantity) {articleQuantity = articleQuantityService.getArticleQuantityByArticleIdForHbase(id);}return articleQuantity;}/*** 热数据 到数据库Mysql的同步** @param articleQuantity*/public void hotApArticleToDBSync(ArticleQuantity articleQuantity) {Integer apArticleId = articleQuantity.getApArticleId();log.info("开始将热数据从Hbase同步到mysql,apArticleId:{}", apArticleId);if (null == apArticleId) {log.error("apArticleId不存在无法进行同步");return;}ApHotArticles apHotArticlesQuery = new ApHotArticles() {{setArticleId(apArticleId);}};List<ApHotArticles> apHotArticlesList = apHotArticlesMapper.selectList(apHotArticlesQuery);if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {log.info("Mysql数据已同步过不需要再次同步,apArticleId:{}", apArticleId);} else {ApHotArticles apHotArticles = articleQuantity.getApHotArticles();apHotArticlesMapper.insert(apHotArticles);}log.info("将热数据从Hbase同步到mysql完成,apArticleId:{}", apArticleId);}/*** 热数据向从Hbase到Mongodb同步** @param articleQuantity*/public void hotApArticleMongoSync(ArticleQuantity articleQuantity) {Integer apArticleId = articleQuantity.getApArticleId();log.info("开始将热数据从Hbase同步到MongoDB,apArticleId:{}", apArticleId);if (null == apArticleId) {log.error("apArticleId不存在无法进行同步");return;}String rowKeyId = DataConvertUtils.toString(apArticleId);MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKeyId, MongoStorageEntity.class);if (null != mongoStorageEntity) {log.info("MongoDB数据已同步过不需要再次同步,apArticleId:{}", apArticleId);} else {List<StorageData> storageDataList = articleQuantity.getStorageDataList();if (null != storageDataList && !storageDataList.isEmpty()) {mongoStorageEntity = new MongoStorageEntity();mongoStorageEntity.setDataList(storageDataList);mongoStorageEntity.setRowKey(rowKeyId);mongoTemplate.insert(mongoStorageEntity);}}log.info("将热数据从Hbase同步到MongoDB完成,apArticleId:{}", apArticleId);}
}

8 定时同步数据

8.1 全量数据从mysql同步到HBase

@Component
@DisallowConcurrentExecution
@Log4j2
/*** 全量数据从mysql 同步到HBase*/
public class MigrationDbToHBaseQuartz extends AbstractJob {@Autowiredprivate ArticleQuantityService articleQuantityService;@Overridepublic String[] triggerCron() {/*** 2019/8/9 10:15:00* 2019/8/9 10:20:00* 2019/8/9 10:25:00* 2019/8/9 10:30:00* 2019/8/9 10:35:00*/return new String[]{"0 0/5 * * * ?"};}@Overrideprotected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {log.info("开始进行数据库到HBASE同步任务");articleQuantityService.dbToHbase();log.info("数据库到HBASE同步任务完成");}}

8.2 定期删除过期的数据

/*** 定期删除过期的数据*/
@Component
@Log4j2
public class MigrationDeleteHotDataQuartz extends AbstractJob {@Autowiredprivate ApHotArticleService apHotArticleService;@Overridepublic String[] triggerCron() {/*** 2019/8/9 22:30:00* 2019/8/10 22:30:00* 2019/8/11 22:30:00* 2019/8/12 22:30:00* 2019/8/13 22:30:00*/return new String[]{"0 30 22 * * ?"};}@Overrideprotected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {long cutrrentTime = System.currentTimeMillis();log.info("开始删除数据库过期数据");deleteExpireHotData();log.info("删除数据库过期数据结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);}/*** 删除过期的热数据*/public void deleteExpireHotData() {List<ApHotArticles> apHotArticlesList = apHotArticleService.selectExpireMonth();if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {for (ApHotArticles apHotArticle : apHotArticlesList) {apHotArticleService.deleteHotData(apHotArticle);}}}}

9 消息接收同步数据

9.1 文章审核成功同步

9.1.1 消息发送

(1)消息名称定义及消息发送方法声明

maven_test.properties

kafka.topic.article-audit-success=kafka.topic.article.audit.success.sigle.test

kafka.properties

kafka.topic.article-audit-success=${kafka.topic.article-audit-success}

com.heima.common.kafka.KafkaTopicConfig新增属性

/*** 审核成功*/
String articleAuditSuccess;

com.heima.common.kafka.KafkaSender

/*** 发送审核成功消息*/
public void sendArticleAuditSuccessMessage(ArticleAuditSuccess message) {ArticleAuditSuccessMessage temp = new ArticleAuditSuccessMessage();temp.setData(message);this.sendMesssage(kafkaTopicConfig.getArticleAuditSuccess(), UUID.randomUUID().toString(), temp);
}

(2)修改自动审核代码,爬虫和自媒体都要修改

在审核成功后,发送消息

爬虫

//文章审核成功
ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();
articleAuditSuccess.setArticleId(apArticle.getId());
articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.CRAWLER);
articleAuditSuccess.setChannelId(apArticle.getChannelId());kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);

自媒体

//文章审核成功
ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();
articleAuditSuccess.setArticleId(apArticle.getId());
articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.MEDIA);
articleAuditSuccess.setChannelId(apArticle.getChannelId());kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);

9.1.2消息接收

/*** 热点文章监听类*/
@Component
@Log4j2
public class MigrationAuditSucessArticleListener implements KafkaListener<String, String> {/*** 通用转换mapper*/@AutowiredObjectMapper mapper;/*** kafka 主题 配置*/@AutowiredKafkaTopicConfig kafkaTopicConfig;@Autowiredprivate ArticleQuantityService articleQuantityService;@Overridepublic String topic() {return kafkaTopicConfig.getArticleAuditSuccess();}/*** 监听消息** @param data* @param consumer*/@Overridepublic void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {log.info("kafka接收到审核通过消息:{}", data);String value = (String) data.value();if (null != value) {ArticleAuditSuccessMessage message = null;try {message = mapper.readValue(value, ArticleAuditSuccessMessage.class);} catch (IOException e) {e.printStackTrace();}ArticleAuditSuccess auto = message.getData();if (null != auto) {//调用方法 将HBAESE中的热数据进行同步Integer articleId = auto.getArticleId();if (null != articleId) {articleQuantityService.dbToHbase(articleId);}}}}
}

9.2 热点文章同步

创建监听类:com.heima.migration.kafka.listener.MigrationHotArticleListener

/*** 热点文章监听类*/
@Component
@Log4j2
public class MigrationHotArticleListener implements KafkaListener<String, String> {/*** 通用转换mapper*/@AutowiredObjectMapper mapper;/*** kafka 主题 配置*/@AutowiredKafkaTopicConfig kafkaTopicConfig;/*** 热点文章service注入*/@Autowiredprivate ApHotArticleService apHotArticleService;@Overridepublic String topic() {return kafkaTopicConfig.getHotArticle();}/*** 监听消息** @param data* @param consumer*/@Overridepublic void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {log.info("kafka接收到热数据同步消息:{}", data);String value = (String) data.value();if (null != value) {ApHotArticleMessage message = null;try {message = mapper.readValue(value, ApHotArticleMessage.class);} catch (IOException e) {e.printStackTrace();}Integer articleId = message.getData().getArticleId();if (null != articleId) {//调用方法 将HBAESE中的热数据进行同步apHotArticleService.hotApArticleSync(articleId);}}}
}

【大厂技术内幕】字节跳动原来是这么做数据迁移的!相关推荐

  1. 阿里言:出乎意料,“字节跳动”居然是这么做数据迁移的

    目标 附面试思维导图: 能够描述项目数据迁移的方案 了解hbase的特点 能够熟悉数据迁移中的数据包装和转换 能够完成文章数据的全量和增量迁移 能够完成热点文章数据的迁移 1 为什么需要自动同步 因为 ...

  2. 聚焦技术前沿 | 字节跳动年薪百万测试开发关注的前沿技术

    网络流传IT从业人员年薪普遍比较高,最近网络有一篇文章[国家何时整治程序员高薪现象引起热议.吃瓜群众只看到表面的薪资高,却不知道程序员需要掌握什么样的技术,加多少班,头发掉多少.当然有些小伙伴会说,看 ...

  3. 大厂Java开发字节跳动2022年实习招聘要求总结

    目录 实习招聘 字节跳动Java实习生 字节跳动抖音服务端实习生 字节跳动抖音直播后端实习生 字节跳动飞书实习生 校招招聘 阿里巴巴Java 腾讯Java 字节跳动电商后端 细节 字节跳动Java实习 ...

  4. ICCV 2021 | 字节跳动利用单幅图片做三维重建!将NeRF、MPI结合,提出MINE新工作...

    转载自:机器之心 |  字节跳动视觉技术团队 来自字节跳动视觉技术团队的研究者将 NeRF 和 Multiplane Image(MPI)结合,提出了一种新的三维空间表达方式 MINE.该方法通过对单 ...

  5. 关于我在字节跳动青训营做了个抖音这件事

    一.实践介绍 1.1项目核心信息 本项目实现了影视综艺榜单及其历史数据查询,实现个人页面展示.个人页面粉丝和关注列表.个人页面已发布视频列表及其详情页 1.2项目服务地址 https://github ...

  6. 字节跳动Java开发4面攻略:项目经验+“拍马屁”+扎实的技术

    字节跳动Java开发4面攻略:项目经验+"拍马屁"+扎实的技术 如标题所见,老陈现在已经顺利入职字节跳动. 老陈在编程事业上摸爬滚打8年之久,有在58待过,有在腾讯地方事业部待过. ...

  7. 字节跳动技术团队提出「AI渲染」方案,手机端也能实现影视级渲染效果

    随着3D技术的高速发展,影视渲染效果的复杂度.精细度都在逐步提升,但高质量的渲染效果和时间成本往往难以兼顾.针对这一行业痛点,字节跳动智能创作团队基于AI技术的优势提出了「AI渲染」方案.这一方案现已 ...

  8. 字节跳动技术新人培训全记录:校招萌新成长指南

    "我现在重新参加校招,明年还能再来听一次培训吗?" 是什么样的技术新人培训,让校招同学惊喜到要「再来一次」? 潜入字节跳动技术新人培训「星火计划」现场,全程围观之后,技术范儿小编发 ...

  9. Flutter 沙龙回顾 | 跨平台技术趋势及字节跳动 Flutter 架构实践

    11 月 23 日,字节跳动技术沙龙 | Flutter 技术专场 在北京后山艺术空间圆满结束.我们邀请到字节跳动移动平台部 Flutter 架构师袁辉辉,Google Flutter 团队工程师 J ...

最新文章

  1. SpringSecurity使用 配置文件 和wen.xml 文件配置
  2. c# BindingSource的简单应用
  3. Kubernetes学习笔记---常用命令
  4. tf.Variable()、tf.get_variable()
  5. Logrotate 对服务器日志按照小时切割并压缩
  6. 跟着迪哥学python 经管之家_跟着迪哥学Python数据分析与机器学习实战
  7. 透过率和反射率的关系_全国本科率只有不到百分之五?!醒醒吧!
  8. 在只需要一个指定正确的参数的情况下如何防止传入其他干扰的参数
  9. oracle 如何迁移到 mysql_怎么将数据库从Oracle迁移到SQL Server,或从Oracle迁移到MySQL...
  10. Nest,很酷的东西
  11. BZOJ 1108: [POI2007]天然气管道Gaz
  12. Intel微处理器列表_百度百科
  13. 用python画玫瑰花-用python画一朵玫瑰给你
  14. spotlight on mysql--安装以及简介
  15. java中Action层、Service层和Dao层的功能区分
  16. 解决XCode 11 build error 编译错误 image not found
  17. 白帽子讲web安全笔记-xss总结
  18. 无限滚动新一代老虎机
  19. 军犬舆情每日热点:台湾海峡发生6.2级地震;泉州通报碳九泄漏事件处理结果
  20. java 立体几何体中心点_高中数学知识点大全,立体几何核心考点及解题技巧

热门文章

  1. SLAM学习之路---双目相机照片生成点云(附C++代码)
  2. 看图说话2012-04-08 [ 晨枫 ]
  3. 045基于卷积神经网络的94种矿石识别
  4. C语言float转char数组
  5. [网络工程师必备生存技能]网络工程师是怎么学习的?
  6. 类似分析牛爱站SEO工具包的同类软件
  7. Zookeeper读写性能测试
  8. 使用excel服务器开发的重中之重
  9. 搜索引擎面面观[转]
  10. 文档管理系统:攻克这3个痛点,解决80%企业文档管理难题