依赖这个LogSaveApplication。然后直接save就行了,save的时候做的是插入队列的操作。
定时或者插入数据数量达标,线程会从队列拿出一定量的数据(可配置).然后Spring里面拿出对应class对应的mapper,调用insertList操作。如果mapper不存在,则不会做任何操作。
SpringContextUtil就是获得applicationContext的类。
ThreadPoolExecutor是使用方自定义的线程池
@Component
@Slf4j
public class LogSaveApplication {
@Resource
private ThreadPoolExecutor executor;
private ConcurrentHashMap<Class, BasePool> pools = new ConcurrentHashMap<>();
public <T> void save(T t) {
Class<?> clazz = t.getClass();
BasePool currentPool = pools.computeIfAbsent(clazz, k -> new BasePool<T>(executor) {
@Override
/**
* 实现的存储逻辑
*/
public void saveOperation(List<T> dataList) {
if (dataList != null && dataList.size() > 0) {
String name = "";
if (mapper.get() == null) {
name = clazz.getSimpleName().substring(0, 1).toLowerCase() + clazz.getSimpleName().substring(1) + "Mapper";
/*根据Mapper名从spring中取对应的Mapper Bean对象
*/
MyMapper myMapper = (MyMapper) SpringContextUtil.getAtx().getBean(name);
//用cas保证只set一次
mapper.compareAndSet(null, myMapper);
}
if (mapper.get() == null) {
log.error("no mapper for {},drop message", name);
return;
}
mapper.get().insertListDul(dataList);
//这是我自己实现的插入重复唯一约束数据更新的操作
log.info("insert success,size = {}", dataList.size());
}
}
});
currentPool.addData(t);
}
@Slf4j
static abstract class BasePool<T> {
private LinkedBlockingQueue<T> dataQueue = new LinkedBlockingQueue<>();
public int saveBatchSize = 200;
private Object dataLock = new Object();
private volatile boolean started = false;
private ThreadPoolExecutor threadPoolExecutor;
public AtomicReference<MyMapper> mapper = new AtomicReference<>();
public BasePool(int saveBatchSize, ThreadPoolExecutor threadPoolExecutor) {
this.saveBatchSize = saveBatchSize;
this.threadPoolExecutor = threadPoolExecutor;
}
public BasePool(ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}
public List<T> getData(int size) {
log.info("从队列中获取数据Step1>>>size="
+ dataQueue.size() + ">>>requestSize:" + size);
if (dataQueue != null && dataQueue.size() > 0) {
List<T> datas = new ArrayList<>();
int realSize = Math.min(size, dataQueue.size());
dataQueue.drainTo(datas, realSize);
if (dataQueue != null) {
log.debug("从队列中获取数据Step2>>>size:"
+ dataQueue.size() + ">>>getSize:" + realSize);
}
return datas;
}
return null;
}
public void addDataList(List<T> data) {
startIfUnStarted();
dataQueue.addAll(data);
}
public void addData(T data) {
startIfUnStarted();
dataQueue.add(data);
}
public void startIfUnStarted() {
if (!started) {
synchronized (dataLock) {
if (!started) {
execute(threadPoolExecutor);
started = true;
}
}
}
}
public Integer getDataQueueSize() {
return dataQueue.size();
}
public void execute(ThreadPoolExecutor threadPoolExecutor) {
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
if (this.getDataQueueSize() <= 100) {
synchronized (this.dataLock) {
try {
this.dataLock.wait(TimeUnit.SECONDS.toMillis(5L));
} catch (InterruptedException e) {
log.error("循环处理消息报错,e=", e);
}
}
}
if (this.getDataQueueSize() != 0) {
try {
saveOperation(getData(saveBatchSize));
} catch (Exception e) {
log.error("循环处理消息报错,e=", e);
}
}
}
});
}
/**
* 自定义存储的逻辑,as you wish
* @param data
*/
public abstract void saveOperation(List<T> data);
}
}
版权声明:本文为qq_36402372原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。