13332 2018-07-21 2020-06-25

前言:本系列是Java基础系列的升级版本,这里假设读者已经阅读了Java多线程基础那一节。

一、之前的补充

1、多线程一定快?

思考下面两个例子,就会知道答案

  • 开一个线程,从一累加到一百万;与开两个线程,分别累加一到五十万、五十万到一百万,再求和。
  • 将上面的一百万改成亿,再比较两种方式的总耗时。

CPU分配固定的时间片给线程(一般是几十毫秒),当线程使用完这个时间片后,就会轮到下一个线程来使用时间片,而进行线程的创建切换也是需要时间的(以下讨论针对上面例子、单核处理器,抛开具体的业务场景)。

开一个线程时,当它可以在单位时间片内完成计算,那么它就节省了线程上下文切换的时间;当它不能在单位时间片内完成计算,那么就需要把线程上下文切换的时间也算上。切换时间可能大于单位时间片,也可能小于单位时间片。

开多个线程时,创建时间加长,但变相的减少了上下文切换的时间,变相加长了任务运行时间。因此多线程在任务量大时会比单线程快。有如下公式

线程执行任务总耗时 = 线程创建时间 + 任务执行耗时 + 线程上下文切换时间 + 线程销毁时间

2、线程优先级

基础篇中曾说道“优先级高仅仅是获取CPU使用权的机会更大些,并不是一定的”,这句话是对的,但还是证据不足,看如下代码

public class Priority {

    private static volatile boolean notStart = true;

    private static volatile boolean notEnd = true;

    public static void main(String[] args) throws Exception {
        List<Job> jobs = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            int priority = i < 5 ? Thread.MIN_PRIORITY : Thread.MAX_PRIORITY;
            Job job = new Job(priority);
            jobs.add(job);
            Thread thread = new Thread(job, "Thread:" + i);
            thread.setPriority(priority);
            thread.start();
        }
        notStart = false;
        TimeUnit.SECONDS.sleep(10);
        notEnd = false;
        for (Job job : jobs) {
            System.out.println("Job Priority :" + job.priority + ", Count :" + job.jobCount);
        }
    }

    static class Job implements Runnable {

        private int priority;

        private long jobCount;

        public Job(int priority) {
            this.priority = priority;
        }

        @Override
        public void run() {
            while (notStart) {
                // 线程让步
                Thread.yield();
            }
            while (notEnd) {
                Thread.yield();
                jobCount++;
            }
        }
    }
}

输出结果
Job Priority :1, Count :2253372
Job Priority :1, Count :2147270
Job Priority :1, Count :2219858
Job Priority :1, Count :2201938
Job Priority :1, Count :2123770
Job Priority :10, Count :2817230
Job Priority :10, Count :2991433
Job Priority :10, Count :2124793
Job Priority :10, Count :2890312
Job Priority :10, Count :2175991

以上数据验证了优先级是有一定效果的,但并不是一定有效果的,且某些情况,某些操作系统会直接忽略Java线程对于优先级的设定。

3、线程的状态

还是对于基础篇的补充,先给出一个列表,如下

状态名称说明
NEW初始状态,线程被构建,但是还没有调用start方法
RUNNABLE运行状态,Java线程将操作系统中的就绪和运行两种状态笼统称为“运行中”
BLOCKED阻塞状态,表示线程阻塞于锁
WAITING等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其他线程做出一些特定动作(通知或中断)
TIME_WAITING超时等待状态,该状态不同于WAITING,它是可以在指定的时间自行返回的
TERMINATED终止状态,表示当前线程已经执行完毕

我们由一段代码一起体验下线程的这几种状态,代码如下

public class ThreadState {

    public static void main(String[] args) {
        new Thread(new TimeWaiting(), "TimeWaitingThread").start();
        new Thread(new Waiting(), "WaitingThread").start();
        // 使用两个Blocked线程,一个获取锁成功,一个被阻塞
        new Thread(new Blocked(), "BlockedThread-1").start();
        new Thread(new Blocked(), "BlockedThread-2").start();
    }

    // 该线程在不断地睡眠
    static class TimeWaiting implements Runnable {
        @Override
        public void run() {
            while (true) {
                SleepUtils.second(100);
            }
        }
    }

