8997 2021-02-28 2021-05-08

RocketMq源码阅读系列开篇之作,本文将介绍一些看似与源码无关,但却实际贯穿全流程的核心概念、知识储备、源码分析。搭楼还需地基稳,多说无益,开始吧!

一、common-cli的使用

apache common-cli工具包在大部分开源工具中非常常见,它的大致作用是避免程序员重复写一些命令行参数读入输出的问题,它能让你代码的命令行输入输出变得非常正规,已达到类似于mysql、redis、bash的命令行输入输出格式。

1、目标格式

下面是一段提示代码

Missing required options: h, u, p, s, d
usage: scp -help
 -d,--dst_path <arg>   the dstPath of remote
 -h,--host <ipv4 or ipv6>     the host of remote server
 -help                 usage help
 -P,--port <arg>       the port of remote server
 -p,--password <arg>   the password of remote server
 -s,--src_path <arg>   the srcPath of local
 -u,--user <arg>       the user of remote server

2、示例代码

下面是一段演示代码,核心部分不是很难-复杂,这里就不展开介绍了

/**
 * 参考链接: https://www.jianshu.com/p/c3ae61787a42
 *
 * @author HK
 * @date 2021-03-09 10:26
 */
public class ScpCmdTest {

    private static String HELP_STRING = null;

    private static Options options = new Options();

    public static void main(String[] args) {
        initCliArgs(new String[]{"-u", "hk",
                "--password", "hk",
                "-h", "localhost",
                "--dst_path", "127.0.0.1",
                "-p", "8099",
               "--src_path", "/xiaokui"
        });
    }

    private static void initCliArgs(String[] args) {
        CommandLineParser commandLineParser = new DefaultParser();
        // help
        options.addOption("help", "usage help");
        // host
        options.addOption(Option.builder("h").argName("ipv4 or ipv6").required().hasArg(true).longOpt("host").type(String.class).desc("the host of remote server").build());
        // port
        options.addOption(Option.builder("P").hasArg(true).longOpt("port").type(Short.TYPE).desc("the port of remote server").build());
        // user
        options.addOption(Option.builder("u").required().hasArg(true).longOpt("user").type(String.class).desc("the user of remote server").build());
        // password
        options.addOption(Option.builder("p").required().hasArg(true).longOpt("password").type(String.class).desc("the password of remote server").build());
        // srcPath
        options.addOption(Option.builder("s").required().hasArg(true).longOpt("src_path").type(String.class).desc("the srcPath of local").build());
        // dstPath
        options.addOption(Option.builder("d").required().hasArg(true).longOpt("dst_path").type(String.class).desc("the dstPath of remote").build());

        CommandLine commandLine = null;
        try {
            commandLine = commandLineParser.parse(options, args);
        } catch (ParseException e) {
            System.out.println(e.getMessage() + "\n" + getHelpString());
            System.exit(0);
        }
        if (commandLine.hasOption("h")) {
            System.out.println("has h:" + commandLine.getOptionValue("h"));
        }
        if (commandLine.hasOption("host")) {
            System.out.println("has host:" + commandLine.getOptionValue("host"));
        }
        if (commandLine.hasOption("-h")) {
            System.out.println("has -h:" + commandLine.getOptionValue("-h"));
        }
        if (commandLine.hasOption("--host")) {
            System.out.println("has --host:" + commandLine.getOptionValue("--host"));
        }
        if (commandLine.hasOption("hh")) {
            System.out.println("has hh:" + commandLine.getOptionValue("hh"));
        } else {
            System.out.println("has not hh:" + commandLine.getOptionValue("hh"));
        }
        System.out.println("输入正确:" + commandLine);
    }

    private static String getHelpString() {
        if (HELP_STRING == null) {
            HelpFormatter helpFormatter = new HelpFormatter();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            PrintWriter printWriter = new PrintWriter(byteArrayOutputStream);
            helpFormatter.printHelp(printWriter, HelpFormatter.DEFAULT_WIDTH, "scp -help", null,
                    options, HelpFormatter.DEFAULT_LEFT_PAD, HelpFormatter.DEFAULT_DESC_PAD, null);
            printWriter.flush();
            HELP_STRING = new String(byteArrayOutputStream.toByteArray());
            printWriter.close();
        }
        return HELP_STRING;
    }
}

二、logging的使用

在RocketMq的源码中,虽然使用了slf4j,但是是在slf4j之上包装了一层,这样会导致日志输出会丢失一些原始信息,比如最初的类信息、行信息(可自行实验)。这里列举几个比较容易搞混的点,如下

1、日志输出的格式

### 一些常见的
%m 输出代码中指定的消息 
%p 输出优先级,即DEBUG,INFO,WARN,ERROR,FATAL 
%r 输出自应用启动到输出该log信息耗费的毫秒数 
%c 就是定义该Logger的名字
%C 输出所属的类目,通常就是调用该Logger的类所在类的全名 
%t 输出产生该日志事件的线程名 
%n 输出一个回车换行符,Windows平台为"rn",Unix平台为"n" 
%d 输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,比如:%d{yyy MMM dd HH:mm:ss,SSS},输出类似:2002年10月18日 22:10:28,921 
%l 输出日志事件的发生位置,包括类目名、发生的线程,以及在代码中的行数。举例:Testlog4.main(Test Log4.java:10)

2、是如何整合的

