Free cookie consent management tool by TermsFeed Policy Generator

source: branches/OKBJavaConnector/ECJClient/src/ec/eval/SlaveConnection.java @ 8614

Last change on this file since 8614 was 6152, checked in by bfarka, 14 years ago

added ecj and custom statistics to communicate with the okb services #1441

File size: 12.5 KB
RevLine 
[6152]1/*
2  Copyright 2006 by Sean Luke and George Mason University
3  Licensed under the Academic Free License version 3.0
4  See the file "LICENSE" for more information
5*/
6
7
8package ec.eval;
9
10import java.io.*;
11import java.net.*;
12import java.util.LinkedList;
13import ec.*;
14import java.util.*;
15
16/**
17 * SlaveConnection.java
18 *
19
20 This class contains certain information associated with a slave: its name, connection socket,
21 input and output streams, and the job queue.  Additionally, the class sets up an auxillary thread
22 which reads and writes to the streams to talk to the slave in the background.  This thread uses
23 the SlaveMonitor as its synchronization point (it sleeps with wait() and wakes up when notified()
24 to do some work).
25 
26 <P>Generally SlaveConnection is only seen by communicates only with SlaveMonitor.
27
28 * @author Liviu Panait, Keith Sullivan, and Sean Luke
29 * @version 2.0
30 */
31
32class SlaveConnection
33    {
34    /** Name of the slave process */
35    String slaveName;
36       
37    /**  Socket for communication with the slave process */
38    Socket evalSocket;
39       
40    /**  Used to transmit data to the slave. */
41    DataOutputStream dataOut;
42       
43    /**  Used to read results and randoms state from slave. */
44    public DataInputStream dataIn;
45       
46    // a pointer to the evolution state
47    EvolutionState state;
48
49    // a pointer to the monitor
50    SlaveMonitor slaveMonitor;
51
52    // a pointer to the worker thread that is working for this slave
53    Thread reader;
54    Thread writer;
55
56    // given that we expect the slave to return the evaluated individuals in the exact same order,
57    // the jobs need to be represented as a queue.
58    private LinkedList jobs = new LinkedList();
59
60    /**
61       The constructor also creates the queue storing the jobs that the slave
62       has been asked to evaluate.  It also creates and launches the worker
63       thread that is communicating with the remote slave to read back the results
64       of the evaluations.
65    */
66    public SlaveConnection( EvolutionState state,
67        String slaveName,
68        Socket evalSocket,
69        DataOutputStream dataOut,
70        DataInputStream dataIn,
71        SlaveMonitor slaveMonitor )
72        {
73        this.slaveName = slaveName;
74        this.evalSocket = evalSocket;
75        this.dataOut = dataOut;
76        this.dataIn = dataIn;
77        this.state = state;
78        this.slaveMonitor = slaveMonitor;
79        buildThreads();
80        showDebugInfo = slaveMonitor.showDebugInfo;
81        }
82       
83    /**
84       This method is called whenever there are any communication problems with the slave
85       (indicating possibly that the slave might have crashed).  In this case, the jobs will
86       be rescheduled for evaluation on other slaves.
87    */
88    boolean shuttingDown;
89    Object shutDownLock = new int[0];  // serializable and lockable
90    protected void shutdown( final EvolutionState state )
91        {
92        // prevent me from hitting this multiple times
93        synchronized(shutDownLock) { if (shuttingDown) return; else shuttingDown = true; }
94       
95        // don't want to miss any of these so we'll wrap them individually
96        try { dataOut.writeByte(Slave.V_SHUTDOWN); } catch (Exception e) { }  // exception, not IOException, because JZLib throws some array exceptions
97        try { dataOut.flush(); } catch (Exception e) { }
98        try { dataOut.close(); } catch (Exception e) { }
99        try { dataIn.close(); } catch (Exception e) { }
100        try { evalSocket.close(); } catch (IOException e) { }
101
102        state.output.systemMessage( SlaveConnection.this.toString() + " Slave is shutting down...." );
103        slaveMonitor.unregisterSlave(this);  // unregister me BEFORE I reschedule my jobs
104        rescheduleJobs(state);
105        synchronized(jobs)
106            {
107            // notify my threads now that I've closed stuff in case they're still waiting
108            slaveMonitor.notifyMonitor(jobs);
109            reader.interrupt();  // not important right now but...
110            writer.interrupt();  // very important that we be INSIDE the jobs synchronization here so the writer doesn't try to wait on the monitor again.
111            }
112        state.output.systemMessage( SlaveConnection.this.toString() + " Slave exits...." );
113        }
114
115    public String toString() { return "Slave(" + slaveName + ")"; }
116
117    boolean showDebugInfo;
118       
119    final void debug(String s)
120        {
121        if (showDebugInfo) { System.err.println(Thread.currentThread().getName() + "->" + s); }
122        }
123   
124    /**
125       Returns the number of jobs that a slave is in charge of.
126    */
127    public int numJobs()
128        {
129        synchronized(jobs) { return jobs.size(); }
130        }
131   
132    // constructs the worker thread for the slave and starts it
133    void buildThreads()
134        {
135        reader = new Thread()
136            {
137            public void run()
138                {
139                while( readLoop());
140                }
141            };
142        writer = new Thread()
143            {
144            public void run()
145                {
146                while( writeLoop());
147                }
148            };
149        writer.start();
150        reader.start();
151        }
152   
153   
154    // returns the oldest unsent job, or null if there is no unsent job.
155    // marks the job as sent so we don't try to grab it next time
156    // NOT SYNCHRONIZED -- YOU MUST SYNCHRONIZE ON jobs!
157    Job oldestUnsentJob()
158        {
159        // jobs are loaded into the queue from the back and go to the front.
160        // so the oldest jobs are in the front and we should search starting
161        // at the front.  List iterators go from front to back, so we can iterate
162        // starting with the oldest.
163       
164        // This all could have been O(1) if we had used two queues, but we're being
165        // intentionally lazy to keep this from getting to complex.
166        Iterator i = jobs.iterator();
167        while(i.hasNext())
168            {
169            Job job = (Job)(i.next());
170            if (!job.sent) { job.sent = true; return job; }
171            }
172        return null;
173        }
174       
175   
176    boolean writeLoop()
177        {
178        Job job = null;
179       
180        try
181            {
182            synchronized(jobs)
183                {
184                // check for an unsent job
185                if ((job = oldestUnsentJob()) == null)  // automatically marks as sent
186                    {
187                    // failed -- wait and drop out of the loop and come in again
188                    debug("" + Thread.currentThread().getName() + "Waiting for a job to send" );
189                    slaveMonitor.waitOnMonitor(jobs);
190                    }
191                }
192            if (job != null)  // we got a job inside our synchronized wait
193                {
194                // send the job
195                debug("" + Thread.currentThread().getName() + "Sending Job");
196                if( job.type == Slave.V_EVALUATESIMPLE )
197                    {
198                    // Tell the server we're evaluating a SimpleProblemForm
199                    dataOut.writeByte(Slave.V_EVALUATESIMPLE);
200                    }
201                else
202                    {
203                    // Tell the server we're evaluating a GroupedProblemForm
204                    dataOut.writeByte(Slave.V_EVALUATEGROUPED);
205                                       
206                    // Tell the server whether to count victories only or not.
207                    dataOut.writeBoolean(job.countVictoriesOnly);
208                    }
209                               
210                // transmit number of individuals
211                dataOut.writeInt(job.inds.length);
212                           
213                // Transmit the subpopulations to the slave
214                for(int x=0;x<job.subPops.length;x++)
215                    dataOut.writeInt(job.subPops[x]);
216                           
217                debug("Starting to transmit individuals");
218                           
219                // Transmit the individuals to the server for evaluation...
220                for(int i=0;i<job.inds.length;i++)
221                    {
222                    job.inds[i].writeIndividual(state, dataOut);
223                    dataOut.writeBoolean(job.updateFitness[i]);
224                    }
225                dataOut.flush();
226                }
227            }
228        catch (Exception e)  { shutdown(state); return false; }
229        return true;
230        }
231       
232       
233       
234       
235       
236    boolean readLoop()
237        {
238        Job job = null;
239       
240        try
241            {
242            // block on an incoming job
243            byte val = dataIn.readByte();
244            debug(SlaveConnection.this.toString() + " Incoming Job");
245           
246            // okay, we've got a job.  Grab the earliest job, that's what's coming in
247           
248            synchronized(jobs)
249                {
250                job = (Job)(jobs.getFirst());
251                }
252            debug("Got job: " + job);
253           
254           
255            ///// NEXT STEP: COPY THE INDIVIDUALS FORWARD INTO NEWINDS.
256            ///// WE DO THIS SO WE CAN LOAD THE INDIVIDUALS BACK INTO NEWINDS
257            ///// AND THEN COPY THEM BACK INTO THE ORIGINAL INDS, BECAUSE ECJ
258            ///// DOESN'T HAVE A COPY(INDIVIDUAL,INTO_INDIVIDUAL) FUNCTION
259           
260            job.copyIndividualsForward();
261
262            // now start reading.  Remember that we've already got a byte.
263           
264            for(int i = 0; i < job.newinds.length; i++)
265                {
266                debug(SlaveConnection.this.toString() + " Individual# " + i);
267                debug(SlaveConnection.this.toString() + " Reading Byte" );
268                if (i > 0) val = dataIn.readByte();  // otherwise we've got it already
269                debug(SlaveConnection.this.toString() + " Reading Individual" );
270                if (val == Slave.V_INDIVIDUAL)
271                    {
272                    job.newinds[i].readIndividual(state, dataIn);
273                    }
274                else if (val == Slave.V_FITNESS)
275                    {
276                    job.newinds[i].evaluated = dataIn.readBoolean();
277                    job.newinds[i].fitness.readFitness(state,dataIn);
278                    }
279                else if (val == Slave.V_NOTHING)
280                    {
281                    // do nothing
282                    }
283                debug( SlaveConnection.this.toString() + " Read Individual" );
284                }
285
286
287            ///// NEXT STEP: COPY THE NEWLY-READ INDIVIDUALS BACK INTO THE ORIGINAL
288            ///// INDIVIDUALS.  THIS IS QUITE A HACK, IF YOU READ JOB.JAVA
289
290            // Now we have all the individuals in so we're good.  Copy them back into the original individuals
291            job.copyIndividualsBack(state);
292
293
294            ///// LAST STEP: LET OTHERS KNOW WE'RE DONE AND AVAILABLE FOR ANOTHER JOB
295
296            // we're all done!  Yank the job from the queue so others think we're available
297            synchronized(jobs)
298                {
299                jobs.removeFirst();
300                }
301       
302            // And let the slave monitor we just finished a job
303            slaveMonitor.notifySlaveAvailability( SlaveConnection.this, job, state );
304
305            }
306        catch (IOException e)
307            {
308            shutdown(state);  // will redistribute jobs
309            return false;
310            }
311
312        return true;
313        }
314
315   
316
317
318    /**
319       Adds a new jobs to the queue.  This implies that the slave will be in charge of executing
320       this particular job.
321    */
322    public void scheduleJob( final Job job )
323        {
324        synchronized(jobs)
325            {
326            if (job.sent) // just in case
327                state.output.fatal("Tried to reschedule an existing job");
328            jobs.addLast(job);
329            slaveMonitor.notifyMonitor(jobs);
330            }
331        }
332
333    /**
334       Reschedules the jobs in this job queue to other slaves in the system.  It assumes that the slave associated
335       with this queue has already been removed from the available slaves, such that it is not assigned its own jobs.
336    */
337    // only called when we're shutting down, so we're not waiting for any notification.
338    void rescheduleJobs( final EvolutionState state )
339        {
340        while( true )
341            {
342            Job job = null;
343            synchronized(jobs)
344                {
345                if( jobs.isEmpty() ) { return; }
346                job = (Job)(jobs.removeFirst());
347                }
348            debug(Thread.currentThread().getName() + " Waiting for a slave to reschedule the evaluation.");
349            job.sent = false;  // reuse
350            slaveMonitor.scheduleJobForEvaluation(state,job);
351            debug(Thread.currentThread().getName() + " Got a slave to reschedule the evaluation.");
352            }
353        }
354    }
355
356
Note: See TracBrowser for help on using the repository browser.