根据前面很清楚了,关于为什么需要wait
它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法
@Slf4j
public class WaitNotifyTest01 {final static Object obj = new Object();public static void main(String[] args) {new Thread(() -> {synchronized(obj){log.debug("执行");try {obj.wait();} catch (InterruptedException e) {e.printStackTrace();}log.debug("其他代码");}},"t1").start();new Thread(() -> {synchronized(obj){log.debug("执行");try {obj.wait();} catch (InterruptedException e) {e.printStackTrace();}log.debug("其他代码");}},"t2").start();sleep(2);log.debug("唤醒obj上其他线程");synchronized (obj){obj.notify(); // 唤醒obj上一个线程//obj.notifyAll(); // 唤醒obj上所有等待线程}}
}
notify的结果
notifyAll的结果
sleep(long n) 和 wait(long n) 的区别
synchronized(lock) {while(条件不成立) {lock.wait();}// 干活
}
//另一个线程
synchronized(lock) {lock.notifyAll();
}
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
要点:
@Slf4j
public class GuardObjectTest {public static void main(String[] args) {GuardObject guardObject = new GuardObject();new Thread(() -> {List response = null;try {response = download();log.debug("下载结束");guardObject.compelete(response);} catch (InterruptedException e) {e.printStackTrace();}}).start();log.debug("waiting。。。。");// 主线程阻塞等待Object response = guardObject.get();log.debug("获得 :[{}]",((List) response).size());}private static List download() throws InterruptedException {// 模拟下载List list = new ArrayList<>();Thread.sleep(2000);for(int i = 0; i < 10; i++){list.add(i+"下载完毕");}return list;}}class GuardObject {private Object response;private final Object lock = new Object();public Object get() {synchronized (lock){// 条件不满足则等待while (response == null){try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}return response;}}public void compelete(Object response) {synchronized (lock){// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}
}
@Slf4j
public class GuardObjectTest02 {public static void main(String[] args) {GuardObject02 guardObject = new GuardObject02();new Thread(() -> {sleep(1);guardObject.compelete(null);sleep(1);guardObject.compelete(Arrays.asList("a","b","c"));}).start();// 不超时//Object response = guardObject.get(2500);// 超时Object response = guardObject.get(1500);if (response != null){log.debug("get response [{}]",((List) response).size());}else{log.debug("no get response");}}}@Slf4j
class GuardObject02 {private Object response;private final Object lock = new Object();public Object get(long millis) {synchronized (lock){// 记录开始时间long begin = System.currentTimeMillis();// 已经过去的时间long timePass = 0;// 条件不满足则等待while (response == null){// 假设等待1000ms ,结果在400 ms时行了, 那么还有600要等long waitTime = millis - timePass;log.debug("需要等待的时间:{}", waitTime);if(waitTime <= 0){log.debug("break...");break;}try {lock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}// 3) 如果提前被唤醒,这时已经经历的时间假设为 400timePass = System.currentTimeMillis() - begin;log.debug("timePass:{}, Object is null {}",timePass,response == null);}return response;}}public void compelete(Object response) {synchronized (lock){// 条件满足,通知等待线程this.response = response;lock.notifyAll();}}
}
要点
class Message {private int id;private Object message;public Message(int id, Object message) {this.id = id;this.message = message;}public int getId() {return id;}public Object getMessage() {return message;}}class MessageQueue {private LinkedList queue;private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;queue = new LinkedList<>();}public Message take() {synchronized (queue) {while (queue.isEmpty()) {log.debug("没货了, wait");try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}Message message = queue.removeFirst();queue.notifyAll();return message;}}public void put(Message message) {synchronized (queue) {while (queue.size() == capacity) {log.debug("库存已达上限, wait");try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(message);queue.notifyAll();}}}
应用
MessageQueue messageQueue = new MessageQueue(2);// 4 个生产者线程, 下载任务for (int i = 0; i < 4; i++) {int id = i;new Thread(() -> {try {log.debug("download...");List response = Downloader.download();log.debug("try put message({})", id);messageQueue.put(new Message(id, response));} catch (IOException e) {e.printStackTrace();}}, "生产者" + i).start();}// 1 个消费者线程, 处理结果new Thread(() -> {while (true) {Message message = messageQueue.take();List response = (List) message.getMessage();log.debug("take message({}): [{}] lines", message.getId(), response.size());}}, "消费者").start();
它们是 LockSupport 类中的方法
LockSupport.park() // 暂停当前线程
LockSupport.unpark() // 恢复某个线程的运行
@Slf4j
public class ParkTest01 {public static void main(String[] args) {Thread t1 = new Thread(() -> {log.debug("start。。。");sleep(1);log.debug("park。。。");LockSupport.park();log.debug("resume....");},"t1");t1.start();sleep(3);log.debug("unpark");LockSupport.unpark(t1);}
}
@Slf4j
public class ParkTest01 {public static void main(String[] args) {Thread t1 = new Thread(() -> {log.debug("start。。。");sleep(1);log.debug("park。。。");LockSupport.park();log.debug("resume....");},"t1");t1.start();sleep(0.5);log.debug("unpark");LockSupport.unpark(t1);}
}
与Object的 wait & notify 相比
每个线程都有自己的一个Parker 对象, 由三部分组成 _counter, _cond 和 _mutex
假设有线程 t
@Slf4j
public class TestWaitNotify {final static Object obj = new Object();public static void main(String[] args) {new Thread(() -> {synchronized (obj){log.debug("执行");try {obj.wait();} catch (InterruptedException e) {e.printStackTrace();}log.debug("其他代码"); // debug}},"t1").start();new Thread(() -> {synchronized (obj){log.debug("执行");try {obj.wait();} catch (InterruptedException e) {e.printStackTrace();}log.debug("其他代码"); // debug}},"t2").start();sleep(0.5);log.debug("唤醒obj上的其他线程");synchronized (obj){obj.notifyAll(); // 唤醒obj上所有等待线程 debug}}
}
RUNNABLE <–> WAITING
RUNNABLE <–> WAITING
RUNNABLE <–> TIMED_WAITING
RUNNABLE <–> TIMED_WAITING
RUNNABLE <–> TIMED_WAITING
RUNNABLE --> BLOCKED
RUNNABLE <–> TERMINATED
一间大屋子有两个功能:睡觉、学习,互不相干。 现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低 解决方法是准备多个房间(多个对象锁)
单个锁:
@Slf4j
class BigRoom {public void sleep() {synchronized (this) {log.debug("sleeping 2 小时");Sleeper.sleep(2);}}public void study() {synchronized (this) {log.debug("study 1 小时");Sleeper.sleep(1);}}
}@Slf4j
public class ManySynchronizedTest01 {public static void main(String[] args) {BigRoom room = new BigRoom();new Thread(() -> {room.sleep();Sleeper.sleep(2);}).start();new Thread(() -> {room.study();Sleeper.sleep(2);}).start();}
}
多个锁:
@Slf4j
class BigRoom {private final Object sleepRoom = new Object();private final Object studyRoom = new Object();public void sleep() {synchronized (sleepRoom) {log.debug("sleeping 2 小时");Sleeper.sleep(2);}}public void study() {synchronized (studyRoom) {log.debug("study 1 小时");Sleeper.sleep(1);}}
}@Slf4j
public class ManySynchronizedTest01 {public static void main(String[] args) {BigRoom room = new BigRoom();new Thread(() -> {room.sleep();Sleeper.sleep(2);}).start();new Thread(() -> {room.study();Sleeper.sleep(2);}).start();}
}
@Slf4j
public class LockDeadTest {public static void main(String[] args) {Object A = new Object();Object B = new Object();Thread t1 = new Thread(() -> {synchronized (A) {log.debug("lock A");sleep(1);synchronized (B) {log.debug("lock B");log.debug("操作...");}}}, "t1");Thread t2 = new Thread(() -> {synchronized (B) {log.debug("lock B");sleep(0.5);synchronized (A) {log.debug("lock A");log.debug("操作...");}}}, "t2");t1.start();t2.start();}
}
**发生了死锁 **
jps 定位进程id,再用 jstack 定位死锁
避免死锁要注意加锁顺序
另外如果由于某个线程进入死循环,导致其他线程一直等待,对于这种情况linux可以通过top先定位到CPU占用高的Java进程,再利用 top -Hp 进程id 来定位是哪个线程,最后用jstack排查
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束
类比死循环
一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束
跟 synchronized 相比
与 synchronized 一样,都支持可重入
基本语法
// 获取锁
reentrantLock.lock();
try {// 临界区
} finally {// 释放锁reentrantLock.unlock();
}
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
@Slf4j
public class ReentrantLockTest01 {static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) {method1();}public static void method1() {lock.lock();try {log.debug("execute method1");method2();} finally {lock.unlock();}}public static void method2() {lock.lock();try {log.debug("execute method2");method3();} finally {lock.unlock();}}public static void method3() {lock.lock();try {log.debug("execute method3");} finally {lock.unlock();}}
}
@Slf4j
public class ReentrantLockTest02 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();Thread t1 = new Thread(() -> {log.debug("启动...");try {lock.lockInterruptibly();} catch (InterruptedException e) {e.printStackTrace();log.debug("等锁的过程中被打断");return;}try{log.debug("获得了锁");}finally {lock.unlock();}},"t1");lock.lock();log.debug("获得了锁");t1.start();try {sleep(1);t1.interrupt();log.debug("执行打断");} finally {lock.unlock();}}
}
注意
@Slf4j
public class ReentrantLockTest03 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();Thread t1 = new Thread(() -> {log.debug("启动...");lock.lock();try{log.debug("获得了锁");}finally {lock.unlock();}},"t1");lock.lock();log.debug("获得了锁");t1.start();try {sleep(1);t1.interrupt();log.debug("执行打断");} finally {log.debug("释放了锁");lock.unlock();}}
}
立刻失败
@Slf4j
public class ReentrantLockTest03 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();Thread t1 = new Thread(() -> {log.debug("启动...");if (!lock.tryLock()){log.debug("没获取到锁,直接结束");return;}try {log.debug("获得了锁");}finally {lock.unlock();}},"t1");lock.lock();log.debug("获得了锁");t1.start();try {sleep(2);} finally {lock.unlock();}}
}
超时失败
@Slf4j
public class ReentrantLockTest05 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();Thread t1 = new Thread(() -> {log.debug("启动...");try {if (!lock.tryLock(1, TimeUnit.SECONDS)){log.debug("尝试获取锁等1s,没获取到直接返回");return;}} catch (InterruptedException e) {e.printStackTrace();}try {log.debug("获得了锁");}finally {lock.unlock();}},"t1");lock.lock();log.debug("获得了锁");t1.start();try {sleep(2);} finally {lock.unlock();}}
}
ReentrantLock 默认是不公平的
@Slf4j
public class ReentrantLockTest06 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock(false);lock.lock();for (int i = 0; i < 500; i++) {new Thread(() -> {lock.lock();try {System.out.println(Thread.currentThread().getName() + " running...");} finally {lock.unlock();}}, "t" + i).start();}// 1s 之后去争抢锁try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}new Thread(() -> {System.out.println(Thread.currentThread().getName() + " start...");lock.lock();try {System.out.println(Thread.currentThread().getName() + " running...");} finally {lock.unlock();}}, "强行插入").start();lock.unlock();}
}
改为公平锁
ReentrantLock lock = new ReentrantLock(true);
使用要点
@Slf4j
public class ReentrantLockTest07 {static ReentrantLock lock = new ReentrantLock();static Condition waitCigaretteQueue = lock.newCondition();static Condition waitBreakfastQueue = lock.newCondition();static volatile boolean hasCigarette = false;static volatile boolean hasBreakfast = false;public static void main(String[] args) {new Thread(() -> {lock.lock();while(!hasCigarette){try {waitCigaretteQueue.await();} catch (InterruptedException e) {e.printStackTrace();}}try {log.debug("等到了他要的smoke");}finally {lock.unlock();}}).start();new Thread(() -> {lock.lock();while(!hasBreakfast){try {waitBreakfastQueue.await();} catch (InterruptedException e) {e.printStackTrace();}}try {log.debug("等到了他要的breakfast");}finally {lock.unlock();}}).start();sleep(1);sendBreakfast();sleep(1);sendCigarette();}private static void sendCigarette() {lock.lock();try {log.debug("送烟来了");hasCigarette = true;waitCigaretteQueue.signal();} finally {lock.unlock();}}private static void sendBreakfast() {lock.lock();try {log.debug("送早餐来了");hasBreakfast = true;waitBreakfastQueue.signal();} finally {lock.unlock();}}
}
@Slf4j
public class SequentialControllerTest01 {// 用来同步的对象static Object obj = new Object();// 判断t2是否运行过static boolean t2runed = false;public static void main(String[] args) {Thread t1 = new Thread(() -> {synchronized (obj){// 如果t2 没有执行过while(!t2runed){// t1 等会try {obj.wait();} catch (InterruptedException e) {e.printStackTrace();}}}System.out.println("t1");});Thread t2 = new Thread(() -> {System.out.println("t2");synchronized (obj){// 修改运行标记t2runed = true;// 通知obj上等待的线程 (可能多个)obj.notifyAll();}});t1.start();t2.start();}
}
@Slf4j
public class SequentialControllerTest02 {public static void main(String[] args) {Thread t1 = new Thread(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}LockSupport.park();System.out.println("t1");});Thread t2 = new Thread(() -> {System.out.println("t2");LockSupport.unpark(t1);});t1.start();t2.start();}
}
park 和 unpark 方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行『暂停』和『恢复』, 不需要『同步对象』和『运行标记』
线程1输出 a 5次,线程 2 输出 b 5次, 线程 3 输出 c 5次。现在要求输出 abcabcabcabcabc
class SyncWaitNotify {private int flag;private int loopNumber;public SyncWaitNotify(int flag, int loopNumber) {this.flag = flag;this.loopNumber = loopNumber;}public void print(int waitFlag, int nextFlag, String str) {for (int i = 0; i < loopNumber; i++) {synchronized (this) {while (this.flag != waitFlag) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.print(str);flag = nextFlag;this.notifyAll();}}}
}
@Slf4j
public class SequentialControllerTest03 {public static void main(String[] args) {SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);new Thread(() -> {syncWaitNotify.print(1, 2, "a");}).start();new Thread(() -> {syncWaitNotify.print(2, 3, "b");}).start();new Thread(() -> {syncWaitNotify.print(3, 1, "c");}).start();}
}
@Slf4j
public class AwaitSignal extends ReentrantLock {// 循环次数private int loopNumber;public AwaitSignal(int loopNumber) {this.loopNumber = loopNumber;}public void start(Condition first){this.lock();try {log.debug("start");first.signal();}finally {this.unlock();}}public void print(String str, Condition current, Condition next) {for (int i = 0; i < loopNumber; i++) {this.lock();try {current.await();log.debug(str);next.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {this.unlock();}}}public static void main(String[] args) {AwaitSignal as = new AwaitSignal(5);Condition aWaitSet = as.newCondition();Condition bWaitSet = as.newCondition();Condition cWaitSet = as.newCondition();new Thread(() -> {as.print("a",aWaitSet, bWaitSet);}).start();new Thread(() -> {as.print("b", bWaitSet, cWaitSet);}).start();new Thread(() -> {as.print("c", cWaitSet, aWaitSet);}).start();as.start(aWaitSet);}
}
@Slf4j
public class SyncPark {private int loopNumber;private Thread[] threads;public SyncPark(int loopNumber) {this.loopNumber = loopNumber;}public void setThreads(Thread... threads) {this.threads = threads;}public void print(String str) {for (int i = 0; i < loopNumber; i++) {LockSupport.park();System.out.println(str);LockSupport.unpark(nextThread());}}private Thread nextThread() {Thread current = Thread.currentThread();int index = 0;for (int i = 0; i < threads.length; i++) {if (threads[i] == current) {index = i;break;}}if (index < threads.length - 1) {return threads[index + 1];} else {return threads[0];}}public void start() {for (Thread thread : threads) {thread.start();}LockSupport.unpark(threads[0]);}public static void main(String[] args) {SyncPark syncPark = new SyncPark(5);Thread t1 = new Thread(() -> {syncPark.print("a");});Thread t2 = new Thread(() -> {syncPark.print("b");});Thread t3 = new Thread(() -> {syncPark.print("c");});syncPark.setThreads(t1,t2,t3);syncPark.start();}
}
上一篇:西安电子科大《深度学习》学习记录