18 changed files with 770 additions and 295 deletions
@ -1,44 +0,0 @@ |
|||||||
package com.biutag.supervision.controller.datav; |
|
||||||
|
|
||||||
import com.biutag.supervision.pojo.Result; |
|
||||||
import io.swagger.v3.oas.annotations.Operation; |
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag; |
|
||||||
import jakarta.annotation.Resource; |
|
||||||
import lombok.RequiredArgsConstructor; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.web.bind.annotation.GetMapping; |
|
||||||
import org.springframework.web.bind.annotation.RequestMapping; |
|
||||||
import org.springframework.web.bind.annotation.RequestParam; |
|
||||||
import org.springframework.web.bind.annotation.RestController; |
|
||||||
|
|
||||||
import java.util.List; |
|
||||||
import java.util.Map; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author: sh |
|
||||||
* @date: 20224/11/5 |
|
||||||
*/ |
|
||||||
@RestController |
|
||||||
@RequiredArgsConstructor |
|
||||||
@RequestMapping("datav/dataScreen") |
|
||||||
public class DataScreenController { |
|
||||||
|
|
||||||
|
|
||||||
// private final DataScreenService dataScreenService;
|
|
||||||
|
|
||||||
|
|
||||||
/** |
|
||||||
* todo 获取地图数据 |
|
||||||
*/ |
|
||||||
@GetMapping("/mapData") |
|
||||||
public Result<List<Map<String, Object>>> mapData(@RequestParam Map<String, Object> params) { |
|
||||||
if (params.isEmpty()) { |
|
||||||
// return Result.success(dataScreenService.mapData());
|
|
||||||
} else { |
|
||||||
String deptId = params.get("id").toString(); |
|
||||||
// return Result.success(dataScreenService.mapCountyData(deptId));
|
|
||||||
} |
|
||||||
return null; |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
@ -0,0 +1,22 @@ |
|||||||
|
package com.biutag.supervision.mapper; |
||||||
|
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||||
|
import com.biutag.supervision.pojo.entity.DataMigration; |
||||||
|
import org.apache.ibatis.annotations.Select; |
||||||
|
|
||||||
|
/** |
||||||
|
* @author 舒云 |
||||||
|
* @description 针对表【data_migration】的数据库操作Mapper |
||||||
|
* @createDate 2024-11-13 11:21:19 |
||||||
|
* @Entity generator.domain.DataMigration |
||||||
|
*/ |
||||||
|
public interface DataMigrationMapper extends BaseMapper<DataMigration> { |
||||||
|
|
||||||
|
@Select("SELECT * FROM data_migration WHERE end_time = (SELECT MAX(end_time) FROM data_migration)") |
||||||
|
DataMigration selectLastMigrationTime(); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -0,0 +1,21 @@ |
|||||||
|
package com.biutag.supervision.mapper; |
||||||
|
|
||||||
|
import com.baomidou.dynamic.datasource.annotation.DS; |
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||||
|
import com.biutag.supervision.pojo.entity.DataPoliceMeeting; |
||||||
|
import org.apache.ibatis.annotations.Mapper; |
||||||
|
|
||||||
|
/** |
||||||
|
* @author 舒云 |
||||||
|
* @description 针对表【data_police_meeting】的数据库操作Mapper |
||||||
|
* @createDate 2024-11-13 09:43:04 |
||||||
|
*/ |
||||||
|
@DS("master") |
||||||
|
@Mapper |
||||||
|
public interface DataPoliceMeetingMysqlMapper extends BaseMapper<DataPoliceMeeting> { |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -0,0 +1,37 @@ |
|||||||
|
package com.biutag.supervision.mapper; |
||||||
|
|
||||||
|
import com.baomidou.dynamic.datasource.annotation.DS; |
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||||
|
import com.biutag.supervision.pojo.entity.DataPoliceMeetingV; |
||||||
|
import org.apache.ibatis.annotations.Mapper; |
||||||
|
import org.apache.ibatis.annotations.Select; |
||||||
|
|
||||||
|
import java.time.LocalDateTime; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* @author 舒云 |
||||||
|
* @createDate 2024-11-13 09:43:04 |
||||||
|
*/ |
||||||
|
@DS("test1") |
||||||
|
@Mapper |
||||||
|
public interface DataPoliceMeetingSqlServerMapper extends BaseMapper<DataPoliceMeetingV> { |
||||||
|
|
||||||
|
@Select("select max(create_time) from data_police_meeting") |
||||||
|
LocalDateTime selectMaxCreateTime(); |
||||||
|
|
||||||
|
@Select("select * from data_police_meeting where create_time > #{lastSelectTime} and create_time <= #{now}") |
||||||
|
List<DataPoliceMeetingV> selectAimList(LocalDateTime lastSelectTime, LocalDateTime now); |
||||||
|
|
||||||
|
|
||||||
|
@Select("SELECT * FROM SampleResultPoolView " + |
||||||
|
"WHERE create_time > #{lastSelectTime} AND create_time <= #{now} " + |
||||||
|
"ORDER BY create_time " + |
||||||
|
"OFFSET #{offset} ROWS FETCH NEXT #{pageSize} ROWS ONLY") |
||||||
|
List<DataPoliceMeetingV> selectAimListWithPagination(LocalDateTime lastSelectTime, LocalDateTime now, int offset, int pageSize); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -0,0 +1,57 @@ |
|||||||
|
package com.biutag.supervision.pojo.entity; |
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableField; |
||||||
|
import com.baomidou.mybatisplus.annotation.TableId; |
||||||
|
import com.baomidou.mybatisplus.annotation.TableName; |
||||||
|
import lombok.Data; |
||||||
|
|
||||||
|
import java.io.Serializable; |
||||||
|
import java.time.LocalDateTime; |
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
/** |
||||||
|
* 数据库迁移记录 |
||||||
|
* @TableName data_migration |
||||||
|
*/ |
||||||
|
@TableName(value ="data_migration") |
||||||
|
@Data |
||||||
|
public class DataMigration implements Serializable { |
||||||
|
/** |
||||||
|
* 数据迁移操作id |
||||||
|
*/ |
||||||
|
@TableId(value = "id") |
||||||
|
private Integer id; |
||||||
|
|
||||||
|
/** |
||||||
|
* 来源库 |
||||||
|
*/ |
||||||
|
@TableField(value = "target") |
||||||
|
private String target; |
||||||
|
|
||||||
|
/** |
||||||
|
* 开始时间 |
||||||
|
*/ |
||||||
|
@TableField(value = "start_time") |
||||||
|
private LocalDateTime startTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* 结束时间 |
||||||
|
*/ |
||||||
|
@TableField(value = "end_time") |
||||||
|
private LocalDateTime endTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* 迁移数量 |
||||||
|
*/ |
||||||
|
@TableField(value = "migration_count") |
||||||
|
private String migrationCount; |
||||||
|
|
||||||
|
/** |
||||||
|
* 目标表最大时间 |
||||||
|
*/ |
||||||
|
@TableField(value = "target_max_time") |
||||||
|
private LocalDateTime targetMaxTime; |
||||||
|
|
||||||
|
@TableField(exist = false) |
||||||
|
private static final long serialVersionUID = 1L; |
||||||
|
} |
||||||
@ -0,0 +1,103 @@ |
|||||||
|
package com.biutag.supervision.pojo.entity; |
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableField; |
||||||
|
import com.baomidou.mybatisplus.annotation.TableName; |
||||||
|
import lombok.Data; |
||||||
|
|
||||||
|
import java.io.Serializable; |
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
/** |
||||||
|
* mysql中negative中data_police_meeting表的实体类 |
||||||
|
* @TableName data_police_meeting |
||||||
|
*/ |
||||||
|
@TableName(value ="data_police_meeting") |
||||||
|
@Data |
||||||
|
public class DataPoliceMeeting implements Serializable { |
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "sample_id") |
||||||
|
private Integer sampleId; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "name") |
||||||
|
private String name; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "visit_name") |
||||||
|
private String visitName; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "visit_phone") |
||||||
|
private String visitPhone; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "organize_id") |
||||||
|
private String organizeId; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "org_name") |
||||||
|
private String orgName; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "business_id") |
||||||
|
private String businessId; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "business_name") |
||||||
|
private String businessName; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "sample_time") |
||||||
|
private Date sampleTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "create_time") |
||||||
|
private Date createTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "sample_content") |
||||||
|
private String sampleContent; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "visitime") |
||||||
|
private Date visitime; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "result_name") |
||||||
|
private String resultName; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "code") |
||||||
|
private String code; |
||||||
|
|
||||||
|
@TableField(exist = false) |
||||||
|
private static final long serialVersionUID = 1L; |
||||||
|
} |
||||||
@ -0,0 +1,103 @@ |
|||||||
|
package com.biutag.supervision.pojo.entity; |
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableField; |
||||||
|
import com.baomidou.mybatisplus.annotation.TableName; |
||||||
|
import lombok.Data; |
||||||
|
|
||||||
|
import java.io.Serializable; |
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
/** |
||||||
|
* sqlServer数据库SampleResultPoolView视图实体类 |
||||||
|
* @TableName SampleResultPoolView |
||||||
|
*/ |
||||||
|
@TableName(value ="SampleResultPoolView") |
||||||
|
@Data |
||||||
|
public class DataPoliceMeetingV implements Serializable { |
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "sample_id") |
||||||
|
private Integer sampleId; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "name") |
||||||
|
private String name; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "visit_name") |
||||||
|
private String visitName; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "visit_phone") |
||||||
|
private String visitPhone; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "organize_id") |
||||||
|
private String organizeId; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "org_name") |
||||||
|
private String orgName; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "business_id") |
||||||
|
private String businessId; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "business_name") |
||||||
|
private String businessName; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "sample_time") |
||||||
|
private Date sampleTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "create_time") |
||||||
|
private Date createTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "sample_content") |
||||||
|
private String sampleContent; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "visitime") |
||||||
|
private Date visitime; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "result_name") |
||||||
|
private String resultName; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
@TableField(value = "code") |
||||||
|
private String code; |
||||||
|
|
||||||
|
@TableField(exist = false) |
||||||
|
private static final long serialVersionUID = 1L; |
||||||
|
} |
||||||
@ -0,0 +1,160 @@ |
|||||||
|
package com.biutag.supervision.service; |
||||||
|
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
||||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
||||||
|
import com.biutag.supervision.mapper.DataMigrationMapper; |
||||||
|
import com.biutag.supervision.mapper.DataPoliceMeetingMysqlMapper; |
||||||
|
import com.biutag.supervision.mapper.DataPoliceMeetingSqlServerMapper; |
||||||
|
import com.biutag.supervision.pojo.entity.DataMigration; |
||||||
|
import com.biutag.supervision.pojo.entity.DataPoliceMeeting; |
||||||
|
import com.biutag.supervision.pojo.entity.DataPoliceMeetingV; |
||||||
|
import jakarta.annotation.PostConstruct; |
||||||
|
import jakarta.annotation.Resource; |
||||||
|
import org.springframework.beans.BeanUtils; |
||||||
|
import org.springframework.beans.BeansException; |
||||||
|
import org.springframework.scheduling.annotation.Async; |
||||||
|
import org.springframework.scheduling.annotation.EnableAsync; |
||||||
|
import org.springframework.scheduling.annotation.Scheduled; |
||||||
|
import org.springframework.stereotype.Service; |
||||||
|
import org.springframework.transaction.annotation.Transactional; |
||||||
|
|
||||||
|
import java.text.DateFormat; |
||||||
|
import java.time.LocalDateTime; |
||||||
|
import java.time.format.DateTimeFormatter; |
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.Date; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Optional; |
||||||
|
import java.util.concurrent.CompletableFuture; |
||||||
|
import java.util.stream.Stream; |
||||||
|
|
||||||
|
/** |
||||||
|
* @author 舒云 |
||||||
|
* @description 针对表【data_migration】的数据库操作Service实现 |
||||||
|
* @createDate 2024-11-13 11:21:19 |
||||||
|
*/ |
||||||
|
@Service |
||||||
|
//@EnableAsync
|
||||||
|
public class DataMigrationServiceImpl extends ServiceImpl<DataMigrationMapper, DataMigration> { |
||||||
|
|
||||||
|
@Resource |
||||||
|
private DataPoliceMeetingMysqlMapper dataPoliceMeetingMysqlMapper; |
||||||
|
@Resource |
||||||
|
private DataPoliceMeetingSqlServerMapper dataPoliceMeetingSqlServerMapper; |
||||||
|
|
||||||
|
@Resource |
||||||
|
private DataMigrationMapper dataMigrationMapper; // 迁移记录
|
||||||
|
|
||||||
|
private static final int BATCH_SIZE = 1000; // 每次处理的记录数
|
||||||
|
|
||||||
|
|
||||||
|
public List<DataPoliceMeetingV> getDataPoliceMeetingFromSqlServer(LocalDateTime lastSelectTime, LocalDateTime now) { |
||||||
|
return dataPoliceMeetingSqlServerMapper.selectAimList(lastSelectTime, now); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
// @PostConstruct
|
||||||
|
// public void startMigration() {
|
||||||
|
// CompletableFuture.runAsync(() -> migrateData());
|
||||||
|
// }
|
||||||
|
|
||||||
|
/* @Async |
||||||
|
public void migrateDataAsync() { |
||||||
|
migrateData(); |
||||||
|
}*/ |
||||||
|
// @Scheduled(cron = "*/10 * * * * ?")
|
||||||
|
|
||||||
|
@Async |
||||||
|
public void migrateData() { |
||||||
|
|
||||||
|
LocalDateTime startTime = LocalDateTime.now(); // 开始迁移时间
|
||||||
|
// 1. 查询数据库中最后一次迁移的时间
|
||||||
|
DataMigration lastMigration = dataMigrationMapper.selectLastMigrationTime(); |
||||||
|
LocalDateTime lastMigrationTime = (lastMigration == null || lastMigration.getTargetMaxTime() == null) |
||||||
|
? LocalDateTime.of(1970, 1, 1, 0, 0) |
||||||
|
: lastMigration.getTargetMaxTime(); |
||||||
|
|
||||||
|
// 2、准备数据
|
||||||
|
int pageNum = 1; |
||||||
|
int pageSize = 1000; |
||||||
|
Long total = 0L; |
||||||
|
List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList; |
||||||
|
do { |
||||||
|
dataPoliceMeetingFromSqlServerList = getDataPoliceMeetingFromSqlServer(lastMigrationTime, LocalDateTime.now(), pageNum, pageSize); |
||||||
|
// 3、开始新的迁移
|
||||||
|
boolean migrationSuccessful = updateMigration(dataPoliceMeetingFromSqlServerList); |
||||||
|
total += dataPoliceMeetingFromSqlServerList.size(); |
||||||
|
if (!migrationSuccessful) { |
||||||
|
System.out.println("数据迁移失败"); |
||||||
|
return; |
||||||
|
} |
||||||
|
pageNum++; |
||||||
|
} while (!dataPoliceMeetingFromSqlServerList.isEmpty()); |
||||||
|
LocalDateTime endTime = LocalDateTime.now(); |
||||||
|
|
||||||
|
// updateMigrationRecord(startTime, endTime, dataPoliceMeetingFromSqlServerList);
|
||||||
|
|
||||||
|
|
||||||
|
DataMigration dataMigration = new DataMigration(); |
||||||
|
dataMigration.setTarget("sqlServer"); |
||||||
|
dataMigration.setStartTime(startTime); |
||||||
|
dataMigration.setEndTime(endTime); |
||||||
|
dataMigration.setMigrationCount(total.toString()); |
||||||
|
LocalDateTime maxCreateTime = dataPoliceMeetingSqlServerMapper.selectMaxCreateTime(); |
||||||
|
dataMigration.setTargetMaxTime(maxCreateTime); |
||||||
|
dataMigrationMapper.insert(dataMigration); |
||||||
|
|
||||||
|
|
||||||
|
System.out.println("数据迁移成功"); |
||||||
|
} |
||||||
|
|
||||||
|
private boolean updateMigration(List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList) { |
||||||
|
try { |
||||||
|
|
||||||
|
for (DataPoliceMeetingV dataPoliceMeetingV : dataPoliceMeetingFromSqlServerList) { |
||||||
|
DataPoliceMeeting dataPoliceMeeting = new DataPoliceMeeting(); |
||||||
|
BeanUtils.copyProperties(dataPoliceMeetingV, dataPoliceMeeting); |
||||||
|
dataPoliceMeetingMysqlMapper.insert(dataPoliceMeeting); |
||||||
|
} |
||||||
|
return true; |
||||||
|
} catch (BeansException e) { |
||||||
|
return false; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// 大数据分页查询
|
||||||
|
public List<DataPoliceMeetingV> getDataPoliceMeetingFromSqlServer(LocalDateTime lastSelectTime, LocalDateTime now, int pageNum, int pageSize) { |
||||||
|
int offset = (pageNum - 1) * pageSize; |
||||||
|
return dataPoliceMeetingSqlServerMapper.selectAimListWithPagination(lastSelectTime, now, offset, pageSize); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
private void updateMigrationRecord(LocalDateTime startTime, LocalDateTime endTime, List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList) { |
||||||
|
DataMigration dataMigration = new DataMigration(); |
||||||
|
dataMigration.setTarget("sqlServer"); |
||||||
|
dataMigration.setStartTime(startTime); |
||||||
|
dataMigration.setEndTime(endTime); |
||||||
|
dataMigration.setMigrationCount(String.valueOf(dataPoliceMeetingFromSqlServerList.size())); |
||||||
|
LocalDateTime maxCreateTime = dataPoliceMeetingSqlServerMapper.selectMaxCreateTime(); |
||||||
|
dataMigration.setTargetMaxTime(maxCreateTime); |
||||||
|
dataMigrationMapper.insert(dataMigration); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* boolean migrationSuccessful = updateMigration(dataPoliceMeetingFromSqlServerList); |
||||||
|
* if (migrationSuccessful) { |
||||||
|
* LocalDateTime endTime = LocalDateTime.now(); // 结束迁移时间
|
||||||
|
* // 4、更新迁移记录
|
||||||
|
* updateMigrationRecord(startTime, endTime, dataPoliceMeetingFromSqlServerList); |
||||||
|
* System.out.println("数据迁移成功"); |
||||||
|
* } else { |
||||||
|
* System.out.println("数据迁移失败"); |
||||||
|
* } |
||||||
|
*/ |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Loading…
Reference in new issue