需求:

用户需要根据配置的终端,每天导出昨天的数据;

现状:

终端数据上报->解析完成->解析数据放入Topic->消费入库OTS;

方案:

(一)、查询OTS,导出数据;
(二)、用另外一个Group消费解析的Topic(不能影响OTS入库的性能,需要实时查询),之后数据直接写入;

需要考虑的问题:

(一)、上报数据存在分包,也就是一条数据,因为终端内存原因,切割成几条数据上报;
(二)、需要对数据排序;
(三)、一个终端的数据文件可以达到350M以上,最大要支持300辆以上的车辆数据导出;
(四)、每天消费的数据有4500w;
(五)、消息存在乱序,补发,补发一般需要到第二天9点后才完成;
(六)、用户需要处理下载数据,需要在第二天中午12点前提供下载链接;

选定方案:

查询OTS:
优点:操作简单(数据已合并,可以直接排序),正常下载就可以;
缺点:下载时间不确定,且需要9点后下载,12点前提供数据,最多只有3个小时的处理时间,但数据下载可能当天都下载不完昨天的数据,很大可能满足不了客户中午需要数据的需求,即使多线程下载,也存在内存,CPU等瓶颈;
新Group消费:
优点:只要代码OK,消费,写入性能OK,就可以保证在规定的时间提供数据;
缺点:需要通过代码处理上面的问题,且开发代码,要考虑性能,不能出现消息大面积堆积的问题; 综合上面的考虑:只能选择新Group消费。

技术实现:

(一)、针对数据乱序,消息合并,排序在消费时不做处理,最后使用半个小时做数据整理;
(二)、直接写TXT文件,数据整理时改为用户使用的CSV,尽量降低消费时间;
(三)、引入Spring缓存,提高性能,引入Redis缓存,防止集群创建同一个文件夹;
(四)、文件过大,文件夹下文件过多时,消息堆积严重,写入信息和查找文件耗时太大,通过文件切割,每个文件超过35M就不在写入;每次新增文件都在服务器IP下面新建当前毫秒的文件夹,这样每个文件夹就都不会太大,每个文件夹下文件都不会太多,消费OK;
(五)、数据整理,根据文件名前缀分组文件,单线程写文件,多线程处理,机器CPU和内存使用飙升,Full GC严重,效率非常低(服务器配置OK路过),350M数据读入内存,合并数据,排序,存在一次Full GC,但实际处理数据在10分钟以内就处理完了。

核心代码实现:

(一)、数据消费

