001 /* 002 * Copyright © 2008, 2009 Pedro Agulló Soliveres. 003 * 004 * This file is part of DirectJNgine. 005 * 006 * DirectJNgine is free software: you can redistribute it and/or modify 007 * it under the terms of the GNU General Public License as published by 008 * the Free Software Foundation, either version 3 of the License. 009 * 010 * Commercial use is permitted to the extent that the code/component(s) 011 * do NOT become part of another Open Source or Commercially developed 012 * licensed development library or toolkit without explicit permission. 013 * 014 * DirectJNgine is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with DirectJNgine. If not, see <http://www.gnu.org/licenses/>. 021 * 022 * This software uses the ExtJs library (http://extjs.com), which is 023 * distributed under the GPL v3 license (see http://extjs.com/license). 024 */ 025 026 /* 027 * Just to give credit where credit is due... 028 * 029 * The original implementation for this class was written 030 * by Jacob Hookom. 031 * 032 * Take a look at http://weblogs.java.net/blog/jhook/ 033 * for details and a nice discussion on handling 034 * concurrent tasks. 035 */ 036 037 package com.softwarementors.extjs.djn; 038 039 import java.util.ArrayList; 040 import java.util.Collection; 041 import java.util.List; 042 043 044 import java.util.concurrent.BlockingQueue; 045 import java.util.concurrent.Callable; 046 import java.util.concurrent.ExecutionException; 047 import java.util.concurrent.Executor; 048 import java.util.concurrent.Future; 049 import java.util.concurrent.FutureTask; 050 import java.util.concurrent.LinkedBlockingQueue; 051 import java.util.concurrent.Semaphore; 052 import java.util.concurrent.TimeUnit; 053 import java.util.concurrent.TimeoutException; 054 055 public class ParallelTask<V> implements Future<Collection<V>> { 056 057 private class BoundedFuture extends FutureTask<V> { 058 BoundedFuture(Callable<V> c) { super(c); } 059 @Override 060 protected void done() { 061 ParallelTask.this.semaphore.release(); // Allow other thread to be executed -if there is one waiting to enter the semaphore protected code 062 ParallelTask.this.completedQueue.add(this); // Add this task to the lisf of completed tasks 063 } 064 } 065 066 //List of submitted tasks 067 private final List<BoundedFuture> submittedQueue; 068 //List of completed tasks: must be thread safe, for different BoundedFuture tasks will attempt to add themselves here as they finish, concurrently 069 private final BlockingQueue<BoundedFuture> completedQueue; 070 private final Semaphore semaphore; 071 private final Executor executor; 072 private final int size; 073 private boolean cancelled = false; 074 075 public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) { 076 if (exec == null || callable == null) 077 throw new NullPointerException(); 078 079 this.executor = exec; 080 this.semaphore = new Semaphore(permits); 081 this.size = callable.size(); 082 this.submittedQueue = new ArrayList<BoundedFuture>(this.size); 083 this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(this.size); 084 for (Callable<V> c : callable) { 085 this.submittedQueue.add(new BoundedFuture(c)); 086 } 087 } 088 089 public boolean cancel(boolean mayInterruptIfRunning) { 090 if (this.isDone()) 091 return false; 092 093 this.cancelled = true; 094 for (Future<?> f : this.submittedQueue) { 095 f.cancel(mayInterruptIfRunning); 096 } 097 return this.cancelled; 098 } 099 100 public Collection<V> get() throws InterruptedException, ExecutionException { 101 // throw new UnsupportedOperationException( "We do not use this in DirectJNgine, and therefore we haven't tested the code (even though it is from a reliable source)"); 102 103 Collection<V> result = new ArrayList<V>(this.submittedQueue.size()); 104 boolean done = false; 105 try { 106 // Start executing threads: the number of threads running concurrently is limited by the semaphore 107 for (BoundedFuture f : this.submittedQueue) { 108 if (this.isCancelled()) { 109 break; 110 } 111 this.semaphore.acquire(); 112 this.executor.execute(f); 113 } 114 115 // Get results once all tests have started running: calling take() on the completed queue will block unless all 116 // threads have finished running! 117 for (int i = 0; i < this.size; i++) { 118 if (this.isCancelled()) 119 break; 120 121 result.add(this.completedQueue.take().get()); 122 } 123 done = true; 124 } 125 finally { 126 if (!done) 127 this.cancel(true); 128 } 129 return result; 130 } 131 132 public Collection<V> get(long timeout, TimeUnit unit) 133 throws InterruptedException, ExecutionException, TimeoutException 134 { 135 throw new UnsupportedOperationException( "We do not support timeouts in DirectJNgine, and therefore we haven't tested the following code!"); 136 /* 137 // timeout handling isn't perfect, but it's an attempt to 138 // replicate the behavior found in AbstractExecutorService 139 long nanos = unit.toNanos(timeout); 140 long endTime = System.nanoTime() + nanos; 141 142 boolean done = false; 143 Collection<V> taskExecutionResults = new ArrayList<V>(this.submittedQueue.size()); 144 try { 145 for (BoundedFuture f : this.submittedQueue) { 146 if (System.nanoTime() >= endTime) 147 throw new TimeoutException(); 148 149 // If we cancelled execution, do not try to keep adding more task to execute 150 if (this.isCancelled()) 151 break; 152 153 // We will block here if there are already n tasks running (n=number passed on semaphore creation) 154 // When one of those running tasks is finished, a new task will be allowed to start execution 155 this.semaphore.acquire(); 156 this.executor.execute(f); 157 } 158 159 for (int i = 0; i < this.size; i++) { 160 if (this.isCancelled()) 161 break; 162 163 long nowTime = System.nanoTime(); 164 if (nowTime >= endTime) 165 throw new TimeoutException(); 166 167 BoundedFuture f = this.completedQueue.poll(endTime - nowTime, TimeUnit.NANOSECONDS); 168 if (f == null) 169 throw new TimeoutException(); 170 171 taskExecutionResults.add(f.get()); 172 } 173 done = true; 174 } 175 finally { 176 if (!done) { 177 this.cancel(true); 178 } 179 } 180 return taskExecutionResults; 181 */ 182 } 183 184 public boolean isCancelled() { 185 return this.cancelled; 186 } 187 188 public boolean isDone() { 189 return this.completedQueue.size() == this.size; 190 } 191 }