先说结论,slf4j是通过预留实现类StaticLoggerBinder来进行整合,通过这个类决定是加载log4j,还是logback等,具体见如下代码

/**
 * slf4j simple logging facade for java
 *
 * 参考链接:https://blog.csdn.net/Mr_Mocha/article/details/90482164
 *
 * slf4j-log4j12-x.x.x.jar是使用org.apache.log4j.Logger提供的驱动
 * slf4j-jdk14-x.x.x.jar是使用java.util.logging提供的驱动
 * slf4j-simple-x.x.x.jar直接绑定System.err
 * slf4j-jcl-x.x.x.jar是使用commons-logging提供的驱动
 * logback-classic-x.x.x.jar是使用logback提供的驱动
 *
 * slf4j实现原理:具体代码见类LoggerFactory的bind方法,大致过程如下
 * 1.先 findPossibleStaticLoggerBinderPathSet 方法尝试去加载类 org/slf4j/impl/StaticLoggerBinder.class
 * 2.如果出现 NoClassDefFoundError 则说明类路径下没有 第三方日志框架对 StaticLoggerBinder 的具体实现,slf4j输出找不到第三方适配,流程结束
 * 3.如果找得到 StaticLoggerBinder 的实现类,则 StaticLoggerBinder.getSingleton().getLoggerFactory(),返回第三方Logger的具体实现类
 * 4.这里还有一个分支,如果找到多个 StaticLoggerBinder 实现类,怎么办? 答案是能跑,但会出现红色警告 Class path contains multiple SLF4J bindings
 * 5.如果出现了两个相同包相同类,那么用的是哪个呢? 答案是只会加载第一个,并且正常运行,与export顺序有关(可以通过调换logback、log4j的导入顺序来使用不同架包)
 *
 * 浅谈两个jar包中包含完全相同的包名和类名的加载问题
 * 1.自定义两个jar包,其中包含相同包名和类名:与export的导入顺序有关。只会加载第一个,并且运行正常
 * 2.自定义jar和jdk包,其中包含相同的包名和类名:与export的导入顺序有关。同样是只会加载第一个,但是如果加载自定义的jar运行会报错。加载jdk正常。
 * @author HK
 * @date 2021-03-09 11:21
 */
public class LogTest {

    private static String STATIC_LOGGER_BINDER_PATH = "org/slf4j/impl/StaticLoggerBinder.class";

    static Logger logger = LoggerFactory.getLogger(LogTest.class);

    public static void main(String[] args) {
        LinkedHashSet staticLoggerBinderPathSet = new LinkedHashSet();

        try {
            ClassLoader loggerFactoryClassLoader = LoggerFactory.class.getClassLoader();
            Enumeration paths;
            if (loggerFactoryClassLoader == null) {
                paths = ClassLoader.getSystemResources(STATIC_LOGGER_BINDER_PATH);
            } else {
                paths = loggerFactoryClassLoader.getResources(STATIC_LOGGER_BINDER_PATH);
            }

            while(paths.hasMoreElements()) {
                URL path = (URL)paths.nextElement();
                staticLoggerBinderPathSet.add(path);
            }

        } catch (IOException var4) {
            Util.report("Error getting resources from path", var4);
        }
        System.out.println(staticLoggerBinderPathSet);

        System.out.println(StaticLoggerBinder.class);
        // logback=1.7.16    log4j=1.6
        System.out.println(StaticLoggerBinder.REQUESTED_API_VERSION);

        logger.info(LoggerFactory.getILoggerFactory().toString() + " " + logger.getName() + ": 这是一条测试消息");
        System.out.println("这是一条测试消息");
    }
}

三、定时任务

在RocketMQ中大量使用定时任务,我们来看下与此相关一些知识点。

1、Timer的使用

1、实现原理

单线程 + 最小堆任务消费队列(完全二叉平衡树,各根节点小于其子节点,且优先插入左侧节点,中序遍历可得排序结果集) + 轮询去堆顶取最小值执行任务。

不推荐使用,原因有二:

  • 对耗时任务及多任务非常不友好
  • 对于运行时异常不友好

后面有相关代码验证。

2、任务生产

/**
 * Schedule the specified timer task for execution at the specified
 * time with the specified period, in milliseconds.  If period is
 * positive, the task is scheduled for repeated execution; if period is
 * zero, the task is scheduled for one-time execution. Time is specified
 * in Date.getTime() format.  This method checks timer state, task state,
 * and initial execution time, but not period.
 *
 * @throws IllegalArgumentException if <tt>time</tt> is negative.
 * @throws IllegalStateException if task was already scheduled or
 *         cancelled, timer was cancelled, or timer thread terminated.
 * @throws NullPointerException if {@code task} is null
 */
// 外层scheduleXXX方法,本质都是对这个方法的调用
private void sched(TimerTask task, long time, long period) {
    if (time < 0)
        throw new IllegalArgumentException("Illegal execution time.");

    // Constrain value of period sufficiently to prevent numeric
    // overflow while still being effectively infinitely large.
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;

    // 对消费任务队列加锁,即生产者串行提交任务
    synchronized(queue) {
        if (!thread.newTasksMayBeScheduled)
            throw new IllegalStateException("Timer already cancelled.");
		// 再锁住具体的任务对象,对于消费者而言,关注的是具体的任务对象
        synchronized(task.lock) {
            if (task.state != TimerTask.VIRGIN)
                throw new IllegalStateException(
                    "Task already scheduled or cancelled");
            // 任务的下一次执行时间
            task.nextExecutionTime = time;
            // 执行周期
            task.period = period;
            // 任务状态
            task.state = TimerTask.SCHEDULED;
        }
		// 任务放入消费队列,等待消费线程消费
        queue.add(task);
        if (queue.getMin() == task)
            // 如果刚刚加入的任务位于堆顶,则唤醒在queue上wait的线程
            // 为什么是notify呢,因为消费线程主动调用了wait方法,只能由外部主动唤醒
            queue.notify();
    }
}

