问题分析:
公司需求有个超大的数据表,需要进行数据迁移备份,而且不能影响线上的正常使用,不能直接新建一张表,然后把这张表替换掉,这样做没办法减少服务器磁盘的占用,现在一张表的站20G 磁盘的空间,在网上查阅大量资料后决定使用flink 来抽取数据。
1.思路
数据过于庞大,在查询的过程中会很慢,姑每次查询10000个 数据主键id ,然后根据主键id 在循环遍历查询出每一条数据,最后通过flink 插入到其他标中去。为了提高速度,查询出的10000个id 会被按每1000个数据 一个线程分割,启动10个线程来分别处理,迁移后会把数据从原表删除,批量删除1000条,会导致too many connection ,表太大事务处理卡死。姑把数据的id 都放到mq队列中去,一个一个的排队删除。
@Service
public class FlinkService
{
private int size = 1000;
@Autowired
private DialogueRecordMapper dialogueRecordMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
public void start(int orgId,String date) throws InterruptedException {
List<Long> records = null ;
List<Long> recordIds = dialogueRecordMapper.selectRecordId(orgId,date);
if(recordIds.size()==0)
{
return;
}
int num = recordIds.size()/size+1;
try {
for (int i = 0; i < num; i++) {
if (recordIds.size() < size) {
records = recordIds;
} else if (i + 1 == num) {
records = recordIds.subList(i * size, recordIds.size());
} else {
records = recordIds.subList(i * size, (i + 1) * size);
}
new ThreadPoolExecutor(10, size, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<>()).execute(new DataMoveThread(records, dialogueRecordMapper,rabbitTemplate));
}
}catch (Exception e){
return;
}
Thread.sleep(1000*500);
start(orgId,date);
}
Thread.sleep 的时间尽量长一点,能够让数据被删除完毕,睡眠后递归调用启动方法。相当于定时器,有时间的可以改成quartz 来实现
2.flink 插入线程
public class DataMoveThread implements Runnable
{
private List<Long> ids;
private DialogueRecordMapper dialogueRecordMapper;
private RabbitTemplate rabbitTemplate;
public DataMoveThread(List<Long> ids,DialogueRecordMapper dialogueRecordMapper,RabbitTemplate rabbitTemplate) {
this.ids = ids;
this.dialogueRecordMapper = dialogueRecordMapper;
this.rabbitTemplate = rabbitTemplate;
}
@SneakyThrows
@Override
@Transactional(rollbackFor = Exception.class)
public void run() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<DialogueRecordDto> dtos = new ArrayList<>();
ids.forEach(d->{
dtos.add(dialogueRecordMapper.selectById(d));
});
DataStream<DialogueRecordDto> dataStream= env
.fromElements(dtos)
.flatMap(new FlatMapFunction< List<DialogueRecordDto>, DialogueRecordDto>() {
@Override
public void flatMap(List<DialogueRecordDto> value, Collector<DialogueRecordDto> out)
throws Exception {
for(DialogueRecordDto dto: value){
out.collect(dto);
}
}
});
dataStream.addSink(JdbcSink.sink(
"sql插入语句",
(ps, t) -> {
ps.setLong(1, t.getId());
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("url")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withPassword("password")
.withUsername("账号")
.build()));
env.execute();
ids.forEach(d->{
rabbitTemplate.convertAndSend("flink-queue",d);
});
}
}
版权声明:本文为qaz__01原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。