package com.suyun.file.service.listen;import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.suyun.common.kafka.JsonSerializer;
import com.suyun.common.kafka.KafkaHelper;
import com.suyun.common.kafka.KafkaRecord;
import com.suyun.common.lang.Tuple2;
import com.suyun.file.service.cache.ExportVehicleDataCache;
import com.suyun.file.service.cache.FilePathCache;
import com.suyun.file.service.helper.OSSHelper;
import com.suyun.file.service.util.FileUtils;
import com.suyun.file.service.util.NetworkUtil;
import com.suyun.vehicle.Topics;
import com.suyun.vehicle.api.dto.CanExportConfigDto;
import com.suyun.vehicle.api.service.ICanExportConfigService;
import com.suyun.vehicle.model.VehicleDataMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;/*** Description:** @Author: leo.xiong* @CreateDate: 2020/11/11 10:18* @Email: leo.xiong@suyun360.com* @Since:*/
@Slf4j
@Service
public class VehicleDataKafkaListener {private TypeReference<KafkaRecord<VehicleDataMap>> dataType = new TypeReference<KafkaRecord<VehicleDataMap>>() {};private static final String GROUP_ID = "CID_alikafka_suyun_file_group";public static final String OSS_BUCKET_NAME = "suyun-nevtsp-foreverfile";public static final String DAY_EXPORT_DIR = "/vehicle/can_data/day_export/";public static final String TXT = ".txt";public static final String CSV = ".csv";public static final String DATE_TIME = "dateTime";public static final String VERSION_ID = "versionId";public static final String DATE_FORMAT_YMD = "yyyyMMdd";public static final String TIME_FORMAT_Y_M_D = "yyyy-MM-dd HH:mm:ss";/*** 第二天9点开始压缩前一天的数据,7:30之后上报的补发数据无需处理*/private static final long NINE_HOUR = (7 * 3600 + 1800) * 1000;/*** 最大文件大小为35M*/private static final long MAX_FILE_LENGTH = 35 * 1024 * 1024;@Autowiredprivate KafkaHelper kafkaHelper;@Resource(name = "executorServices")private ExecutorService[] executorServices;@Autowiredprivate OSSHelper ossHelper;@Autowiredprivate FilePathCache filePathCache;@Autowiredprivate ExportVehicleDataCache exportVehicleDataCache;@Autowiredprivate ICanExportConfigService canExportConfigService;@Resource(name = "redisTemplate")private ValueOperations<String, String> valueOperations;@KafkaListener(id = "vehicleDataKafkaListener", groupId = GROUP_ID, topics = Topics.VEHICLE_DATA, errorHandler = "consumerAwareErrorHandler", containerFactory = "batchKafkaListenerContainerFactory")public void vehicleDataUpdatedHandler(List<ConsumerRecord<String, byte[]>> consumerRecords, Acknowledgment ack) {Long systemTime = System.currentTimeMillis();String currentDay = formatDate(systemTime, DATE_FORMAT_YMD);if (StringUtils.isEmpty(existOrCreateDirectory(currentDay, true))) {return;}Map<String, Map<String, Map<Long, KafkaRecord<VehicleDataMap>>>> reportDayVehicleIdTimeStampKafkaRecordMap = new HashMap<>(2);//当前时间如果大于7:30,则表示昨天的文件已经生成,不能再处理昨天的数据信息boolean isGenerateYesterdaySFile = (systemTime - NINE_HOUR) >= getTimestampByLocalDateTime(LocalDateTime.now().with(LocalTime.MIN));buildDataMap(consumerRecords, isGenerateYesterdaySFile, currentDay, reportDayVehicleIdTimeStampKafkaRecordMap);if (CollectionUtils.isEmpty(reportDayVehicleIdTimeStampKafkaRecordMap)) {ack.acknowledge();return;}for (Map.Entry<String, Map<String, Map<Long, KafkaRecord<VehicleDataMap>>>> reportDayVehicleTimestampDataEntry : reportDayVehicleIdTimeStampKafkaRecordMap.entrySet()) {String reportDay = reportDayVehicleTimestampDataEntry.getKey();String directory = existOrCreateDirectory(reportDay, false);if (StringUtils.isEmpty(directory)) {continue;}Map<String, Map<Long, KafkaRecord<VehicleDataMap>>> vehicleIdTimestampKafkaRecordMap = reportDayVehicleTimestampDataEntry.getValue();vehicleIdTimestampKafkaRecordMap.forEach((vehicleId, timestampKafkaRecordMap) -> {int hash = Hashing.consistentHash(vehicleId.hashCode(), executorServices.length);executorServices[hash].execute(() -> {List<KafkaRecord<VehicleDataMap>> values = Lists.newArrayList(timestampKafkaRecordMap.values());try {process(reportDay, directory, vehicleId, timestampKafkaRecordMap, values);} catch (Exception e) {log.warn("写入失败 reportDay: {} directory: {} vehicleId: {}", reportDay, directory, vehicleId, e);for (KafkaRecord<VehicleDataMap> kafkaRecord : values) {kafkaHelper.forward2RetryTopic(kafkaRecord, GROUP_ID);}}});});}ack.acknowledge();}/*** 执行数据处理** @param reportDay* @param directory* @param vehicleId* @param timestampKafkaRecordMap* @param values* @return*/private void process(String reportDay, String directory, String vehicleId, Map<Long, KafkaRecord<VehicleDataMap>> timestampKafkaRecordMap, List<KafkaRecord<VehicleDataMap>> values) {CanExportConfigDto canExportConfigDto = null;try {canExportConfigDto = getCanExportConfigDto(values.get(0).getData(), reportDay);} catch (Exception e) {for (KafkaRecord<VehicleDataMap> kafkaRecord : values) {kafkaHelper.forward2RetryTopic(kafkaRecord, GROUP_ID);}}if (canExportConfigDto == null) {return;}StringBuffer content = new StringBuffer();for (KafkaRecord<VehicleDataMap> value : timestampKafkaRecordMap.values()) {VehicleDataMap vehicleDataMap = value.getData();Map<String, Object> codeValueMap = Maps.newHashMapWithExpectedSize(vehicleDataMap.getValues().size() + 2);vehicleDataMap.getValues().forEach((code, valueInfo) -> {Tuple2<Double, String> tuple2 = vehicleDataMap.getValue(code);if (tuple2 == null) {return;}codeValueMap.put(code, tuple2._1().toString());});if (CollectionUtils.isEmpty(codeValueMap)) {log.warn("数据信息为空: vehicleDataMap: {}", vehicleDataMap);continue;}//保存解析版本,导出需要根据解析版本获取信号名信息if (!codeValueMap.containsKey(VERSION_ID)) {codeValueMap.put(VERSION_ID, vehicleDataMap.getVersionId());}if (!codeValueMap.containsKey(DATE_TIME)) {codeValueMap.put(DATE_TIME, String.valueOf(vehicleDataMap.getTimestamp()));}content.append(JSONObject.toJSONString(codeValueMap) + "\n");}if (content.length() == 0) {return;}String fileName = canExportConfigDto.getPlateNo() + "(" + reportDay + "000000" + "~" + reportDay + "235959" + ")" + TXT;writeTxt(content.toString(), vehicleId, reportDay, directory, fileName);}/*** 组装数据信息** @param consumerRecords* @param currentDay* @param reportDayVehicleIdTimeStampKafkaRecordMap*/private void buildDataMap(List<ConsumerRecord<String, byte[]>> consumerRecords, boolean isGenerateYesterdaySFile, String currentDay, Map<String, Map<String, Map<Long, KafkaRecord<VehicleDataMap>>>> reportDayVehicleIdTimeStampKafkaRecordMap) {consumerRecords.forEach(consumerRecord -> {KafkaRecord<VehicleDataMap> kafkaRecord = JsonSerializer.deserialize(consumerRecord.value(), dataType);//无用数据if (kafkaRecord == null || kafkaRecord.getData() == null || CollectionUtils.isEmpty(kafkaRecord.getData().getValues())) {return;}//解析版本ID不能为空,否则取不到信号名称,导出无效if (StringUtils.isEmpty(kafkaRecord.getData().getVersionId())) {return;}VehicleDataMap vehicleDataMap = kafkaRecord.getData();String reportDay = formatDate(vehicleDataMap.getTimestamp(), DATE_FORMAT_YMD);//如果上报时间和当前时间不是同一天,并且已经过了昨天生成文件日期,补发处理数据已无效if (!currentDay.equals(reportDay) && isGenerateYesterdaySFile) {log.info("补发数据 车辆ID: {} 上报时间: {} 接收时间: {}", vehicleDataMap.getVehicleId(), vehicleDataMap.getTimestamp(), vehicleDataMap.getReceiveTime());return;}CanExportConfigDto canExportConfigDto = null;try {canExportConfigDto = getCanExportConfigDto(vehicleDataMap, reportDay);} catch (Exception e) {kafkaHelper.forward2RetryTopic(kafkaRecord, GROUP_ID);}if (canExportConfigDto == null) {return;}Map<String, Map<Long, KafkaRecord<VehicleDataMap>>> vehicleIdTimestampKafkaRecordMap = reportDayVehicleIdTimeStampKafkaRecordMap.get(reportDay);if (vehicleIdTimestampKafkaRecordMap == null) {vehicleIdTimestampKafkaRecordMap = new HashMap<>(150);reportDayVehicleIdTimeStampKafkaRecordMap.put(reportDay, vehicleIdTimestampKafkaRecordMap);}Map<Long, KafkaRecord<VehicleDataMap>> timestampKafkaRecordMap = vehicleIdTimestampKafkaRecordMap.get(vehicleDataMap.getVehicleId());if (timestampKafkaRecordMap == null) {timestampKafkaRecordMap = Maps.newHashMap();vehicleIdTimestampKafkaRecordMap.put(vehicleDataMap.getVehicleId(), timestampKafkaRecordMap);}KafkaRecord<VehicleDataMap> oldKafkaRecord = timestampKafkaRecordMap.get(vehicleDataMap.getTimestamp());if (oldKafkaRecord != null) {oldKafkaRecord.getData().putAll(kafkaRecord.getData().getValues());}timestampKafkaRecordMap.put(vehicleDataMap.getTimestamp(), kafkaRecord);});}/*** 写入文件信息** @param content* @param vehicleId* @param reportDay* @param directory* @param fileName*/private void writeTxt(String content, String vehicleId, String reportDay, String directory, String fileName) {String filePath = existOrCreateFile(reportDay, vehicleId, directory, fileName);if (StringUtils.isEmpty(filePath)) {return;}FileUtils.writeToFile(filePath, content, StandardCharsets.UTF_8.name(), true);}/*** 获取配置信息** @param vehicleDataMap* @param reportDay* @return*/private CanExportConfigDto getCanExportConfigDto(VehicleDataMap vehicleDataMap, String reportDay) throws Exception {CanExportConfigDto canExportConfigDto = exportVehicleDataCache.get(vehicleDataMap.getVehicleId() + ":" + reportDay);//是否需要导出,配置时间为天,导出数据也是按天导出if (canExportConfigDto == null) {try {canExportConfigDto = canExportConfigService.queryActiveCanExportConfig(vehicleDataMap.getVehicleId(), vehicleDataMap.getTimestamp());} catch (Exception e) {throw new Exception(e.getMessage());}if (canExportConfigDto == null) {exportVehicleDataCache.put(vehicleDataMap.getVehicleId() + ":" + reportDay, new CanExportConfigDto());return null;}exportVehicleDataCache.put(vehicleDataMap.getVehicleId() + ":" + reportDay, canExportConfigDto);} else if (StringUtils.isEmpty(canExportConfigDto.getVehicleId())) {return null;}return canExportConfigDto;}/*** 目录是否存在,不存在创建** @param currentDay* @param isCreate   是否创建* @return 返回文件夹路径*/public String existOrCreateDirectory(String currentDay, boolean isCreate) {String directoryPath = filePathCache.get(FilePathCache.DIRECTORY + currentDay);if (StringUtils.isNotEmpty(directoryPath)) {return directoryPath;}String directory = ossHelper.getBucketPathMap().get(OSS_BUCKET_NAME) + DAY_EXPORT_DIR + currentDay;File file = new File(directory);if (file.isDirectory()) {filePathCache.put(FilePathCache.DIRECTORY + currentDay, directory);return directory;}if (!isCreate) {return "";}boolean createFlag = valueOperations.setIfAbsent(currentDay + FilePathCache.DIRECTORY, NetworkUtil.IP, 2, TimeUnit.DAYS);if (createFlag) {file.mkdir();} else {try {Thread.sleep(1000);existOrCreateDirectory(currentDay, false);} catch (InterruptedException e) {e.printStackTrace();}}filePathCache.put(FilePathCache.DIRECTORY + currentDay, directory);return directory;}/*** 文件是否存在,不存在创建* 一个文件过大,影响写入性能,默认35M重新生成一个文件** @param currentDay* @param vehicleId* @param directory* @param fileName* @return 返回文件路径*/private String existOrCreateFile(String currentDay, String vehicleId, String directory, String fileName) {String filePath = filePathCache.get(FilePathCache.FILE + currentDay + ":" + vehicleId);if (StringUtils.isNotEmpty(filePath)) {File file = new File(filePath);if (file.isFile() && file.length() <= MAX_FILE_LENGTH) {return filePath;}}String newDirectory = directory + "/" + NetworkUtil.IP_REPLACE_DOT + "/" + System.currentTimeMillis();File directoryFile = new File(newDirectory);directoryFile.mkdirs();String fileNamePath = newDirectory + "/" + fileName;File file = new File(fileNamePath);try {file.createNewFile();filePathCache.put(FilePathCache.FILE + currentDay + ":" + vehicleId, fileNamePath);} catch (IOException e) {log.error("创建文件失败 fileNamePath: {}", fileNamePath, e);return null;}return fileNamePath;}public static String formatDate(Long timestamp, String pattern) {DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);return getLocalDateTimeByLong(timestamp).format(formatter);}public static LocalDateTime getLocalDateTimeByLong(Long timestamp) {long nanoOfSecond = (timestamp % 1000) * 1000000;LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(timestamp / 1000, (int) nanoOfSecond, getSystemZoneOffset());return localDateTime;}public static long getTimestampByLocalDateTime(LocalDateTime localDateTime) {return localDateTime.toEpochSecond(getSystemZoneOffset()) * 1000;}public static ZoneOffset getSystemZoneOffset() {return ZoneOffset.of("+" + TimeZone.getDefault().getRawOffset() / 3600 / 1000);}
}

(二)、文件整理

package com.suyun.file.service.task.service;import com.alibaba.fastjson.JSONObject;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.suyun.file.service.cache.VersionCanSignalCache;
import com.suyun.file.service.helper.CsvHelper;
import com.suyun.file.service.helper.OSSHelper;
import com.suyun.file.service.listen.VehicleDataKafkaListener;
import com.suyun.file.service.util.FileUtils;
import com.suyun.vehicle.api.dto.BatchExportInfoDto;
import com.suyun.vehicle.api.dto.VersionCanSignalDto;
import com.suyun.vehicle.api.service.IBatchExportInfoService;
import com.suyun.vehicle.api.service.ICanProtocolService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;/*** Description:* <p>** </p>** @Author: leo.xiong* @CreateDate: 2022/8/19 14:10* @Email: leo.xiong@suyun360.com* @Since:*/
@Slf4j
@Service
public class ZipFileTaskService {/*** 最大版本大小*/private static final int VERSION_SIZE = 8;/*** 列数*/private static final int NUMBER_OF_COLUMNS = 600;/*** 单位数*/private static final int NUMBER_OF_UNITS = 100;/*** 行数*/private static final int NUMBER_OF_ROWS = 45000;@Autowiredprivate VehicleDataKafkaListener vehicleDataKafkaListener;@Resource(name = "executorZipService")private ExecutorService executorZipService;@Autowiredprivate VersionCanSignalCache versionCanSignalCache;@Autowiredprivate ICanProtocolService canProtocolService;@Autowiredprivate IBatchExportInfoService batchExportInfoService;@Autowiredprivate CsvHelper csvHelper;@Autowiredprivate OSSHelper ossHelper;private static final int MAX_FAILED_TRY_TIME = 3;/*** 整理数据突然中断导致的错误,删除CSV,重新整理*/private static final int COLLATE_DATA_ERROR = 2;public ProcessResult process(String preDay, AtomicInteger times) {String directory = vehicleDataKafkaListener.existOrCreateDirectory(preDay, false);if (StringUtils.isEmpty(directory)) {log.warn("文件夹不存在 directory: {}", directory);return new ProcessResult(false);}File directoryFile = new File(directory);Collection<File> fileList = FileUtils.listFiles(directoryFile, new String[]{"txt", "csv"}, true);if (CollectionUtils.isEmpty(fileList)) {log.warn("文件不存在");return new ProcessResult(false);}Map<String, List<File>> fileNameListMap = fileList.parallelStream().collect(Collectors.groupingBy(file -> {String fileName = file.getName();String fileNameValue = fileName.split(preDay)[0] + preDay + "000000" + "~" + preDay + "235959" + ")";return fileNameValue;}));Map<String, String> fileFailedMsgMap = Maps.newHashMap();Set<String> exportSuccessSet = new HashSet<>(500);Set<String> exportFailedSet = Sets.newHashSet();executorZipService.execute(() -> {processTxt(directory, fileNameListMap, fileFailedMsgMap, exportSuccessSet, exportFailedSet);Collection<File> txtFileList = FileUtils.listFiles(directoryFile, new String[]{"txt"}, true);if (CollectionUtils.isEmpty(txtFileList)) {zipFileList(preDay, directory, fileFailedMsgMap, exportSuccessSet, exportFailedSet);return;}if (times.get() < MAX_FAILED_TRY_TIME) {times.addAndGet(1);process(preDay, times);return;}for (File file : txtFileList) {String plateNo = file.getName().substring(0, file.getName().length() - 35);fileFailedMsgMap.put(plateNo, "txt文件解析失败");exportFailedSet.add(plateNo);}zipFileList(preDay, directory, fileFailedMsgMap, exportSuccessSet, exportFailedSet);});return new ProcessResult(true);}private void processTxt(String directory, Map<String, List<File>> fileNameListMap, Map<String, String> fileFailedMsgMap, Set<String> exportSuccessSet, Set<String> exportFailedSet) {fileNameListMap.forEach((fileNameValue, files) -> {long beginTime = System.currentTimeMillis();log.info("process file name: {}", fileNameValue);String plateNo = fileNameValue.substring(0, fileNameValue.length() - 31);try {List<File> txtFileList = Lists.newArrayListWithExpectedSize(files.size());if (files.size() == 1) {File fileOne = files.get(0);if (fileOne.length() == 0) {fileOne.delete();return;}if (fileOne.getName().endsWith(VehicleDataKafkaListener.CSV)) {return;}txtFileList.add(fileOne);} else {for (File file : files) {if (file.length() == 0) {file.delete();continue;}if (file.getName().endsWith(VehicleDataKafkaListener.CSV)) {file.delete();continue;}txtFileList.add(file);}}if (CollectionUtils.isEmpty(txtFileList)) {return;}organizeFile(txtFileList, directory, fileNameValue);exportSuccessSet.add(plateNo);} catch (Exception e) {log.warn("数据处理失败 fileNameValue: {}", fileNameValue, e);fileFailedMsgMap.put(fileNameValue, e.getMessage());exportFailedSet.add(plateNo);}log.info("process file name: {} use time: {} ms", fileNameValue, System.currentTimeMillis() - beginTime);});}/*** 整理数据,整理完成,删除数据** @param fileList* @param directory* @param fileNameValue*/private void organizeFile(List<File> fileList, String directory, String fileNameValue) {try {List<String> contentList = readLine(fileList);//使用到的版本,code,signalName映射Map<String, Map<String, String>> versionIdCodeSignalNameMap = new HashMap<>(VERSION_SIZE);//多个版本合并后的信号大小,作为头部信息(需要固定列顺序,所以不用Set)List<String> headerList = new ArrayList<>(NUMBER_OF_COLUMNS);//存在值的列Set<String> validHeaderSet = new HashSet<>(NUMBER_OF_COLUMNS);//需要补充单位信息Map<String, String> signalNameUnitMap = new HashMap<>(NUMBER_OF_UNITS);//行数Map<Long, Map<String, String>> timestampValuesMap = new HashMap<>(NUMBER_OF_ROWS);for (String content : contentList) {if (StringUtils.isEmpty(content)) {continue;}Map<String, String> codeValueMap = null;try {codeValueMap = JSONObject.parseObject(content, Map.class);} catch (Exception e) {log.warn("content parse error: {}", content);continue;}String versionId = codeValueMap.get(VehicleDataKafkaListener.VERSION_ID);if (StringUtils.isEmpty(versionId)) {continue;}String dateTime = codeValueMap.get(VehicleDataKafkaListener.DATE_TIME);if (StringUtils.isEmpty(dateTime)) {continue;}List<VersionCanSignalDto> canSignalByVersionIdList = versionCanSignalCache.get(versionId);if (canSignalByVersionIdList == null) {canSignalByVersionIdList = canProtocolService.findCanSignalByVersionIdList(Lists.newArrayList(versionId));if (CollectionUtils.isEmpty(canSignalByVersionIdList)) {versionCanSignalCache.put(versionId, Collections.EMPTY_LIST);continue;}versionCanSignalCache.put(versionId, canSignalByVersionIdList);} else if (CollectionUtils.isEmpty(canSignalByVersionIdList)) {continue;}Map<String, String> codeSignalNameMap = null;if (!versionIdCodeSignalNameMap.containsKey(versionId)) {codeSignalNameMap = new HashMap<>(NUMBER_OF_COLUMNS);versionIdCodeSignalNameMap.put(versionId, codeSignalNameMap);for (VersionCanSignalDto versionCanSignalDto : canSignalByVersionIdList) {if (headerList.contains(versionCanSignalDto.getSignalName())) {continue;}headerList.add(versionCanSignalDto.getSignalName());codeSignalNameMap.put(versionCanSignalDto.getCode(), versionCanSignalDto.getSignalName());if (StringUtils.isEmpty(versionCanSignalDto.getUnit())) {continue;}signalNameUnitMap.put(versionCanSignalDto.getSignalName(), versionCanSignalDto.getUnit());}} else {codeSignalNameMap = versionIdCodeSignalNameMap.get(versionId);}Long timestamp = Long.valueOf(dateTime);Map<String, String> signalNameValueMap = timestampValuesMap.get(timestamp);if (signalNameValueMap == null) {signalNameValueMap = new HashMap<>(NUMBER_OF_COLUMNS);timestampValuesMap.put(timestamp, signalNameValueMap);}for (Map.Entry<String, String> codeValueEntry : codeSignalNameMap.entrySet()) {String code = codeValueEntry.getKey();String value = codeValueMap.get(code);if (StringUtils.isEmpty(value)) {continue;}String signalName = codeValueEntry.getValue();validHeaderSet.add(signalName);signalNameValueMap.put(signalName, value);}}//如果不存在有值的信号信息,直接返回if (CollectionUtils.isEmpty(validHeaderSet)) {for (File file : fileList) {file.delete();}return;}writeCsv(directory, fileNameValue, headerList, validHeaderSet, timestampValuesMap, signalNameUnitMap);//删除TXT文件for (File file : fileList) {file.delete();}} catch (Exception e) {log.warn("整理文件错误 filePath: {}", fileList.get(0).getAbsolutePath(), e);}}private List<String> readLine(List<File> fileList) throws IOException {List<String> contentList = new ArrayList<>(60000);for (File file : fileList) {List<String> readList = FileUtils.readLines(file, StandardCharsets.UTF_8);if (CollectionUtils.isEmpty(readList)) {continue;}contentList.addAll(readList);}return contentList;}/*** 数据写入CSV文件** @param directory* @param fileNameValue* @param headerList* @param validHeaderSet* @param timestampValuesMap* @param signalNameUnitMap*/private void writeCsv(String directory, String fileNameValue, List<String> headerList, Set<String> validHeaderSet, Map<Long, Map<String, String>> timestampValuesMap, Map<String, String> signalNameUnitMap) {headerList.removeIf(header -> !validHeaderSet.contains(header));Map<Long, Map<String, String>> sortMap = Maps.newLinkedHashMapWithExpectedSize(timestampValuesMap.size());//根据Key升序排序MaptimestampValuesMap.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEachOrdered(x -> sortMap.put(x.getKey(), x.getValue()));String[] headers = new String[headerList.size()];//头部字段需要加上单位信息[单位];String[] headerUnits = new String[headerList.size() + 2];headerUnits[0] = "time";headerUnits[1] = "上报时间";for (int i = 0, len = headerList.size(); i < len; i++) {headers[i] = headerList.get(i);String unit = signalNameUnitMap.get(headers[i]);if (StringUtils.isEmpty(unit)) {headerUnits[i + 2] = headers[i];} else {headerUnits[i + 2] = headers[i] + "[" + unit + "]";}}List<String[]> valueList = Lists.newArrayListWithExpectedSize(sortMap.size());AtomicInteger atomicInteger = new AtomicInteger(-1);sortMap.forEach((timestamp, signalValueMap) -> {String[] values = new String[headerUnits.length];boolean isValidFlag = false;for (int i = 0; i < headers.length; i++) {String value = signalValueMap.get(headers[i]);if (StringUtils.isNotEmpty(value)) {if (!isValidFlag) {isValidFlag = true;}values[i + 2] = value;} else {values[i + 2] = "";}}//如果没有一个有效值,此行数据无需写入if (!isValidFlag) {return;}values[0] = String.valueOf(atomicInteger.addAndGet(1));values[1] = VehicleDataKafkaListener.formatDate(timestamp, VehicleDataKafkaListener.TIME_FORMAT_Y_M_D) + "\t";valueList.add(values);});csvHelper.writeToCsv(directory, fileNameValue + VehicleDataKafkaListener.CSV, headerUnits, valueList);}/*** 压缩文件,保存记录** @param preDay* @param directory* @param fileFailedMsgMap* @param exportSuccessSet* @param exportFailedSet* @return*/private ProcessResult zipFileList(String preDay, String directory, Map<String, String> fileFailedMsgMap, Set<String> exportSuccessSet, Set<String> exportFailedSet) {BatchExportInfoDto batchExportInfoDto = new BatchExportInfoDto();batchExportInfoDto.setDataDate(preDay);if (!CollectionUtils.isEmpty(exportSuccessSet)) {batchExportInfoDto.setSuccessPlateNo(StringUtils.join(exportSuccessSet, ","));}if (!CollectionUtils.isEmpty(exportFailedSet)) {batchExportInfoDto.setFailedPlateNo(StringUtils.join(exportFailedSet, ","));}if (CollectionUtils.isEmpty(fileFailedMsgMap)) {File directoryFile = new File(directory);Collection<File> fileList = FileUtils.listFiles(directoryFile, new String[]{"csv"}, true);FileUtils.zipFiles(directoryFile.getParent() + "/" + preDay + ".zip", fileList);FileUtils.deleteDirectory(directory);batchExportInfoDto.setFileName(preDay + ".zip");batchExportInfoDto.setUrl(ossHelper.getHttpOssPrefix(VehicleDataKafkaListener.OSS_BUCKET_NAME, VehicleDataKafkaListener.DAY_EXPORT_DIR) + batchExportInfoDto.getFileName());batchExportInfoDto.setStatus("1");batchExportInfoService.saveOrUpdate(batchExportInfoDto);return new ProcessResult(true);}fileFailedMsgMap.forEach((fileName, errorMsg) -> {log.warn("处理失败文件信息 fileName: {} errorMsg: {}", fileName, errorMsg);});batchExportInfoDto.setStatus("0");batchExportInfoService.saveOrUpdate(batchExportInfoDto);return new ProcessResult(true);}
}

一次JAVA频繁写大文件的记录相关推荐

