多线程(按时间段多个线程并发执行)

  • Post author:
  • Post category:其他



1.###首先引入pom文件

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.10.2</version>
</dependency>


2.编写一个抽象类 IntervalRunnable,定义方法run

package aa;

import lombok.Data;

import java.util.Date;

@Data
public abstract class IntervalRunnable {

    public abstract void run(Date tempStart, Date tempEnd);
}


3.编写一个核心工具类 IntervalExecutors

package aa;

import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


public class IntervalExecutors {

    public static final int  INTERVAL_HOUR = 3600;

    /**
     * 按指定间隔将时间段分隔执行
     *
     * @param start 开始时间
     * @param end 开始时间
     * @param interval 间隔时间 比如按1秒分隔执行 传1 按小时分隔 传3600
     */
    public static void run(ExecutorService executor, List<Future> futures, final IntervalRunnable runnable, Date start, Date end, int interval) {
        Date tempStart = start;
        Date tempEnd = DateUtil.addSecond(tempStart, interval - 1);

        while (tempStart.before(end)) {
            tempEnd = tempEnd.after(end) ? end : tempEnd;

            final Date finalTempStart = DateUtil.clone(tempStart);
            final Date finalTempEnd = DateUtil.clone(tempEnd);
            futures.add(executor.submit(new Runnable() {
                @Override
                public void run() {
                    runnable.run(finalTempStart, finalTempEnd);
                }
            }));

            tempStart = DateUtil.addSecond(tempEnd, 1);
            tempEnd = DateUtil.addSecond(tempStart, interval - 1);
        }
    }

    // 等待future执行完成;若其中一个future报错,则尝试停止其余future
    public static void wait(List<Future> futures) {
        Iterator<Future> iterator = futures.iterator();
        try {
            while (iterator.hasNext()) {
                iterator.next().get();
            }
        } catch (InterruptedException | ExecutionException e) {
            while (iterator.hasNext()) {
                iterator.next().cancel(true);
            }
            // 并发报错
            throw new RuntimeException(e instanceof ExecutionException ? e.getCause() : e);
        }
    }

}


4..编写全局java线程池类  GlobalThreadPool

package aa;

import com.alibaba.ttl.threadpool.TtlExecutors;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * 全局线程池
 */
@Slf4j
public class GlobalThreadPool {

    private static final float MEMORY_BYTE;
    private static final float MEMORY_GB;
    private static final int PROCESSOR_COUNT;

    public static final ExecutorService TEMP_POOL;

    public static final ExecutorService SINGLE_POOL = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1));

    // 这个线程池执行的都是IO操作,可以适当放大线程池中的线程数量
    public static final ExecutorService CORE_POOL = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * 16));

    // 调用API使用的线程池,调用API时多数时间在等待,因此可以适当放大
    private static volatile ExecutorService API_POOL;

    static {
        MEMORY_BYTE = Runtime.getRuntime().maxMemory();
        MEMORY_GB = MEMORY_BYTE / 1024f / 1024f / 1024f;
        PROCESSOR_COUNT = Runtime.getRuntime().availableProcessors();
        log.info("全局线程池初始化!核心数={},可用内存{}G({}byte)", PROCESSOR_COUNT, MEMORY_GB, MEMORY_BYTE);

        TEMP_POOL = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(PROCESSOR_COUNT * 2));
    }

    public static ExecutorService getApiPool() {
        if (API_POOL == null) {
            synchronized (GlobalThreadPool.class) {
                if (API_POOL == null) {
                    int maxThreads = (int) (MEMORY_GB * 32);
                    API_POOL = TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(maxThreads, maxThreads,
                            60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()));
                    log.info("API线程池初始化,coreSize:{} maxSize:{}", PROCESSOR_COUNT, maxThreads);
                }
            }
        }
        return API_POOL;
    }

}


5.main测试

List<Future> futures = new ArrayList<>();
IntervalExecutors.run(GlobalThreadPool.TEMP_POOL, futures, new IntervalRunnable() {
    @Override
    public void run(Date tempStart, Date tempEnd) {
        try {
            //每个线程的逻辑
            //syncTrade(shop, status, tempStart, tempEnd, skuMap, callback, saveCallback);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}, start, end, interval);
IntervalExecutors.wait(futures);



版权声明:本文为long198861原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。