3、任务消费

// 调用构造方法时,默认会启动消费线程
public Timer(String name) {
    thread.setName(name);
    thread.start();
}

public void run() {
    try {
        mainLoop();
    } finally {
        // Someone killed this Thread, behave as if Timer cancelled
        synchronized(queue) {
            newTasksMayBeScheduled = false;
            queue.clear();  // Eliminate obsolete references
        }
    }
}

/**
     * The main timer loop.  (See class comment.)
     */
private void mainLoop() {
    // 轮询模式
    while (true) {
        try {
            TimerTask task;
            // 是否执行任务
            boolean taskFired;
            synchronized(queue) {
                // Wait for queue to become non-empty
                // 这里主动调用了wait方法,等待外部唤醒
                // 条件是队列无任务消费,且有能力去消费任务
                while (queue.isEmpty() && newTasksMayBeScheduled)
                    queue.wait();
                if (queue.isEmpty())
                    break; // Queue is empty and will forever remain; die

                // Queue nonempty; look at first evt and do the right thing
                // 当前时间 和 执行时间
                long currentTime, executionTime;
                // 取堆顶任务
                task = queue.getMin();
                synchronized(task.lock) {
                    if (task.state == TimerTask.CANCELLED) {
                        queue.removeMin();
                        continue;  // No action required, poll queue again
                    }
                    // 当前系统时间,精确到毫秒 ms
                    currentTime = System.currentTimeMillis();
                    // 任务的执行时间
                    executionTime = task.nextExecutionTime;
                    // 如果当前系统时间 大于等于 执行时间
                    // 假设任务执行时间为 20210101 11:11:11 111,而系统时间为20200101 22:22:22 222,则进入条件
                    if (taskFired = (executionTime<=currentTime)) {
                        // 移除已执行的一次性任务
                        if (task.period == 0) { // Non-repeating, remove
                            queue.removeMin();
                            task.state = TimerTask.EXECUTED;
                        } else { // Repeating task, reschedule
                            // 周期性任务
                            // 如果周期小于0,那么堆顶任务的下一次执行时间 = 当前时间 - 任务执行周期
                            // 如果周期大于0,那么堆顶任务的下一次执行时间 = 任务执行时间 + 任务执行周期
                            // 再重新调整最小堆任务
                            // 这里的代码逻辑具体表现为 schedule方法和scheduleAtFixedRate方法的细微差别
                            queue.rescheduleMin(
                                task.period<0 ? currentTime   - task.period
                                : executionTime + task.period);
                        }
                    }
                }
                // 如果不执行任务(执行时间 > 当前系统时间,即还未到执行时间),那么等待 执行时间 - 当前时间 毫秒
                // 等待时间到了(不一定非常精确,但误差可控),再次执行任务
                if (!taskFired) // Task hasn't yet fired; wait
                    queue.wait(executionTime - currentTime);
            }
            // 当前系统时间 大于等于 执行时间,执行任务
            // 我也觉得这个条件有点奇怪,不妨假设两个例子
            // 1.1 计划在2s后执行一个一次性任务,那么当前系统时间为 n,执行时间为 n + 1,此时taskFired = false,则 wait(2s)
            // 1.2 然后在进入循环,假设此时系统时间为 n + 3 > n + 2,此时taskFired = true,则执行任务
            // 2.1 计划计划执行一个周期性任务,n + 1后,每两秒执行一次
            // 2.2 初始化时系统时间可看做 n,小于执行时间n + 1,此时taskFired = false,则 wait 1s
            // 2.3 再如循环,此时系统时间为 n + 1.00001,大于执行时间 n + 1,此时taskFired = true,调整堆顶位置,然后执行任务,再循环
            if (taskFired)  // Task fired; run it, holding no locks
                task.run();
        } catch(InterruptedException e) {
        }
    }
}

// 假设 我希望有一个任务在 2021-02-02 10:50:00 开始运行,每个3分钟运行一次
// 但实际任务运行时间为 2021-02-02 10:55:00,那么区别如下

// schedule 不会补上之前的任务,它还是按照实际程序执行时间开始计算,即第一次打印为 2020-02-02 10:55:00,第二次为58
// 这里是 -period
public void schedule(TimerTask task, long delay, long period) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, System.currentTimeMillis()+delay, -period);
}

// scheduleAtFixedRate 会补上之前的任务,即快速打印50和53(多执行了两次),然后再按正常的56、59运行下去
// 这里是 +period,具体区别我们这里就不跟,见源代码,不用这个类就对了
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, System.currentTimeMillis()+delay, period);
}

4、代码验证

  • 验证schedule和scheduleAtFixedRate的区别。
static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

