001    /*
002     * Copyright © 2008, 2012 Pedro Agulló Soliveres.
003     * 
004     * This file is part of DirectJNgine.
005     *
006     * DirectJNgine is free software: you can redistribute it and/or modify
007     * it under the terms of the GNU Lesser General Public License as published by
008     * the Free Software Foundation, either version 3 of the License.
009     *
010     * Commercial use is permitted to the extent that the code/component(s)
011     * do NOT become part of another Open Source or Commercially developed
012     * licensed development library or toolkit without explicit permission.
013     *
014     * DirectJNgine is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU Lesser General Public License for more details.
018     *
019     * You should have received a copy of the GNU Lesser General Public License
020     * along with DirectJNgine.  If not, see <http://www.gnu.org/licenses/>.
021     * 
022     * This software uses the ExtJs library (http://extjs.com), which is 
023     * distributed under the GPL v3 license (see http://extjs.com/license).
024     */
025    
026    /*
027     * Just to give credit where credit is due...
028     * 
029     * The original implementation for this class was written 
030     * by Jacob Hookom.
031     * 
032     * Take a look at http://weblogs.java.net/blog/jhook/
033     * for details and a nice discussion on handling
034     * concurrent tasks.
035     */
036    
037    package com.softwarementors.extjs.djn;
038    
039    import java.util.ArrayList;
040    import java.util.Collection;
041    import java.util.List;
042    import java.util.concurrent.BlockingQueue;
043    import java.util.concurrent.Callable;
044    import java.util.concurrent.ExecutionException;
045    import java.util.concurrent.Executor;
046    import java.util.concurrent.Future;
047    import java.util.concurrent.FutureTask;
048    import java.util.concurrent.LinkedBlockingQueue;
049    import java.util.concurrent.Semaphore;
050    import java.util.concurrent.TimeUnit;
051    import java.util.concurrent.TimeoutException;
052    
053    import edu.umd.cs.findbugs.annotations.NonNull;
054    
055    public class ParallelTask<V> implements Future<Collection<V>> {
056    
057      private class BoundedFuture extends FutureTask<V> {
058        BoundedFuture(Callable<V> c) { super(c); }
059        @Override
060        protected void done() {
061          ParallelTask.this.semaphore.release(); // Allow other thread to be executed -if there is one waiting to enter the semaphore protected code
062          ParallelTask.this.completedQueue.add(this); // Add this task to the lisf of completed tasks
063        }
064      }
065    
066      //List of submitted tasks
067      @NonNull private final List<BoundedFuture> submittedQueue;         
068      //List of completed tasks: must be thread safe, for different BoundedFuture tasks will attempt to add themselves here as they finish, concurrently  
069      @NonNull private final BlockingQueue<BoundedFuture> completedQueue; 
070      @NonNull private final Semaphore semaphore;
071      @NonNull private final Executor executor;
072      private final int size;
073      private boolean cancelled = false;
074    
075      public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) {
076        if (exec == null || callable == null) 
077          throw new NullPointerException();
078        
079        this.executor = exec;
080        this.semaphore = new Semaphore(permits);
081        this.size = callable.size();
082        this.submittedQueue = new ArrayList<BoundedFuture>(this.size);
083        this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(this.size);
084        for (Callable<V> c : callable) {
085          this.submittedQueue.add(new BoundedFuture(c));
086        }
087      }
088    
089      public boolean cancel(boolean mayInterruptIfRunning) {
090        if (this.isDone()) 
091          return false;
092        
093        this.cancelled = true;
094        for (Future<?> f : this.submittedQueue) {
095          f.cancel(mayInterruptIfRunning);
096        }
097        return this.cancelled;
098      }
099    
100      public Collection<V> get() throws InterruptedException, ExecutionException {
101        // throw new UnsupportedOperationException( "We do not use this in DirectJNgine, and therefore we haven't tested the code (even though it is from a reliable source)");
102    
103        Collection<V> result = new ArrayList<V>(this.submittedQueue.size());
104        boolean done = false;
105        try {
106          // Start executing threads: the number of threads running concurrently is limited by the semaphore
107          for (BoundedFuture f : this.submittedQueue) {
108            if (this.isCancelled()) { 
109              break;
110            }
111            this.semaphore.acquire();
112            this.executor.execute(f);
113          }
114          
115          // Get results once all tests have started running: calling take() on the completed queue will block unless all
116          // threads have finished running!
117          for (int i = 0; i < this.size; i++) {
118            if (this.isCancelled())
119              break;
120            
121            result.add(this.completedQueue.take().get());
122          }
123          done = true;
124        } 
125        finally {
126          if (!done) 
127            this.cancel(true);
128        }
129        return result;
130      }
131    
132      public Collection<V> get(long timeout, TimeUnit unit)
133        throws InterruptedException, ExecutionException, TimeoutException 
134      {
135        throw new UnsupportedOperationException( "We do not support timeouts in DirectJNgine, and therefore we haven't tested the following code!");
136        /*
137        // timeout handling isn't perfect, but it's an attempt to 
138        // replicate the behavior found in AbstractExecutorService
139        long nanos = unit.toNanos(timeout);
140        long endTime = System.nanoTime() + nanos;
141        
142        boolean done = false;
143        Collection<V> taskExecutionResults = new ArrayList<V>(this.submittedQueue.size());
144        try {
145          for (BoundedFuture f : this.submittedQueue) {
146            if (System.nanoTime() >= endTime) 
147              throw new TimeoutException();
148            
149            // If we cancelled execution, do not try to keep adding more task to execute
150            if (this.isCancelled()) 
151              break;
152    
153            // We will block here if there are already n tasks running (n=number passed on semaphore creation)
154            // When one of those running tasks is finished, a new task will be allowed to start execution 
155            this.semaphore.acquire();
156            this.executor.execute(f);
157          }
158          
159          for (int i = 0; i < this.size; i++) {
160            if (this.isCancelled()) 
161              break;
162            
163            long nowTime = System.nanoTime();
164            if (nowTime >= endTime) 
165              throw new TimeoutException();
166            
167            BoundedFuture f = this.completedQueue.poll(endTime - nowTime, TimeUnit.NANOSECONDS);
168            if (f == null) 
169              throw new TimeoutException();
170            
171            taskExecutionResults.add(f.get());
172          }
173          done = true;
174        } 
175        finally {
176          if (!done) {
177            this.cancel(true);
178          }
179        }
180        return taskExecutionResults;
181        */
182      }
183    
184      public boolean isCancelled() {
185        return this.cancelled;
186      }
187    
188      public boolean isDone() {
189        return this.completedQueue.size() == this.size;
190      }
191    }