详细解析Java异步线程处理队列任务工具类以及实战
创始人
2024-06-03 10:23:08
0

场景待入


快速理解小场景描述:
【一群人】来到【一个大厅】办理业务,大厅中有【多个窗口】给我们办理业务。
每个人都有自己要办事情,处理过程需要消耗时间。
大厅根据人群多少,开始窗口梳理。

如果把“一群人”理解成一群待处理的n个【任务】,把这群人排成一个长队就形成了一个【任务队列】,“多个窗口”充当我们的【多个线程】异步处理任务队列。我们多线程解决任务队列的代入感来了,有木有!
“大厅”用来充当线程和任务的组装以及处理关系。如:大厅营业start:所有窗口等待办公创建多线程,大厅stop:所有窗口关闭,回收线程。
接下来,就是多个线程异步处理队列任务的干货!

任务类接口


1.首先使我们的任务接口
把这个任务设计成接口,为了后续我们使用的方便宜行,后续去实现接口,我们可以是发短信任务,发邮件任务,等待下载任务等等。
文件:ITask.java

package com.sboot.blog.task;/*** 任务的执行体或者携带体 理解成去窗口办事的人** @author zhaoxinglu*/
public interface ITask {/*** 执行体中 自定义任务内容*/void run();
}

线程类

2.我们的线程类
这里也就是大厅窗口的一个建设,用来处理任务
文件:TaskExecutor.class

