001 /* 002 * Copyright © 2008, 2009 Pedro Agulló Soliveres. 003 * 004 * This file is part of DirectJNgine. 005 * 006 * DirectJNgine is free software: you can redistribute it and/or modify 007 * it under the terms of the GNU General Public License as published by 008 * the Free Software Foundation, either version 3 of the License. 009 * 010 * Commercial use is permitted to the extent that the code/component(s) 011 * do NOT become part of another Open Source or Commercially developed 012 * licensed development library or toolkit without explicit permission. 013 * 014 * DirectJNgine is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with DirectJNgine. If not, see <http://www.gnu.org/licenses/>. 021 * 022 * This software uses the ExtJs library (http://extjs.com), which is 023 * distributed under the GPL v3 license (see http://extjs.com/license). 024 */ 025 026 /* 027 * Just to give credit where credit is due... 028 * 029 * The original implementation for this class was written 030 * by Jacob Hookom. 031 * 032 * Take a look at http://weblogs.java.net/blog/jhook/ 033 * for details and a nice discussion on handling 034 * concurrent tasks. 035 */ 036 037 package com.softwarementors.extjs.djn; 038 039 import java.util.ArrayList; 040 import java.util.Collection; 041 import java.util.List; 042 043 044 import java.util.concurrent.BlockingQueue; 045 import java.util.concurrent.Callable; 046 import java.util.concurrent.ExecutionException; 047 import java.util.concurrent.Executor; 048 import java.util.concurrent.Future; 049 import java.util.concurrent.FutureTask; 050 import java.util.concurrent.LinkedBlockingQueue; 051 import java.util.concurrent.Semaphore; 052 import java.util.concurrent.TimeUnit; 053 import java.util.concurrent.TimeoutException; 054 055 public class ParallelTask<V> implements Future<Collection<V>> { 056 057 private class BoundedFuture extends FutureTask<V> { 058 BoundedFuture(Callable<V> c) { super(c); } 059 // BoundedFuture(Runnable t, V r) { super(t, r); } 060 @Override 061 protected void done() { 062 ParallelTask.this.semaphore.release(); // Allow other thread to be executed -if there is one waiting to enter the semaphore protected code 063 ParallelTask.this.completedQueue.add(this); // Add this task to the lisf of completed tasks 064 } 065 } 066 067 //List of submitted tasks 068 private final List<BoundedFuture> submittedQueue; 069 //List of completed tasks: must be thread safe, for different BoundedFuture tasks will attempt to add themselves here as they finish, concurrently 070 private final BlockingQueue<BoundedFuture> completedQueue; 071 private final Semaphore semaphore; 072 private final Executor executor; 073 private final int size; 074 private boolean cancelled = false; 075 076 public ParallelTask(Executor exec, Collection<Callable<V>> callable, int permits) { 077 if (exec == null || callable == null) 078 throw new NullPointerException(); 079 080 this.executor = exec; 081 this.semaphore = new Semaphore(permits); 082 this.size = callable.size(); 083 this.submittedQueue = new ArrayList<BoundedFuture>(this.size); 084 this.completedQueue = new LinkedBlockingQueue<BoundedFuture>(this.size); 085 for (Callable<V> c : callable) { 086 this.submittedQueue.add(new BoundedFuture(c)); 087 } 088 } 089 090 public boolean cancel(boolean mayInterruptIfRunning) { 091 if (this.isDone()) 092 return false; 093 094 this.cancelled = true; 095 for (Future<?> f : this.submittedQueue) { 096 f.cancel(mayInterruptIfRunning); 097 } 098 return this.cancelled; 099 } 100 101 public Collection<V> get() throws InterruptedException, ExecutionException { 102 // throw new UnsupportedOperationException( "We do not use this in DirectJNgine, and therefore we haven't tested the code (even though it is from a reliable source)"); 103 104 Collection<V> result = new ArrayList<V>(this.submittedQueue.size()); 105 boolean done = false; 106 try { 107 // Start executing threads: the number of threads running concurrently is limited by the semaphore 108 for (BoundedFuture f : this.submittedQueue) { 109 if (this.isCancelled()) { 110 break; 111 } 112 this.semaphore.acquire(); 113 this.executor.execute(f); 114 } 115 116 // Get results once all tests have started running: calling take() on the completed queue will block unless all 117 // threads have finished running! 118 for (int i = 0; i < this.size; i++) { 119 if (this.isCancelled()) 120 break; 121 122 result.add(this.completedQueue.take().get()); 123 } 124 done = true; 125 } 126 finally { 127 if (!done) 128 this.cancel(true); 129 } 130 return result; 131 } 132 133 public Collection<V> get(long timeout, TimeUnit unit) 134 throws InterruptedException, ExecutionException, TimeoutException 135 { 136 throw new UnsupportedOperationException( "We do not support timeouts in DirectJNgine, and therefore we haven't tested the following code!"); 137 /* 138 // timeout handling isn't perfect, but it's an attempt to 139 // replicate the behavior found in AbstractExecutorService 140 long nanos = unit.toNanos(timeout); 141 long endTime = System.nanoTime() + nanos; 142 143 boolean done = false; 144 Collection<V> taskExecutionResults = new ArrayList<V>(this.submittedQueue.size()); 145 try { 146 for (BoundedFuture f : this.submittedQueue) { 147 if (System.nanoTime() >= endTime) 148 throw new TimeoutException(); 149 150 // If we cancelled execution, do not try to keep adding more task to execute 151 if (this.isCancelled()) 152 break; 153 154 // We will block here if there are already n tasks running (n=number passed on semaphore creation) 155 // When one of those running tasks is finished, a new task will be allowed to start execution 156 this.semaphore.acquire(); 157 this.executor.execute(f); 158 } 159 160 for (int i = 0; i < this.size; i++) { 161 if (this.isCancelled()) 162 break; 163 164 long nowTime = System.nanoTime(); 165 if (nowTime >= endTime) 166 throw new TimeoutException(); 167 168 BoundedFuture f = this.completedQueue.poll(endTime - nowTime, TimeUnit.NANOSECONDS); 169 if (f == null) 170 throw new TimeoutException(); 171 172 taskExecutionResults.add(f.get()); 173 } 174 done = true; 175 } 176 finally { 177 if (!done) { 178 this.cancel(true); 179 } 180 } 181 return taskExecutionResults; 182 */ 183 } 184 185 public boolean isCancelled() { 186 return this.cancelled; 187 } 188 189 public boolean isDone() { 190 return this.completedQueue.size() == this.size; 191 } 192 }