001 /* 002 * Copyright © 2008, 2009 Pedro Agulló Soliveres. 003 * 004 * This file is part of DirectJNgine. 005 * 006 * DirectJNgine is free software: you can redistribute it and/or modify 007 * it under the terms of the GNU General Public License as published by 008 * the Free Software Foundation, either version 3 of the License. 009 * 010 * DirectJNgine is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with DirectJNgine. If not, see <http://www.gnu.org/licenses/>. 017 * 018 * This software uses the ExtJs library (http://extjs.com), which is 019 * distributed under the GPL v3 license (see http://extjs.com/license). 020 */ 021 022 /* 023 * Just to give credit where credit is due... 024 * 025 * The original implementation for this class was written 026 * by Jacob Hookom. 027 * 028 * Take a look at http://weblogs.java.net/blog/jhook/ 029 * for details and a nice discussion on handling 030 * concurrent tasks. 031 */ 032 033 package com.softwarementors.extjs.djn; 034 035 import java.util.ArrayList; 036 import java.util.Collection; 037 import java.util.List; 038 039 040 import java.util.concurrent.BlockingQueue; 041 import java.util.concurrent.Callable; 042 import java.util.concurrent.ExecutionException; 043 import java.util.concurrent.Executor; 044 import java.util.concurrent.Future; 045 import java.util.concurrent.FutureTask; 046 import java.util.concurrent.LinkedBlockingQueue; 047 import java.util.concurrent.Semaphore; 048 import java.util.concurrent.TimeUnit; 049 import java.util.concurrent.TimeoutException; 050 051 public class ParallelTask<V> implements Future<Collection<V>> { 052 053 private class BoundedFuture extends FutureTask<V> { 054 BoundedFuture(Callable<V> c) { super(c); } 055 // BoundedFuture(Runnable t, V r) { super(t, r); } 056 @Override 057 protected void done() { 058 ParallelTask.this.semaphore.release(); // Allow other thread to be executed -if there is one waiting to enter the semaphore protected code 059 ParallelTask.this.completedQueue.add(this); // Add this task to the lisf of completed tasks 060 } 061 } 062 063 //List of submitted tasks 064 private final List<BoundedFuture> submittedQueue; 065 //List of completed tasks: must be thread safe, for different BoundedFuture tasks will attempt to add themselves here as they finish, concurrently 066 private final BlockingQueue<BoundedFuture> completedQueue; 067 private final Semaphore semaphore; 068 private final Executor executor; 069 private final int size; 070 private boolean cancelled = false; 071 072 public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) { 073 if (exec == null || callable == null) 074 throw new NullPointerException(); 075 076 this.executor = exec; 077 this.semaphore = new Semaphore(permits); 078 this.size = callable.size(); 079 this.submittedQueue = new ArrayList<BoundedFuture>(this.size); 080 this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(this.size); 081 for (Callable<V> c : callable) { 082 this.submittedQueue.add(new BoundedFuture(c)); 083 } 084 } 085 086 public boolean cancel(boolean mayInterruptIfRunning) { 087 if (this.isDone()) 088 return false; 089 090 this.cancelled = true; 091 for (Future<?> f : this.submittedQueue) { 092 f.cancel(mayInterruptIfRunning); 093 } 094 return this.cancelled; 095 } 096 097 public Collection<V> get() throws InterruptedException, ExecutionException { 098 // throw new UnsupportedOperationException( "We do not use this in DirectJNgine, and therefore we haven't tested the code (even though it is from a reliable source)"); 099 100 Collection<V> result = new ArrayList<V>(this.submittedQueue.size()); 101 boolean done = false; 102 try { 103 // Start executing threads: the number of threads running concurrently is limited by the semaphore 104 for (BoundedFuture f : this.submittedQueue) { 105 if (this.isCancelled()) { 106 break; 107 } 108 this.semaphore.acquire(); 109 this.executor.execute(f); 110 } 111 112 // Get results once all tests have started running: calling take() on the completed queue will block unless all 113 // threads have finished running! 114 for (int i = 0; i < this.size; i++) { 115 if (this.isCancelled()) 116 break; 117 118 result.add(this.completedQueue.take().get()); 119 } 120 done = true; 121 } 122 finally { 123 if (!done) 124 this.cancel(true); 125 } 126 return result; 127 } 128 129 public Collection<V> get(long timeout, TimeUnit unit) 130 throws InterruptedException, ExecutionException, TimeoutException 131 { 132 throw new UnsupportedOperationException( "We do not support timeouts in DirectJNgine, and therefore we haven't tested the following code!"); 133 /* 134 // timeout handling isn't perfect, but it's an attempt to 135 // replicate the behavior found in AbstractExecutorService 136 long nanos = unit.toNanos(timeout); 137 long endTime = System.nanoTime() + nanos; 138 139 boolean done = false; 140 Collection<V> taskExecutionResults = new ArrayList<V>(this.submittedQueue.size()); 141 try { 142 for (BoundedFuture f : this.submittedQueue) { 143 if (System.nanoTime() >= endTime) 144 throw new TimeoutException(); 145 146 // If we cancelled execution, do not try to keep adding more task to execute 147 if (this.isCancelled()) 148 break; 149 150 // We will block here if there are already n tasks running (n=number passed on semaphore creation) 151 // When one of those running tasks is finished, a new task will be allowed to start execution 152 this.semaphore.acquire(); 153 this.executor.execute(f); 154 } 155 156 for (int i = 0; i < this.size; i++) { 157 if (this.isCancelled()) 158 break; 159 160 long nowTime = System.nanoTime(); 161 if (nowTime >= endTime) 162 throw new TimeoutException(); 163 164 BoundedFuture f = this.completedQueue.poll(endTime - nowTime, TimeUnit.NANOSECONDS); 165 if (f == null) 166 throw new TimeoutException(); 167 168 taskExecutionResults.add(f.get()); 169 } 170 done = true; 171 } 172 finally { 173 if (!done) { 174 this.cancel(true); 175 } 176 } 177 return taskExecutionResults; 178 */ 179 } 180 181 public boolean isCancelled() { 182 return this.cancelled; 183 } 184 185 public boolean isDone() { 186 return this.completedQueue.size() == this.size; 187 } 188 }