private static void test1() {
    // 测试schedule、scheduleAtFixedRate的区别
    Timer timer = new Timer("定时任务timer", false);
    System.out.println("main当前时间为:" + sdf.format(new Date()));
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("schedule当前时间为:" + sdf.format(new Date()));
        }
    }, new Date(System.currentTimeMillis() + -2 * 1000), 5 * 1000);

    timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            System.out.println("scheduleAtFixedRate当前时间为:" + sdf.format(new Date()));
        }
    }, new Date(System.currentTimeMillis() + -2 * 1000), 5 * 1000);
}

// 输出如下,由此可知 schedule 会忽略传入时间作为起始值,而将当前时间作为开始值
// 而 scheduleAtFixedRate 会在调用时打印一次,并到下一个循环步入正轨
main当前时间为:2021-03-21 17:53:27,期望时间应为 17:53:25
    
schedule当前时间为:2021-03-21 17:53:27
scheduleAtFixedRate当前时间为:2021-03-21 17:53:27
scheduleAtFixedRate当前时间为:2021-03-21 17:53:30
    
schedule当前时间为:2021-03-21 17:53:32
scheduleAtFixedRate当前时间为:2021-03-21 17:53:35
    
schedule当前时间为:2021-03-21 17:53:37
scheduleAtFixedRate当前时间为:2021-03-21 17:53:40
    
schedule当前时间为:2021-03-21 17:53:42
scheduleAtFixedRate当前时间为:2021-03-21 17:53:45
    
schedule当前时间为:2021-03-21 17:53:47
scheduleAtFixedRate当前时间为:2021-03-21 17:53:50
    
schedule当前时间为:2021-03-21 17:53:52
scheduleAtFixedRate当前时间为:2021-03-21 17:53:55
    
schedule当前时间为:2021-03-21 17:53:57
scheduleAtFixedRate当前时间为:2021-03-21 17:54:00
  • 验证对于耗时任务及多任务不友好。
private static void test2() {
    // 测试对耗时任务及多任务不友好
    Timer timer = new Timer("定时任务timer", false);
    System.out.println("main当前时间为:" + sdf.format(new Date()));
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("schedule1当前时间为:" + sdf.format(new Date()));
        }
    }, new Date(System.currentTimeMillis() + 2 * 1000), 2 * 1000);

    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("schedule2当前时间为:" + sdf.format(new Date()));
        }
    }, new Date(System.currentTimeMillis() + 2 * 1000), 2 * 1000);
}

// 输出如下,按照正常的猜想,应该是这样的
// 1 当前时间为 n,则 n + 2 后执行任务1、任务2
// 2 n + 4 时,再执行任务1、任务2,循环此步骤
// 但结果却是,原因出在 Timer的本质是单线程执行
// n + 7 执行任务1、任务2
// n + x 执行任务x,完全乱了逻辑
main当前时间为:2021-03-21 18:04:55
schedule1当前时间为:2021-03-21 18:05:02
schedule2当前时间为:2021-03-21 18:05:02
    
schedule1当前时间为:2021-03-21 18:05:07
schedule1当前时间为:2021-03-21 18:05:12
schedule2当前时间为:2021-03-21 18:05:12
schedule1当前时间为:2021-03-21 18:05:17
schedule1当前时间为:2021-03-21 18:05:22
schedule2当前时间为:2021-03-21 18:05:22
schedule1当前时间为:2021-03-21 18:05:27
schedule1当前时间为:2021-03-21 18:05:32
schedule2当前时间为:2021-03-21 18:05:32
schedule1当前时间为:2021-03-21 18:05:37
  • 验证对于运行时异常不友好。
private static void test3() {
    // 测试对运行时异常不友好
    Timer timer = new Timer("定时任务timer", false);
    System.out.println("main当前时间为:" + sdf.format(new Date()));
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("schedule1当前时间为:" + sdf.format(new Date()));
        }
    }, new Date(System.currentTimeMillis() + 2 * 1000), 2 * 1000);

    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("schedule2当前时间为:" + sdf.format(new Date()));
            throw new RuntimeException();
        }
    }, new Date(System.currentTimeMillis() + 3 * 1000), 2 * 1000);
}

// 输出如下,解决办法就是显式捕捉即可
main当前时间为:2021-03-21 18:14:40
schedule1当前时间为:2021-03-21 18:14:42
schedule2当前时间为:2021-03-21 18:14:43

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Exception in thread "定时任务timer" java.lang.RuntimeException
	at rockermq.schedule.TimerTest$6.run(TimerTest.java:80)
	at java.util.TimerThread.mainLoop(Timer.java:555)
	at java.util.TimerThread.run(Timer.java:505)

2、ScheduledExecutorService

相比较Timer的使用,ScheduledExecutorService是推荐的使用类。它的核心地方,也就是Timer的痛点之处,有了反面例子,那么正面角色也就好理解了。

1、接口方法

// ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
// 其中具体的实现类为 ScheduledThreadPoolExecutor,后面均以类 ScheduledThreadPoolExecutor 为例 
public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

接口方法很简单,甚至抛弃了Timer中对于特定Date的运行支持,而完全采用固定延迟 + 固定周期。这里的难点是融合线程池知识,如果对线程池了解不深,就容易陷入盲区。

2、任务生产

/**
 * Creates a new {@code ScheduledThreadPoolExecutor} with the
 * given core pool size.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
// 注意这里的 new DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

/**
 * Specialized delay queue. To mesh with TPE declarations, this
 * class must be declared as a BlockingQueue<Runnable> even though
 * it can only hold RunnableScheduledFutures.
 */