package com.sboot.blog.task;import java.util.Random;
import java.util.concurrent.BlockingQueue;/*** 处理任务的窗口 窗口上班就位执行体办事儿** @author zhaoxinglu*/
public class TaskExecutor extends Thread {/*** 执行体队列*/private BlockingQueue taskQueue;/*** 窗口的当前处理事务状态 初始化:窗口工作状态开启*/private boolean isRunning = true;/*** 窗口名字 做这个名字为了方便我们观察线程*/private String taskName;public TaskExecutor(BlockingQueue taskQueue) {this.taskQueue = taskQueue;this.taskName = makeName();}/*** 生成窗口名字*/public String makeName(){String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";Random random = new Random();StringBuffer sb = new StringBuffer();for (int i = 0; i < 6; ++i) {int number = random.nextInt(52);// [0,51)sb.append(str.charAt(number));}return sb.toString();}/*** 窗口工作状态关闭*/public void quit() {isRunning = false;interrupt();}@Overridepublic void run() {// 窗口工作开启状态时 等待处理事务while (isRunning) {ITask iTask;try {//任务执行体进来  如果没有时间 继续等待处理事务iTask = taskQueue.take();System.out.println("窗口报:" + taskName + "后面还有:" + taskQueue.size() + "下一个来" + taskName);} catch (InterruptedException e) {if (!isRunning) {// 发生意外了,是下班状态的话就把窗口关闭。interrupt();// 如果执行到break,后面的代码就无效了。break;}// 发生意外了,不是下班状态,那么窗口继续等待。continue;}// 为这个执行体办事iTask.run();}}}

任务列表和多线程

3.任务队列和多线程的调配
文件:TaskQueue

package com.sboot.blog.task;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;/*** 任务队列* 控制执行体和处理窗口的任务队列** @author zhaoxinglu*/
public class TaskQueue {/*** 某场景下 排队办事的执行体*/private BlockingQueue mTaskQueue;/*** 某场景下 处理执行体的多个窗口*/public TaskExecutor[] mTaskExecutors;/*** 创建队列的时候 设定窗口数量** @param size*/public TaskQueue(int size) {mTaskQueue = new LinkedBlockingQueue<>();mTaskExecutors = new TaskExecutor[size];}/*** 场景开始启动*/public void start() {//防止存在未关闭窗口  如果有先关闭stop();//所有窗口状态:等待处理事务for (int i = 0; i < mTaskExecutors.length; i++) {//每初始化一个窗口 都让窗口观望当前执行体队列mTaskQueuemTaskExecutors[i] = new TaskExecutor(mTaskQueue);mTaskExecutors[i].start();}}/*** 场景关闭 所有窗口关闭*/public void stop() {if (mTaskExecutors != null) {for (TaskExecutor taskExecutor : mTaskExecutors) {if (taskExecutor != null) {taskExecutor.quit();}}}}/*** 允许执行体添加进来** @param task* @param * @return*/public  int add(T task) {if (!mTaskQueue.contains((task))) {mTaskQueue.add(task);}//返回当前排队的执行体数return mTaskQueue.size();}public int getTaskQueueSize(){return mTaskQueue.size();}}

实验

4.实例化一种任务 这里模拟打印机功能
文件:PrintTask.java

package com.sboot.blog.task;import java.text.SimpleDateFormat;
import java.util.Date;/*** 打印执行任务** @author zhaoxinglu*/
public class PrintTask implements ITask {private int id;public PrintTask(int id) {this.id = id;}@Overridepublic void run() {try {Thread.sleep(2000);System.out.println("干活,等了2s"+Thread.currentThread().getName());} catch (InterruptedException ignored) {}//设置日期格式SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// new Date()为获取当前系统时间System.out.println(df.format(new Date()));System.out.println("当前的id:" + id);System.out.println("wait...");}}

异步多线程

5.控制器中根据业务逻辑生成任务队列,创建多线程实现异步任务队列

package com.sboot.blog.sendmsg.controller;import com.sboot.blog.task.ITask;
import com.sboot.blog.task.PrintTask;
import com.sboot.blog.task.TaskQueue;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;@Api(value = "sendmsg", tags = "发送信息")
@RestController
@RequestMapping("/send")@Configuration      //1.主要用于标记配置类,兼备Component的效果。
@EnableScheduling   // 2.开启定时任务
public class SendMessageController {@ApiOperation(value = "打印信息value", notes = "打印note")@GetMapping(value = "/say")//@Scheduled(cron = "0/5 * * * * ?")
//    //或直接指定时间间隔,例如:5秒
//    @Scheduled(fixedRate = 5000)public String saything() {//设置日期格式SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// new Date()为获取当前系统时间System.out.println(df.format(new Date()));return "test";}@ApiOperation(value = "测试窗口队列任务")@GetMapping(value = "/test")public String test() {//场景开启 初始化执行窗口TaskQueue taskQueue = new TaskQueue(8);taskQueue.start();System.out.println("初始化后的窗口length:" + taskQueue.mTaskExecutors.length);//其他线程可以调用CountDownLatch的countDown()方法来对CountDownLatch中的数字减一, 当数字被减成0后,所有await的线程都将被唤醒。//辅助方法阻塞 阻塞完成后 后续回收线程  这个方法场景:全部场景结束后,有操作 采用。注意 这个 18 队列完成18个后就会执行后面的【等待完成操作事件】 如果你的队列n<18 你等不到【等待完成操作事件】 如果n>18 你完成【等待完成操作事件】 后 还会继续队列。【等待完成操作事件】在后面有标注CountDownLatch countDownLatch = new CountDownLatch(18);for (int i = 0; i < 180; i++) {PrintTask task = new PrintTask(i);
//            int n = taskQueue.add(task);int n = taskQueue.add(new ITask() {@Overridepublic void run() {try {Thread.sleep(2000);System.out.println("干活,等了2s"+Thread.currentThread().getName());// 执行下面代码计数器减一countDownLatch.countDown();} catch (InterruptedException ignored) {}}});System.out.println("放置第" + i + "个时    队列内人数:" + n);}try {//【等待完成操作事件】 这里的阻塞就是完成等待事件countDownLatch.await();System.out.println("队列执行完 回收");//【关闭业务窗】这里对比 stop()的作用。调用之前,我们的操作窗口,还开着,可以继续往里防止队列处理//强调对比一下  至此线程还在跑 9999的这个任务会被执行PrintTask task = new PrintTask(9999);taskQueue.add(task);//线程回收taskQueue.stop();//【关闭业务窗】这里对比 stop()的作用。调用之后,业务窗口关闭,再添加队列 加不进来了System.out.println("stop后窗口数:" + taskQueue.mTaskExecutors.length);//接下来任务111111就不回执行了PrintTask task1 = new PrintTask(111111);taskQueue.add(task1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("放完队列后的后的窗口length:" + taskQueue.mTaskExecutors.length);return "任务跑起来了";}}

例子日志

附上异步log:
在这里插入图片描述

名人指导

在几位资深钻石段位程序员的围观指正讨论下:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

探讨一下 如果我们窗口不回收会怎么样 线程状况如何的

工具观察

借助工具:
在这里插入图片描述
运用工具 监控线程:
建立8个线程:
我们不主动回收 完全靠JC回收的状态:
在这里插入图片描述

在这里插入图片描述
我们在任务执行完毕后回收:
在这里插入图片描述
在这里插入图片描述


间隔20s执行带有回收机制图形:
在这里插入图片描述


间隔20s无回收调用图形:
在这里插入图片描述

小结

小总结:
如果这个异步线程调用比较频繁,如我们最后这两张图的展示间隔频率20s,此时在JC机制和JVM还没来得及自行回收线程,我们的下一次调用线程已经出发,会造成我们的线程出线性增长,这样就可能是一个比较危险的操作。条件允许,自行要把进程收回!小波浪稳定性线形图,肯定要比折线增长的安全!

具体主动回收,还是等待机制自己处理,这个需要看我们实际应用的业务场景!
至此结束!

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...