1 | /* |
---|
2 | Copyright 2006 by Sean Luke and George Mason University |
---|
3 | Licensed under the Academic Free License version 3.0 |
---|
4 | See the file "LICENSE" for more information |
---|
5 | */ |
---|
6 | |
---|
7 | |
---|
8 | package ec.eval; |
---|
9 | |
---|
10 | import ec.*; |
---|
11 | |
---|
12 | import java.io.*; |
---|
13 | import java.util.*; |
---|
14 | import java.net.*; |
---|
15 | import ec.util.*; |
---|
16 | import ec.steadystate.SteadyStateEvolutionState; |
---|
17 | import ec.steadystate.QueueIndividual; |
---|
18 | |
---|
19 | /** |
---|
20 | * SlaveMonitor.java |
---|
21 | * |
---|
22 | |
---|
23 | <P>The SlaveMonitor manages slave connections to each remote slave, and provides synchronization facilities |
---|
24 | for the slave connections and for various other objects waiting to be notified when new slaves are |
---|
25 | available, space is available in a slave's job queue, an individual has been completed, etc. |
---|
26 | |
---|
27 | <p>The monitor provides functions to create and delete slaves (registerSlave(), unregisterSlave()), |
---|
28 | schedule a job for evaluation (scheduleJobForEvaluation(...)), block until all jobs have completed |
---|
29 | (waitForAllSlavesToFinishEvaluating(...)), test if any individual in a job has been finished |
---|
30 | (evaluatedIndividualAvailable()), and block until an individual in a job is available and returned |
---|
31 | (waitForindividual()). |
---|
32 | |
---|
33 | <p>Generally speaking, the SlaveMonitor owns the SlaveConnections -- no one else |
---|
34 | should speak to them. Also generally speaking, only MasterProblems create and speak to the SlaveMonitor. |
---|
35 | |
---|
36 | * @author Sean Luke, Liviu Panait, and Keith Sullivan |
---|
37 | * @version 1.0 |
---|
38 | */ |
---|
39 | |
---|
40 | public class SlaveMonitor |
---|
41 | { |
---|
42 | public static final String P_EVALMASTERPORT = "eval.master.port"; |
---|
43 | public static final String P_EVALCOMPRESSION = "eval.compression"; |
---|
44 | public static final String P_MAXIMUMNUMBEROFCONCURRENTJOBSPERSLAVE = "eval.masterproblem.max-jobs-per-slave"; |
---|
45 | public static final int SEED_INCREMENT = 7919; // a large value (prime for fun) bigger than expected number of threads per slave |
---|
46 | |
---|
47 | public EvolutionState state; |
---|
48 | |
---|
49 | /** |
---|
50 | * The socket where slaves connect. |
---|
51 | */ |
---|
52 | public ServerSocket servSock; |
---|
53 | |
---|
54 | /** |
---|
55 | * Indicates whether compression is used over the socket IO streams. |
---|
56 | */ |
---|
57 | public boolean useCompression; |
---|
58 | |
---|
59 | boolean shutdownInProgress = false; |
---|
60 | Object[] shutdownInProgressLock = new Object[0]; // arrays are serializable |
---|
61 | final boolean isShutdownInProgress() { synchronized (shutdownInProgressLock) { return shutdownInProgress; } } |
---|
62 | final void setShutdownInProgress(boolean val) { synchronized (shutdownInProgressLock) { shutdownInProgress = val; } } |
---|
63 | |
---|
64 | int randomSeed; |
---|
65 | Thread thread; |
---|
66 | |
---|
67 | public boolean waitOnMonitor(Object monitor) |
---|
68 | { |
---|
69 | try |
---|
70 | { |
---|
71 | monitor.wait(); |
---|
72 | } |
---|
73 | catch (InterruptedException e) { return false; } |
---|
74 | return true; |
---|
75 | } |
---|
76 | |
---|
77 | public void notifyMonitor(Object monitor) |
---|
78 | { |
---|
79 | monitor.notifyAll(); |
---|
80 | } |
---|
81 | |
---|
82 | // the slaves (not really a queue) |
---|
83 | private LinkedList allSlaves = new LinkedList(); |
---|
84 | |
---|
85 | // the available slaves |
---|
86 | private LinkedList availableSlaves = new LinkedList(); |
---|
87 | |
---|
88 | // the maximum number of jobs per slave |
---|
89 | int maxJobsPerSlave; |
---|
90 | |
---|
91 | // whether the system should display information that is useful for debugging |
---|
92 | boolean showDebugInfo; |
---|
93 | |
---|
94 | final void debug(String s) |
---|
95 | { |
---|
96 | if (showDebugInfo) { System.err.println(Thread.currentThread().getName() + "->" + s); } |
---|
97 | } |
---|
98 | |
---|
99 | /** |
---|
100 | Simple constructor that initializes the data structures for keeping track of the state of each slave. |
---|
101 | The constructor receives two parameters: a boolean flag indicating whether the system should display |
---|
102 | information that is useful for debugging, and the maximum load per slave (the maximum number of jobs |
---|
103 | that a slave can be entrusted with at each time). |
---|
104 | */ |
---|
105 | public SlaveMonitor( final EvolutionState state, boolean showDebugInfo ) |
---|
106 | { |
---|
107 | this.showDebugInfo = showDebugInfo; |
---|
108 | this.state = state; |
---|
109 | |
---|
110 | int port = state.parameters.getInt( |
---|
111 | new Parameter( P_EVALMASTERPORT ),null); |
---|
112 | |
---|
113 | maxJobsPerSlave = state.parameters.getInt( |
---|
114 | new Parameter( P_MAXIMUMNUMBEROFCONCURRENTJOBSPERSLAVE ),null); |
---|
115 | |
---|
116 | useCompression = state.parameters.getBoolean(new Parameter(P_EVALCOMPRESSION),null,false); |
---|
117 | |
---|
118 | try |
---|
119 | { |
---|
120 | servSock = new ServerSocket(port); |
---|
121 | } |
---|
122 | catch( IOException e ) |
---|
123 | { |
---|
124 | state.output.fatal("Unable to bind to port " + port + ": " + e); |
---|
125 | } |
---|
126 | |
---|
127 | randomSeed = (int)(System.currentTimeMillis()); |
---|
128 | |
---|
129 | // spawn the thread |
---|
130 | thread = new Thread(new Runnable() |
---|
131 | { |
---|
132 | public void run() |
---|
133 | { |
---|
134 | Thread.currentThread().setName("SlaveMonitor:: "); |
---|
135 | Socket slaveSock; |
---|
136 | |
---|
137 | while (!isShutdownInProgress()) |
---|
138 | { |
---|
139 | slaveSock = null; |
---|
140 | while( slaveSock==null && !isShutdownInProgress() ) |
---|
141 | { |
---|
142 | try |
---|
143 | { |
---|
144 | slaveSock = servSock.accept(); |
---|
145 | } |
---|
146 | catch( IOException e ) { slaveSock = null; } |
---|
147 | } |
---|
148 | |
---|
149 | debug(Thread.currentThread().getName() + " Slave attempts to connect." ); |
---|
150 | |
---|
151 | if( isShutdownInProgress() ) break; |
---|
152 | |
---|
153 | try |
---|
154 | { |
---|
155 | DataInputStream dataIn = null; |
---|
156 | DataOutputStream dataOut = null; |
---|
157 | InputStream tmpIn = slaveSock.getInputStream(); |
---|
158 | OutputStream tmpOut = slaveSock.getOutputStream(); |
---|
159 | if (useCompression) |
---|
160 | { |
---|
161 | /* |
---|
162 | state.output.fatal("JDK 1.5 has broken compression. For now, you must set eval.compression=false"); |
---|
163 | tmpIn = new CompressingInputStream(tmpIn); |
---|
164 | tmpOut = new CompressingOutputStream(tmpOut); |
---|
165 | */ |
---|
166 | tmpIn = Output.makeCompressingInputStream(tmpIn); |
---|
167 | tmpOut = Output.makeCompressingOutputStream(tmpOut); |
---|
168 | if (tmpIn == null || tmpOut == null) |
---|
169 | Output.initialError("You do not appear to have JZLib installed on your system, and so must set eval.compression=false. " + |
---|
170 | "To get JZLib, download from the ECJ website or from http://www.jcraft.com/jzlib/"); |
---|
171 | } |
---|
172 | |
---|
173 | dataIn = new DataInputStream(tmpIn); |
---|
174 | dataOut = new DataOutputStream(tmpOut); |
---|
175 | String slaveName = dataIn.readUTF(); |
---|
176 | |
---|
177 | dataOut.writeInt(randomSeed); |
---|
178 | randomSeed+=SEED_INCREMENT; |
---|
179 | |
---|
180 | // Write random state for eval thread to slave |
---|
181 | dataOut.flush(); |
---|
182 | |
---|
183 | registerSlave(state, slaveName, slaveSock, dataOut, dataIn); |
---|
184 | state.output.systemMessage( "Slave " + slaveName + " connected successfully." ); |
---|
185 | } |
---|
186 | catch (IOException e) { } |
---|
187 | } |
---|
188 | |
---|
189 | debug( Thread.currentThread().getName() + " The monitor is shutting down." ); |
---|
190 | } |
---|
191 | }); |
---|
192 | thread.start(); |
---|
193 | } |
---|
194 | |
---|
195 | /** |
---|
196 | Registers a new slave with the monitor. Upon registration, a slave is marked as available for jobs. |
---|
197 | */ |
---|
198 | public void registerSlave( EvolutionState state, String name, Socket socket, DataOutputStream out, DataInputStream in) |
---|
199 | { |
---|
200 | SlaveConnection newSlave = new SlaveConnection( state, name, socket, out, in, this ); |
---|
201 | |
---|
202 | synchronized(availableSlaves) |
---|
203 | { |
---|
204 | availableSlaves.addLast(newSlave); |
---|
205 | notifyMonitor(availableSlaves); |
---|
206 | } |
---|
207 | synchronized(allSlaves) |
---|
208 | { |
---|
209 | allSlaves.addLast(newSlave); |
---|
210 | notifyMonitor(allSlaves); |
---|
211 | } |
---|
212 | } |
---|
213 | |
---|
214 | /** |
---|
215 | Unregisters a dead slave from the monitor. |
---|
216 | */ |
---|
217 | public void unregisterSlave( SlaveConnection slave ) |
---|
218 | { |
---|
219 | synchronized(allSlaves) |
---|
220 | { |
---|
221 | allSlaves.remove(slave); |
---|
222 | notifyMonitor(allSlaves); |
---|
223 | } |
---|
224 | synchronized(availableSlaves) |
---|
225 | { |
---|
226 | availableSlaves.remove(slave); |
---|
227 | notifyMonitor(availableSlaves); |
---|
228 | } |
---|
229 | } |
---|
230 | |
---|
231 | /** |
---|
232 | Shuts down the slave monitor (also shuts down all slaves). |
---|
233 | */ |
---|
234 | public void shutdown() |
---|
235 | { |
---|
236 | // kill the socket socket and bring down the thread |
---|
237 | setShutdownInProgress(true); |
---|
238 | try |
---|
239 | { |
---|
240 | servSock.close(); |
---|
241 | } |
---|
242 | catch (IOException e) |
---|
243 | { |
---|
244 | } |
---|
245 | thread.interrupt(); |
---|
246 | try { thread.join(); } |
---|
247 | catch (InterruptedException e) { } |
---|
248 | |
---|
249 | // gather all the slaves |
---|
250 | |
---|
251 | synchronized(allSlaves) |
---|
252 | { |
---|
253 | while( !allSlaves.isEmpty() ) |
---|
254 | { |
---|
255 | ((SlaveConnection)(allSlaves.removeFirst())).shutdown(state); |
---|
256 | } |
---|
257 | notifyMonitor(allSlaves); |
---|
258 | } |
---|
259 | } |
---|
260 | |
---|
261 | /** |
---|
262 | Schedules a job for execution on one of the available slaves. The monitor waits until at least one |
---|
263 | slave is available to perform the job. |
---|
264 | */ |
---|
265 | public void scheduleJobForEvaluation( final EvolutionState state, Job job ) |
---|
266 | { |
---|
267 | if (isShutdownInProgress()) return; // no more jobs allowed. This line rejects requests from slaveConnections when THEY'RE shutting down. |
---|
268 | |
---|
269 | SlaveConnection result = null; |
---|
270 | synchronized(availableSlaves) |
---|
271 | { |
---|
272 | while( true) |
---|
273 | { |
---|
274 | if (!availableSlaves.isEmpty()) |
---|
275 | { |
---|
276 | result = (SlaveConnection)(availableSlaves.removeFirst()); |
---|
277 | break; |
---|
278 | } |
---|
279 | debug("Waiting for an available slave." ); |
---|
280 | waitOnMonitor(availableSlaves); |
---|
281 | } |
---|
282 | notifyMonitor(availableSlaves); |
---|
283 | } |
---|
284 | debug( "Got a slave available for work." ); |
---|
285 | |
---|
286 | result.scheduleJob(job); |
---|
287 | |
---|
288 | if( result.numJobs() < maxJobsPerSlave ) |
---|
289 | { |
---|
290 | synchronized(availableSlaves) |
---|
291 | { |
---|
292 | if( !availableSlaves.contains(result)) availableSlaves.addLast(result); // so we're round-robin |
---|
293 | notifyMonitor(availableSlaves); |
---|
294 | } |
---|
295 | } |
---|
296 | } |
---|
297 | |
---|
298 | /** |
---|
299 | This method returns only when all slaves have finished the jobs that they were assigned. While this method waits, |
---|
300 | new jobs can be assigned to the slaves. This method is usually invoked from MasterProblem.finishEvaluating. You |
---|
301 | should not abuse using this method: if there are two evaluation threads, where one of them waits until all jobs are |
---|
302 | finished, while the second evaluation thread keeps posting jobs to the slaves, the first thread might have to wait |
---|
303 | until the second thread has had all its jobs finished. |
---|
304 | */ |
---|
305 | public void waitForAllSlavesToFinishEvaluating( final EvolutionState state ) |
---|
306 | { |
---|
307 | synchronized(allSlaves) |
---|
308 | { |
---|
309 | Iterator iter = allSlaves.iterator(); |
---|
310 | while( iter.hasNext() ) |
---|
311 | { |
---|
312 | SlaveConnection slaveConnection = (SlaveConnection)(iter.next()); |
---|
313 | try { slaveConnection.dataOut.flush(); } catch (java.io.IOException e) {} // we'll catch this error later.... |
---|
314 | } |
---|
315 | notifyMonitor(allSlaves); |
---|
316 | } |
---|
317 | |
---|
318 | boolean shouldCycle = true; |
---|
319 | synchronized(allSlaves) |
---|
320 | { |
---|
321 | while( shouldCycle ) |
---|
322 | { |
---|
323 | shouldCycle = false; |
---|
324 | Iterator iter = allSlaves.iterator(); |
---|
325 | while( iter.hasNext() ) |
---|
326 | { |
---|
327 | SlaveConnection slaveConnection = (SlaveConnection)(iter.next()); |
---|
328 | int jobs = slaveConnection.numJobs(); |
---|
329 | if( jobs != 0 ) |
---|
330 | { |
---|
331 | debug("Slave " + slaveConnection + " has " + jobs + " more jobs to finish." ); |
---|
332 | shouldCycle = true; |
---|
333 | break; |
---|
334 | } |
---|
335 | } |
---|
336 | if( shouldCycle ) |
---|
337 | { |
---|
338 | debug("Waiting for slaves to finish their jobs." ); |
---|
339 | waitOnMonitor(allSlaves); |
---|
340 | debug("At least one job has been finished." ); |
---|
341 | } |
---|
342 | } |
---|
343 | notifyMonitor(allSlaves); |
---|
344 | } |
---|
345 | debug("All slaves have finished their jobs." ); |
---|
346 | } |
---|
347 | |
---|
348 | /** |
---|
349 | Notifies the monitor that the particular slave has finished performing a job, and it (probably) is |
---|
350 | available for other jobs. |
---|
351 | */ |
---|
352 | void notifySlaveAvailability( SlaveConnection slave, final Job job, EvolutionState state ) |
---|
353 | { |
---|
354 | // first announce that a slave in allSlaves has finished, so people blocked on waitForAllSlavesToFinishEvaluating |
---|
355 | // can wake up and realize it. |
---|
356 | |
---|
357 | synchronized(allSlaves) |
---|
358 | { |
---|
359 | notifyMonitor(allSlaves); |
---|
360 | } |
---|
361 | |
---|
362 | // now announce that we've got a new available slave if someone wants it |
---|
363 | |
---|
364 | if( slave.numJobs() < maxJobsPerSlave ) |
---|
365 | { |
---|
366 | synchronized(availableSlaves) |
---|
367 | { |
---|
368 | if( !availableSlaves.contains(slave)) availableSlaves.addLast(slave); |
---|
369 | notifyMonitor(availableSlaves); |
---|
370 | } |
---|
371 | } |
---|
372 | |
---|
373 | debug("Notify the monitor that the slave is available." ); |
---|
374 | |
---|
375 | // now announce that we've got a new completed individual if someone is waiting for it |
---|
376 | |
---|
377 | if( state instanceof ec.steadystate.SteadyStateEvolutionState ) |
---|
378 | { |
---|
379 | // Perhaps we should the individuals by fitness first, so the fitter ones show up later |
---|
380 | // and don't get immediately wiped out by less fit ones. Or should it be the other way |
---|
381 | // around? We might revisit that in the future. |
---|
382 | |
---|
383 | // At any rate, add ALL the individuals that came back to the evaluatedIndividuals LinkedList |
---|
384 | synchronized(evaluatedIndividuals) |
---|
385 | { |
---|
386 | for(int x=0; x<job.inds.length;x++) |
---|
387 | evaluatedIndividuals.addLast( new QueueIndividual(job.inds[x], job.subPops[x]) ); |
---|
388 | notifyMonitor(evaluatedIndividuals); |
---|
389 | } |
---|
390 | } |
---|
391 | } |
---|
392 | |
---|
393 | LinkedList evaluatedIndividuals = new LinkedList(); |
---|
394 | |
---|
395 | public boolean evaluatedIndividualAvailable() |
---|
396 | { |
---|
397 | synchronized(evaluatedIndividuals) |
---|
398 | { |
---|
399 | try { evaluatedIndividuals.getFirst(); return true; } |
---|
400 | catch (NoSuchElementException e) { return false; } |
---|
401 | } |
---|
402 | } |
---|
403 | |
---|
404 | |
---|
405 | /** Blocks until an individual comes available */ |
---|
406 | public QueueIndividual waitForIndividual() |
---|
407 | { |
---|
408 | while(true) |
---|
409 | { |
---|
410 | synchronized(evaluatedIndividuals) |
---|
411 | { |
---|
412 | if (evaluatedIndividualAvailable()) |
---|
413 | return (QueueIndividual)(evaluatedIndividuals.removeFirst()); |
---|
414 | |
---|
415 | debug("Waiting for individual to be evaluated." ); |
---|
416 | waitOnMonitor(evaluatedIndividuals); // lets go of evaluatedIndividuals loc |
---|
417 | debug("At least one individual has been finished." ); |
---|
418 | } |
---|
419 | } |
---|
420 | } |
---|
421 | |
---|
422 | /** Returns the number of available slave (not busy) */ |
---|
423 | int numAvailableSlaves() |
---|
424 | { |
---|
425 | int i = 0; |
---|
426 | synchronized(availableSlaves) { i = availableSlaves.size(); } |
---|
427 | return i; |
---|
428 | } |
---|
429 | |
---|
430 | /** |
---|
431 | * @param s checkpoint file output stream |
---|
432 | * @throws IOException |
---|
433 | */ |
---|
434 | private void writeObject(ObjectOutputStream out) throws IOException |
---|
435 | { |
---|
436 | state.output.fatal("Not implemented yet: SlaveMonitor.writeObject"); |
---|
437 | } |
---|
438 | |
---|
439 | /** |
---|
440 | * @param s checkpoint file input stream. |
---|
441 | * @throws IOException |
---|
442 | * @throws ClassNotFoundException |
---|
443 | */ |
---|
444 | private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException |
---|
445 | { |
---|
446 | state.output.fatal("Not implemented yet: SlaveMonitor.readObject"); |
---|
447 | } |
---|
448 | } |
---|