// ScheduledThreadPoolExecutor内部类
static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {
    // 省略其他
}

/**
 * Main execution method for delayed or periodic tasks.  If pool
 * is shut down, rejects the task. Otherwise adds task to queue
 * and starts a thread, if necessary, to run it.  (We cannot
 * prestart the thread to run the task because the task (probably)
 * shouldn't be run yet.)  If the pool is shut down while the task
 * is being added, cancel and remove it if required by state and
 * run-after-shutdown parameters.
 *
 * @param task the task
 */
// 来自类 ScheduledThreadPoolExecutor
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        // 这里加入了任务
        // 注意,这里的延迟队列是特制,属于内部类,外面不能访问
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 预热核心线程
            ensurePrestart();
    }
}

至此,任务以加入任务队列,等待消费。

3、任务消费

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
// 来自类 ThreadPoolExecutor
// 线程池内部逻辑,主线程会轮询调用这个方法,以从任务队列消费任务
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 注意这里有一个逻辑
            Runnable r = timed ?
                // 同take无限等待不同,这个指定了最大等待时间,超过了返回null
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            	// 尝试从延迟队列获取并移除头部元素,如果没有符合条件的则等待,直到可以获取到下一个
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

// 来自类 DelayedWorkQueue
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            // 没有则等待
            if (first == null)
                available.await();
            else {
                // 获取第一个任务延迟等待
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 立即执行
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 同Timer,等待特定时间
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

4、总结

其实也没啥好总结的,大概说下吧:ScheduledThreadPoolExecutor依托于线程池 + 延时队列技术,变相地为Timer引入了多线程,以解决了Timer的不足之处。如下是几段简单的代码演示

/**
 * 测试 基本使用
 */
private static void test1() {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    scheduledExecutorService.schedule(new Runnable() {
        @Override
        public void run() {
            System.out.println(new Date());
        }
    }, 1, TimeUnit.SECONDS);
}

/**
 * 测试 断点位置
 */
private static void test2() {
    ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            System.out.println(new Date());
        }
    }, 1, 2, TimeUnit.SECONDS);
}

四、Linux下的5种IO模型

引用链接:https://zhuanlan.zhihu.com/p/127170201、https://www.zhihu.com/question/32163005

1、几个系统调用函数

recvfrom Linux系统提供给用户用于接收网络IO的系统接口。从套接字上接收一个消息,可同时应用于面向连接和无连接的套接字。

如果此系统调用返回值<0,并且 errno为EWOULDBLOCK或EAGAIN(套接字已标记为非阻塞,而接收操作被阻塞或者接收超时 )时,连接正常,阻塞接收数据(这很关键,前4种IO模型都设计此系统调用)。

select select系统调用允许程序同时在多个底层文件描述符上,等待输入的到达或输出的完成。以数组形式存储文件描述符,32位机器默认1024个,64位机器默认2048个。当有数据准备好时,无法感知具体是哪个流OK了,所以需要一个一个的遍历,函数的时间复杂度为O(n)

poll链表形式存储文件描述符,没有长度限制(而select有长度限制,注意数组和链表的区别)。本质与select相同,函数的时间复杂度也为O(n)

epoll 是基于事件驱动的,如果某个流准备好了,会以事件通知,知道具体是哪个流,因此不需要遍历,函数的时间复杂度为O(1)

sigaction 用于设置对信号的处理方式,也可检验对某信号的预设处理方式。Linux使用SIGIO信号来实现IO异步通知机制。

2、5种IO模型

学习过操作系统的知识后,可以知道:不管是网络IO还是磁盘IO,对于读操作而言,都是等到网络的某个数据分组到达后/数据准备好后,将数据拷贝到内核空间的缓冲区中,再从内核空间拷贝到用户空间的缓冲区

口头说明以 妈妈让我去厨房烧一锅子水,然后拿热水下饺子为例(我代指一个进程,水开了代指IO准备就绪,下饺子代指进程读取/使用IO数据)

1、阻塞IO

  • 书面说明:阻塞IO的执行过程是进程进行系统调用等待内核将数据准备好并复制到用户态缓冲区后,进程放弃使用CPU一直阻塞在此,直到数据准备好。

  • 口头说明:水只要没烧开,我就干瞪眼看着这个锅,其他什么都不做。

阻塞io

2、非阻塞IO

  • 书面说明:应用程序询问内核是否有数据准备好。如果就绪,就进行拷贝操作(此时阻塞);如果未就绪,就不阻塞程序,内核直接返回未就绪的返回值,等待用户程序下一个轮询。
  • 口头说明:我可以干些别的,但需要快速、频繁地来看下水是否开了。
  • 相比较于阻塞IO的优点:由于阻塞IO阻塞于IO数据的读取,从而放弃CPU使用,直到下一次CPU时间片;而非阻塞IO轮询IO数据,当发现IO数据没到时,可以利用CPU剩余时间片去做别的事情。

非阻塞IO非阻塞IO模型

3、IO多路复用

排了很长的队,终于轮到我支付后,拿到了一张小票,上面有号次。当全家桶出炉后,会喊相应的号次来取。KFC营业员小姐姐打小票出号次的动作相当于操作系统多开了个线程,专门接收客户端的连接。我只关注叫到的是不是我的号,因此程序还需在服务端注册我想监听的事件类型。

多路复用一般都是用于网络IO,服务端与多个客户端的建立连接。下面是神奇的多路复用执行过程:

