博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【代码积累】countdown latch
阅读量:4099 次
发布时间:2019-05-25

本文共 4326 字,大约阅读时间需要 14 分钟。

import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class CustomLinkedBlockingQueue
implements CustomBlockingQueue
{ private CustomHeadNode
head = null; /*至少有一个头结点,尾节点*/ private CustomTailNode
tail = null; private ReentrantLock putlock = new ReentrantLock(); private ReentrantLock takelock = new ReentrantLock(); private Condition isAvailable = takelock.newCondition(); /*If no elements are available then make take actions dormant.*/ private Condition isFull = putlock.newCondition(); /*If it's full then make put-actions dormant.*/ /*用一个原子变量记录List的长度*/ private AtomicInteger count = new AtomicInteger(0); //private CustomAtomicInteger count = new CustomAtomicInteger(0); private int capacity = 10; /*默认10*/ public CustomLinkedBlockingQueue() { super(); init(); } public CustomLinkedBlockingQueue(int capacity) { if( capacity > Integer.MAX_VALUE ) { this.capacity = Integer.MAX_VALUE; } else { this.capacity = capacity; } init(); } private void init(){ head = new CustomHeadNode
(); tail = new CustomTailNode
(); head.next = tail; tail.prev = head; } /*delete and return the head of the queue*/ private T dequeue() { if( count.get() > 0 && true!=head.next.isTail) { T value = head.next.value; if( null != value ) { /*删除中间的节点*/ head.next.next.prev = head; head.next = head.next.next; return value; } else { return null; } } else { return null; } } /*Put the specific element to the tail of the queue*/ private void enqueue(T ele) { if( count.get() < capacity ) { CustomNode
node = new CustomNode
(); node.value = ele; tail.prev.next = node; node.prev = tail.prev; node.next = tail; tail.prev = node; } } @Override /*Take a look at the head node of the LinkedList but do not remove it.*/ public T peek() { // TODO Auto-generated method stub return null; } @Override /*Remove and return the head node of the LinkedList*/ public T poll() { // TODO Auto-generated method stub T x = null; int c = -1;// final AtomicInteger count = this.count;// final ReentrantLock takelock = this.takelock; try { takelock.lockInterruptibly(); /*Acquire the lock unless the thread is interrupted.*/ /*当队列为空时,线程被挂起,直到被唤醒或者中断。如果线程被中断,根据takelock.lockInterruptibly()语句,线程将释放锁。*/ while(0 == count.get()) { isAvailable.await(); } /*队列不为空,则线程返回队列头元素*/ x = dequeue(); c = count.getAndDecrement(); /*出队列,长度计数-1*/ if(c > 1) { /*count在dequeue之前>1,dequeue后应该是1,还有数据,因此需要唤醒一个take线程*/ /*还有数据可以取,再唤醒一次take线程*/ isAvailable.signal(); } System.out.println("queue size = "+(c-1)+" after [POLL].value="+x); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { takelock.unlock(); } if( c == capacity ) { /*队列dequeue之前满了,此时必然有put线程被挂起。 * 此处需要唤醒一下put线程,否则队列满后,所有的put线程被挂起,而take线程不唤醒put线程,则所有的put线程都永远挂起了。*/ signalPutThreads(); } return x; } @Override public T take() { // TODO Auto-generated method stub return poll(); } @Override public void put(T element) { // TODO Auto-generated method stub if( element == null ) throw new NullPointerException(); int c = -1;// final AtomicInteger count = this.count;// final ReentrantLock putlock = this.putlock; try { putlock.lockInterruptibly(); while(count.get() == capacity) { isFull.await(); } enqueue(element); /*add the element to the tail and do increment.*/ c = count.getAndIncrement(); /*get the current value,increase 1 as the next value,return current and set next.*/ if( c+1 < capacity ) { isFull.signal(); } System.out.println("queue size = "+(c+1)+" after PUT.value="+element); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { putlock.unlock(); } if(c == 0) { /*这里的c保存的是queue执行enqueue之前的值,若为0,表示之前有get线程被挂起了,因此需要唤醒一个get线程*/ signalTakeThreads(); } } private void signalTakeThreads() { try { takelock.lockInterruptibly(); isAvailable.signal(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { takelock.unlock(); } } private void signalPutThreads() { try { putlock.lockInterruptibly(); isFull.signal(); /*每次唤醒一个线程,all则唤醒所有线程,但是最终执行哪个线程,取决于调度决策机制,用户无法干预。*/ } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { putlock.unlock(); } }}

转载地址:http://ovhii.baihongyu.com/

你可能感兴趣的文章
Java 8新特性:Stream API
查看>>
管理用户状态——Cookie与Session
查看>>
最受欢迎的前端框架Bootstrap 入门
查看>>
JavaScript编程简介:DOM、AJAX与Chrome调试器
查看>>
通过Maven管理项目依赖
查看>>
通过Spring Boot三分钟创建Spring Web项目
查看>>
Spring的IoC(依赖注入)原理
查看>>
Guava快速入门
查看>>
Java编程基础:static的用法
查看>>
Java编程基础:抽象类和接口
查看>>
Java编程基础:异常处理
查看>>
Java编程基础:了解面向对象
查看>>
新一代Java模板引擎Thymeleaf
查看>>
Spring MVC中使用Thymeleaf模板引擎
查看>>
Spring Boot构建简单的微博应用
查看>>
Spring处理表单提交
查看>>
Spring MVC异常处理
查看>>
Leetcode 1180. Count Substrings with Only One Distinct Letter [Python]
查看>>
PHP 7 的五大新特性
查看>>
php使用 memcache 来存储 session
查看>>