  1. Java高效读取大文件(转)

    Java高效读取大文件 1.概述 本教程将演示如何用Java高效地读取大文件.这篇文章是Baeldung(http://www.baeldung.com/) 上"Java--回归基础&quo ...

  2. java写大文件_java实现超大文件的读写功能

    对于几百M或上G的大文件可使用java nio进行读写 , 根据个人的需求 可能需要将一个超大文件读写形成很多较小的文件进行分析,这也不是什么难事,在读完一个缓冲区后 更换写入的对象即可,本文就不做详 ...

  3. java实现对大文件切割下载_Java实现大文件的切割与合并操作示例

    Java实现大文件的切割与合并操作示例 发布时间:2020-09-27 02:25:08 来源:脚本之家 阅读:99 作者:HiBoyljw 本文实例讲述了Java实现大文件的切割与合并操作.分享给大 ...

  4. java内存中读文件_关于内存管理:读取Java中的大文件

    我需要一个非常了解Java和内存问题的人的建议. 我有一个大文件(大约1.5GB),我需要将此文件切成许多小文件(例如100个小文件). 我通常知道如何做到这一点(使用BufferedReader), ...

  5. java读取excel大文件

    在读取excel大文件的时候就不能再使用poi包下面的Workbook类,会造成OOM等问题. 我们常见的excel分为xlsx格式和csv格式.分别实现一下. xlsx格式处理. 需要的pom依赖: ...

  6. java jxl 写 excel文件_使用jxl读和写Excel文件(速成)

    同行整理,简单明了,快速上手! =============================正文1============================ 最近和Excel频繁亲密接触,主要将N个Exc ...

  7. Java高效读取大文件

    1.概述 本教程将演示如何用Java高效地读取大文件.这篇文章是Baeldung(http://www.baeldung.com/) 上"Java--回归基础"系列教程的一部分. ...

  8. java 多进程写一个文件_java高并发多线程及多进程同时写入文件研究

    测试&思考: 环境:windows 七.linux centos 6.三.java8html java多线程同时写一个文件 java高并发环境下多线程同时写入一个文件时, 经过 FileLoc ...

  9. java上传大文件_Java超大文件上传解决办法

    这里只写后端的代码,基本的思想就是,前端将文件分片,然后每次访问上传接口的时候,向后端传入参数:当前为第几块文件,和分片总数 下面直接贴代码吧,一些难懂的我大部分都加上注释了: 上传文件实体类: 看得 ...

最新文章

  1. Cortex-M3 异常中断向量表
  2. python中函数的括号使用
  3. Java 关键字和语句
  4. vue下的props,data
  5. poj1201/zoj1508/hdu1384 Intervals(差分约束)
  6. 【WC2018】通道【边分治】【虚树】【树的直径】
  7. 前端学习(1978)vue之电商管理系统电商系统之为每一行数据提供单独的value
  8. 没有bug队——加贝——Python 练习实例 17,18
  9. 三角网导线平差实例_导线平差实例(一):简易平差
  10. 外卖行业现状分析_2019年中国外卖行业市场现状与发展趋势分析
  11. 给大家推荐个jquery api 3.2 在线手册地址
  12. repl_backlog原理是什么
  13. 系列微课|《Python程序设计(第3版)》第4章
  14. oracle数据库之数据的增删改以及简单的单表查询
  15. vb.net中关于byval和byref的使用
  16. android 电视 网上邻居,手机要怎么连接安卓智能电视或电视盒子
  17. mysql联合唯一约束_MySQL中添加唯一约束和联合唯一约束
  18. 服务器终端输出,服务器与终端数据交互程序
  19. GBase8S_RSS配置
  20. 开帖:做网络试验如何选择模拟器eNSP,PNET,EVE-NG,GNS3,cisco

热门文章

  1. php判断明天是不是星期六,PHP获取时间排除周六、周日的两个方法_PHP教程
  2. 什么是“大区直播”?“大区模式”的前世今生与兴起!
  3. html的相对位置和绝对位置的理解,搭配float使用。
  4. 警察局里的专业人像摄影
  5. MyBatis-Plus之注解
  6. 科学计算机记忆方法,关于科学记忆的方法讲解
  7. 清远2020年大数据相关产业产值超过200亿
  8. 【机器学习原理】决策树分类算法
  9. 《Steam平台热销VR商品》(Yanlz+Unity+XR+VR+AR+MR+SteamVR+Valve+Oculus+Vive+热销商品+排行榜+推荐商品+Top+立钻哥哥+==)
  10. 打电话用蓝牙耳机什么牌子好?打电话清晰的蓝牙耳机推荐