IO多路复用IO多路复用模型

相比于阻塞IO模型,多路复用只是多了一个select/poll/epoll函数。select函数会不断地轮询自己所负责的文件描述符/套接字的到达状态,当某个套接字就绪时,就对这个套接字进行处理。

select负责轮询等待这里可以替换为poll或epoll,它们之间的区别见前文,所以linux下面尽量使用epool),recvfrom负责拷贝。当用户进程调用该select,select会监听所有注册好的IO,如果所有IO都没注册好,调用进程就阻塞。

对于客户端来说,一般感受不到阻塞,因为请求来了,可以放到线程池里执行;但对于执行select的操作系统而言,是阻塞的,需要阻塞地等待某个套接字变为可读

IO多路复用其实是阻塞在select,poll,epoll这类系统调用上的,复用的是执行select,poll,epoll的线程。

4、信号驱动IO

跑KFC嫌麻烦,刚好有个会员,直接点份外卖,美滋滋。当外卖送达时,会收到取餐电话(信号)。在收到取餐电话之前,我可以愉快地吃鸡或者学习。

当数据报准备好的时候,内核会向应用程序发送一个信号,进程对信号进行捕捉,并且调用信号处理函数来获取数据报。

信号驱动IO信号驱动IO模型

该模型也分为两个阶段:

  • 数据准备阶段:未阻塞,当数据准备完成之后,会主动的通知用户进程数据已经准备完成,对用户进程做一个回调。
  • 数据拷贝阶段:阻塞用户进程,等待数据拷贝。

5、异步IO

此时科技的发展已经超乎想象了,外卖机器人将全家桶自动送达并转换成营养快速注入我的体内,同时还能得到口感的满足。注入结束后,机器人会提醒我注入完毕。在这个期间我可以放心大胆的玩,甚至注射的时候也不需要停下来

类比一下,就是用户进程发起系统调用后,立刻就可以开始去做其他的事情,然后直到I/O数据准备好并复制完成后,内核会给用户进程发送通知,告诉用户进程操作已经完成了。

异步IO异步IO模型

特点:此时科技的发展已经超乎想象了,外卖机器人将全家桶自动送达并转换成营养快速注入我的体内,同时还能得到口感的满足。注入结束后,机器人会提醒我注入完毕。在这个期间我可以放心大胆的玩,甚至注射的时候也不需要停下来!

类比一下,就是用户进程发起系统调用后,立刻就可以开始去做其他的事情,然后直到I/O数据准备好并复制完成后,内核会给用户进程发送通知,告诉用户进程操作已经完成了。

异步I/O执行的两个阶段**都不会阻塞读写操作,**由内核完成。完成后内核将数据放到指定的缓冲区,通知应用程序来取。

6、比较

各个IO Model的比较如图所示

io比较

五、netty

1、netty为什么选择nio

这里以一个简单的回文服务器作为例子(客户端发送什么,服务端返回什么),分别测试bio、nio、aio的性能,具体测试代码如下

1、bio

服务端代码

public class BIOServer {

    public void initServer () throws IOException {
        ServerSocket serverSocket = new ServerSocket(2222);
        System.out.println("服务器主线程等待连接....");
        AtomicInteger i = new AtomicInteger(0);
        Socket client;
        while ((client = serverSocket.accept()) != null) {
            i.incrementAndGet();
            Socket finalClient = client;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        byte[] bytes = new byte["hello".getBytes().length];
                        while (finalClient.getInputStream().read(bytes) != -1) {
                            finalClient.getOutputStream().write(bytes);
                            finalClient.close();
                            return;
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "服务器线程" + i.get()).start();
        }
    }
}

public class BIO {

    public static void main(String[] args) throws IOException, InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new BIOServer().initServer();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Thread.sleep(1000);
        new Client().beginTest(2222);
    }
}

客户端代码(后面将不再重复)

public class Client {

    private int number = 10;

    public Client() {
    }

    public Client(int thread) {
        this.number = thread;
    }

    private void initClient(int port) throws IOException, InterruptedException {
        Socket socket = new Socket(InetAddress.getLocalHost(), port);
        PrintWriter pw = new PrintWriter(socket.getOutputStream());
        BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));

        String req = "hello";
        pw.print(req);
        pw.flush();

        String temp;
        while ((temp = br.readLine()) != null) {
            System.out.println("收到服务端回声消息:" + temp);
        }
        pw.close();
    }

    public void beginTest(int port) throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(number, number + 10, 10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(number), new ThreadFactory() {
            @Override
            public Thread newThread( Runnable r) {
                return new Thread(r, "线程" + (number - atomicInteger.incrementAndGet()));
            }
        });
        int preparedThread = threadPoolExecutor.prestartAllCoreThreads();
        System.out.println("线程成功预热数:" + preparedThread);

        long beginTime = System.currentTimeMillis();
        CyclicBarrier barrier = new CyclicBarrier(number);
        CountDownLatch countDownLatch = new CountDownLatch(number);

        for (int i = 0; i < number; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        barrier.await();
                        initClient(port);
                    } catch (IOException | InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        countDownLatch.await();
        System.out.println(number + "线程连接服务器,总耗时: " + (System.currentTimeMillis() - beginTime) + " ms");
        System.exit(0);
    }
}

2、nio

public class NIOServer {

    private Selector selector;

