1
22
23 package com.liferay.portal.kernel.concurrent;
24
25 import java.util.Comparator;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
28 import java.util.concurrent.locks.Condition;
29 import java.util.concurrent.locks.ReentrantLock;
30
31
36 public class CoalescedPipe<E> {
37
38 public CoalescedPipe() {
39 this(null);
40 }
41
42 public CoalescedPipe(Comparator<E> comparator) {
43 _comparator = comparator;
44 _headElementLink = new ElementLink<E>(null);
45 _lastElementLink = _headElementLink;
46 _notEmptyCondition = _takeLock.newCondition();
47 }
48
49 public long coalescedCount() {
50 return _coalescedCount.get();
51 }
52
53 public int pendingCount() {
54 return _pendingCount.get();
55 }
56
57 public void put(E e) throws InterruptedException {
58 if (e == null) {
59 throw new NullPointerException();
60 }
61
62 int pendingElements = -1;
63
64 _putLock.lockInterruptibly();
65
66 try {
67 if (_coalesceElement(e)) {
68 return;
69 }
70
71 _lastElementLink._nextElementLink = new ElementLink<E>(e);
72
73 _lastElementLink = _lastElementLink._nextElementLink;
74
75 pendingElements = _pendingCount.getAndIncrement();
76 }
77 finally {
78 _putLock.unlock();
79 }
80
81 if (pendingElements == 0) {
82 _takeLock.lock();
83
84 try {
85 _notEmptyCondition.signal();
86 }
87 finally {
88 _takeLock.unlock();
89 }
90 }
91 }
92
93 public E take() throws InterruptedException {
94 E element = null;
95
96 _takeLock.lockInterruptibly();
97
98 try {
99 while (_pendingCount.get() == 0) {
100 _notEmptyCondition.await();
101 }
102
103 ElementLink<E> garbageELementLink = _headElementLink;
104
105 _headElementLink = _headElementLink._nextElementLink;
106
107 garbageELementLink._nextElementLink = null;
108
109 element = _headElementLink._element;
110
111 _headElementLink._element = null;
112
113 int pendingElements = _pendingCount.getAndDecrement();
114
115 if (pendingElements > 1) {
116 _notEmptyCondition.signal();
117 }
118 }
119 finally {
120 _takeLock.unlock();
121 }
122
123 return element;
124 }
125
126 public Object[] takeSnapshot() {
127 _putLock.lock();
128 _takeLock.lock();
129
130 try {
131 Object[] pendingElements = new Object[_pendingCount.get()];
132
133 ElementLink<E> currentElementLink =
134 _headElementLink._nextElementLink;
135
136 for (int i = 0; currentElementLink != null; i++) {
137 pendingElements[i] = currentElementLink._element;
138
139 currentElementLink = currentElementLink._nextElementLink;
140 }
141
142 return pendingElements;
143 }
144 finally {
145 _putLock.unlock();
146 _takeLock.unlock();
147 }
148 }
149
150 private boolean _coalesceElement(E e) {
151 try {
152 _takeLock.lockInterruptibly();
153
154 try {
155 ElementLink<E> currentElementLink =
156 _headElementLink._nextElementLink;
157
158 if (_comparator != null) {
159 while (currentElementLink != null) {
160 if (_comparator.compare(
161 currentElementLink._element, e) == 0) {
162
163 _coalescedCount.incrementAndGet();
164
165 return true;
166 }
167 else {
168 currentElementLink =
169 currentElementLink._nextElementLink;
170 }
171 }
172 }
173 else {
174 while (currentElementLink != null) {
175 if (currentElementLink._element.equals(e)) {
176 _coalescedCount.incrementAndGet();
177
178 return true;
179 }
180 else {
181 currentElementLink =
182 currentElementLink._nextElementLink;
183 }
184 }
185 }
186 }
187 finally {
188 _takeLock.unlock();
189 }
190 }
191 catch (InterruptedException ie) {
192
193
195 }
196
197 return false;
198 }
199
200 private final AtomicLong _coalescedCount = new AtomicLong(0);
201 private final Comparator<E> _comparator;
202 private ElementLink<E> _headElementLink;
203 private ElementLink<E> _lastElementLink;
204 private final Condition _notEmptyCondition;
205 private final AtomicInteger _pendingCount = new AtomicInteger(0);
206 private final ReentrantLock _putLock = new ReentrantLock();
207 private final ReentrantLock _takeLock = new ReentrantLock();
208
209 private static class ElementLink<E> {
210
211 private ElementLink(E element) {
212 _element = element;
213 }
214
215 private E _element;
216 private ElementLink<E> _nextElementLink;
217
218 }
219
220 }