Free cookie consent management tool by TermsFeed Policy Generator

source: branches/OKBJavaConnector/ECJClient/src/ec/eval/SlaveMonitor.java @ 10216

Last change on this file since 10216 was 6152, checked in by bfarka, 14 years ago

added ecj and custom statistics to communicate with the okb services #1441

File size: 16.3 KB
Line 
1/*
2  Copyright 2006 by Sean Luke and George Mason University
3  Licensed under the Academic Free License version 3.0
4  See the file "LICENSE" for more information
5*/
6
7
8package ec.eval;
9
10import ec.*;
11
12import java.io.*;
13import java.util.*;
14import java.net.*;
15import ec.util.*;
16import ec.steadystate.SteadyStateEvolutionState;
17import ec.steadystate.QueueIndividual;
18
19/**
20 * SlaveMonitor.java
21 *
22
23 <P>The SlaveMonitor manages slave connections to each remote slave, and provides synchronization facilities
24 for the slave connections and for various other objects waiting to be notified when new slaves are
25 available, space is available in a slave's job queue, an individual has been completed, etc.
26
27 <p>The monitor provides functions to create and delete slaves (registerSlave(), unregisterSlave()),
28 schedule a job for evaluation (scheduleJobForEvaluation(...)), block until all jobs have completed
29 (waitForAllSlavesToFinishEvaluating(...)), test if any individual in a job has been finished
30 (evaluatedIndividualAvailable()),  and block until an individual in a job is available and returned
31 (waitForindividual()).
32 
33 <p>Generally speaking, the SlaveMonitor owns the SlaveConnections -- no one else
34 should speak to them.  Also generally speaking, only MasterProblems create and speak to the SlaveMonitor.
35 
36 * @author Sean Luke, Liviu Panait, and Keith Sullivan
37 * @version 1.0
38 */
39
40public class SlaveMonitor
41    {
42    public static final String P_EVALMASTERPORT = "eval.master.port";
43    public static final String P_EVALCOMPRESSION = "eval.compression";
44    public static final String P_MAXIMUMNUMBEROFCONCURRENTJOBSPERSLAVE = "eval.masterproblem.max-jobs-per-slave";
45    public static final int SEED_INCREMENT = 7919; // a large value (prime for fun) bigger than expected number of threads per slave
46
47    public EvolutionState state;
48   
49    /**
50     *  The socket where slaves connect.
51     */
52    public ServerSocket servSock;
53       
54    /**
55     * Indicates whether compression is used over the socket IO streams.
56     */
57    public boolean useCompression;
58
59    boolean shutdownInProgress = false;
60    Object[] shutdownInProgressLock = new Object[0];  // arrays are serializable
61    final boolean isShutdownInProgress() { synchronized (shutdownInProgressLock) { return shutdownInProgress; } }
62    final void setShutdownInProgress(boolean val) { synchronized (shutdownInProgressLock) { shutdownInProgress = val; } }
63   
64    int randomSeed;
65    Thread thread;
66
67    public boolean waitOnMonitor(Object monitor)
68        {
69        try
70            {
71            monitor.wait();
72            }
73        catch (InterruptedException e) { return false; }
74        return true;
75        }
76
77    public void notifyMonitor(Object monitor)
78        {
79        monitor.notifyAll();
80        }
81
82    // the slaves (not really a queue)
83    private LinkedList allSlaves = new LinkedList();
84
85    // the available slaves
86    private LinkedList availableSlaves = new LinkedList();
87
88    // the maximum number of jobs per slave
89    int maxJobsPerSlave;
90
91    // whether the system should display information that is useful for debugging
92    boolean showDebugInfo;
93   
94    final void debug(String s)
95        {
96        if (showDebugInfo) { System.err.println(Thread.currentThread().getName() + "->" + s); }
97        }
98   
99    /**
100       Simple constructor that initializes the data structures for keeping track of the state of each slave.
101       The constructor receives two parameters: a boolean flag indicating whether the system should display
102       information that is useful for debugging, and the maximum load per slave (the maximum number of jobs
103       that a slave can be entrusted with at each time).
104    */
105    public SlaveMonitor( final EvolutionState state, boolean showDebugInfo )
106        {
107        this.showDebugInfo = showDebugInfo;
108        this.state = state;
109               
110        int port = state.parameters.getInt(
111            new Parameter( P_EVALMASTERPORT ),null);
112               
113        maxJobsPerSlave = state.parameters.getInt(
114            new Parameter( P_MAXIMUMNUMBEROFCONCURRENTJOBSPERSLAVE ),null);
115
116        useCompression = state.parameters.getBoolean(new Parameter(P_EVALCOMPRESSION),null,false);
117               
118        try
119            {
120            servSock = new ServerSocket(port);
121            }
122        catch( IOException e )
123            {
124            state.output.fatal("Unable to bind to port " + port + ": " + e);
125            }
126               
127        randomSeed = (int)(System.currentTimeMillis());
128
129        // spawn the thread
130        thread = new Thread(new Runnable()
131            {
132            public void run()
133                {
134                Thread.currentThread().setName("SlaveMonitor::    ");
135                Socket slaveSock;
136                       
137                while (!isShutdownInProgress())
138                    {
139                    slaveSock = null;
140                    while( slaveSock==null && !isShutdownInProgress() )
141                        {
142                        try
143                            {
144                            slaveSock = servSock.accept();
145                            }
146                        catch( IOException e ) { slaveSock = null; }
147                        }
148
149                    debug(Thread.currentThread().getName() + " Slave attempts to connect." );
150
151                    if( isShutdownInProgress() ) break;
152
153                    try
154                        {
155                        DataInputStream dataIn = null;
156                        DataOutputStream dataOut = null;
157                        InputStream tmpIn = slaveSock.getInputStream();
158                        OutputStream tmpOut = slaveSock.getOutputStream();
159                        if (useCompression)
160                            {
161                            /*
162                              state.output.fatal("JDK 1.5 has broken compression.  For now, you must set eval.compression=false");
163                              tmpIn = new CompressingInputStream(tmpIn);
164                              tmpOut = new CompressingOutputStream(tmpOut);
165                            */
166                            tmpIn = Output.makeCompressingInputStream(tmpIn);
167                            tmpOut = Output.makeCompressingOutputStream(tmpOut);
168                            if (tmpIn == null || tmpOut == null)
169                                Output.initialError("You do not appear to have JZLib installed on your system, and so must set eval.compression=false. " +
170                                    "To get JZLib, download from the ECJ website or from http://www.jcraft.com/jzlib/");
171                            }
172                                                                                                       
173                        dataIn = new DataInputStream(tmpIn);
174                        dataOut = new DataOutputStream(tmpOut);
175                        String slaveName = dataIn.readUTF();
176
177                        dataOut.writeInt(randomSeed);
178                        randomSeed+=SEED_INCREMENT;
179                       
180                        // Write random state for eval thread to slave
181                        dataOut.flush();
182
183                        registerSlave(state, slaveName, slaveSock, dataOut, dataIn);
184                        state.output.systemMessage( "Slave " + slaveName + " connected successfully." );
185                        }
186                    catch (IOException e) {  }
187                    }
188
189                debug( Thread.currentThread().getName() + " The monitor is shutting down." );
190                }
191            });
192        thread.start();
193        }
194
195    /**
196       Registers a new slave with the monitor.  Upon registration, a slave is marked as available for jobs.
197    */
198    public void registerSlave( EvolutionState state, String name, Socket socket, DataOutputStream out, DataInputStream in)
199        {
200        SlaveConnection newSlave = new SlaveConnection( state, name, socket, out, in, this );
201       
202        synchronized(availableSlaves)
203            {
204            availableSlaves.addLast(newSlave);
205            notifyMonitor(availableSlaves);
206            }
207        synchronized(allSlaves)
208            {
209            allSlaves.addLast(newSlave);
210            notifyMonitor(allSlaves);
211            }
212        }
213
214    /**
215       Unregisters a dead slave from the monitor.
216    */
217    public void unregisterSlave( SlaveConnection slave )
218        {
219        synchronized(allSlaves)
220            {
221            allSlaves.remove(slave);
222            notifyMonitor(allSlaves);
223            }
224        synchronized(availableSlaves)
225            {
226            availableSlaves.remove(slave);
227            notifyMonitor(availableSlaves);
228            }
229        }
230
231    /**
232       Shuts down the slave monitor (also shuts down all slaves).
233    */
234    public void shutdown()
235        {
236        // kill the socket socket and bring down the thread
237        setShutdownInProgress(true);
238        try
239            {
240            servSock.close();
241            }
242        catch (IOException e)
243            {
244            }
245        thread.interrupt();
246        try { thread.join(); }
247        catch (InterruptedException e) { }
248       
249        // gather all the slaves
250       
251        synchronized(allSlaves)
252            {
253            while( !allSlaves.isEmpty() )
254                {
255                ((SlaveConnection)(allSlaves.removeFirst())).shutdown(state);
256                }
257            notifyMonitor(allSlaves);
258            }
259        }
260
261    /**
262       Schedules a job for execution on one of the available slaves.  The monitor waits until at least one
263       slave is available to perform the job.
264    */
265    public void scheduleJobForEvaluation( final EvolutionState state, Job job )
266        {
267        if (isShutdownInProgress()) return;  // no more jobs allowed.  This line rejects requests from slaveConnections when THEY'RE shutting down.
268       
269        SlaveConnection result = null;
270        synchronized(availableSlaves)
271            {
272            while( true)
273                {
274                if (!availableSlaves.isEmpty())
275                    {
276                    result = (SlaveConnection)(availableSlaves.removeFirst());
277                    break;
278                    }
279                debug("Waiting for an available slave." );
280                waitOnMonitor(availableSlaves);
281                }
282            notifyMonitor(availableSlaves);
283            }       
284        debug( "Got a slave available for work." );
285
286        result.scheduleJob(job);
287
288        if( result.numJobs() < maxJobsPerSlave )
289            {
290            synchronized(availableSlaves)
291                {
292                if( !availableSlaves.contains(result)) availableSlaves.addLast(result);  // so we're round-robin
293                notifyMonitor(availableSlaves);
294                }
295            }
296        }
297
298    /**
299       This method returns only when all slaves have finished the jobs that they were assigned.  While this method waits,
300       new jobs can be assigned to the slaves.  This method is usually invoked from MasterProblem.finishEvaluating.  You
301       should not abuse using this method: if there are two evaluation threads, where one of them waits until all jobs are
302       finished, while the second evaluation thread keeps posting jobs to the slaves, the first thread might have to wait
303       until the second thread has had all its jobs finished.
304    */
305    public void waitForAllSlavesToFinishEvaluating( final EvolutionState state )
306        {
307        synchronized(allSlaves)
308            {
309            Iterator iter = allSlaves.iterator();
310            while( iter.hasNext() )
311                {
312                SlaveConnection slaveConnection = (SlaveConnection)(iter.next());
313                try { slaveConnection.dataOut.flush(); } catch (java.io.IOException e) {} // we'll catch this error later....
314                }
315            notifyMonitor(allSlaves);
316            }
317           
318        boolean shouldCycle = true;
319        synchronized(allSlaves)
320            {
321            while( shouldCycle )
322                {
323                shouldCycle = false;
324                Iterator iter = allSlaves.iterator();
325                while( iter.hasNext() )
326                    {
327                    SlaveConnection slaveConnection = (SlaveConnection)(iter.next());
328                    int jobs = slaveConnection.numJobs();
329                    if( jobs != 0 )
330                        {
331                        debug("Slave " + slaveConnection + " has " + jobs + " more jobs to finish." );
332                        shouldCycle = true;
333                        break;
334                        }                               
335                    }
336                if( shouldCycle )
337                    {
338                    debug("Waiting for slaves to finish their jobs." );
339                    waitOnMonitor(allSlaves);
340                    debug("At least one job has been finished." );
341                    }
342                }
343            notifyMonitor(allSlaves);
344            }
345        debug("All slaves have finished their jobs." );
346        }
347
348    /**
349       Notifies the monitor that the particular slave has finished performing a job, and it (probably) is
350       available for other jobs.
351    */
352    void notifySlaveAvailability( SlaveConnection slave, final Job job, EvolutionState state )
353        {
354        // first announce that a slave in allSlaves has finished, so people blocked on waitForAllSlavesToFinishEvaluating
355        // can wake up and realize it.
356       
357        synchronized(allSlaves)
358            {
359            notifyMonitor(allSlaves);
360            }
361
362        // now announce that we've got a new available slave if someone wants it
363       
364        if( slave.numJobs() < maxJobsPerSlave )
365            {
366            synchronized(availableSlaves)
367                {
368                if( !availableSlaves.contains(slave)) availableSlaves.addLast(slave);
369                notifyMonitor(availableSlaves);
370                }
371            }
372
373        debug("Notify the monitor that the slave is available." );
374
375        // now announce that we've got a new completed individual if someone is waiting for it
376
377        if( state instanceof ec.steadystate.SteadyStateEvolutionState )
378            {
379            // Perhaps we should the individuals by fitness first, so the fitter ones show up later
380            // and don't get immediately wiped out by less fit ones.  Or should it be the other way
381            // around?  We might revisit that in the future.
382           
383            // At any rate, add ALL the individuals that came back to the evaluatedIndividuals LinkedList
384            synchronized(evaluatedIndividuals)
385                {
386                for(int x=0; x<job.inds.length;x++)
387                    evaluatedIndividuals.addLast( new QueueIndividual(job.inds[x], job.subPops[x]) );
388                notifyMonitor(evaluatedIndividuals);
389                }
390            }
391        }
392
393    LinkedList evaluatedIndividuals =  new LinkedList();
394
395    public boolean evaluatedIndividualAvailable()
396        {
397        synchronized(evaluatedIndividuals)
398            {
399            try { evaluatedIndividuals.getFirst(); return true; }
400            catch (NoSuchElementException e) { return false; }
401            }
402        }
403
404
405    /** Blocks until an individual comes available */
406    public QueueIndividual waitForIndividual()
407        {
408        while(true)
409            {
410            synchronized(evaluatedIndividuals)
411                {
412                if (evaluatedIndividualAvailable())
413                    return (QueueIndividual)(evaluatedIndividuals.removeFirst());
414
415                debug("Waiting for individual to be evaluated." );
416                waitOnMonitor(evaluatedIndividuals);  // lets go of evaluatedIndividuals loc
417                debug("At least one individual has been finished." );
418                }
419            }
420        }
421
422    /** Returns the number of available slave (not busy) */
423    int numAvailableSlaves()
424        {
425        int i = 0;
426        synchronized(availableSlaves) { i = availableSlaves.size(); }
427        return i;
428        }
429
430    /**
431     * @param s checkpoint file output stream
432     * @throws IOException
433     */
434    private void writeObject(ObjectOutputStream out) throws IOException
435        {
436        state.output.fatal("Not implemented yet: SlaveMonitor.writeObject");
437        }
438       
439    /**
440     * @param s checkpoint file input stream.
441     * @throws IOException
442     * @throws ClassNotFoundException
443     */
444    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
445        {
446        state.output.fatal("Not implemented yet: SlaveMonitor.readObject");
447        }
448    }
Note: See TracBrowser for help on using the repository browser.