|
|
|
|
@ -1,7 +1,6 @@
|
|
|
|
|
package com.biutag.supervision.service; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
|
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
|
|
|
|
import com.biutag.supervision.mapper.DataMigrationMapper; |
|
|
|
|
import com.biutag.supervision.mapper.DataPoliceMeetingMysqlMapper; |
|
|
|
|
@ -9,25 +8,14 @@ import com.biutag.supervision.mapper.DataPoliceMeetingSqlServerMapper;
|
|
|
|
|
import com.biutag.supervision.pojo.entity.DataMigration; |
|
|
|
|
import com.biutag.supervision.pojo.entity.DataPoliceMeeting; |
|
|
|
|
import com.biutag.supervision.pojo.entity.DataPoliceMeetingV; |
|
|
|
|
import jakarta.annotation.PostConstruct; |
|
|
|
|
import jakarta.annotation.Resource; |
|
|
|
|
import org.springframework.beans.BeanUtils; |
|
|
|
|
import org.springframework.beans.BeansException; |
|
|
|
|
import org.springframework.scheduling.annotation.Async; |
|
|
|
|
import org.springframework.scheduling.annotation.EnableAsync; |
|
|
|
|
import org.springframework.scheduling.annotation.Scheduled; |
|
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
|
import org.springframework.transaction.annotation.Transactional; |
|
|
|
|
|
|
|
|
|
import java.text.DateFormat; |
|
|
|
|
import java.time.LocalDateTime; |
|
|
|
|
import java.time.format.DateTimeFormatter; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Date; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Optional; |
|
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
|
import java.util.stream.Stream; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @author 舒云 |
|
|
|
|
@ -48,12 +36,9 @@ public class DataMigrationServiceImpl extends ServiceImpl<DataMigrationMapper, D
|
|
|
|
|
|
|
|
|
|
private static final int BATCH_SIZE = 1000; // 每次处理的记录数
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public List<DataPoliceMeetingV> getDataPoliceMeetingFromSqlServer(LocalDateTime lastSelectTime, LocalDateTime now) { |
|
|
|
|
return dataPoliceMeetingSqlServerMapper.selectAimList(lastSelectTime, now); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// @PostConstruct
|
|
|
|
|
// public void startMigration() {
|
|
|
|
|
// CompletableFuture.runAsync(() -> migrateData());
|
|
|
|
|
@ -67,21 +52,20 @@ public class DataMigrationServiceImpl extends ServiceImpl<DataMigrationMapper, D
|
|
|
|
|
|
|
|
|
|
// @Async
|
|
|
|
|
public void migrateData() { |
|
|
|
|
|
|
|
|
|
LocalDateTime startTime = LocalDateTime.now(); // 开始迁移时间
|
|
|
|
|
// 1. 查询数据库中最后一次迁移的时间
|
|
|
|
|
DataMigration lastMigration = dataMigrationMapper.selectLastMigrationTime(); |
|
|
|
|
LocalDateTime lastMigrationTime = (lastMigration == null || lastMigration.getTargetMaxTime() == null) |
|
|
|
|
? LocalDateTime.of(1970, 1, 1, 0, 0) |
|
|
|
|
: lastMigration.getTargetMaxTime(); |
|
|
|
|
|
|
|
|
|
// 2、准备数据
|
|
|
|
|
int pageNum = 1; |
|
|
|
|
int pageSize = 1000; |
|
|
|
|
Long total = 0L; |
|
|
|
|
int total = 0; |
|
|
|
|
List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList; |
|
|
|
|
do { |
|
|
|
|
dataPoliceMeetingFromSqlServerList = getDataPoliceMeetingFromSqlServer(lastMigrationTime, LocalDateTime.now(), pageNum, pageSize); |
|
|
|
|
System.out.println("查询到" + dataPoliceMeetingFromSqlServerList.size() + "条数据"); |
|
|
|
|
// 3、开始新的迁移
|
|
|
|
|
boolean migrationSuccessful = updateMigration(dataPoliceMeetingFromSqlServerList); |
|
|
|
|
total += dataPoliceMeetingFromSqlServerList.size(); |
|
|
|
|
@ -92,30 +76,30 @@ public class DataMigrationServiceImpl extends ServiceImpl<DataMigrationMapper, D
|
|
|
|
|
pageNum++; |
|
|
|
|
} while (!dataPoliceMeetingFromSqlServerList.isEmpty()); |
|
|
|
|
LocalDateTime endTime = LocalDateTime.now(); |
|
|
|
|
|
|
|
|
|
// updateMigrationRecord(startTime, endTime, dataPoliceMeetingFromSqlServerList);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DataMigration dataMigration = new DataMigration(); |
|
|
|
|
dataMigration.setTarget("sqlServer"); |
|
|
|
|
dataMigration.setStartTime(startTime); |
|
|
|
|
dataMigration.setEndTime(endTime); |
|
|
|
|
dataMigration.setMigrationCount(total.toString()); |
|
|
|
|
dataMigration.setMigrationCount(String.valueOf(total)); |
|
|
|
|
LocalDateTime maxCreateTime = dataPoliceMeetingSqlServerMapper.selectMaxCreateTime(); |
|
|
|
|
dataMigration.setTargetMaxTime(maxCreateTime); |
|
|
|
|
dataMigrationMapper.insert(dataMigration); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
System.out.println("数据迁移成功"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean updateMigration(List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList) { |
|
|
|
|
try { |
|
|
|
|
|
|
|
|
|
for (DataPoliceMeetingV dataPoliceMeetingV : dataPoliceMeetingFromSqlServerList) { |
|
|
|
|
DataPoliceMeeting dataPoliceMeeting = new DataPoliceMeeting(); |
|
|
|
|
BeanUtils.copyProperties(dataPoliceMeetingV, dataPoliceMeeting); |
|
|
|
|
dataPoliceMeetingMysqlMapper.insert(dataPoliceMeeting); |
|
|
|
|
if (!dataPoliceMeetingFromSqlServerList.isEmpty()) { |
|
|
|
|
List<DataPoliceMeeting> dataPoliceMeetingList = new ArrayList<>(dataPoliceMeetingFromSqlServerList.size()); |
|
|
|
|
for (DataPoliceMeetingV dataPoliceMeetingV : dataPoliceMeetingFromSqlServerList) { |
|
|
|
|
DataPoliceMeeting dataPoliceMeeting = new DataPoliceMeeting(); |
|
|
|
|
BeanUtils.copyProperties(dataPoliceMeetingV, dataPoliceMeeting); |
|
|
|
|
dataPoliceMeetingList.add(dataPoliceMeeting); |
|
|
|
|
} |
|
|
|
|
System.out.println("准备插入" + dataPoliceMeetingList.size() + "条数据"); |
|
|
|
|
dataPoliceMeetingMysqlMapper.insert(dataPoliceMeetingList); |
|
|
|
|
System.out.println("插入成功"); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} catch (BeansException e) { |
|
|
|
|
@ -126,10 +110,10 @@ public class DataMigrationServiceImpl extends ServiceImpl<DataMigrationMapper, D
|
|
|
|
|
// 大数据分页查询
|
|
|
|
|
public List<DataPoliceMeetingV> getDataPoliceMeetingFromSqlServer(LocalDateTime lastSelectTime, LocalDateTime now, int pageNum, int pageSize) { |
|
|
|
|
int offset = (pageNum - 1) * pageSize; |
|
|
|
|
System.out.println("开始查询sqlserver,offset:" + offset); |
|
|
|
|
return dataPoliceMeetingSqlServerMapper.selectAimListWithPagination(lastSelectTime, now, offset, pageSize); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void updateMigrationRecord(LocalDateTime startTime, LocalDateTime endTime, List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList) { |
|
|
|
|
DataMigration dataMigration = new DataMigration(); |
|
|
|
|
dataMigration.setTarget("sqlServer"); |
|
|
|
|
@ -140,9 +124,6 @@ public class DataMigrationServiceImpl extends ServiceImpl<DataMigrationMapper, D
|
|
|
|
|
dataMigration.setTargetMaxTime(maxCreateTime); |
|
|
|
|
dataMigrationMapper.insert(dataMigration); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|