package com.loan.modules.common.util;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;@SuppressWarnings("all")public class ETLThreadPool { private static ThreadPoolExecutor etlExectutor = null; /** * 功能:得到线程池实例 * @param corePoolSize 线程池维护线程的最少数量 * @param maximumPoolSize 线程池维护线程的最大数量 * @param keepAliveTime 线程池维护线程所允许的空闲时间 * @param unit 线程池维护线程所允许的空闲时间的单位 * @param workQueue 线程池所使用的缓冲队列 * @return */ @SuppressWarnings("unchecked") public static ThreadPoolExecutor getThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { synchronized (ETLThreadPool.class) { if (etlExectutor == null) { etlExectutor = createExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } } return etlExectutor; } /** * 功能:创建ThreadPoolExecutor实例; * @param corePoolSize:核心线程数量 * @param maximumPoolSize:最大线程数量 * @param keepAliveTime:线程空闲保持时间 * @param unit:时间单位 * @param workQueue:工作队列 * @param handler:旧任务抛弃策略 * @return * ThreadPoolExecutor */ @SuppressWarnings("unchecked") private static ThreadPoolExecutor createExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { etlExectutor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); return etlExectutor; }}
package com.loan.modules;import java.util.Queue;import java.util.concurrent.CountDownLatch;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import com.loan.modules.common.util.ETLThreadPool;public class test { private static ThreadPoolExecutor cachedThreadPool = ETLThreadPool.getThreadPool(8,10, 3000, TimeUnit.SECONDS, new LinkedBlockingQueue(20000)); public static void main(String[] args) throws InterruptedException { Thred1(); Thred2(); Thred1(); } public static void Thred1() throws InterruptedException{ int total = 30; final CountDownLatch countDownLatch = new CountDownLatch(total); for (int i = 0; i < total; i++) { Thred1 t1 = new Thred1(i,countDownLatch); cachedThreadPool.execute(t1); } countDownLatch.await();// 等待所有子线程执行完 } public static void Thred2() throws InterruptedException{ int total = 5; final CountDownLatch countDownLatch = new CountDownLatch(total); for (int i = 0; i < total; i++) { cachedThreadPool.execute(new Runnable() { @Override public void run() { // 批量向instinct系统发送进件信息 // 计数器 减一 System.out.println("2"); countDownLatch.countDown(); } }); } countDownLatch.await();// 等待所有子线程执行完 } private synchronized static int getQueueSize(Queue queue) { return queue.size(); }}