15 changed files with 755 additions and 279 deletions
@ -1,44 +0,0 @@
|
||||
package com.biutag.supervision.controller.datav; |
||||
|
||||
import com.biutag.supervision.pojo.Result; |
||||
import io.swagger.v3.oas.annotations.Operation; |
||||
import io.swagger.v3.oas.annotations.tags.Tag; |
||||
import jakarta.annotation.Resource; |
||||
import lombok.RequiredArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.web.bind.annotation.GetMapping; |
||||
import org.springframework.web.bind.annotation.RequestMapping; |
||||
import org.springframework.web.bind.annotation.RequestParam; |
||||
import org.springframework.web.bind.annotation.RestController; |
||||
|
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* @author: sh |
||||
* @date: 20224/11/5 |
||||
*/ |
||||
@RestController |
||||
@RequiredArgsConstructor |
||||
@RequestMapping("datav/dataScreen") |
||||
public class DataScreenController { |
||||
|
||||
|
||||
// private final DataScreenService dataScreenService;
|
||||
|
||||
|
||||
/** |
||||
* todo 获取地图数据 |
||||
*/ |
||||
@GetMapping("/mapData") |
||||
public Result<List<Map<String, Object>>> mapData(@RequestParam Map<String, Object> params) { |
||||
if (params.isEmpty()) { |
||||
// return Result.success(dataScreenService.mapData());
|
||||
} else { |
||||
String deptId = params.get("id").toString(); |
||||
// return Result.success(dataScreenService.mapCountyData(deptId));
|
||||
} |
||||
return null; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,22 @@
|
||||
package com.biutag.supervision.mapper; |
||||
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
import com.biutag.supervision.pojo.entity.DataMigration; |
||||
import org.apache.ibatis.annotations.Select; |
||||
|
||||
/** |
||||
* @author 舒云 |
||||
* @description 针对表【data_migration】的数据库操作Mapper |
||||
* @createDate 2024-11-13 11:21:19 |
||||
* @Entity generator.domain.DataMigration |
||||
*/ |
||||
public interface DataMigrationMapper extends BaseMapper<DataMigration> { |
||||
|
||||
@Select("SELECT * FROM data_migration WHERE end_time = (SELECT MAX(end_time) FROM data_migration)") |
||||
DataMigration selectLastMigrationTime(); |
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,21 @@
|
||||
package com.biutag.supervision.mapper; |
||||
|
||||
import com.baomidou.dynamic.datasource.annotation.DS; |
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
import com.biutag.supervision.pojo.entity.DataPoliceMeeting; |
||||
import org.apache.ibatis.annotations.Mapper; |
||||
|
||||
/** |
||||
* @author 舒云 |
||||
* @description 针对表【data_police_meeting】的数据库操作Mapper |
||||
* @createDate 2024-11-13 09:43:04 |
||||
*/ |
||||
@DS("master") |
||||
@Mapper |
||||
public interface DataPoliceMeetingMysqlMapper extends BaseMapper<DataPoliceMeeting> { |
||||
|
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,37 @@
|
||||
package com.biutag.supervision.mapper; |
||||
|
||||
import com.baomidou.dynamic.datasource.annotation.DS; |
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
import com.biutag.supervision.pojo.entity.DataPoliceMeetingV; |
||||
import org.apache.ibatis.annotations.Mapper; |
||||
import org.apache.ibatis.annotations.Select; |
||||
|
||||
import java.time.LocalDateTime; |
||||
import java.util.List; |
||||
|
||||
|
||||
/** |
||||
* @author 舒云 |
||||
* @createDate 2024-11-13 09:43:04 |
||||
*/ |
||||
@DS("test1") |
||||
@Mapper |
||||
public interface DataPoliceMeetingSqlServerMapper extends BaseMapper<DataPoliceMeetingV> { |
||||
|
||||
@Select("select max(create_time) from data_police_meeting") |
||||
LocalDateTime selectMaxCreateTime(); |
||||
|
||||
@Select("select * from data_police_meeting where create_time > #{lastSelectTime} and create_time <= #{now}") |
||||
List<DataPoliceMeetingV> selectAimList(LocalDateTime lastSelectTime, LocalDateTime now); |
||||
|
||||
|
||||
@Select("SELECT * FROM SampleResultPoolView " + |
||||
"WHERE create_time > #{lastSelectTime} AND create_time <= #{now} " + |
||||
"ORDER BY create_time " + |
||||
"OFFSET #{offset} ROWS FETCH NEXT #{pageSize} ROWS ONLY") |
||||
List<DataPoliceMeetingV> selectAimListWithPagination(LocalDateTime lastSelectTime, LocalDateTime now, int offset, int pageSize); |
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,57 @@
|
||||
package com.biutag.supervision.pojo.entity; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableField; |
||||
import com.baomidou.mybatisplus.annotation.TableId; |
||||
import com.baomidou.mybatisplus.annotation.TableName; |
||||
import lombok.Data; |
||||
|
||||
import java.io.Serializable; |
||||
import java.time.LocalDateTime; |
||||
import java.util.Date; |
||||
|
||||
/** |
||||
* 数据库迁移记录 |
||||
* @TableName data_migration |
||||
*/ |
||||
@TableName(value ="data_migration") |
||||
@Data |
||||
public class DataMigration implements Serializable { |
||||
/** |
||||
* 数据迁移操作id |
||||
*/ |
||||
@TableId(value = "id") |
||||
private Integer id; |
||||
|
||||
/** |
||||
* 来源库 |
||||
*/ |
||||
@TableField(value = "target") |
||||
private String target; |
||||
|
||||
/** |
||||
* 开始时间 |
||||
*/ |
||||
@TableField(value = "start_time") |
||||
private LocalDateTime startTime; |
||||
|
||||
/** |
||||
* 结束时间 |
||||
*/ |
||||
@TableField(value = "end_time") |
||||
private LocalDateTime endTime; |
||||
|
||||
/** |
||||
* 迁移数量 |
||||
*/ |
||||
@TableField(value = "migration_count") |
||||
private String migrationCount; |
||||
|
||||
/** |
||||
* 目标表最大时间 |
||||
*/ |
||||
@TableField(value = "target_max_time") |
||||
private LocalDateTime targetMaxTime; |
||||
|
||||
@TableField(exist = false) |
||||
private static final long serialVersionUID = 1L; |
||||
} |
||||
@ -0,0 +1,103 @@
|
||||
package com.biutag.supervision.pojo.entity; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableField; |
||||
import com.baomidou.mybatisplus.annotation.TableName; |
||||
import lombok.Data; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.Date; |
||||
|
||||
/** |
||||
* mysql中negative中data_police_meeting表的实体类 |
||||
* @TableName data_police_meeting |
||||
*/ |
||||
@TableName(value ="data_police_meeting") |
||||
@Data |
||||
public class DataPoliceMeeting implements Serializable { |
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "sample_id") |
||||
private Integer sampleId; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "name") |
||||
private String name; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "visit_name") |
||||
private String visitName; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "visit_phone") |
||||
private String visitPhone; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "organize_id") |
||||
private String organizeId; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "org_name") |
||||
private String orgName; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "business_id") |
||||
private String businessId; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "business_name") |
||||
private String businessName; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "sample_time") |
||||
private Date sampleTime; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "create_time") |
||||
private Date createTime; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "sample_content") |
||||
private String sampleContent; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "visitime") |
||||
private Date visitime; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "result_name") |
||||
private String resultName; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "code") |
||||
private String code; |
||||
|
||||
@TableField(exist = false) |
||||
private static final long serialVersionUID = 1L; |
||||
} |
||||
@ -0,0 +1,103 @@
|
||||
package com.biutag.supervision.pojo.entity; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableField; |
||||
import com.baomidou.mybatisplus.annotation.TableName; |
||||
import lombok.Data; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.Date; |
||||
|
||||
/** |
||||
* sqlServer数据库SampleResultPoolView视图实体类 |
||||
* @TableName SampleResultPoolView |
||||
*/ |
||||
@TableName(value ="SampleResultPoolView") |
||||
@Data |
||||
public class DataPoliceMeetingV implements Serializable { |
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "sample_id") |
||||
private Integer sampleId; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "name") |
||||
private String name; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "visit_name") |
||||
private String visitName; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "visit_phone") |
||||
private String visitPhone; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "organize_id") |
||||
private String organizeId; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "org_name") |
||||
private String orgName; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "business_id") |
||||
private String businessId; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "business_name") |
||||
private String businessName; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "sample_time") |
||||
private Date sampleTime; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "create_time") |
||||
private Date createTime; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "sample_content") |
||||
private String sampleContent; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "visitime") |
||||
private Date visitime; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "result_name") |
||||
private String resultName; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
@TableField(value = "code") |
||||
private String code; |
||||
|
||||
@TableField(exist = false) |
||||
private static final long serialVersionUID = 1L; |
||||
} |
||||
@ -0,0 +1,160 @@
|
||||
package com.biutag.supervision.service; |
||||
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
||||
import com.biutag.supervision.mapper.DataMigrationMapper; |
||||
import com.biutag.supervision.mapper.DataPoliceMeetingMysqlMapper; |
||||
import com.biutag.supervision.mapper.DataPoliceMeetingSqlServerMapper; |
||||
import com.biutag.supervision.pojo.entity.DataMigration; |
||||
import com.biutag.supervision.pojo.entity.DataPoliceMeeting; |
||||
import com.biutag.supervision.pojo.entity.DataPoliceMeetingV; |
||||
import jakarta.annotation.PostConstruct; |
||||
import jakarta.annotation.Resource; |
||||
import org.springframework.beans.BeanUtils; |
||||
import org.springframework.beans.BeansException; |
||||
import org.springframework.scheduling.annotation.Async; |
||||
import org.springframework.scheduling.annotation.EnableAsync; |
||||
import org.springframework.scheduling.annotation.Scheduled; |
||||
import org.springframework.stereotype.Service; |
||||
import org.springframework.transaction.annotation.Transactional; |
||||
|
||||
import java.text.DateFormat; |
||||
import java.time.LocalDateTime; |
||||
import java.time.format.DateTimeFormatter; |
||||
import java.util.ArrayList; |
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.stream.Stream; |
||||
|
||||
/** |
||||
* @author 舒云 |
||||
* @description 针对表【data_migration】的数据库操作Service实现 |
||||
* @createDate 2024-11-13 11:21:19 |
||||
*/ |
||||
@Service |
||||
//@EnableAsync
|
||||
public class DataMigrationServiceImpl extends ServiceImpl<DataMigrationMapper, DataMigration> { |
||||
|
||||
@Resource |
||||
private DataPoliceMeetingMysqlMapper dataPoliceMeetingMysqlMapper; |
||||
@Resource |
||||
private DataPoliceMeetingSqlServerMapper dataPoliceMeetingSqlServerMapper; |
||||
|
||||
@Resource |
||||
private DataMigrationMapper dataMigrationMapper; // 迁移记录
|
||||
|
||||
private static final int BATCH_SIZE = 1000; // 每次处理的记录数
|
||||
|
||||
|
||||
public List<DataPoliceMeetingV> getDataPoliceMeetingFromSqlServer(LocalDateTime lastSelectTime, LocalDateTime now) { |
||||
return dataPoliceMeetingSqlServerMapper.selectAimList(lastSelectTime, now); |
||||
} |
||||
|
||||
|
||||
@PostConstruct |
||||
public void startMigration() { |
||||
CompletableFuture.runAsync(() -> migrateData()); |
||||
} |
||||
|
||||
/* @Async |
||||
public void migrateDataAsync() { |
||||
migrateData(); |
||||
}*/ |
||||
// @Scheduled(cron = "*/10 * * * * ?")
|
||||
|
||||
@Async |
||||
public void migrateData() { |
||||
|
||||
LocalDateTime startTime = LocalDateTime.now(); // 开始迁移时间
|
||||
// 1. 查询数据库中最后一次迁移的时间
|
||||
DataMigration lastMigration = dataMigrationMapper.selectLastMigrationTime(); |
||||
LocalDateTime lastMigrationTime = (lastMigration == null || lastMigration.getTargetMaxTime() == null) |
||||
? LocalDateTime.of(1970, 1, 1, 0, 0) |
||||
: lastMigration.getTargetMaxTime(); |
||||
|
||||
// 2、准备数据
|
||||
int pageNum = 1; |
||||
int pageSize = 1000; |
||||
Long total = 0L; |
||||
List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList; |
||||
do { |
||||
dataPoliceMeetingFromSqlServerList = getDataPoliceMeetingFromSqlServer(lastMigrationTime, LocalDateTime.now(), pageNum, pageSize); |
||||
// 3、开始新的迁移
|
||||
boolean migrationSuccessful = updateMigration(dataPoliceMeetingFromSqlServerList); |
||||
total += dataPoliceMeetingFromSqlServerList.size(); |
||||
if (!migrationSuccessful) { |
||||
System.out.println("数据迁移失败"); |
||||
return; |
||||
} |
||||
pageNum++; |
||||
} while (!dataPoliceMeetingFromSqlServerList.isEmpty()); |
||||
LocalDateTime endTime = LocalDateTime.now(); |
||||
|
||||
// updateMigrationRecord(startTime, endTime, dataPoliceMeetingFromSqlServerList);
|
||||
|
||||
|
||||
DataMigration dataMigration = new DataMigration(); |
||||
dataMigration.setTarget("sqlServer"); |
||||
dataMigration.setStartTime(startTime); |
||||
dataMigration.setEndTime(endTime); |
||||
dataMigration.setMigrationCount(total.toString()); |
||||
LocalDateTime maxCreateTime = dataPoliceMeetingSqlServerMapper.selectMaxCreateTime(); |
||||
dataMigration.setTargetMaxTime(maxCreateTime); |
||||
dataMigrationMapper.insert(dataMigration); |
||||
|
||||
|
||||
System.out.println("数据迁移成功"); |
||||
} |
||||
|
||||
private boolean updateMigration(List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList) { |
||||
try { |
||||
|
||||
for (DataPoliceMeetingV dataPoliceMeetingV : dataPoliceMeetingFromSqlServerList) { |
||||
DataPoliceMeeting dataPoliceMeeting = new DataPoliceMeeting(); |
||||
BeanUtils.copyProperties(dataPoliceMeetingV, dataPoliceMeeting); |
||||
dataPoliceMeetingMysqlMapper.insert(dataPoliceMeeting); |
||||
} |
||||
return true; |
||||
} catch (BeansException e) { |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
// 大数据分页查询
|
||||
public List<DataPoliceMeetingV> getDataPoliceMeetingFromSqlServer(LocalDateTime lastSelectTime, LocalDateTime now, int pageNum, int pageSize) { |
||||
int offset = (pageNum - 1) * pageSize; |
||||
return dataPoliceMeetingSqlServerMapper.selectAimListWithPagination(lastSelectTime, now, offset, pageSize); |
||||
} |
||||
|
||||
|
||||
private void updateMigrationRecord(LocalDateTime startTime, LocalDateTime endTime, List<DataPoliceMeetingV> dataPoliceMeetingFromSqlServerList) { |
||||
DataMigration dataMigration = new DataMigration(); |
||||
dataMigration.setTarget("sqlServer"); |
||||
dataMigration.setStartTime(startTime); |
||||
dataMigration.setEndTime(endTime); |
||||
dataMigration.setMigrationCount(String.valueOf(dataPoliceMeetingFromSqlServerList.size())); |
||||
LocalDateTime maxCreateTime = dataPoliceMeetingSqlServerMapper.selectMaxCreateTime(); |
||||
dataMigration.setTargetMaxTime(maxCreateTime); |
||||
dataMigrationMapper.insert(dataMigration); |
||||
} |
||||
|
||||
/** |
||||
* boolean migrationSuccessful = updateMigration(dataPoliceMeetingFromSqlServerList); |
||||
* if (migrationSuccessful) { |
||||
* LocalDateTime endTime = LocalDateTime.now(); // 结束迁移时间
|
||||
* // 4、更新迁移记录
|
||||
* updateMigrationRecord(startTime, endTime, dataPoliceMeetingFromSqlServerList); |
||||
* System.out.println("数据迁移成功"); |
||||
* } else { |
||||
* System.out.println("数据迁移失败"); |
||||
* } |
||||
*/ |
||||
|
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in new issue