发表于 2024/08/26 18:31:26 [java] 浏览次数:162
实例属性
private final Map<String, Semaphore> semaphoreMap = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private final AtomicInteger taskNum = new AtomicInteger(0);
/**
* 提交一个有并发控制的任务 通过 semaphore 信号量控制并发
*
* @param taskName 任务名 相同任务名 做并发控制
* @param concurrent 并发数
* @param task
* @return
*/
public synchronized void executeConcurrentCtr(String taskName, int concurrent, Runnable task) {
executor.execute(() -> {
taskNum.incrementAndGet();
Semaphore semaphore = semaphoreMap.computeIfAbsent(taskName, k -> new Semaphore(concurrent));
long time = 0;
try {
semaphore.acquire();
long s = System.currentTimeMillis();
log.info("虚线程执行器开始执行并发控制任务:{}", taskName);
task.run();
long e = System.currentTimeMillis();
time = e - s;
} catch (Error | Exception e) {
log.error("虚线程执行器并发控制任务异常:" + taskName, 1, e);
} finally {
semaphore.release();
taskNum.decrementAndGet();
}
log.info("虚线程执行器执行并发控制任务-完成:{},完成,耗时:{} ms, 剩余任务数:{}", taskName, time, taskNum.get());
});
}