    // 该线程在Waiting.class实例上等待
    static class Waiting implements Runnable {
        @Override
        public void run() {
            while (true) {
                synchronized (Waiting.class) {
                    try {
                        Waiting.class.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    // 该线程在Blocked.class实例上加锁后,不会释放锁
    static class Blocked implements Runnable {
        @Override
        public void run() {
            synchronized (Blocked.class) {
                while (true) {
                    SleepUtils.second(100);
                }
            }
        }
    }
}

class SleepUtils {
    public static final void second(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

首先通过jps(JVM Process Status Tool)定位线程的的虚拟机ID,再通过jstack(Stack Trace for Java)到处当前虚拟机的线程库快照,部分输出如下

hk@hk-pc:~/IdeaWorkSpace/newxiaokui$ jps
Picked up _JAVA_OPTIONS:   -Dawt.useSystemAAFontSettings=gasp
9523 Jps
9447 ThreadState
3021 Main
9454 Launcher

hk@hk-pc:~/IdeaWorkSpace/newxiaokui$ jstack 9447
Picked up _JAVA_OPTIONS:   -Dawt.useSystemAAFontSettings=gasp
2018-07-21 15:52:44
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.171-b11 mixed mode):

"Attach Listener" #15 daemon prio=9 os_prio=0 tid=0x00007ff4cc001000 nid=0x2553 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"DestroyJavaVM" #14 prio=5 os_prio=0 tid=0x00007ff508012800 nid=0x24e8 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   
// BlockedThread-2线程阻塞在获取Blocked.class的锁
"BlockedThread-2" #13 prio=5 os_prio=0 tid=0x00007ff508662800 nid=0x250e waiting for monitor entry [0x00007ff4f117a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at site.xiaokui.common.hk.thread.ThreadState$Blocked.run(ThreadState.java:54)
        - waiting to lock <0x00000000ec2aaa08> (a java.lang.Class for site.xiaokui.common.hk.thread.ThreadState$Blocked)
        at java.lang.Thread.run(Thread.java:748)
        
// BlockedThread-1线程获取到了Blocked.class的锁
"BlockedThread-1" #12 prio=5 os_prio=0 tid=0x00007ff508661000 nid=0x250d waiting on condition [0x00007ff4f127b000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at site.xiaokui.common.hk.thread.SleepUtils.second(ThreadState.java:64)
        at site.xiaokui.common.hk.thread.ThreadState$Blocked.run(ThreadState.java:54)
        - locked <0x00000000ec2aaa08> (a java.lang.Class for site.xiaokui.common.hk.thread.ThreadState$Blocked)
        at java.lang.Thread.run(Thread.java:748)
        
// WaitingThread线程在Waitting实例上等待
"WaitingThread" #11 prio=5 os_prio=0 tid=0x00007ff50865f800 nid=0x250c in Object.wait() [0x00007ff4f137c000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000000ec2a74a8> (a java.lang.Class for site.xiaokui.common.hk.thread.ThreadState$Waiting)
        at java.lang.Object.wait(Object.java:502)
        at site.xiaokui.common.hk.thread.ThreadState$Waiting.run(ThreadState.java:39)
        - locked <0x00000000ec2a74a8> (a java.lang.Class for site.xiaokui.common.hk.thread.ThreadState$Waiting)
        at java.lang.Thread.run(Thread.java:748)
        
// TimeWaitingThread线程处于超时等待(slepping)
"TimeWaitingThread" #10 prio=5 os_prio=0 tid=0x00007ff50865e000 nid=0x250b waiting on condition [0x00007ff4f147d000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at site.xiaokui.common.hk.thread.SleepUtils.second(ThreadState.java:64)
        at site.xiaokui.common.hk.thread.ThreadState$TimeWaiting.run(ThreadState.java:27)
        at java.lang.Thread.run(Thread.java:748)

通过示例,我们了解到了Java程序运行中线程状态的具体含义。线程在自身的生命周期中,并不是固定地处于某个状态,而是随着代码的执行在不同的状态之间进行切换,Java线程状态变迁如图所示(极为重要)。

Java线程状态

由上图可知

  • 线程创建之后,调用start方法开始执行。
  • 当线程执行wait方法之后,线程进入等待状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态。
  • 而超时等待状态相当于在等待状态的基础上增加了超时限制,也就是超时时间到达时将会返回运行状态。
  • 当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入阻塞状态。
  • 线程在执行Runnable的run方法之后就会进入到终止状态。

4、Daemon线程

Daemon线程是一种支持性线程,又称守护线程,因为它主要被用作程序中后台调度以及支持性工作。这意味着,当一个Java虚拟机中不存在非Daemon线程的时候,Java虚拟机将会退出。可以通过Thread.setDaemon(true)将线程设置为Daemon线程,且需要在启动线程之前设置,不能在启动线程之后设置。如下代码

public class Daemon {

    public static void main(String[] args) {
        Thread thread = new Thread(new DaemonRunner(), "DaemonRunner");
        thread.setDaemon(true);
        thread.start();
    }

    static class DaemonRunner implements Runnable {
        @Override
        public void run() {
            try {
                ThreadState.SleepUtils.second(10);
            } finally {
                System.out.println("DaemonThread finally run");
            }
        }
    }
}

运行Daemon程序,可以看到终端没有任何的输出。main线程(非Daemon线程)在启动了线程DaemonRunner之后随着main方法执行完毕而终止,而此时Java虚拟机中已经没有非Daemon线程,虚拟机需要退出。Java虚拟机中的所有Daemon线程都需要立即终止,因此DaemonRunner立即终止,但是DaemonRunner中的finally块没有执行(在构建Daemon线程时,不能依靠finally块中的内容来确保执行关闭或清理资源的逻辑)。

二、启动和终止线程

1、构造线程

在运行线程之前首先要构造一个线程对象,线程对象在构造的时候需要提供线程所需要的属性,如线程所属的线程组、线程优先级、是否是Daemon线程等信息,下面是Thread对于线程初始化的默认设置部分代码

private void init(ThreadGroup g, Runnable target, String name,
                  long stackSize, AccessControlContext acc,
                  boolean inheritThreadLocals) {
    if (name == null) {
        throw new NullPointerException("name cannot be null");
    }

    this.name = name;
	// 当前线程就是该线程的父线程
    Thread parent = currentThread();

    this.group = g;
    // 将daemon、priority属性设置为父线程的对应属性
    this.daemon = parent.isDaemon();
    this.priority = parent.getPriority();
    setPriority(priority);
    // 将父线程的InheritableThreadLocal复制过来
    if (inheritThreadLocals && parent.inheritableThreadLocals != null)
        this.inheritableThreadLocals =
            ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    /* Stash the specified stack size in case the VM cares */
    this.stackSize = stackSize;

    /* Set thread ID */
    tid = nextThreadID();
}

在上述过程中,一个新构造的线程对象是由其parent线程来进行空间分配的,而child线程继承了parent的daemon、优先级、加载资源的contextClassLoader以及可继承的ThreadLocal等,同时还会分配一个唯一的ID来标识这个child线程。至此,一个能够运行的线程对象就初始化好了,在堆内存中等待着运行。

2、启动线程

线程对象在初始化完成之后,调用start方法就可以启动这个线程。线程start方法的含义是:当前线程(即parent线程)同步告知Java虚拟机,只要线程规划期空闲,应立即启动需调用start方法的线程。

3、理解中断

/**
 * Interrupts this thread.
 *
 * <p> Unless the current thread is interrupting itself, which is
 * always permitted, the {@link #checkAccess() checkAccess} method
 * of this thread is invoked, which may cause a {@link
 * SecurityException} to be thrown.
 *
 * <p> If this thread is blocked in an invocation of the {@link
 * Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link
 * Object#wait(long, int) wait(long, int)} methods of the {@link Object}
 * class, or of the {@link #join()}, {@link #join(long)}, {@link
 * #join(long, int)}, {@link #sleep(long)}, or {@link #sleep(long, int)},
 * methods of this class, then its interrupt status will be cleared and it
 * will receive an {@link InterruptedException}.
 *
 * <p> If this thread is blocked in an I/O operation upon an {@link
 * java.nio.channels.InterruptibleChannel InterruptibleChannel}
 * then the channel will be closed, the thread's interrupt
 * status will be set, and the thread will receive a {@link
 * java.nio.channels.ClosedByInterruptException}.
 *
 * <p> If this thread is blocked in a {@link java.nio.channels.Selector}
 * then the thread's interrupt status will be set and it will return
 * immediately from the selection operation, possibly with a non-zero
 * value, just as if the selector's {@link
 * java.nio.channels.Selector#wakeup wakeup} method were invoked.
 *
 * <p> If none of the previous conditions hold then this thread's interrupt
 * status will be set. </p>
 *
 * <p> Interrupting a thread that is not alive need not have any effect.
 *
 * @throws  SecurityException
 *          if the current thread cannot modify this thread
 *
 * @revised 6.0
 * @spec JSR-51
 */
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

/**
 * Tests whether the current thread has been interrupted.  The
 * <i>interrupted status</i> of the thread is cleared by this method.  In
 * other words, if this method were to be called twice in succession, the
 * second call would return false (unless the current thread were
 * interrupted again, after the first call had cleared its interrupted
 * status and before the second call had examined it).
 *
 * <p>A thread interruption ignored because a thread was not alive
 * at the time of the interrupt will be reflected by this method
 * returning false.
 *
 * @return  <code>true</code> if the current thread has been interrupted;
 *          <code>false</code> otherwise.
 * @see #isInterrupted()
 * @revised 6.0
 */
public static boolean interrupted() {
    // 会清除中断标示
    return currentThread().isInterrupted(true);
}

/**
 * Tests whether this thread has been interrupted.  The <i>interrupted
 * status</i> of the thread is unaffected by this method.
 *
 * <p>A thread interruption ignored because a thread was not alive
 * at the time of the interrupt will be reflected by this method
 * returning false.
 *
 * @return  <code>true</code> if this thread has been interrupted;
 *          <code>false</code> otherwise.
 * @see     #interrupted()
 * @revised 6.0
 */
public boolean isInterrupted() {
    // 不会清除中断标示
    return isInterrupted(false);
}

/**
 * Tests if some Thread has been interrupted.  The interrupted state
 * is reset or not based on the value of ClearInterrupted that is
 * passed.
 */
private native boolean isInterrupted(boolean ClearInterrupted);

中断可以理解为线程的一个标识位属性,它表示一个运行中的线程是否被其他线程进行了中断操作。中断好比其他线程对该线程打了一个招呼,其他线程通过调用该线程的interrupt方法对其进行了中断操作。

线程通过检查自身是否被中断来进行响应,通过方法isInterrupted方法来进行判断是否被中断,也可以调用静态方法Thread.interrupted方法对当前线程的中断标识位进行复位。如果该线程已经处于终结状态,即使该线程被中断过,在调用该线程对象的isInterrupted方法时依旧会返回false。

从Java的API中可以看到,许多声明抛出InterruptedException的方法(例如Thread.sleep(long millis)方法),这些方法在抛出InterruptedException之前,Java虚拟机会先将该线程的中断标识位清除,然后抛出InterruptedException,此时调用isInterrupted方法将会返回false。下面是个例子

public class Interrupted {

    public static void main(String[] args) throws Exception {
        // sleepThread不停地睡眠
        Thread sleepThread = new Thread(new SleepRunner(), "SleepThread");
        sleepThread.setDaemon(true);
        // busyThread不停地运行
        Thread busyThread = new Thread(new BusyRunner(), "BusyThread");
        busyThread.setDaemon(true);
        sleepThread.start();
        busyThread.start();
        // 休眠5秒,让sleepThread和busyThread充分运行
        TimeUnit.SECONDS.sleep(5);
        sleepThread.interrupt();
        busyThread.interrupt();
        System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());
        System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());
        TimeUnit.SECONDS.sleep(2);
    }

    static class SleepRunner implements Runnable {
        @Override
        public void run() {
            while (true) {
				// 中断睡眠中的线程,会抛出InterruptedException异常,参见javadoc
                ThreadState.SleepUtils.second(10);
            }
        }
    }

    static class BusyRunner implements Runnable {
        @Override
        public void run() {
            while (true) {
            }
        }
    }
}

// 输出结果如下
SleepThread interrupted is false
BusyThread interrupted is true
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at site.xiaokui.common.hk.thread.ThreadState$SleepUtils.second(ThreadState.java:63)
	at site.xiaokui.common.hk.thread.Interrupted$SleepRunner.run(Interrupted.java:33)
	at java.lang.Thread.run(Thread.java:748)

从结果来看,抛出InterruptedException的线程SleepThread,其中断标识位被清除了,而一直忙碌工作的线程BusyThread,中断标识位没有被清除。

更为细致的理解,请参考javadoc注释。

4、过期的方法

大家对于CD机肯定不会陌生,如果把它播放音乐比作一个线程的运作,那么对音乐播放做出的暂停、恢复和停止操作对应于Thread的API就是suspend方法,resume方法和stop方法。

下面代码中演示了这一功能,线程PrintThread以1秒的频率进行打印,而主线程对其进行暂停、恢复和停止操作。

public class Deprecated {

    public static void main(String[] args) throws Exception {
        DateFormat format = new SimpleDateFormat("HH:mm:ss");
        Thread printThread = new Thread(new Runner(), "PrintThread");
        printThread.setDaemon(true);
        printThread.start();
        TimeUnit.SECONDS.sleep(3);
        // 将PrintThread进行暂停,输出内容工作停止
        printThread.suspend();
        System.out.println("main suspend PrintThread at " + format.format(new Date()));
        TimeUnit.SECONDS.sleep(3);
        // 将PrintThread进行恢复,输出内容继续
        printThread.resume();
        System.out.println("main resume PrintThread at " + format.format(new Date()));
        TimeUnit.SECONDS.sleep(3);
        // 将PrintThread进行终止,输出内容停止
        printThread.stop();
        System.out.println("main stop PrintThread at " + format.format(new Date()));
        TimeUnit.SECONDS.sleep(3);
    }

    static class Runner implements Runnable {
        @Override
        public void run() {
            DateFormat format = new SimpleDateFormat("HH:mm:ss");
            while (true) {
                System.out.println(Thread.currentThread().getName() + "Run at " + format.format(new Date()));
                ThreadState.SleepUtils.second(1);
            }
        }
    }
}

输出结果如下
PrintThreadRun at 08:13:06
PrintThreadRun at 08:13:07
PrintThreadRun at 08:13:08
main suspend PrintThread at 08:13:09
main resume PrintThread at 08:13:12
PrintThreadRun at 08:13:12
PrintThreadRun at 08:13:13
PrintThreadRun at 08:13:14
main stop PrintThread at 08:13:15

在执行过程中,PrintThread运行了3秒,随后被暂停,3秒后恢复,最后经过3秒被终止。通过示例的输出可以看到,suspend方法、resume方法、stop方法完成了线程的暂停、恢复和终止工作,而且非常“人性化”。但是这些API是过期的,也就是不建议使用的。

不建议使用的主要原因主要有:以suspend方法为例,在调用后,线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。同样,stop方法在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不确定状态下。

5、安全地终止线程

上一节提到的中断状态是个线程的一个标识位,而中断操作是一种简便的线程间交互方式,而这种交互方式最适合用来取消或停止任务。除了使用中断操作,还可以利用一个boolean变量来控制是否需要停止任务并终止该线程。

下面是一个例子,线程CountThread不断地进行变量累加,而主线程尝试对其进行中断操作和停止操作。

public class Shutdown {

    public static void main(String[] args) throws Exception {
        Runner one = new Runner();
        Thread countThread = new Thread(one, "CountThread");
        countThread.start();
        // 睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
        TimeUnit.SECONDS.sleep(1);
        countThread.interrupt();

        Runner two = new Runner();
        countThread = new Thread(two, "CountThread");
        countThread.start();
        // 睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
        TimeUnit.SECONDS.sleep(1);
        two.cancel();
    }

    private static class Runner implements Runnable {
        private long i;
        private volatile boolean on = true;
        @Override
        public void run() {
            // 注意,对于中断操作,需要程序在代码中处理,中断标识只是起一个标识作用
            while (on && !Thread.currentThread().isInterrupted()) {
                i++;
            }
            System.out.println("Count i = " + i);
        }

        public void cancel() {
            on = false;
        }
    }
}

// 输出结果如下
Count i = 355863091
Count i = 350910457

示例在执行过程中,main线程通过中断操作和cancel方法均可使CountThread得以终止。这种通过标识位或者中断操作的方式能够使线程终止时有机会去清理资源,而不是武断地将线程停止,因此这种终止线程的做法显得更加安全和优雅。

三、线程间通信

线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点价值,或者说价值很少,如果多个线程能够相互配合完成工作,这将会带来巨大的价值。

1、volatile和synchronized

Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。

关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有的线程对变量访问的可见性。

举个例子,定义一个表示程序是否运行的成员变量boolean on = true,那么另一个线程可能对它执行关闭动作(on = false),这里涉及多个线程对变量的访问,因此需要将其定义成为volatile boolean on = true,这样其他线程对它进行改变时,可以让所有线程感知其变化,因为所有对on变量的访问和修改都需要以共享内存为准。但是,过多地使用volatile是不必要的,因为它会降低程序执行的效率。

关键字synchronized可以修饰方法或者以同步代码块的形式来进行使用,它主要确保多个线程在同一时刻,只能有一个线程处于方法或者同步快中,它保证了线程对变量访问的可见性和排他性。

下面例子中使用了同步快和同步方法,通过使用javap工具查看生成的class文件信息来分析synchronized关键字的实现细节,实例如下

public class Synchronized {
    public static void main(String[] args) {
        synchronized (Synchronized.class) {
        }
        m();
    }
    
    public static synchronized void m(){
    }
}

在Synchronized.class同级目录下执行javap -v Synchronized.class,输出如下所示:

hk@hk-pc:~/IdeaWorkSpace/newxiaokui/build/classes/java/main/site/xiaokui/common/hk/thread/basic$ javap -v Synchronized.class 
Picked up _JAVA_OPTIONS:   -Dawt.useSystemAAFontSettings=gasp
Classfile /home/hk/IdeaWorkSpace/newxiaokui/build/classes/java/main/site/xiaokui/common/hk/thread/basic/Synchronized.class
  Last modified 2018-12-28; size 654 bytes
  MD5 checksum d42925eb7e91ba6d65028d7763496cac
  Compiled from "Synchronized.java"
public class site.xiaokui.common.hk.thread.basic.Synchronized
  minor version: 0
  major version: 52
  flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
   #1 = Methodref          #4.#24         // java/lang/Object."<init>":()V
   #2 = Class              #25            // site/xiaokui/common/hk/thread/basic/Synchronized
   #3 = Methodref          #2.#26         // site/xiaokui/common/hk/thread/basic/Synchronized.m:()V
   #4 = Class              #27            // java/lang/Object
   #5 = Utf8               <init>
   #6 = Utf8               ()V
   #7 = Utf8               Code
   #8 = Utf8               LineNumberTable
   #9 = Utf8               LocalVariableTable
  #10 = Utf8               this
  #11 = Utf8               Lsite/xiaokui/common/hk/thread/basic/Synchronized;
  #12 = Utf8               main
  #13 = Utf8               ([Ljava/lang/String;)V
  #14 = Utf8               args
  #15 = Utf8               [Ljava/lang/String;
  #16 = Utf8               StackMapTable
  #17 = Class              #15            // "[Ljava/lang/String;"
  #18 = Class              #27            // java/lang/Object
  #19 = Class              #28            // java/lang/Throwable
  #20 = Utf8               MethodParameters
  #21 = Utf8               m
  #22 = Utf8               SourceFile
  #23 = Utf8               Synchronized.java
  #24 = NameAndType        #5:#6          // "<init>":()V
  #25 = Utf8               site/xiaokui/common/hk/thread/basic/Synchronized
  #26 = NameAndType        #21:#6         // m:()V
  #27 = Utf8               java/lang/Object
  #28 = Utf8               java/lang/Throwable
/// 3斜杠表示手动添加
{
  public site.xiaokui.common.hk.thread.basic.Synchronized();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=1, locals=1, args_size=1
         0: aload_0 /// 将第一个引用类型本地变量推送至栈顶
         1: invokespecial #1                  // Method java/lang/Object."<init>":()V /// 调用超类构造方法,实例初始化方法
         4: return
      LineNumberTable:
        line 7: 0
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0       5     0  this   Lsite/xiaokui/common/hk/thread/basic/Synchronized;

  public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=3, args_size=1
         0: ldc           #2                  // class site/xiaokui/common/hk/thread/basic/Synchronized
         2: dup
         3: astore_1
         4: monitorenter
         5: aload_1
         6: monitorexit
         7: goto          15
        10: astore_2
        11: aload_1
        12: monitorexit
        13: aload_2
        14: athrow
        15: invokestatic  #3                  // Method m:()V
        18: return
      Exception table:
         from    to  target type
             5     7    10   any
            10    13    10   any
      LineNumberTable:
        line 9: 0
        line 10: 5
        line 11: 15
        line 12: 18
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      19     0  args   [Ljava/lang/String;
      StackMapTable: number_of_entries = 2
        frame_type = 255 /* full_frame */
          offset_delta = 10
          locals = [ class "[Ljava/lang/String;", class java/lang/Object ]
          stack = [ class java/lang/Throwable ]
        frame_type = 250 /* chop */
          offset_delta = 4
    MethodParameters:
      Name                           Flags
      args

  public static synchronized void m();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
    Code:
      stack=0, locals=0, args_size=0
         0: return
      LineNumberTable:
        line 15: 0
}
SourceFile: "Synchronized.java"

上面class信息中,对于同步块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。

任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步快或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞在同步块和同步方法的入口处,进入BLOCKED状态。

下图描述了对象、对象的监视器、同步队列和执行线程之间的关系。

Monitor关系图

从图中可以看到,任意线程对Object(Object由synchronized保护)的访问,首先要获得Object的监视器。如果获取失败,线程进入同步队列,线程状态变为BLOCKED。当访问Object的前驱(获得了锁线程)释放了锁,则该释放操作将隐式唤醒阻塞在同步队列中的线程,使其重新尝试对监视器的获取。

2、等待/通知机制

一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么(what)”和“怎么做(how)”,在功能层面上实现了解耦,体系上具备了良好伸缩性,但是在Java语言中如何实现类似的功能呢?

简单的办法就是让消费者不断循环检查变量是否符合预期,如下面代码所示,在whle循环设置不满足的条件,如果条件满足则退出while循环,从而完成消费者的工作。

while (value != desire) {
    Thread.sleep(1000);
}
doSomething();

上面这段伪代码在条件不满足时就睡眠一段时间,这样做的目的是防止过快的“无效”尝试,这种方式看似能够实现所需的功能,但是却存在如下问题

  1. 难以保证及时性。在睡眠时,基本不消耗处理器资源,但是如果睡得过久,就不能及时发现条件已经变化,也就是及时性难以保证。
  2. 难以降低开销。如果降低睡眠时间,比如休眠1毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成无端的浪费。

以上两个问题,看似矛盾难以调和,但是Java通过内置的等待/通知机制能够很好地解决这个矛盾。

等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的父类java.lang.Object上(更为细致的讨论,请参见Object详解一文),方法和描述如下表所示

方法名称描述
notify()通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象的锁
notifyAll()通知所有等待在该对象上的线程
wait()调用该方法的线程进入WAITING状态,只有等待另外线程的通知或者中断才会返回,需要注意,调用wait()方法后,释放对象的锁
wait(long)超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回
wait(long,int)字面上可以精确到纳米级别,但其实并不可以

等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

/**
 * 等待通知的机制的前提是 当前线程获取锁对象的 monitor,可以简单理解为获取当前锁对象的锁
 */
public class WaitNotify {

    static boolean flag = true;

    static Object lock = new Object();

    public static void main(String[] args) throws Exception {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {
        @Override
        public void run() {
            // 加锁,拥有lock的Monitor
            synchronized (lock) {
                // 当条件不满足时,继续wait,同时释放了lock的锁
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + "flag is true, wait @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();;
                    }
                }
                // 当条件满足时,完成工作
                System.out.println(Thread.currentThread() + " flag is false, running @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {
        @Override
        public void run() {
            // 加锁,拥有lock的Monitor
            synchronized (lock) {
                // 获取lock的锁,然后进行通知,通知时不会释放lock的锁
                // 直到当前线程释放了lock后,WaitThread才能从wait方法中返回
                System.out.println(Thread.currentThread() + " hold lock, notify @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                ThreadState.SleepUtils.second(5);
            }
            // 再次加锁
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock, sleep @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                ThreadState.SleepUtils.second(5);
            }
        }
    }
}

输出结果如下
Thread[WaitThread,5,main]flag is true, wait @ 15:02:48
Thread[NotifyThread,5,main] hold lock, notify @ 15:02:49
Thread[NotifyThread,5,main] hold lock, sleep @ 15:02:54
Thread[WaitThread,5,main] flag is false, running @ 15:02:59

上述结果的第3行和第4行输出的顺序可能会互换,而上述的例子主要说明了调用wait方法、notify以及notifyAll方法时需要注意的细节,如下

  1. 使用wait方法、notify方法,notifyAll方法时需要先对调用对象加锁。
  2. 调用wait方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
  3. notity或notifyAll方法调用后,等待线程依旧不会从wait方法返回,需要调用notify或notifyAll的线程释放锁之后,等待线程才有机会从wait方法返回。
  4. notify方法将等待队列中的一个等待线程从等待队列中移到同步队列中(不一定为顺序选取),而notifyAll方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
  5. 从wait方法返回的前提是获得了调用对象的锁。

从上述细节中可以看到,等待/通知机制依托于同步机制,其目的就是确保等待线程从wait方法返回时,能够到通知线程对变量做出的修改。

总结上面过程,步骤如下

  1. WaitThread首先获取了对象的锁,然后调用对象的wait方法,从而放弃了锁并进入对象的等待队列WaitQueue中,进入等待状态。
  2. 由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并调用对象的notify方法,将WaitThread从等待队列(WaitQueue)移到了同步队列(SynchronizedQueue)中,此时WaitThread的状态变为阻塞状态。
  3. NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait方法处返回继续执行。

等待/通知的经典范式

从上节中可以提炼出等待/通知的经典范式,该范式分为两部分,分别针对等待方(消费者)和通知方(生产者)。

等待方遵循如下原则

  1. 获取对象的锁。
  2. 如果条件不满足,那么调用对象的wait方法,被通知后仍要检查条件。
  3. 条件满则则执行对应的逻辑。

对应的伪代码如下

synchronized (对象) {
    while (条件不满足) {
        对象.wait();
    }
    对应的处理逻辑
}

通知方遵循如下原则

  1. 获取对象的锁。
  2. 改变条件。
  3. 通知所有等待在对象上的线程。

对应的伪代码如下

synchronized (对象) {
    改变条件
    对象.notifyAll();
}

3、手写消息队列

/**
 * @author HK
 * @date 2019-10-09 17:10
 */
public class SimpleMq {

    private static Queue<String> queue = new LinkedList<>();

    private static Object readLock = new Object(), writeLock = new Object();

    private volatile static boolean end = false;

    static class Producer {
        public void send(String str) throws InterruptedException {
            synchronized (writeLock) {
                queue.add(str);
                System.out.println("生产者发送:" + str);
                synchronized (readLock) {
                    readLock.notifyAll();
                }
                writeLock.wait();
            }
        }
    }

    static class Consumer extends Thread {
        @Override
        public void run() {
            System.out.println("消费者启动");
            // 不依赖于轮询,而依赖Java内置的通知等待机制
            while (!end) {
                synchronized (readLock) {
                    try {
                        // 必须事先在这里等待,否则无法保证等待通知机制正常运行
                        readLock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("消费者接收:" + queue.poll());
                synchronized (writeLock) {
                    writeLock.notifyAll();
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 消费者必须先启动,必要时可以设置sleep时间,这一步无法完全保证正确性
        Consumer consumer = new Consumer();
        consumer.start();

        Producer producer = new Producer();
        producer.send("Hello");
        producer.send("World");
    }
}
// 输出如下
消费者启动
生产者发送:Hello
消费者接收:Hello
生产者发送:World
消费者接收:World

4、Thread.join

如果一个线程A执行了thread.join语句,其含义是:当前线程A等待thread线程终止之后才从thread.join返回。线程Thread除了提供join方法之外,还提供了join(long millis)和join(long millis, int nanos)两个具备超时特性的方法。这两个方法表示,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回。

在如下代码中,创建了10个线程,编号0 ~ 9,每个线程调用前一个的join方法,也就是线程0结束了,线程1才能从join方法中返回,而线程0需要等待main线程结束。

public class Join {

    public static void main(String[] args) throws Exception {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
            Thread thread = new Thread(new Demino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate.");
    }

    static class Demino implements Runnable {
        private Thread thread;
        public Demino(Thread thread) {
            this.thread = thread;
        }
        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
    }
}

// 输出结果如下
main terminate.
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.

从上述输出可以看到,每个线程终止的前提是前驱线程的终止,每个线程等待前驱线程终止后,才从join方法返回,这里涉及了等待/通知机制(等待前驱线程结束,接受前驱线程结束通知)。

下面代码是JDK中join()方法的源码

/**
 * Waits for this thread to die.
 *
 * <p> An invocation of this method behaves in exactly the same
 * way as the invocation
 *
 * <blockquote>
 * {@linkplain #join(long) join}{@code (0)}
 * </blockquote>
 *
 * @throws  InterruptedException
 *          if any thread has interrupted the current thread. The
 *          <i>interrupted status</i> of the current thread is
 *          cleared when this exception is thrown.
 */
public final void join() throws InterruptedException {
    join(0);
}

// 锁当前线程对象
public final synchronized void join(long millis) throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        // 线程必须start才有意义
        while (isAlive()) {
            // 等待当前线程释放锁
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            // 超出范围,释放锁
            if (delay <= 0) {
                break;
            }
            // 正常等待
            wait(delay);
            // 已经延迟等待了的时间
            now = System.currentTimeMillis() - base;
        }
    }
}

这里总结下逻辑,以主线程A中运行线程B.join为例:

  • 当B执行join方法时,如果A获取了B锁且B存活,那么放弃B锁,等待B锁上的通知(见上面31~33行);如果A未获取B锁(即B自身拥有B锁),则阻塞于B锁的获取。
  • 当B运行完后,JVM内部会释放自身B锁,然后隐式调用B.notifyAll。

当线程终止时,会调用线程自身的notifyAll方法,会通知所有等待在该线程对象上的线程。可以看到join方法的逻辑机构与之前描述的等待/通知经典范式一致,即加锁、循环和处理逻辑3个步骤。

5、ThreadLocal

ThreadLocal,即线程本地变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。

可以通过set(T)方法来设置一个值,在当前线程下再通过get方法获取到原先设置的值。

下面示例代码中,构建了一个常用的Profiler类,它具有begin和end两个方法,而end方法返回从begin方法调用开始到end方法调用的时间差,单位是毫秒。

/**
 * @author HK
 * @date 2019-10-11 10:10
 */
public class ThreadLocalTest {

    private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    private static volatile boolean isEnd = false;

    public static void main(String[] args) throws InterruptedException {
        Thread a = new Thread(new Runnable() {
            @Override
            public void run() {
                threadLocal.set("threadLocal:a");
                while (!isEnd) {
                }
                System.out.println(Thread.currentThread().getName() + ":" + threadLocal.get());
            }
        }, "线程a");
        Thread b = new Thread(new Runnable() {
            @Override
            public void run() {
                threadLocal.set("threadLocal:b");
                while (!isEnd) {
                }
                System.out.println(Thread.currentThread().getName() + ":" + threadLocal.get());
            }
        }, "线程b");
        a.start();
        b.start();
        Thread.sleep(200);
        isEnd = true;
        System.out.println(Thread.currentThread().getName() + ":" + threadLocal.get());
    }
}
// 输出如下
线程b:threadLocal:b
线程a:threadLocal:a
main:null
    
/**
 * Returns the value in the current thread's copy of this
 * thread-local variable.  If the variable has no value for the
 * current thread, it is first initialized to the value returned
 * by an invocation of the {@link #initialValue} method.
 *
 * @return the current thread's value of this thread-local
 */
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

/**
 * Get the map associated with a ThreadLocal. Overridden in
 * InheritableThreadLocal.
 *
 * @param  t the current thread
 * @return the map
 */
ThreadLocalMap getMap(Thread t) {
    // 这个threadLocals是线程私有的
    return t.threadLocals;
}

/**
 * The entries in this hash map extend WeakReference, using
 * its main ref field as the key (which is always a
 * ThreadLocal object).  Note that null keys (i.e. entry.get()
 * == null) mean that the key is no longer referenced, so the
 * entry can be expunged from table.  Such entries are referred to
 * as "stale entries" in the code that follows.
 */
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

由于ThreadLocalMap内部的Entry是WeakReference实例,这里提下强引用(Strong Reference)、软引用(Soft Reference)、弱引用(Weak Reference)、虚引用(Phantom Reference)4种,这4种引用强度依次逐渐减弱。

  • 强引用就是指在程序代码之中普遍存在的,类似“Object obj = new Object()”这类的引用,只要强引用还存在,垃圾收集器永远不会回收掉被引用的对象。
  • 软引用是用来描述一些还有用但并非必需的对象。对于软引用关联着的对象,在系统将要发生内存溢出异常之前,将会把这些对象列进回收范围之中进行二次回收。如果这次回收还没有足够的内存,才会抛出内存溢出异常。在JDK1.2之后,提供了SoftReference类来实现软引用。
  • 弱引用也是用来描述非必需对象的,但是它的强度比软引用更弱一些,被弱引用关联的对象只能生存到下一次垃圾收集发生之前。当垃圾收集器工作时,无论当前内存是否足够,都会回收掉被弱引用关联的对象。在JDK1.2之后,提供了WeakReference类来实现弱引用。
  • 虚引用也称为幽灵引用或者幻影引用,它是最弱的一种引用关系。一个对象是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用来取得一个对象实例。为一个对象设置虚引用关联的唯一目的就是能在这个对象被收集器回收时收到一个系统通知。在JDK1.2之后,提供了PhantomReference类实现虚引用。

对于Entry为什么继承自WeakReference,这里是一个解释:为了保证线程消亡后,该线程上的ThreadLocalMap在脱离开强引用的前提下,也能一同被GC回收(更深层次的讨论,有待继续,欢迎指点)。

四、线程应用实例

1、等待超时模式

开发人员经常会遇到这样的方法调用场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立即返回,反之,超时返回默认结果。

前面介绍了等待/通知的经典范式,即加锁、条件循环和处理逻辑3个步骤,而这种范式无法做到超时等待。而超时等待的加入,只需要对经典范式做出非常小的改动,改动内容如下所示。

假设超时时间是T,那么可以推断出在当前时间now + T之后就会超时。

定义如下变量

  • 等待超时时间:REMAINING = T。
  • 超时时间:FUTURE = now + T。

这时仅需要wait(REMAINING)即可,在wait(REMAINING)之后将会执行:REMAINING = FUTURE - now。如果REMAINING小于等于0,表示已经超时,直接退出,否则将继续执行wait(REMAININT)。

上述描述等待超时模式的伪代码如下

// 对当前对象加锁
public synchronized Object get(long mills) throws InterruptedException {
    long future = System.currentTimeMillis() + mills;
    long remaining = mills;
    // 当超时大于0并且result返回值不满足要求
    while ((result == null) && remainint > 0) {
        wait(remaining);
        remaining = future - System.currentTimeMillis();
    }
    return result;
}

可以看出,等待超时模式就是在等待/通知范式基础上增加了超时控制,这使得该模式相比原有范式更具有灵活性,因为即使方法执行时间过久,也不会“永久”阻塞调用者,而是按照调用者的要求“按时”返回。

2、一个简单的数据库连接池

我们使用等待超时模式来构造一个简单的数据库连接池,在示例中模拟从连接池获取、使用和释放连接的过程,而客户端获取连接的过程被设定为等待超时的模式,也就是在1000毫秒内如果无法获取到可用连接,将会返回给客户端一个null。设定连接池的大小为10个,然后通过调节客户端的线程数来模拟无法获取连接的场景。

首先看下连接池的定义,它通过构造方法初始化连接的最大上限,通过一个双端队列来维护连接,调用方需要先调用fetchConnection(long)方法来指定在多少毫秒内超时获取连接,当连接使用完成后,需要调用releaseConnection(Connection)方法将连接放回连接池,实例代码如下

public class ConnectionPool {

    private LinkedList<Connection> pool = new LinkedList<>();

    public ConnectionPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; i++) {
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }

    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                // 连接释放需要进行通知,这样其他消费者能够感知连接池中已经归还了一个连接
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }

    // 在mills毫秒内无法获取到连接,将会返回null
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool) {
            // 完全超时
            if (mills <= 0) {
                while (pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    // 如果remaining还大于0,说明还有时间可以继续等待连接进来,反之就已经超时了
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                // 由于拥有pool的锁,所以这里的isEmpty结果和上面isEmpty的结果是一样的
                if (!pool.isEmpty()) {
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }
}

由于java.sql.Connection是一个接口,最终的实现是由数据库驱动提供方来实现的,考虑到只是个示例,我们通过动态代理构造一个Connection,该Connection的代理实现仅仅是在commit方法调用时休眠100毫秒。

public class ConnectionDriver {

    static class ConnectionHandler implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getName().equals("commit")) {
                TimeUnit.MICROSECONDS.sleep(100);
            }
            return null;
        }
    }

    // 创建一个Connection的代理,在commit时休眠100毫秒
    public static final Connection createConnection() {
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(), new Class<?>[]{Connection.class}, new ConnectionHandler());
    }
}

下面通过是个示例来测试简易数据库连接池的工作情况,模拟客户端ConnectionRunner获取、使用、最后释放连接的过程,当它使用连接时会增加获取到的连接数量,反之,将会增加未获取到连接的数量。

public class ConnectionPoolTest {

    static ConnectionPool pool = new ConnectionPool(10);
    // 保证所有ConnectionRunner能够同时开始
    static CountDownLatch start =  new CountDownLatch(1);
    // main线程将会等待所有ConnectionRunner结束后才能继续执行
    static CountDownLatch end;

    public static void main(String[] args) throws Exception {
        int threadCount = 10;
        end = new CountDownLatch(threadCount);
        int count = 20;
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread");
            thread.start();
        }
        // 开始所有
        start.countDown();
        // 等待所有结束,即隐式当end.getCount() == 0
        end.await();
        System.out.println("total invoke: " + (threadCount * count));
        System.out.println("got connection: " + got);
        System.out.println("not got connection: " + notGot);
        System.out.println("未获取到比率: " + (notGot.get() / (double)threadCount / count * 100) + "%");
    }



    static class ConnectionRunner implements Runnable {
        int count;
        AtomicInteger got;
        AtomicInteger notGot;
        public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        @Override
        public void run() {
            try {
                start.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            while (count > 0) {
                try {
                    // 从线程池中获取连接,如果1000ms内无法获取到,将返回null
                    // 分别统计连接获取的数量got和未获取到的数量notGot
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}

上述示例中使用了CountDownLatch(底层还是wait/notifyAll)来确保ConnectionRunnerThread能够同时开始执行,并且在全部结束之后,才使main线程从等待状态中返回。通过调节不同的线程数(单个线程尝试连接次数固定为20)观察连接情况,下面是具体的测试数据

线程数量总获取次数获取到次数未获取到次数未获取到比率
1020020000%
20400387133.25%
30600554467.666%
4080069410613.25%
50100081518518.5%

首先发表一下感慨,本人2018年的电脑性能还是没人家2015年的电脑强,好好学习,天天向上。

从表中的数据统计可以看出,在资源一定的情况下(连接池中的10个连接),随着客户端线程的逐步增加,客户端出现超时无法获取连接的比率不断升。虽然客户端线程在这种超时获取的模式下会出现连接无法获取的情况,但是它能够保证客户端线程不会一直挂在连接获取的操作上,而是“按时”返回,并告知客户端连接获取出现问题,是系统的一种自我保护机制。数据库连接池的设计也可以复用到其他的资源获取的场景,针对昂贵资源(比如数据库连接)的获取都应该加以超时限制。

3、线程池技术及其示例

对于服务端的程序,经常面对的是客户端传入的短小任务(执行时间短、工作内容较为单一),需要服务端快速处理并返回结果。如果服务端每次接受到一个任务,创建一个线程,然后进行执行,这在原型阶段是个不错的选择,但是面对成千上万的任务提交进服务器时,如果还是采用一个任务一个线程的方式,那么将会创建数以万计的线程,这不是一个好的选择。因为这会使操作系统频繁的进行上下文切换,无故增加系统的负载,而线程的创建和销毁都是需要耗费系统资源的,也无疑浪费了系统资源。

线程池技术能够很好地解决这个问题,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面,消除了频繁创建和销毁线程的系统资源开销,另一方面,面对过量的任务提交能够频换的劣化过度。

下面先看一个简单的线程池接口定义

public interface ThreadPool<Job extends Runnable> {
    // 执行一个Job,这个Job需要实现Runnable
    void execute(Job job);
    // 关闭线程池
    void shutdown();
    // 增加工作者线程
    void addWorkers(int num);
    // 减少工作者线程
    void removeWorker(int num);
    // 得到正在等待执行的任务数量
    int getJobSize();
}

客户端可以通过execute(Job)方法将Job提交入线程池执行,而客户端自身不同等待Job的执行完成。除了execute(Job)方法以外,线程池接口提供了增大/减少工作者线程以及关闭线程池的方法。这里工作者线程代表一个重复执行Job的线程,而每个由客户端提交的Job都将进入到一个工作队列中等待工作者线程的处理。

接下来是线程池接口的默认实现,代码如下

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    // 线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    // 默认数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    // 最小限制
    private static final int MIN_WORKER_NUMBERS = 1;
    // 工作列表,将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<>();
    // 工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    // 工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    // 线程编号生成
    private AtomicLong threadNum = new AtomicLong();
    
    public DefaultThreadPool() {
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }
    
    public DefaultThreadPool(int num) {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWorkers(workerNum);
    }
    
    private void initializeWorkers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }
    
    @Override
    public void execute(Job job) {
        if (job != null) {
            // 添加一个工作,然后进行通知
            synchronized (jobs) {
                jobs.addLast(job);
                // 只需调用一个即可
                jobs.notify();
            }
        }
    }

    @Override
    public void shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        synchronized (jobs) {
            // 限制新增的worker数量不能超过最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorkers(num);
            this.workerNum += num;
        }
    }

    @Override
    public void removeWorker(int num) {
        synchronized (jobs) {
            if (num >= this.workerNum) {
                throw new IllegalArgumentException("beyond workNum");
            }
            // 按照给定的数量停止worker
            int count = 0;
            while (count < num) {
                Worker worker = workers.get(count);
                if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }
    
    // 工作者,负责消费任务
    class Worker implements Runnable {
        // 是否工作
        private volatile boolean running = true;
        @Override
        public void run() {
            while (running) {
                Job job =  null;
                synchronized (jobs) {
                    // 如果工作者列表是空的,那么就wait
                    while (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    // 取出一个Job
                    job = jobs.removeFirst();
                }
                if (job != null) {
                    try {
                        job.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        public void shutdown() {
            running = false;
        }
    }
}

从线程池的实现来看,当客户端调用execute(Job)方法时,会不断地向任务列表jobs中添加Job,而每个工作者线程会不断地从jobs上取出一个Job进行执行。当jobs为空时,工作线程进入等待状态,关键代码为while (jobs.isEmpty()) and jobs.wait();

添加一个Job后,对工作队列jobs调用其notify方法,而不是notifyAll方法,因为能够确定有工作者线程别唤醒,这时使用notify方法将会比notifyAll方法获得更小的开销(只唤醒等待队列中的一个线程进入阻塞队列中)。

可以看到,线程池的本质就是使用一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作队列上取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户端提交了一个任务之后,便会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者线程会被唤醒。

4、简单Web服务器线程池

目前的浏览器都支持多线程访问,比如说在请求一个html页面时,页面中包含的图片资源、样式资源会被浏览器并发请求,这样用户就不会遇到一直等到一个图片完全下载完成后才能继续查看文字内容的尴尬情况。

如果Web服务器是单线程的,多线程的浏览器也没有用武之地,因为服务还是一个请求一个请求的顺序处理。因此大部分Web服务器都支持并发访问,常用的Java Web服务器,如Tomcat、Jetty,在其处理请求的过程中都使用到了线程池技术。

下面通过使用前一节的线程池来构造一个简单的Web服务器,这个服务器用来处理Http请求,目前只能处理简单的文本和图片内容。这个Web服务器使用main线程不断地接受客户端Socket的连接,将连接以及请求提交给线程池处理,这样使得Web服务器能够同时处理多个客户端请求,代码如下

/**
 * @author HK
 * @date 2018-07-23 21:29
 */
public class SimpleHttpServer {
    // 处理HttpRequest的线程池
    static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(10);
    // SimpleHttpServer的根路径
    static String basePath = "/home/hk/test/";
    static ServerSocket serverSocket;
    static int port = 8080;

    public static void main(String[] args) throws Exception {
        SimpleHttpServer.start();
    }

    public static void start() throws Exception {
        System.out.println("开始运行....");
        serverSocket = new ServerSocket(port);
        Socket socket = null;
        // 注意下面这一行代码
        while ((socket = serverSocket.accept()) != null) {
            // 接受一个客户端Socket,生成一个HttpRequestHandler,放入线程池执行
            threadPool.execute(new HttpRequestHandler(socket));
        }
        serverSocket.close();
    }

    static class HttpRequestHandler implements Runnable {
        private Socket socket;

        public HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            String line = null;
            BufferedReader br = null;
            BufferedReader reader = null;
            PrintWriter out = null;
            InputStream in = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String header = reader.readLine();
                String filePath = basePath + header.split(" ")[1];
                out = new PrintWriter(socket.getOutputStream());
                if (filePath.endsWith("jpg") || filePath.endsWith("jpeg") || filePath.endsWith("ico") || filePath.endsWith("png")) {
                    in = new FileInputStream(filePath);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int i = 0;
                    while ((i = in.read()) != -1) {
                        baos.write(i);
                    }
                    byte[] array = baos.toByteArray();
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Molly");
                    out.println("Content-Type: image/jpeg");
                    out.println("Content-Length: " + array.length);
                    out.println("");
                    out.flush();
                    baos.writeTo(socket.getOutputStream());
                } else {
                    br = new BufferedReader(new InputStreamReader(new
                            FileInputStream(filePath)));
                    out = new PrintWriter(socket.getOutputStream());
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Molly");
                    out.println("Content-Type: text/html; charset=UTF-8");
                    out.println("");
                    while ((line = br.readLine()) != null) {
                        out.println(line);
                    }
                }
                out.flush();
            } catch (Exception ex) {
                out.println("HTTP/1.1 500");
                out.println("");
                out.flush();
            } finally {
                close(br, in, reader, out, socket);
            }
        }
    }

    private static void close(Closeable... closeables) {
        for (Closeable closeable : closeables) {
            try {
                if (closeable != null) {
                    closeable.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

SimpleHttpServer在建立了与客户端的连接之后,并不会处理客户端的请求,而是将其包装成HttpRequestHandler并交由线程池处理。在线程池中的Worker处理客户端请求的同事,SimpleHttpRequest能够继续完成后续客户端连接的建立,不会阻塞后续客户端的请求。

5、ab压力测试

通过ab命令进行压力测试,如下

# apt安装
sudo apt-get install apache2-utils
# 开10个线程,发送一百个请求,具体数据如下
ab -c 10 -n 100 http://localhost:8080/11.png

hk@hk-LPC:~/test$ ab -c 10 -n 100 http://localhost:8080/22.png
This is ApacheBench, Version 2.3 <$Revision: 1757674 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient).....done

Server Software:        Molly
Server Hostname:        localhost
Server Port:            8080

Document Path:          /22.png
Document Length:        30495 bytes

Concurrency Level:      10
Time taken for tests:   1.171 seconds
Complete requests:      100
Failed requests:        0
Total transferred:      3057300 bytes
HTML transferred:       3049500 bytes
# 吞吐率 = 每秒处理请求数
Requests per second:    85.40 [#/sec] (mean)
Time per request:       117.098 [ms] (mean)
# 请求平均耗时
Time per request:       11.710 [ms] (mean, across all concurrent requests)
Transfer rate:          2549.70 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   1.8      0       8
Processing:    73  114  24.5    114     176
Waiting:       73  113  24.4    112     176
Total:         76  115  24.7    114     177

Percentage of the requests served within a certain time (ms)
  50%    114
  66%    125
  75%    133
  80%    135
  90%    150
  95%    158
  98%    176
  99%    177
 100%    177 (longest request)

下面是统计数据

请求线程数请求数线程池数量吞吐率请求平均耗时
1101041.69 [#/sec] (mean)23.988 [ms]
11001041.86 [#/sec] (mean)23.888 [ms]
110001041.61 [#/sec] (mean)24.036 [ms]
10101087.63 [#/sec] (mean)11.412 [ms]
101001093.10 [#/sec] (mean)10.741 [ms]
1010001093.22 [#/sec] (mean)10.727 [ms]
11002041.39 [#/sec] (mean)24.159 [ms]
101002092.80 [#/sec] (mean)10.776 [ms]
1050002093.92 [#/sec] (mean)10.647 [ms]

不难得出,随着请求线程数、线程池中线程数量的增加,SimpleHttpServer的吞吐量不断增大,响应时间不断变小,多线程、线程池技术作用非常明显。

但是,线程池中的线程数量也并不是越多越好,具体的数量需要评估每个任务的处理时间,以及当前计算机的处理器能力及数量。使用的线程过少,无法发挥处理器的性能;使用的线程过多,将会增加系统的无故开销,起到相反的作用。

总访问次数: 278次, 一般般帅 创建于 2018-07-21, 最后更新于 2020-06-25

进大厂! 欢迎关注微信公众号,第一时间掌握最新动态!