JAVA并发编程工具篇--1.1理解Future获取线程执行结果
创始人
2024-05-07 12:46:52
0

背景:在并发编程中,我们可以使用Future来获取子线程执行的结果,然后在主线程一起进行业务处理; 那么Future是如何来工作的;

1 使用:
demo1:使用Future每次都阻塞获取任务的执行结果:

public static void main(String[] args) {// 声明线程池ExecutorService executorService = Executors.newFixedThreadPool(1);// 声明 CallableCallable> commonUseQuatoCall = () -> testGetFutureMap("param");// 开启一个线程进行业务处理Future> submitcommonCall = executorService.submit(commonUseQuatoCall);Map commonUseQuatoData = null;try {// 阻塞获取结果commonUseQuatoData = submitcommonCall.get(50000, TimeUnit.MILLISECONDS);}catch (Exception ex){}finally {// 最后关闭线程池executorService.shutdown();}if (null != commonUseQuatoData){/*** do some thing*/}}// 业务处理private static Map testGetFutureMap(String param) {// 处理业务逻辑Map  mapData = new HashMap<>();/*** do some thing*/mapData.put("flag","sucess");return mapData;}

demo2:使用ExecutorCompletionService 优先处理返回结果最快的任务:

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {ExecutorService executorService = Executors.newFixedThreadPool(2);ExecutorCompletionService> completionService = new ExecutorCompletionService<>(executorService);completionService.submit(() -> {return methodA();});completionService.submit(() -> {return methodB();});for (int i = 0; i < 2; i++) {// 获得结果并处理try {Map oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);if ("methodAResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {// 方法A 返回的结果System.out.println("\"methodAResult\" = " + "methodAResult");}if ("methodBResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {// 方法B 返回的结果System.out.println("\"methodBResult\" = " + "methodBResult");}}catch (Exception ex){ex.printStackTrace();}}System.out.println("\"finish\" = " + "finish");executorService.shutdown();}private static Map methodB() throws InterruptedException {Map mapData = new HashMap<>(3);Object data = null;/*** 业务处理* data = xxx;*/Thread.sleep(10000);// 返回结果mapData.put("type", "methodBResult");mapData.put("data", data);return mapData;}private static Map methodA() throws InterruptedException {Map mapData = new HashMap<>(3);Object data = null;/*** 业务处理* data = xxx;*/Thread.sleep(20000);// 返回结果mapData.put("type", "methodAResult");mapData.put("data", data);return mapData;}

2 工作过程:
2.1 封装线程:
声明线程池

ExecutorService executorService = Executors.newFixedThreadPool(1);
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());}

2.1.1 Future 提交任务:

Callable> commonUseQuatoCall = () -> testGetFutureMap("param");
Future> submitcommonCall = executorService.submit(commonUseQuatoCall);

调用 AbstractExecutorService.submit(Callable task)

/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException       {@inheritDoc}*/
public  Future submit(Callable task) {if (task == null) throw new NullPointerException();// 回调为空抛出异常RunnableFuture ftask = newTaskFor(task);// 包装回调execute(ftask);// 开启线程执行ftask 的任务return ftask;
}
protected  RunnableFuture newTaskFor(Callable callable) {return new FutureTask(callable);
}
/*** Creates a {@code FutureTask} that will, upon running, execute the* given {@code Callable}.** @param  callable the callable task* @throws NullPointerException if the callable is null*/public FutureTask(Callable callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable}
private static final int NEW   = 0;

2.1.2 ExecutorCompletionService 提交任务:

 ExecutorCompletionService> completionService = new ExecutorCompletionService<>(executorService);completionService.submit(() -> {return methodA();});

调用 ExecutorCompletionService.submit:

public Future submit(Callable task) {if (task == null) throw new NullPointerException();RunnableFuture f = newTaskFor(task);// 使用 QueueingFuture executor.execute(new QueueingFuture(f));return f;}
private RunnableFuture newTaskFor(Callable task) {if (aes == null)return new FutureTask(task);elsereturn aes.newTaskFor(task);}
private final BlockingQueue> completionQueue;
private class QueueingFuture extends FutureTask {QueueingFuture(RunnableFuture task) {super(task, null);this.task = task;}// 在执行任务获取获取结果后调用protected void done() { completionQueue.add(task); }private final Future task;}

2.2 线程执行:
ThreadPoolExecutor. execute(Runnable command):

 public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// 当前工作的线程数小于声明的核心现车数if (addWorker(command, true))// 添加任务return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {// 将任务添加到 BlockingQueue 队列中int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

addWorker(command, true):

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();// 线程执行workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

2.3 线程执行的结果数据填充:

FutureTask.run():

public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable c = callable;if (c != null && state == NEW) {V result;boolean ran;try {// 调用目标方法并阻塞获的获取执行结果result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)// 获取结果后设置结果set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;// 赋值结果UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;        // to reduce footprint}

done() 方法 :将结果放入到 ExecutorCompletionService 下BlockingQueue completionQueue 中

private final BlockingQueue> completionQueue;
private class QueueingFuture extends FutureTask {QueueingFuture(RunnableFuture task) {super(task, null);this.task = task;}// 在执行任务获取获取结果后调用protected void done() { completionQueue.add(task); }private final Future task;}

2.4 获取线程执行结果:
2.4.1 future 获取结果

/**
* @throws CancellationException {@inheritDoc}*/
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}
/*** Returns result or throws exception for completed task.** @param s completed state value*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}

2.4.2 ExecutorCompletionService 获取结果:

 Map oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);

ExecutorCompletionService 下take() 方法:

public Future take() throws InterruptedException {return completionQueue.take();
}
// 获取FutureTask的结果
/*** @throws CancellationException {@inheritDoc}*/
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}/*** @throws CancellationException {@inheritDoc}*/
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);
}
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}

3 过程总结:
3.1 线程异步任务的执行,通过Callable 封装目标方法,通过FutureTask 发起线程完成任务的执行;执行完成将结果放入到FutureTask 的outcome中;再通过FutureTask获取线程异步执行的结果;
3.2 ExecutorCompletionService 通过将线程返回的结果放入到一个队列中,然后在从队列中获取到结果,使用ExecutorCompletionService时,需要注意每次从队列中获取结果后,将改结果从队列中移除,否则改队列中元素的容量会越来越大;

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...