springboot + flink 抽取 3000万左右的mysql 表到另外一张表

  • Post author:
  • Post category:mysql

问题分析:

公司需求有个超大的数据表,需要进行数据迁移备份,而且不能影响线上的正常使用,不能直接新建一张表,然后把这张表替换掉,这样做没办法减少服务器磁盘的占用,现在一张表的站20G 磁盘的空间,在网上查阅大量资料后决定使用flink 来抽取数据。

1.思路

数据过于庞大,在查询的过程中会很慢,姑每次查询10000个 数据主键id ,然后根据主键id 在循环遍历查询出每一条数据,最后通过flink 插入到其他标中去。为了提高速度,查询出的10000个id 会被按每1000个数据 一个线程分割,启动10个线程来分别处理,迁移后会把数据从原表删除,批量删除1000条,会导致too many connection ,表太大事务处理卡死。姑把数据的id 都放到mq队列中去,一个一个的排队删除。

@Service
public class FlinkService
{

     private int size = 1000;

     @Autowired
     private DialogueRecordMapper dialogueRecordMapper;

     @Autowired
     private RabbitTemplate rabbitTemplate;


     public void start(int orgId,String date) throws InterruptedException {

          List<Long> records = null ;
          List<Long> recordIds =  dialogueRecordMapper.selectRecordId(orgId,date);
          if(recordIds.size()==0)
          {
               return;
          }
          int num = recordIds.size()/size+1;
          try {
               for (int i = 0; i < num; i++) {
                    if (recordIds.size() < size) {
                         records = recordIds;
                    } else if (i + 1 == num) {
                         records = recordIds.subList(i * size, recordIds.size());
                    } else {
                         records = recordIds.subList(i * size, (i + 1) * size);
                    }
                    new ThreadPoolExecutor(10, size, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<>()).execute(new DataMoveThread(records, dialogueRecordMapper,rabbitTemplate));
               }
          }catch (Exception e){
               return;
          }
          Thread.sleep(1000*500);
          start(orgId,date);
     }

Thread.sleep 的时间尽量长一点,能够让数据被删除完毕,睡眠后递归调用启动方法。相当于定时器,有时间的可以改成quartz 来实现

2.flink 插入线程

public class DataMoveThread implements Runnable
{
    private  List<Long> ids;

    private DialogueRecordMapper dialogueRecordMapper;


    private RabbitTemplate rabbitTemplate;

    public DataMoveThread(List<Long> ids,DialogueRecordMapper dialogueRecordMapper,RabbitTemplate rabbitTemplate) {
        this.ids = ids;
        this.dialogueRecordMapper = dialogueRecordMapper;
        this.rabbitTemplate = rabbitTemplate;
    }

    @SneakyThrows
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void run() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<DialogueRecordDto> dtos = new ArrayList<>();
        ids.forEach(d->{
              dtos.add(dialogueRecordMapper.selectById(d));
        });

        DataStream<DialogueRecordDto> dataStream= env
                .fromElements(dtos)
                .flatMap(new FlatMapFunction< List<DialogueRecordDto>, DialogueRecordDto>() {
                    @Override
                    public void flatMap(List<DialogueRecordDto> value, Collector<DialogueRecordDto> out)
                            throws Exception {
                        for(DialogueRecordDto dto: value){
                            out.collect(dto);
                        }
                    }
                });

        dataStream.addSink(JdbcSink.sink(
                   "sql插入语句",
                   (ps, t) -> {
                       ps.setLong(1, t.getId());
                      
                   },
                   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                           .withUrl("url")
                           .withDriverName("com.mysql.cj.jdbc.Driver")
                           .withPassword("password")
                           .withUsername("账号")
                           .build()));
        env.execute();
        ids.forEach(d->{
                    rabbitTemplate.convertAndSend("flink-queue",d);
        });
    }
}


版权声明:本文为qaz__01原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。