1.###首先引入pom文件
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.10.2</version>
</dependency>
2.编写一个抽象类 IntervalRunnable,定义方法run
package aa;
import lombok.Data;
import java.util.Date;
@Data
public abstract class IntervalRunnable {
public abstract void run(Date tempStart, Date tempEnd);
}
3.编写一个核心工具类 IntervalExecutors
package aa;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class IntervalExecutors {
public static final int INTERVAL_HOUR = 3600;
/**
* 按指定间隔将时间段分隔执行
*
* @param start 开始时间
* @param end 开始时间
* @param interval 间隔时间 比如按1秒分隔执行 传1 按小时分隔 传3600
*/
public static void run(ExecutorService executor, List<Future> futures, final IntervalRunnable runnable, Date start, Date end, int interval) {
Date tempStart = start;
Date tempEnd = DateUtil.addSecond(tempStart, interval - 1);
while (tempStart.before(end)) {
tempEnd = tempEnd.after(end) ? end : tempEnd;
final Date finalTempStart = DateUtil.clone(tempStart);
final Date finalTempEnd = DateUtil.clone(tempEnd);
futures.add(executor.submit(new Runnable() {
@Override
public void run() {
runnable.run(finalTempStart, finalTempEnd);
}
}));
tempStart = DateUtil.addSecond(tempEnd, 1);
tempEnd = DateUtil.addSecond(tempStart, interval - 1);
}
}
// 等待future执行完成;若其中一个future报错,则尝试停止其余future
public static void wait(List<Future> futures) {
Iterator<Future> iterator = futures.iterator();
try {
while (iterator.hasNext()) {
iterator.next().get();
}
} catch (InterruptedException | ExecutionException e) {
while (iterator.hasNext()) {
iterator.next().cancel(true);
}
// 并发报错
throw new RuntimeException(e instanceof ExecutionException ? e.getCause() : e);
}
}
}
4..编写全局java线程池类 GlobalThreadPool
package aa;
import com.alibaba.ttl.threadpool.TtlExecutors;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* 全局线程池
*/
@Slf4j
public class GlobalThreadPool {
private static final float MEMORY_BYTE;
private static final float MEMORY_GB;
private static final int PROCESSOR_COUNT;
public static final ExecutorService TEMP_POOL;
public static final ExecutorService SINGLE_POOL = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1));
// 这个线程池执行的都是IO操作,可以适当放大线程池中的线程数量
public static final ExecutorService CORE_POOL = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 16));
// 调用API使用的线程池,调用API时多数时间在等待,因此可以适当放大
private static volatile ExecutorService API_POOL;
static {
MEMORY_BYTE = Runtime.getRuntime().maxMemory();
MEMORY_GB = MEMORY_BYTE / 1024f / 1024f / 1024f;
PROCESSOR_COUNT = Runtime.getRuntime().availableProcessors();
log.info("全局线程池初始化!核心数={},可用内存{}G({}byte)", PROCESSOR_COUNT, MEMORY_GB, MEMORY_BYTE);
TEMP_POOL = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(PROCESSOR_COUNT * 2));
}
public static ExecutorService getApiPool() {
if (API_POOL == null) {
synchronized (GlobalThreadPool.class) {
if (API_POOL == null) {
int maxThreads = (int) (MEMORY_GB * 32);
API_POOL = TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(maxThreads, maxThreads,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()));
log.info("API线程池初始化,coreSize:{} maxSize:{}", PROCESSOR_COUNT, maxThreads);
}
}
}
return API_POOL;
}
}
5.main测试
List<Future> futures = new ArrayList<>();
IntervalExecutors.run(GlobalThreadPool.TEMP_POOL, futures, new IntervalRunnable() {
@Override
public void run(Date tempStart, Date tempEnd) {
try {
//每个线程的逻辑
//syncTrade(shop, status, tempStart, tempEnd, skuMap, callback, saveCallback);
} catch (Exception e) {
log.error(e.getMessage());
}
}
}, start, end, interval);
IntervalExecutors.wait(futures);
版权声明:本文为long198861原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。