• 中文
    • English
  • 注册
  • 查看作者
  • Java高并发编程实战,异步注解@Async自定义线程池

    目录

    一、@Async注解

    二、代码实例

    1、导入POM

    2、配置类

    3、controller

    4、service

    三、发现同文件内执行异步任务,还是一个线程,没有实现@Async效果,why?

    四、配置中分别使用了ThreadPoolTaskExecutor和ThreadPoolExecutor,这两个有啥区别?

    1、initialize()

    2、initializeExecutor抽象方法

    五、核心线程数

    六、线程池执行过程

    Java高并发编程实战系列文章

    一、@Async注解

    @Async的作用就是异步处理任务。

    在方法上添加@Async,表示此方法是异步方法;

    在类上添加@Async,表示类中的所有方法都是异步方法;

    使用此注解的类,必须是Spring管理的类;

    需要在启动类或配置类中加入@EnableAsync注解,@Async才会生效;

    在使用@Async时,如果不指定线程池的名称,也就是不自定义线程池,@Async是有默认线程池的,使用的是Spring默认的线程池SimpleAsyncTaskExecutor。

    默认线程池的默认配置如下:

    默认核心线程数:8;

    最大线程数:Integet.MAX_VALUE;

    队列使用LinkedBlockingQueue;

    容量是:Integet.MAX_VALUE;

    空闲线程保留时间:60s;

    线程池拒绝策略:AbortPolicy;

    从最大线程数可以看出,在并发情况下,会无限制的创建线程,我勒个吗啊。

    也可以通过yml重新配置:

    spring:

    task:

    execution:

    pool:

    max-size: 10

    core-size: 5

    keep-alive: 3s

    queue-capacity: 1000

    thread-name-prefix: my-executor

    也可以自定义线程池,下面通过简单的代码来实现以下@Async自定义线程池。

    二、代码实例

    Spring为任务调度与异步方法执行提供了注解@Async支持,通过在方法上标注@Async注解,可使得方法被异步调用。在需要异步执行的方法上加入@Async注解,并指定使用的线程池,当然可以不指定,直接写@Async。

    1、导入POM

    com.google.guava

    guava

    31.0.1-jre

    2、配置类

    package com.nezhac.config;

    import com.google.common.util.concurrent.ThreadFactoryBuilder;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

    import org.springframework.scheduling.annotation.EnableAsync;

    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

    import java.util.concurrent.*;

    @EnableAsync// 支持异步操作

    @Configuration

    public class AsyncTaskConfig {

    /**

    * com.google.guava中的线程池

    * @return

    */

    @Bean(“my-executor”)

    public Executor firstExecutor() {

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(“my-executor”).build();

    // 获取CPU的处理器数量

    int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,

    200, TimeUnit.SECONDS,

    new LinkedBlockingQueue<>(), threadFactory);

    threadPool.allowsCoreThreadTimeOut();

    return threadPool;

    }

    /**

    * Spring线程池

    * @return

    */

    @Bean(“async-executor”)

    public Executor asyncExecutor() {

    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

    // 核心线程数

    taskExecutor.setCorePoolSize(10);

    // 线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程

    taskExecutor.setMaxPoolSize(100);

    // 缓存队列

    taskExecutor.setQueueCapacity(50);

    // 空闲时间,当超过了核心线程数之外的线程在空闲时间到达之后会被销毁

    taskExecutor.setKeepAliveSeconds(200);

    // 异步方法内部线程名称

    taskExecutor.setThreadNamePrefix(“async-executor-“);

    /**

    * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略

    * 通常有以下四种策略:

    *ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

    *ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

    *ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

    *ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功为止

    */

    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    taskExecutor.initialize();

    return taskExecutor;

    }

    }

    3、controller

    package com.nezha.controller;

    import com.nezha.service.UserService;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.scheduling.annotation.Async;

    import org.springframework.web.bind.annotation.GetMapping;

    import org.springframework.web.bind.annotation.RequestMapping;

    import org.springframework.web.bind.annotation.RestController;

    @RestController

    @RequestMapping(“/test”)

    public class UserController {

    private static final Logger logger = LoggerFactory.getLogger(UserController.class);

    @Autowired

    private UserService userService;

    @GetMapping(“asyncTest”)

    public void asyncTest() {

    logger.info(“哪吒真帅”);

    userService.asyncTest();

    asyncTest2();

    logger.info(“哪吒编程,每日更新Java干货”);

    }

    @Async(“my-executor”)

    public void asyncTest2() {

    logger.info(“同文件内执行执行异步任务”);

    }

    }

    4、service

    package com.nezha.service;

    public interface UserService {

    // 普通方法

    void test();

    // 异步方法

    void asyncTest();

    }

    service实现类

    package com.nezha.service;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.scheduling.annotation.Async;

    import org.springframework.stereotype.Service;

    @Service

    public class UserServiceImpl implements UserService {

    private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);

    @Override

    public void test() {

    logger.info(“执行普通任务”);

    }

    @Async(“my-executor”)

    @Override

    public void asyncTest() {

    logger.info(“执行异步任务”);

    }

    }

    Java高并发编程实战,异步注解@Async自定义线程池

    三、发现同文件内执行异步任务,还是一个线程,没有实现@Async效果,why?

    众里寻他千百度,查到了@Async失效的几个原因:

    注解@Async的方法不是public方法;

    注解@Async的返回值只能为void或Future;

    注解@Async方法使用static修饰也会失效;

    没加@EnableAsync注解;

    调用方和@Async不能在一个类中;

    在Async方法上标注@Transactional是没用的,但在Async方法调用的方法上标注@Transcational是有效的;

    这里就不一一演示了,有兴趣的小伙伴可以研究一下。

    四、配置中分别使用了ThreadPoolTaskExecutor和ThreadPoolExecutor,这两个有啥区别?

    ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装。

    Java高并发编程实战,异步注解@Async自定义线程池

    1、initialize()

    查看一下ThreadPoolTaskExecutor 的 initialize()方法

    public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory

    implements BeanNameAware, InitializingBean, DisposableBean {

    /**

    * Set up the ExecutorService.

    */

    public void initialize() {

    if (logger.isInfoEnabled()) {

    logger.info(“Initializing ExecutorService” + (this.beanName != null ? ” ‘” + this.beanName + “‘” : “”));

    }

    if (!this.threadNamePrefixSet && this.beanName != null) {

    setThreadNamePrefix(this.beanName + “-“);

    }

    this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);

    }

    /**

    * Create the target {@link java.util.concurrent.ExecutorService} instance.

    * Called by {@code afterPropertiesSet}.

    * @param threadFactory the ThreadFactory to use

    * @param rejectedExecutionHandler the RejectedExecutionHandler to use

    * @return a new ExecutorService instance

    * @see #afterPropertiesSet()

    */

    protected abstract ExecutorService initializeExecutor(

    ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);

    }

    2、initializeExecutor抽象方法

    再查看一下initializeExecutor抽象方法的具体实现类,其中有一个就是ThreadPoolTaskExecutor类,查看它的initializeExecutor方法,使用的就是ThreadPoolExecutor。

    Java高并发编程实战,异步注解@Async自定义线程池

    public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport

    implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

    @Override

    protected ExecutorService initializeExecutor(

    ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

    BlockingQueue queue = createQueue(this.queueCapacity);

    ThreadPoolExecutor executor;

    if (this.taskDecorator != null) {

    executor = new ThreadPoolExecutor(

    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,

    queue, threadFactory, rejectedExecutionHandler) {

    @Override

    public void execute(Runnable command) {

    Runnable decorated = taskDecorator.decorate(command);

    if (decorated != command) {

    decoratedTaskMap.put(decorated, command);

    }

    super.execute(decorated);

    }

    };

    }

    else {

    executor = new ThreadPoolExecutor(

    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,

    queue, threadFactory, rejectedExecutionHandler);

    }

    if (this.allowCoreThreadTimeOut) {

    executor.allowCoreThreadTimeOut(true);

    }

    this.threadPoolExecutor = executor;

    return executor;

    }

    }

    因此可以了解到ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装。

    五、核心线程数

    配置文件中的线程池核心线程数为何配置为

    1.// 获取CPU的处理器数量

    2.int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;

    Runtime.getRuntime().availableProcessors()获取的是CPU核心线程数,也就是计算资源。

    CPU密集型,线程池大小设置为N,也就是和cpu的线程数相同,可以尽可能地避免线程间上下文切换,但在实际开发中,一般会设置为N+1,为了防止意外情况出现线程阻塞,如果出现阻塞,多出来的线程会继续执行任务,保证CPU的利用效率。

    IO密集型,线程池大小设置为2N,这个数是根据业务压测出来的,如果不涉及业务就使用推荐。

    在实际中,需要对具体的线程池大小进行调整,可以通过压测及机器设备现状,进行调整大小。

    如果线程池太大,则会造成CPU不断的切换,对整个系统性能也不会有太大的提升,反而会导致系统缓慢。

    六、线程池执行过程

    Java高并发编程实战,异步注解@Async自定义线程池

  • 0
  • 0
  • 0
  • 18
  • 请登录之后再进行评论

    登录
  • 任务
  • 实时动态
  • 发布
  • 单栏布局 侧栏位置: