1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    *
5    *
6    *
7    * The contents of this file are subject to the terms of the Liferay Enterprise
8    * Subscription License ("License"). You may not use this file except in
9    * compliance with the License. You can obtain a copy of the License by
10   * contacting Liferay, Inc. See the License for the specific language governing
11   * permissions and limitations under the License, including but not limited to
12   * distribution rights of the Software.
13   *
14   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20   * SOFTWARE.
21   */
22  
23  package com.liferay.portal.kernel.concurrent;
24  
25  import java.util.Comparator;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicLong;
28  import java.util.concurrent.locks.Condition;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  /**
32   * <a href="CoalescedPipe.java.html"><b><i>View Source</i></b></a>
33   *
34   * @author Shuyang Zhou
35   */
36  public class CoalescedPipe<E> {
37  
38      public CoalescedPipe() {
39          this(null);
40      }
41  
42      public CoalescedPipe(Comparator<E> comparator) {
43          _comparator = comparator;
44          _headElementLink = new ElementLink<E>(null);
45          _lastElementLink = _headElementLink;
46          _notEmptyCondition = _takeLock.newCondition();
47      }
48  
49      public long coalescedCount() {
50          return _coalescedCount.get();
51      }
52  
53      public int pendingCount() {
54          return _pendingCount.get();
55      }
56  
57      public void put(E e) throws InterruptedException {
58          if (e == null) {
59              throw new NullPointerException();
60          }
61  
62          int pendingElements = -1;
63  
64          _putLock.lockInterruptibly();
65  
66          try {
67              if (_coalesceElement(e)) {
68                  return;
69              }
70  
71              _lastElementLink._nextElementLink = new ElementLink<E>(e);
72  
73              _lastElementLink = _lastElementLink._nextElementLink;
74  
75              pendingElements = _pendingCount.getAndIncrement();
76          }
77          finally {
78              _putLock.unlock();
79          }
80  
81          if (pendingElements == 0) {
82              _takeLock.lock();
83  
84              try {
85                  _notEmptyCondition.signal();
86              }
87              finally {
88                  _takeLock.unlock();
89              }
90          }
91      }
92  
93      public E take() throws InterruptedException {
94          E element = null;
95  
96          _takeLock.lockInterruptibly();
97  
98          try {
99              while (_pendingCount.get() == 0) {
100                 _notEmptyCondition.await();
101             }
102 
103             ElementLink<E> garbageELementLink = _headElementLink;
104 
105             _headElementLink = _headElementLink._nextElementLink;
106 
107             garbageELementLink._nextElementLink = null;
108 
109             element = _headElementLink._element;
110 
111             _headElementLink._element = null;
112 
113             int pendingElements = _pendingCount.getAndDecrement();
114 
115             if (pendingElements > 1) {
116                 _notEmptyCondition.signal();
117             }
118         }
119         finally {
120             _takeLock.unlock();
121         }
122 
123         return element;
124     }
125 
126     public Object[] takeSnapshot() {
127         _putLock.lock();
128         _takeLock.lock();
129 
130         try {
131             Object[] pendingElements = new Object[_pendingCount.get()];
132 
133             ElementLink<E> currentElementLink =
134                 _headElementLink._nextElementLink;
135 
136             for (int i = 0; currentElementLink != null; i++) {
137                 pendingElements[i] = currentElementLink._element;
138 
139                 currentElementLink = currentElementLink._nextElementLink;
140             }
141 
142             return pendingElements;
143         }
144         finally {
145             _putLock.unlock();
146             _takeLock.unlock();
147         }
148     }
149 
150     private boolean _coalesceElement(E e) {
151         try {
152             _takeLock.lockInterruptibly();
153 
154             try {
155                 ElementLink<E> currentElementLink =
156                     _headElementLink._nextElementLink;
157 
158                 if (_comparator != null) {
159                     while (currentElementLink != null) {
160                         if (_comparator.compare(
161                                 currentElementLink._element, e) == 0) {
162 
163                             _coalescedCount.incrementAndGet();
164 
165                             return true;
166                         }
167                         else {
168                             currentElementLink =
169                                 currentElementLink._nextElementLink;
170                         }
171                     }
172                 }
173                 else {
174                     while (currentElementLink != null) {
175                         if (currentElementLink._element.equals(e)) {
176                             _coalescedCount.incrementAndGet();
177 
178                             return true;
179                         }
180                         else {
181                             currentElementLink =
182                                 currentElementLink._nextElementLink;
183                         }
184                     }
185                 }
186             }
187             finally {
188                 _takeLock.unlock();
189             }
190         }
191         catch (InterruptedException ie) {
192 
193             // Continue to let the current element enter the pipe
194 
195         }
196 
197         return false;
198     }
199 
200     private final AtomicLong _coalescedCount = new AtomicLong(0);
201     private final Comparator<E> _comparator;
202     private ElementLink<E> _headElementLink;
203     private ElementLink<E> _lastElementLink;
204     private final Condition _notEmptyCondition;
205     private final AtomicInteger _pendingCount = new AtomicInteger(0);
206     private final ReentrantLock _putLock = new ReentrantLock();
207     private final ReentrantLock _takeLock = new ReentrantLock();
208 
209     private static class ElementLink<E> {
210 
211         private ElementLink(E element) {
212             _element = element;
213         }
214 
215         private E _element;
216         private ElementLink<E> _nextElementLink;
217 
218     }
219 
220 }