    public void initServer(int port) throws IOException{
        // 打开ServerSocket通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        // 获取一个选择器
        this.selector = Selector.open();
        // 将通道管理器与该通道进行绑定,并为该通道注册SelectionKey.OP_ACCEPT事件
        // 注册事件后,当该事件触发时会使selector.select()返回,
        // 否则selector.select()一直阻塞
        serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        listen();
    }

    public void listen() throws IOException{
        System.out.println("启动服务器!");
        while (true) {
            // select()方法一直阻塞直到有注册的通道准备好了才会返回
            selector.select();
            Iterator<?> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = (SelectionKey) iterator.next();
                // 删除已选的key,防止重复处理
                iterator.remove();
                handler(key);
            }
        }
    }

    public void handler(SelectionKey key)throws IOException{
        if (key.isAcceptable()) {
            handlerAccept(key);
        }else if (key.isReadable()){
            handlerRead(key);
        }else if (key.isWritable()){
            System.out.println("can write!");
        }else if (key.isConnectable()){
            System.out.println("is connectable");
        }
    }


    public void handlerAccept(SelectionKey key) throws IOException{
        // 从SelectionKey中获取ServerSocketChannel
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // 获取SocketChannel
        SocketChannel socketChannel = server.accept();
        // 设置成非阻塞
        socketChannel.configureBlocking(false);
        // 为socketChannel通道建立 OP_READ 读操作,使客户端发送的内容可以被读到
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    public void handlerRead(SelectionKey key)throws IOException{
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 创建读取缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        // 从通道读取可读取的字节数
        try {
            int readCount = socketChannel.read(byteBuffer);
            if (readCount > 0) {
                byte[] data = byteBuffer.array();
                ByteBuffer outBuffer = ByteBuffer.wrap(data);
                socketChannel.write(outBuffer);
                socketChannel.close();
            } else {
                System.out.println("客户端异常退出");
            }
        } catch (IOException e) {
            key.cancel();
        }
    }
}

public class NIO {

    public static void main(String[] args) throws IOException, InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new NIOServer().initServer(4444);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Thread.sleep(1000);
        new Client().beginTest(4444);
    }
}

3、aio

public class AIOServer {

    private static final ExecutorService executorService = Executors.newFixedThreadPool(200);

    public void init() throws Exception {
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
        server.bind(new InetSocketAddress(3333));
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Object attachment) {
                server.accept(null, this);
//                try {
//                    System.out.println(Thread.currentThread().getName() + ":服务器与客户端" + client.getRemoteAddress() + "建立连接");
//                } catch (IOException e) {
//                    e.printStackTrace();
//                }
                ByteBuffer buffer = ByteBuffer.allocate("hello".getBytes().length);
                client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer index, ByteBuffer buffer) {
                        try {
                            buffer.flip();
                            client.write(buffer).get();//这个是异步的,一定要用get 确保执行结束 才能clear
                            client.close();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        System.out.println(exc.getMessage());
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                throw new RuntimeException(exc.getMessage());
            }
        });
    }
}

public class AIO {

    public static void main(String[] args) throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new AIOServer().init();
                } catch (Exception e) {
                    e.printStackTrace();
                    return;
                }
            }
        }, "服务端线程").start();
        Thread.sleep(1000);
        if (args.length != 0) {
            new Client(Integer.parseInt(args[0])).beginTest(3333);
        } else {
            new Client().beginTest(3333);
        }
    }
}

4、测试数据

测试脚本如下

#!/bin/bash
# 该脚本主要用于代替手工执行
# @author HK
# @date 2020-10-22
# @version 1.0

readonly bash_path=""
readonly from_dir=""
readonly to_dir="/home/hk-pc/ABC/classes"
readonly base_path="/home/hk-pc/JavaSpace/newxiaokui/k-study/k-java/src/main/java"
readonly log_file="${to_dir}/output.txt"

