001    /*
002     * Copyright © 2008, 2009 Pedro Agulló Soliveres.
003     * 
004     * This file is part of DirectJNgine.
005     *
006     * DirectJNgine is free software: you can redistribute it and/or modify
007     * it under the terms of the GNU General Public License as published by
008     * the Free Software Foundation, either version 3 of the License.
009     *
010     * DirectJNgine is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013     * GNU General Public License for more details.
014     *
015     * You should have received a copy of the GNU General Public License
016     * along with DirectJNgine.  If not, see <http://www.gnu.org/licenses/>.
017     * 
018     * This software uses the ExtJs library (http://extjs.com), which is 
019     * distributed under the GPL v3 license (see http://extjs.com/license).
020     */
021    
022    /*
023     * Just to give credit where credit is due...
024     * 
025     * The original implementation for this class was written 
026     * by Jacob Hookom.
027     * 
028     * Take a look at http://weblogs.java.net/blog/jhook/
029     * for details and a nice discussion on handling
030     * concurrent tasks.
031     */
032    
033    package com.softwarementors.extjs.djn;
034    
035    import java.util.ArrayList;
036    import java.util.Collection;
037    import java.util.List;
038    
039    
040    import java.util.concurrent.BlockingQueue;
041    import java.util.concurrent.Callable;
042    import java.util.concurrent.ExecutionException;
043    import java.util.concurrent.Executor;
044    import java.util.concurrent.Future;
045    import java.util.concurrent.FutureTask;
046    import java.util.concurrent.LinkedBlockingQueue;
047    import java.util.concurrent.Semaphore;
048    import java.util.concurrent.TimeUnit;
049    import java.util.concurrent.TimeoutException;
050    
051    public class ParallelTask<V> implements Future<Collection<V>> {
052    
053      private class BoundedFuture extends FutureTask<V> {
054        BoundedFuture(Callable<V> c) { super(c); }
055        // BoundedFuture(Runnable t, V r) { super(t, r); }
056        @Override
057        protected void done() {
058          ParallelTask.this.semaphore.release(); // Allow other thread to be executed -if there is one waiting to enter the semaphore protected code
059          ParallelTask.this.completedQueue.add(this); // Add this task to the lisf of completed tasks
060        }
061      }
062    
063      //List of submitted tasks
064      private final List<BoundedFuture> submittedQueue;         
065      //List of completed tasks: must be thread safe, for different BoundedFuture tasks will attempt to add themselves here as they finish, concurrently  
066      private final BlockingQueue<BoundedFuture> completedQueue; 
067      private final Semaphore semaphore;
068      private final Executor executor;
069      private final int size;
070      private boolean cancelled = false;
071    
072      public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) {
073        if (exec == null || callable == null) 
074          throw new NullPointerException();
075        
076        this.executor = exec;
077        this.semaphore = new Semaphore(permits);
078        this.size = callable.size();
079        this.submittedQueue = new ArrayList<BoundedFuture>(this.size);
080        this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(this.size);
081        for (Callable<V> c : callable) {
082          this.submittedQueue.add(new BoundedFuture(c));
083        }
084      }
085    
086      public boolean cancel(boolean mayInterruptIfRunning) {
087        if (this.isDone()) 
088          return false;
089        
090        this.cancelled = true;
091        for (Future<?> f : this.submittedQueue) {
092          f.cancel(mayInterruptIfRunning);
093        }
094        return this.cancelled;
095      }
096    
097      public Collection<V> get() throws InterruptedException, ExecutionException {
098        // throw new UnsupportedOperationException( "We do not use this in DirectJNgine, and therefore we haven't tested the code (even though it is from a reliable source)");
099    
100        Collection<V> result = new ArrayList<V>(this.submittedQueue.size());
101        boolean done = false;
102        try {
103          // Start executing threads: the number of threads running concurrently is limited by the semaphore
104          for (BoundedFuture f : this.submittedQueue) {
105            if (this.isCancelled()) { 
106              break;
107            }
108            this.semaphore.acquire();
109            this.executor.execute(f);
110          }
111          
112          // Get results once all tests have started running: calling take() on the completed queue will block unless all
113          // threads have finished running!
114          for (int i = 0; i < this.size; i++) {
115            if (this.isCancelled())
116              break;
117            
118            result.add(this.completedQueue.take().get());
119          }
120          done = true;
121        } 
122        finally {
123          if (!done) 
124            this.cancel(true);
125        }
126        return result;
127      }
128    
129      public Collection<V> get(long timeout, TimeUnit unit)
130        throws InterruptedException, ExecutionException, TimeoutException 
131      {
132        throw new UnsupportedOperationException( "We do not support timeouts in DirectJNgine, and therefore we haven't tested the following code!");
133        /*
134        // timeout handling isn't perfect, but it's an attempt to 
135        // replicate the behavior found in AbstractExecutorService
136        long nanos = unit.toNanos(timeout);
137        long endTime = System.nanoTime() + nanos;
138        
139        boolean done = false;
140        Collection<V> taskExecutionResults = new ArrayList<V>(this.submittedQueue.size());
141        try {
142          for (BoundedFuture f : this.submittedQueue) {
143            if (System.nanoTime() >= endTime) 
144              throw new TimeoutException();
145            
146            // If we cancelled execution, do not try to keep adding more task to execute
147            if (this.isCancelled()) 
148              break;
149    
150            // We will block here if there are already n tasks running (n=number passed on semaphore creation)
151            // When one of those running tasks is finished, a new task will be allowed to start execution 
152            this.semaphore.acquire();
153            this.executor.execute(f);
154          }
155          
156          for (int i = 0; i < this.size; i++) {
157            if (this.isCancelled()) 
158              break;
159            
160            long nowTime = System.nanoTime();
161            if (nowTime >= endTime) 
162              throw new TimeoutException();
163            
164            BoundedFuture f = this.completedQueue.poll(endTime - nowTime, TimeUnit.NANOSECONDS);
165            if (f == null) 
166              throw new TimeoutException();
167            
168            taskExecutionResults.add(f.get());
169          }
170          done = true;
171        } 
172        finally {
173          if (!done) {
174            this.cancel(true);
175          }
176        }
177        return taskExecutionResults;
178        */
179      }
180    
181      public boolean isCancelled() {
182        return this.cancelled;
183      }
184    
185      public boolean isDone() {
186        return this.completedQueue.size() == this.size;
187      }
188    }