|
|
|
|
@ -1,128 +1,47 @@
|
|
|
|
|
package com.biutag.supervision.service; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import cn.hutool.core.util.StrUtil; |
|
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
|
|
|
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; |
|
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
|
|
|
|
import com.biutag.supervision.mapper.DataMigrationMapper; |
|
|
|
|
import com.biutag.supervision.mapper.DataPoliceMeetingMysqlMapper; |
|
|
|
|
import com.biutag.supervision.mapper.DataPoliceMeetingSqlServerMapper; |
|
|
|
|
import com.biutag.supervision.pojo.entity.DataMigration; |
|
|
|
|
import com.biutag.supervision.pojo.entity.DataPoliceMeeting; |
|
|
|
|
import com.biutag.supervision.pojo.entity.DataPoliceMeetingV; |
|
|
|
|
import jakarta.annotation.Resource; |
|
|
|
|
import org.springframework.beans.BeanUtils; |
|
|
|
|
import org.springframework.beans.BeansException; |
|
|
|
|
import com.biutag.supervision.pojo.model.DataMigrationModel; |
|
|
|
|
import com.biutag.supervision.pojo.param.DataMigrationQueryParam; |
|
|
|
|
import lombok.RequiredArgsConstructor; |
|
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
|
|
|
|
|
|
import java.time.LocalDateTime; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.List; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @author 舒云 |
|
|
|
|
* @description 针对表【data_migration】的数据库操作Service实现 |
|
|
|
|
* @createDate 2024-11-13 11:21:19 |
|
|
|
|
*/ |
|
|
|
|
@Slf4j |
|
|
|
|
@RequiredArgsConstructor |
|
|
|
|
@Service |
|
|
|
|
//@EnableAsync
|
|
|
|
|
public class DataMigrationServiceImpl extends ServiceImpl<DataMigrationMapper, DataMigration> { |
|
|
|
|
|
|
|
|
|
@Resource |
|
|
|
|
private DataPoliceMeetingMysqlMapper dataPoliceMeetingMysqlMapper; |
|
|
|
|
@Resource |
|
|
|
|
private DataPoliceMeetingSqlServerMapper dataPoliceMeetingSqlServerMapper; |
|
|
|
|
|
|
|
|
|
@Resource |
|
|
|
|
private DataMigrationMapper dataMigrationMapper; // 迁移记录
|
|
|
|
|
|
|
|
|
|
private static final int BATCH_SIZE = 1000; // 每次处理的记录数
|
|
|
|
|
|
|
|
|
|
public List<DataPoliceMeetingV> getDataPoliceMeetingFromSqlServer(LocalDateTime lastSelectTime, LocalDateTime now) { |
|
|
|
|
return dataPoliceMeetingSqlServerMapper.selectAimList(lastSelectTime, now); |
|
|
|
|
} |
|
|
|
|
// @PostConstruct
|
|
|
|
|
// public void startMigration() {
|
|
|
|
|
// CompletableFuture.runAsync(() -> migrateData());
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
/* @Async |
|
|
|
|
public void migrateDataAsync() { |
|
|
|
|
migrateData(); |
|
|
|
|
}*/ |
|
|
|
|
// @Scheduled(cron = "*/10 * * * * ?")
|
|
|
|
|
|
|
|
|
|
// @Async
|
|
|
|
|
public void migrateData() { |
|
|
|
|
LocalDateTime startTime = LocalDateTime.now(); // 开始迁移时间
|
|
|
|
|
// 1. 查询数据库中最后一次迁移的时间
|
|
|
|
|
DataMigration lastMigration = dataMigrationMapper.selectLastMigrationTime(); |
|
|
|
|
LocalDateTime lastMigrationTime = (lastMigration == null || lastMigration.getTargetMaxTime() == null) |
|
|
|
|
? LocalDateTime.of(1970, 1, 1, 0, 0) |
|
|
|
|
: lastMigration.getTargetMaxTime(); |
|
|
|
|
// 2、准备数据
|
|
|
|
|
int pageNum = 1; |
|
|
|
|
int pageSize = 1000; |
|
|
|
|
int total = 0; |
|
|
|
|
List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList; |
|
|
|
|
do { |
|
|
|
|
dataPoliceMeetingFromSqlServerList = getDataPoliceMeetingFromSqlServer(lastMigrationTime, LocalDateTime.now(), pageNum, pageSize); |
|
|
|
|
System.out.println("查询到" + dataPoliceMeetingFromSqlServerList.size() + "条数据"); |
|
|
|
|
// 3、开始新的迁移
|
|
|
|
|
boolean migrationSuccessful = updateMigration(dataPoliceMeetingFromSqlServerList); |
|
|
|
|
total += dataPoliceMeetingFromSqlServerList.size(); |
|
|
|
|
if (!migrationSuccessful) { |
|
|
|
|
System.out.println("数据迁移失败"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
pageNum++; |
|
|
|
|
} while (!dataPoliceMeetingFromSqlServerList.isEmpty()); |
|
|
|
|
LocalDateTime endTime = LocalDateTime.now(); |
|
|
|
|
// updateMigrationRecord(startTime, endTime, dataPoliceMeetingFromSqlServerList);
|
|
|
|
|
DataMigration dataMigration = new DataMigration(); |
|
|
|
|
dataMigration.setTarget("sqlServer"); |
|
|
|
|
dataMigration.setStartTime(startTime); |
|
|
|
|
dataMigration.setEndTime(endTime); |
|
|
|
|
dataMigration.setMigrationCount(String.valueOf(total)); |
|
|
|
|
LocalDateTime maxCreateTime = dataPoliceMeetingSqlServerMapper.selectMaxCreateTime(); |
|
|
|
|
dataMigration.setTargetMaxTime(maxCreateTime); |
|
|
|
|
dataMigrationMapper.insert(dataMigration); |
|
|
|
|
System.out.println("数据迁移成功"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean updateMigration(List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList) { |
|
|
|
|
try { |
|
|
|
|
if (!dataPoliceMeetingFromSqlServerList.isEmpty()) { |
|
|
|
|
List<DataPoliceMeeting> dataPoliceMeetingList = new ArrayList<>(dataPoliceMeetingFromSqlServerList.size()); |
|
|
|
|
for (DataPoliceMeetingV dataPoliceMeetingV : dataPoliceMeetingFromSqlServerList) { |
|
|
|
|
DataPoliceMeeting dataPoliceMeeting = new DataPoliceMeeting(); |
|
|
|
|
BeanUtils.copyProperties(dataPoliceMeetingV, dataPoliceMeeting); |
|
|
|
|
dataPoliceMeetingList.add(dataPoliceMeeting); |
|
|
|
|
} |
|
|
|
|
System.out.println("准备插入" + dataPoliceMeetingList.size() + "条数据"); |
|
|
|
|
dataPoliceMeetingMysqlMapper.insert(dataPoliceMeetingList); |
|
|
|
|
System.out.println("插入成功"); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} catch (BeansException e) { |
|
|
|
|
return false; |
|
|
|
|
public class DataMigrationServiceImpl extends ServiceImpl<DataPoliceMeetingMysqlMapper, DataPoliceMeeting> { |
|
|
|
|
public Page<DataMigrationModel> page(DataMigrationQueryParam dataMigrationQueryParam) { |
|
|
|
|
QueryWrapper<DataPoliceMeeting> queryWrapper = new QueryWrapper<>(); |
|
|
|
|
queryWrapper.like(dataMigrationQueryParam.getSampleId() != null, "sample_id", dataMigrationQueryParam.getSampleId()) |
|
|
|
|
.like(StrUtil.isNotBlank(dataMigrationQueryParam.getName()), "name", dataMigrationQueryParam.getName()) |
|
|
|
|
.like(StrUtil.isNotBlank(dataMigrationQueryParam.getVisitName()), "visit_name", dataMigrationQueryParam.getVisitName()) |
|
|
|
|
.like(StrUtil.isNotBlank(dataMigrationQueryParam.getVisitPhone()), "visit_phone", dataMigrationQueryParam.getVisitPhone()) |
|
|
|
|
.like(StrUtil.isNotBlank(dataMigrationQueryParam.getOrgName()), "org_name", dataMigrationQueryParam.getOrgName()) |
|
|
|
|
.like(StrUtil.isNotBlank(dataMigrationQueryParam.getBusinessName()), "business_name", dataMigrationQueryParam.getBusinessName()) |
|
|
|
|
.orderBy(StrUtil.isNotBlank(dataMigrationQueryParam.getSortField()), dataMigrationQueryParam.getSortOrder().equals("ASC"), changeSortField(dataMigrationQueryParam.getSortField())); |
|
|
|
|
if (dataMigrationQueryParam.getSampleTime().size() == 2) { |
|
|
|
|
queryWrapper.between("sample_time", dataMigrationQueryParam.getSampleTime().get(0), dataMigrationQueryParam.getSampleTime().get(1)); |
|
|
|
|
} |
|
|
|
|
if (dataMigrationQueryParam.getVisitTime().size() == 2) { |
|
|
|
|
queryWrapper.between("visit_time", dataMigrationQueryParam.getVisitTime().get(0), dataMigrationQueryParam.getVisitTime().get(1)); |
|
|
|
|
} |
|
|
|
|
return baseMapper.queryPage(Page.of(dataMigrationQueryParam.getCurrent(), dataMigrationQueryParam.getSize()), queryWrapper); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 大数据分页查询
|
|
|
|
|
public List<DataPoliceMeetingV> getDataPoliceMeetingFromSqlServer(LocalDateTime lastSelectTime, LocalDateTime now, int pageNum, int pageSize) { |
|
|
|
|
int offset = (pageNum - 1) * pageSize; |
|
|
|
|
System.out.println("开始查询sqlserver,offset:" + offset); |
|
|
|
|
return dataPoliceMeetingSqlServerMapper.selectAimListWithPagination(lastSelectTime, now, offset, pageSize); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void updateMigrationRecord(LocalDateTime startTime, LocalDateTime endTime, List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList) { |
|
|
|
|
DataMigration dataMigration = new DataMigration(); |
|
|
|
|
dataMigration.setTarget("sqlServer"); |
|
|
|
|
dataMigration.setStartTime(startTime); |
|
|
|
|
dataMigration.setEndTime(endTime); |
|
|
|
|
dataMigration.setMigrationCount(String.valueOf(dataPoliceMeetingFromSqlServerList.size())); |
|
|
|
|
LocalDateTime maxCreateTime = dataPoliceMeetingSqlServerMapper.selectMaxCreateTime(); |
|
|
|
|
dataMigration.setTargetMaxTime(maxCreateTime); |
|
|
|
|
dataMigrationMapper.insert(dataMigration); |
|
|
|
|
private String changeSortField(String sortField) { |
|
|
|
|
if (sortField.equals("sampleTime")) { |
|
|
|
|
return "sample_time"; |
|
|
|
|
} else if (sortField.equals("visitTime")) { |
|
|
|
|
return "visit_time"; |
|
|
|
|
} |
|
|
|
|
return ""; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|