001    /*
002     * Copyright © 2008, 2009 Pedro Agulló Soliveres.
003     * 
004     * This file is part of DirectJNgine.
005     *
006     * DirectJNgine is free software: you can redistribute it and/or modify
007     * it under the terms of the GNU General Public License as published by
008     * the Free Software Foundation, either version 3 of the License.
009     *
010     * Commercial use is permitted to the extent that the code/component(s)
011     * do NOT become part of another Open Source or Commercially developed
012     * licensed development library or toolkit without explicit permission.
013     *
014     * DirectJNgine is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with DirectJNgine.  If not, see <http://www.gnu.org/licenses/>.
021     * 
022     * This software uses the ExtJs library (http://extjs.com), which is 
023     * distributed under the GPL v3 license (see http://extjs.com/license).
024     */
025    
026    /*
027     * Just to give credit where credit is due...
028     * 
029     * The original implementation for this class was written 
030     * by Jacob Hookom.
031     * 
032     * Take a look at http://weblogs.java.net/blog/jhook/
033     * for details and a nice discussion on handling
034     * concurrent tasks.
035     */
036    
037    package com.softwarementors.extjs.djn;
038    
039    import java.util.ArrayList;
040    import java.util.Collection;
041    import java.util.List;
042    
043    
044    import java.util.concurrent.BlockingQueue;
045    import java.util.concurrent.Callable;
046    import java.util.concurrent.ExecutionException;
047    import java.util.concurrent.Executor;
048    import java.util.concurrent.Future;
049    import java.util.concurrent.FutureTask;
050    import java.util.concurrent.LinkedBlockingQueue;
051    import java.util.concurrent.Semaphore;
052    import java.util.concurrent.TimeUnit;
053    import java.util.concurrent.TimeoutException;
054    
055    public class ParallelTask<V> implements Future<Collection<V>> {
056    
057      private class BoundedFuture extends FutureTask<V> {
058        BoundedFuture(Callable<V> c) { super(c); }
059        // BoundedFuture(Runnable t, V r) { super(t, r); }
060        @Override
061        protected void done() {
062          ParallelTask.this.semaphore.release(); // Allow other thread to be executed -if there is one waiting to enter the semaphore protected code
063          ParallelTask.this.completedQueue.add(this); // Add this task to the lisf of completed tasks
064        }
065      }
066    
067      //List of submitted tasks
068      private final List<BoundedFuture> submittedQueue;         
069      //List of completed tasks: must be thread safe, for different BoundedFuture tasks will attempt to add themselves here as they finish, concurrently  
070      private final BlockingQueue<BoundedFuture> completedQueue; 
071      private final Semaphore semaphore;
072      private final Executor executor;
073      private final int size;
074      private boolean cancelled = false;
075    
076      public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) {
077        if (exec == null || callable == null) 
078          throw new NullPointerException();
079        
080        this.executor = exec;
081        this.semaphore = new Semaphore(permits);
082        this.size = callable.size();
083        this.submittedQueue = new ArrayList<BoundedFuture>(this.size);
084        this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(this.size);
085        for (Callable<V> c : callable) {
086          this.submittedQueue.add(new BoundedFuture(c));
087        }
088      }
089    
090      public boolean cancel(boolean mayInterruptIfRunning) {
091        if (this.isDone()) 
092          return false;
093        
094        this.cancelled = true;
095        for (Future<?> f : this.submittedQueue) {
096          f.cancel(mayInterruptIfRunning);
097        }
098        return this.cancelled;
099      }
100    
101      public Collection<V> get() throws InterruptedException, ExecutionException {
102        // throw new UnsupportedOperationException( "We do not use this in DirectJNgine, and therefore we haven't tested the code (even though it is from a reliable source)");
103    
104        Collection<V> result = new ArrayList<V>(this.submittedQueue.size());
105        boolean done = false;
106        try {
107          // Start executing threads: the number of threads running concurrently is limited by the semaphore
108          for (BoundedFuture f : this.submittedQueue) {
109            if (this.isCancelled()) { 
110              break;
111            }
112            this.semaphore.acquire();
113            this.executor.execute(f);
114          }
115          
116          // Get results once all tests have started running: calling take() on the completed queue will block unless all
117          // threads have finished running!
118          for (int i = 0; i < this.size; i++) {
119            if (this.isCancelled())
120              break;
121            
122            result.add(this.completedQueue.take().get());
123          }
124          done = true;
125        } 
126        finally {
127          if (!done) 
128            this.cancel(true);
129        }
130        return result;
131      }
132    
133      public Collection<V> get(long timeout, TimeUnit unit)
134        throws InterruptedException, ExecutionException, TimeoutException 
135      {
136        throw new UnsupportedOperationException( "We do not support timeouts in DirectJNgine, and therefore we haven't tested the following code!");
137        /*
138        // timeout handling isn't perfect, but it's an attempt to 
139        // replicate the behavior found in AbstractExecutorService
140        long nanos = unit.toNanos(timeout);
141        long endTime = System.nanoTime() + nanos;
142        
143        boolean done = false;
144        Collection<V> taskExecutionResults = new ArrayList<V>(this.submittedQueue.size());
145        try {
146          for (BoundedFuture f : this.submittedQueue) {
147            if (System.nanoTime() >= endTime) 
148              throw new TimeoutException();
149            
150            // If we cancelled execution, do not try to keep adding more task to execute
151            if (this.isCancelled()) 
152              break;
153    
154            // We will block here if there are already n tasks running (n=number passed on semaphore creation)
155            // When one of those running tasks is finished, a new task will be allowed to start execution 
156            this.semaphore.acquire();
157            this.executor.execute(f);
158          }
159          
160          for (int i = 0; i < this.size; i++) {
161            if (this.isCancelled()) 
162              break;
163            
164            long nowTime = System.nanoTime();
165            if (nowTime >= endTime) 
166              throw new TimeoutException();
167            
168            BoundedFuture f = this.completedQueue.poll(endTime - nowTime, TimeUnit.NANOSECONDS);
169            if (f == null) 
170              throw new TimeoutException();
171            
172            taskExecutionResults.add(f.get());
173          }
174          done = true;
175        } 
176        finally {
177          if (!done) {
178            this.cancel(true);
179          }
180        }
181        return taskExecutionResults;
182        */
183      }
184    
185      public boolean isCancelled() {
186        return this.cancelled;
187      }
188    
189      public boolean isDone() {
190        return this.completedQueue.size() == this.size;
191      }
192    }