菜单

循序渐进

    Java Oracle MySQL Bash Python Nginx Apache Redis MongoDB Git HTML Javascript Node CSS

最近来访

    话.线程池实现原理

    张嘉杰.原创 2015-04-09 java

    今儿同事问了一个关于多线程的问题。一般的线程,在调用其start()方法之后,会自动开始执行相应的逻辑,如果之后再次调用start(),会怎么样?是否可以??? 每个线程其实是有多个状态的,刚刚创建之后,其处于NEW状态,在执行完相应的run()逻辑之后,其处理TERMINATED状态。如果多次调用这个start()方法,是否可以让逻辑执行多次呢? 答案肯定是不行的(一个线程执行完自己的任务之后就被销毁了),所以多次执行start()必然不靠谱,应该会报错。同事又问到:那jdk线程池又是如何重用的呢?

    jdk内部的ThreadPoolExecutor的实现

    线程池内部相当于在跑一个while循环,在不断的从阻塞队列里拿task,之后调用task.run(),此时相应Runnablerun方法就会被执行,执行完以后继续去拿,如此反复, 整个线程其实一直没有执行完成,所以,其状态也不会变成TERMINATED,就此实现了线程的重用。

    java.util.concurrent.ThreadPoolExecutor

    /**
     * Main run loop
     */
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);
        }
    }

    这个ThreadPoolExecutor内部的Worker即之前创建出来的线程,在这里不断的获取task,其runTask方法如下:

    /**
     * Runs a single task between before/after methods.
     */
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            /*
             * Ensure that unless pool is stopping, this thread
             * does not have its interrupt set. This requires a
             * double-check of state in case the interrupt was
             * cleared concurrently with a shutdownNow -- if so,
             * the interrupt is re-enabled.
             */
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
                thread.interrupt();
            /*
             * Track execution state to ensure that afterExecute
             * is called only if task completed or threw
             * exception. Otherwise, the caught runtime exception
             * will have been thrown by afterExecute itself, in
             * which case we don't want to call it again.
             */
            boolean ran = false;
            beforeExecute(thread, task);
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }

    注意task.run()这句,即总结的,调用Runnablerun方法。

    用一个小例子来分析下整个线程池的执行任务的过程:

    public class ThreadPoolTest {
        public static void main(String[] args) {
            ExecutorService exec = Executors.newFixedThreadPool(1); //设置固定线程池大小为1
            List<Runnable> list = new ArrayList<Runnable>();
            for(int i=0;i<5;i++) list.add(new MyRunnable());
            for(int i=0;i<5;i++) exec.execute(list.get(i));
        }
    }
    class MyRunnable implements Runnable {
        @Override
        public void run() {
            System.out.println("..."); //可以在此加断点DEUBG
        }
    }

    此时,exec会不断的向线程池中添加任务。

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
     * for execution
     * @throws NullPointerException if command is null
     */
    public void execute(Runnable command) {
        if (command == null) throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            } else if (!addIfUnderMaximumPoolSize(command)) {
                reject(command); // is shutdown or saturated
            }
        }
    }

    看到这个Runnable什么时候执行,是新创建线程执行,还是复用之前的线程,甚至是否需要拒绝请求。 当前的线程池newFixedThreadPool(1)是个固定大小的池,此时poolSize == corePoolSize, 所以此时的创建新线程的逻辑会返回false

    /**
     * Creates and starts a new thread running firstTask as its first
     * task, only if fewer than corePoolSize threads are running
     * and the pool is not shut down.
     * @param firstTask the task the new thread should run first (or
     * null if none)
     * @return true if successful
     */
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

    所以,当前的command就被放入阻塞队列中workQueue.offer(command)runTask中的task则是通过以下方法提到的:

    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

    此处是明显的生产者-消费者模式,其中生产者是Client,在外部不断的提交待执行的任务,消费者则是线程池内的线程, 这两者之间通过阻塞队列workQueue建立起了连接,一个生产,一个执行待执行的任务的run方法将其消费。 这时,线程中的线程是连续执行任务,还是会结束,在新任务出现时创建新的Worker线程,是由所使用的线程池类型决定的,例如:

    ExecutorService exec = Executors.new Executors.newFixedThreadPool(2); //设置固定线程池大小为2

    线程池的固定线程数为2,其执行过程中是调用阻塞队列的take方法,此时如果队列中没有任务,是会一直阻塞。 而如果使用CachedTharedPool,则会执行阻塞队列的poll方法,根据定义的超时时间进行等待。 Worker在执行时,是执行阻塞队列的take方法还是poll方法,取决于timed是否为true,如下:

    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    .....
    .....
    .....
      try {
          Runnable r = timed ?
              workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
              workQueue.take();
          if (r != null)
              return r;
          timedOut = true;
      } catch (InterruptedException retry) {
          timedOut = false;
      }

    在使用CachedThreaPool时,由于共corePoolSize为0,所以每次执行时timedtrue,此时执行阻塞队列的poll方法,keepAliveTime=60000000000 后续返回时,此时由于队列中没有任务,所以timedOut=true,所以,在后续执行时

    if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
        if (compareAndDecrementWorkerCount(c))
            return null;
        continue;
    }

    此处会return null。

    OK,今儿就先到这儿了 :)


    版权属于:jcore.cn

    原文地址:http://www.jcore.cn/2015/04/09/java-thread-pool

    除非注明,文章均为原创,转载时必须以链接形式注明原始出处。

    分享文章到:

    热门推荐文章