# 进入工作目录
cd ${base_path} || {
  echo "无法进入java目录"
  exit 1
}
# 编译java文件 -d 指定class输出的目录
javac -d "${to_dir}" netty/io/*.java
# 再进入编译后的class目录
cd "${to_dir}" || {
  echo "无法进入java目录"
  exit 1
}
# 准备日志文件,输出记录
>${log_file}
echo "日志输出至文件 ${log_file}"

# 测试次数,然后再取平均值
readonly times=(50)
# 线程测试数量
readonly threads=(50 100 250 500 1000 1500 2000 3000)

# 方法第一个入参为类型,可选值为bio、nio、aio,必传
# 方法第二个入参为执行次数
# 方法第三个入参为线程数
function run_test() {
  io_type="$1"
  run_times="$2"
  thread_count="$3"
  sum_time=0
  average_time=0
  for ((i = 0; i < ${run_times}; i++)); do
    if test "$io_type" = 'bio'; then
      result=$(java netty/io/BIO "$thread_count" | tail -n 1)
    elif test "$io_type" = 'nio'; then
      result=$(java netty/io/NIO "$thread_count" | tail -n 1)
    elif test "$io_type" = 'aio'; then
      result=$(java netty/io/AIO "$thread_count" | tail -n 1)
    else
      echo "非法输入" "$2"
      exit 1
    fi
    # echo "$result" | tee -a ${log_file}
    temp="${result#* }"
    consume_time="${temp%% *}"
    sum_time=$(expr $sum_time + $consume_time)
  done
  average_time=$(expr $sum_time / $run_times)
  echo "$io_type" "线程数" "$thread_count" "  运行次数" "$run_times" "  平均耗时" "$average_time" "ms" | tee -a ${log_file}
}

echo "$(date) "| tee -a ${log_file}
# 最好用这种变量,那种用两个变量遍历数组索引的方式有问题
for i in "${times[@]}";do
  for j in "${threads[@]}";do
    run_test bio "$i" "$j"
    run_test nio "$i" "$j"
    run_test aio "$i" "$j"
  done
done
echo "$(date) "| tee -a ${log_file}

测试结果如下

2020年 11月 11日 星期三 16:26:45 CST 
bio 线程数 50   运行次数 50   平均耗时 71 ms  # 相差不大
nio 线程数 50   运行次数 50   平均耗时 65 ms
aio 线程数 50   运行次数 50   平均耗时 78 ms
bio 线程数 100   运行次数 50   平均耗时 1076 ms  # 相差不大
nio 线程数 100   运行次数 50   平均耗时 1071 ms
aio 线程数 100   运行次数 50   平均耗时 1079 ms
bio 线程数 250   运行次数 50   平均耗时 2227 ms  # 相差不大
nio 线程数 250   运行次数 50   平均耗时 2109 ms
aio 线程数 250   运行次数 50   平均耗时 2251 ms
bio 线程数 500   运行次数 50   平均耗时 2854 ms  # 有略微差距
nio 线程数 500   运行次数 50   平均耗时 3147 ms
aio 线程数 500   运行次数 50   平均耗时 3436 ms
bio 线程数 1000   运行次数 50   平均耗时 3720 ms  # 差距稍稍明显
nio 线程数 1000   运行次数 50   平均耗时 4515 ms
aio 线程数 1000   运行次数 50   平均耗时 4280 ms
bio 线程数 1500   运行次数 50   平均耗时 5091 ms  # 数据不合理啊
nio 线程数 1500   运行次数 50   平均耗时 6173 ms
aio 线程数 1500   运行次数 50   平均耗时 4904 ms


2020年 11月 02日 星期一 11:35:12 CST 
bio 线程数 50   运行次数 50   平均耗时 80 ms  # 相差不大
nio 线程数 50   运行次数 50   平均耗时 67 ms
aio 线程数 50   运行次数 50   平均耗时 81 ms
bio 线程数 100   运行次数 50   平均耗时 1074 ms  # 相差不大
nio 线程数 100   运行次数 50   平均耗时 1071 ms
aio 线程数 100   运行次数 50   平均耗时 1078 ms
bio 线程数 250   运行次数 50   平均耗时 1928 ms  # 相差不大,bio最好
nio 线程数 250   运行次数 50   平均耗时 2138 ms
aio 线程数 250   运行次数 50   平均耗时 2201 ms
bio 线程数 500   运行次数 50   平均耗时 2772 ms  # 相差不大,bio最好
nio 线程数 500   运行次数 50   平均耗时 3081 ms
aio 线程数 500   运行次数 50   平均耗时 3117 ms
bio 线程数 1000   运行次数 50   平均耗时 4499 ms  # 相差不大,nio自豪
nio 线程数 1000   运行次数 50   平均耗时 4161 ms
aio 线程数 1000   运行次数 50   平均耗时 4515 ms
bio 线程数 1500   运行次数 50   平均耗时 5739 ms  # 相差不大,nio最好
nio 线程数 1500   运行次数 50   平均耗时 4985 ms
aio 线程数 1500   运行次数 50   平均耗时 5236 ms
bio 线程数 2000   运行次数 50   平均耗时 5671 ms  # 相差不大,nio最好
nio 线程数 2000   运行次数 50   平均耗时 4869 ms
aio 线程数 2000   运行次数 50   平均耗时 6358 ms
bio 线程数 3000   运行次数 50   平均耗时 10016 ms  # nio效果明显
nio 线程数 3000   运行次数 50   平均耗时 6223 ms

以上数据为个人笔记本运行测试结果,限于测试条件,结果仅供参考(测一次要一个多小时),数据证明大致反应了一个事实(马后炮):

  • 并发量不大时(1000以下吧),三种io差别不大(分别对于linux中的同步阻塞io、io多路复用、异步io)
  • 并发量中等时(1500以上吧),bio应付起来稍稍有点吃力
  • 并发量大时(5000以上吧),就不应该考虑bio了,而nio与aio看不出什么明显优劣势,暂不表态

5、结果分析

NIO(Non-block input output),非阻塞模式IO。NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)。

传统IO基于流进行操作的,而NIO基于Channel和Buffer进行操作,是面向缓存的,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接打开,数据到达),当监听事件发生时,才去唤醒线程处理请求,而不必阻塞于外部的输入。

下面是BIO的处理流程,可以看到每个线程都阻塞在socket的读写,而不能去处理其他事情,白白浪费了CPU资源。

BIO访问流程

下面是NIO的处理流程,NIO区别于BIO的关键步骤在于有一个专门的Selector线程轮询socket发来的有效数据,然后将其交给相应的线程进行处理,这里少了一个等待客户端发生数据的时间。

NIO访问流程

个人认为讲得比较好的博客链接:参考链接1参考链接2

BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。

NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。

AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

2、netty为什么快

1、基于nio

2、

3、

未完待续。

总访问次数: 243次, 一般般帅 创建于 2021-02-28, 最后更新于 2021-05-08

进大厂! 欢迎关注微信公众号,第一时间掌握最新动态!