Free cookie consent management tool by TermsFeed Policy Generator

source: branches/OKBJavaConnector/ECJClient/src/ec/exchange/IslandExchange.java @ 7759

Last change on this file since 7759 was 6152, checked in by bfarka, 14 years ago

added ecj and custom statistics to communicate with the okb services #1441

File size: 92.8 KB
Line 
1/*
2  Copyright 2006 by Sean Luke and George Mason University
3  Licensed under the Academic Free License version 3.0
4  See the file "LICENSE" for more information
5*/
6
7
8package ec.exchange;
9import java.util.*;
10import java.io.*;
11import java.net.*;
12import ec.*;
13import ec.util.*;
14
15/*
16 * IslandExchange.java
17 *
18 * Created Sat Feb 10 13:44:11 EST 2001
19 * By: Liviu Panait and Sean Luke
20 */
21
22/**
23 * IslandExchange is an Exchanger which
24 * implements a simple but quite functional asynchronous
25 * island model for doing massive parallel distribution of evolution across
26 * beowulf clusters.  One of its really nice features is that because everything
27 * is in Java, your cluster can have mixed platforms in it (MacOS, Unix,
28 * Windoze, whatever you like).  You can also have multiple processes running
29 * on the same machine, as long as they're given different client ports.
30 * IslandExchange operates over TCP/IP with Java sockets, and is compatible
31 * with checkpointing.
32 *
33 * <p>IslandExchange uses an arbitrary graph topology for migrating individuals
34 * from island (EC process) to island over the network.  There are a few
35 * restrictions for simplicity, however:
36
37 <ul>
38 <li> Every island must have the same kind of subpopulations and species.
39 <li> Every subpopulation will send the same number of migrants as any
40 other subpopulation.
41 <li> Migrants from a subpopulation will go to the same subpopulation.
42 </ul>
43
44 * <p>Every island is a <i>client</i>.  Additionally one island is designated
45 * a <i>server</i>.  Note that, just like in the Hair Club for Men, the server
46 * is also a client.  The purpose of the server is to synchronize the clients
47 * so that they all get set up properly and hook up to each other, then to
48 * send them small signal messages (like informing them that another client has
49 * discovered the ideal individual), and help them gracefully shut down.  Other
50 * than these few signals which are routed through the server to the clients,
51 * all other information -- namely the migrants themselves -- are sent directly
52 * from client to client in a peer-to-peer fashion.
53 *
54 * <p>The topology of the network is stored solely in the server's parameter
55 * database.  When the clients fire up, they first set up "Mailboxes" (where
56 * immigrants from other clients will appear), then they go to the server
57 * and ask it who they should connect to to send migrants.  The server tells
58 * them, and then they then hook up.  When a client has finished hooking up, it
59 * reports this to the server.  After everyone has hooked up, the server tells
60 * the clients to begin evolution, and they're off and running.
61 *
62 * <p>Islands send emigrants to other islands by copying good individuals
63 * (selected with a SelectionMethod) and sending the good individuals to
64 * the mailboxes of receiving clients.  Once an individual has been received,
65 * it is considered to be unevaluated by the receiving island, even though
66 * it had been previously evaluated by the sending island.
67 *
68 * <p>The IslandExchange model is typically <i>asynchronous</i> because migrants may
69 * appear in your mailbox at any time; islands do not wait for each other
70 * to complete the next generation.  This is a more efficient usage of network
71 * bandwidth.  When an island completes its breeding, it looks inside its
72 * mailbox for new migrants.  It then replaces some of its newly-bred
73 * individuals (chosen entirely at random)
74 * with the migrants (we could have increased the population size so we didn't
75 * waste that breeding time, but we were lazy).  It then flushes the mailbox,
76 * which patiently sits waiting for more individuals.
77 *
78 * <p>Clients may also be given different start times and modulos for
79 * migrating.  For example, client A might be told that he begins sending emigrants
80 * only after generation 6, and then sends emigrants on every 4 generations beyond
81 * that.  The purpose for the start times and modulos is so that not every client
82 * sends emigrants at the same time; this also makes better use of network bandwidth.
83 *
84 * <p>When a client goes down, the other clients deal with it gracefully; they
85 * simply stop trying to send to it.  But if the server goes down, the clients
86 * do not continue operation; they will shut themselves down.  This means that in
87 * general you can shut down an entire island model network just by killing the
88 * server process.  However, if the server quits because it runs out of generations,
89 * it will wait for the clients to all quit before it finally stops.
90 *
91 * <p>IslandExchange works correctly with checkpointing.  If you restart from
92 * a checkpoint, the IslandExchange will start up the clients and servers again
93 * and reconnect.  Processes can start from different checkpoints, of course.
94 * However, realize that if you restart from a checkpoint, some migrants
95 * may have been lost in transit from island to island.  That's the nature of
96 * networking without heavy-duty transaction management! This means that we
97 * cannot guarantee that restarting from checkpoint will yield the same results
98 * as the first run yielded.
99 *
100 * <p>Islands are not described in the topology parameters by their
101 * IP addresses; instead, they are described by "ids", strings which uniquely
102 * identify each island.  For example, "gilligans-island" might be an id.  :-)
103 * This allows you to move your topology to different IP addresses without having
104 * to change all your parameter files!  You can even move your topology to totally
105 * different machines, and restart from previous checkpoints, and everything
106 * should still work correctly.
107 *
108 * <p>There are times, especially to experiment with dynamics, that you need
109 * a <i>synchronous</i> island model.  If you specify synchronicity, the server's
110 * stated modulo and offset override any modulii or offsets specified by clients.
111 * Everyone will use the server's modulo and offset.  This means that everyone
112 * will trade individuals on the same generation.  Additionally, clients will wait
113 * until everyone else has traded, before they are permitted to continue evolving.
114 * This has the effect of locking all the clients together generation-wise; no
115 * clients can run faster than any other clients.
116 *
117 * <p>One last item: normally in this model, the server is also a client.  But
118 * if for some reason you need the server to be a process all by itself, without
119 * creating a client as well, you can do that.  You spawn such a server differently
120 * than the main execution of ECJ.  To spawn a server on a given server params file
121 * (let's say it's server.params) but NOT spawn a client, you do:
122 <p><pre>
123 java ec.exchange.IslandExchange -file server.params
124 </pre>
125 * <p> ...this sets up a special process which just spawns a server, and doesn't do
126 * all the setup of an evolutionary run.  Of course as usual, for each of the
127 * clients, you'll run <tt>java ec.Evolve ...</tt> instead.
128
129 <p><b>Parameters</b><br>
130 <p><i>Note:</i> some of these parameters are only necessary for creating
131 <b>clients</b>.  Others are necessary for creating the <b>server</b>.
132 <table>
133 <tr><td valign=top><tt><i>base</i>.chatty</tt><br>
134 <font size=-1>boolean, default = true</font></td>
135 <td valign=top> Should we be verbose or silent about our exchanges?
136 </td></tr>
137 <tr><td valign=top><tt><i>base</i>.select</tt><br>
138 <font size=-1>classname, inherits and != ec.SelectionMethod</font></td>
139 <td valign=top>
140 <i>client</i>: The selection method used for picking migrants to emigrate to other islands
141 </td></tr>
142 <tr><td valign=top><tt><i>base</i>.select-to-die</tt><br>
143 <font size=-1>classname, inherits and != ec.SelectionMethod, default is ec.select.RandomSelection</font></td>
144 <td valign=top>
145 <i>client</i>: The selection method used for picking individuals to be replaced by incoming migrants.
146 <b>IMPORTANT Note</b>.  This selection method must <i>not</i> pick an individual based on fitness.
147 The selection method will be called just after breeding but <i>before</i> evaluation; many individuals
148 will not have had a fitness assigned at that point.  You might want to design a SelectionMethod
149 other than RandomSelection, however, to do things like not picking elites to die.
150 </td></tr>
151 <tr><td valign=top><tt><i>base</i>.server-addr</tt><br>
152 <font size=-1>String</font></td>
153 <td valign=top>
154 <i>client</i>: The IP address of the server
155 </td></tr>
156 <tr><td valign=top><tt><i>base</i>.server-port</tt><br>
157 <font size=-1>int >= 1</font></td>
158 <td valign=top>
159 <i>client</i>: The port number of the server
160 </td></tr>
161 <tr><td valign=top><tt><i>base</i>.client-port</tt><br>
162 <font size=-1>int >= 1</font></td>
163 <td valign=top>
164 <i>client</i>: The port number of the client (where it will receive migrants) -- this should be different from the server port.
165 </td></tr>
166 <tr><td valign=top><tt><i>base</i>.id</tt><br>
167 <font size=-1>String</font></td>
168 <td valign=top>
169 <i>client</i>: The "name" the client is giving itself.  Each client should have a unique name.  For example, "gilligans-island".
170 </td></tr>
171 <tr><td valign=top><tt><i>base</i>.compressed</tt><br>
172 <font size=-1>bool = <tt>true</tt> (default) or <tt>false</tt></font></td>
173 <td valign=top>
174 <i>client</i>: Whether the communication with other islands should be compressed or not.  Compressing uses more CPU, but it may also significantly reduce communication.
175 </td></tr>
176 <tr><td valign=top><tt><i>base</i>.i-am-server</tt><br>
177 <font size=-1>bool = <tt>true</tt> or <tt>false</tt> (default)</font></td>
178 <td valign=top>
179 <i>client</i>: Is this client also the server?  If so, it'll read the server parameters and set up a server as well.
180 </td></tr>
181 <tr><td valign=top><tt><i>base</i>.sync</tt><br>
182 <font size=-1>bool = <tt>true</tt> or <tt>false</tt> (default)</font></td>
183 <td valign=top>
184 <i>server</i>: Are we doing a synchronous island model?  If so, the server's modulo and offset override any client's stated modulo and offset.
185 </td></tr>
186 <tr><td valign=top><tt><i>base</i>.num-islands</tt><br>
187 <font size=-1>int >= 1</font></td>
188 <td valign=top>
189 <i>server</i>: The number of islands in the topology.
190 </td></tr>
191 <tr><td valign=top><tt><i>base</i>.island.<i>n</i>.id</tt><br>
192 <font size=-1>String</font></td>
193 <td valign=top>
194 <i>server</i>: The ID of island #n in the topology.
195 </td></tr>
196 <tr><td valign=top><tt><i>base</i>.island.<i>n</i>.num-mig</tt><br>
197 <font size=-1>int >= 1</font></td>
198 <td valign=top>
199 <i>server</i>: The number of islands that island #n sends emigrants to.
200 </td></tr>
201 <tr><td valign=top><tt><i>base</i>.island.<i>n</i>.mig.</tt><i>m</i><br>
202 <font size=-1>int >= 1</font></td>
203 <td valign=top>
204 <i>server</i>: The ID of island #m that island #n sends emigrants to.
205 </td></tr>
206 <tr><td valign=top><tt><i>base</i>.island.<i>n</i>.size</tt><br>
207 <font size=-1>int >= 1</font></td>
208 <td valign=top>
209 <i>server</i>: The number of emigrants (per subpopulation) that island #n sends to other islands.  If not set, uses the default parameter below.
210 </td></tr>
211 <tr><td valign=top><tt><i>base</i>.size</tt><br>
212 <font size=-1>int >= 1</font></td>
213 <td valign=top>
214 <i>server</i>: Default parameter: number of emigrants (per subpopulation) that a given island sends to other islands.
215 </td></tr>
216 <tr><td valign=top><tt><i>base</i>.island.<i>n</i>.start</tt><br>
217 <font size=-1>int >= 0</font></td>
218 <td valign=top>
219 <i>server</i>: The generation when island #n begins sending emigrants. If not set, uses the default parameter below.
220 </td></tr>
221 <tr><td valign=top><tt><i>base</i>.start</tt><br>
222 <font size=-1>bool = <tt>true</tt> or <tt>false</tt> (default)</font></td>
223 <td valign=top>
224 <i>server</i>: Default parameter: the generation when an island begins sending emigrants.
225 </td></tr>
226 <tr><td valign=top><tt><i>base</i>.island.<i>n</i>.mod</tt><br>
227 <font size=-1>int >= 1</font></td>
228 <td valign=top>
229 <i>server</i>: The number of generations that island #n waits between sending emigrants.  If not set, uses the default parameter below.
230 </td></tr>
231 <tr><td valign=top><tt><i>base</i>.mod</tt><br>
232 <font size=-1>bool = <tt>true</tt> or <tt>false</tt> (default)</font></td>
233 <td valign=top>
234 <i>server</i>: Default parameter: The number of generations an island waits between sending emigrants.
235 </td></tr>
236 <tr><td valign=top><tt><i>base</i>.island.<i>n</i>.mailbox-capacity</tt><br>
237 <font size=-1>int >= 1</font></td>
238 <td valign=top>
239 <i>server</i>: The maximum size (per subpopulation) of the mailbox for island #n.  If not set, uses the default parameter below.
240 </td></tr>
241 <tr><td valign=top><tt><i>base</i>.mailbox-capacity</tt><br>
242 <font size=-1>int >= 1</font></td>
243 <td valign=top>
244 <i>server</i>: Default parameter: the maximum size (per subpopulation) of the mailbox for a given island.
245 </td></tr>
246 </table>
247 
248 <p><b>Parameter bases</b><br>
249 <table>
250
251 <tr><td valign=top><tt><i>base</i>.select</tt></td>
252 <td>selection method for the client's migrants</td></tr>
253 </table>
254 
255 * @author Liviu Panait and Sean Luke
256 * @version 2.0
257 */
258
259public class IslandExchange extends Exchanger
260    {
261
262    //// Client information
263
264    /** The server address */
265    public static final String P_SERVER_ADDRESS = "server-addr";
266
267    /** The server port */
268    public static final String P_SERVER_PORT = IslandExchangeServer.P_SERVER_PORT;
269
270    /** The client port */
271    public static final String P_CLIENT_PORT = "client-port";
272
273    /** Whether the server is also on this island */
274    public static final String P_IS_SERVER = "i-am-server";
275   
276    /** The id of the island */
277    public static final String P_OWN_ID = "id";
278       
279    /** Whether the communication is compressed or not */
280    public static final String P_COMPRESSED_COMMUNICATION = "compressed";
281
282    /** The selection method for sending individuals to other islands */
283    public static final String P_SELECT_METHOD = "select";
284
285    /** The selection method for deciding individuals to be replaced by immigrants */
286    public static final String P_SELECT_TO_DIE_METHOD = "select-to-die";
287
288    /** How long we sleep in between attempts to connect or look for signals */
289    public static final int SLEEP_TIME = 100;
290
291    /** How long we sleep between checking for FOUND messages */
292    public static final int FOUND_TIMEOUT = 100;
293
294    /** Whether or not we're chatty */
295    public static final String P_CHATTY = "chatty";
296
297    /** Okay signal */
298    public static final String OKAY = "okay";
299
300    /** Synchronize signal */
301    public static final String SYNC = "sync";
302
303    /** Found signal */
304    public static final String FOUND = "found";
305
306    /** Our chattiness */
307    boolean chatty;
308
309    /** The thread of the server (is different than null only for the island with the server) */
310    public Thread serverThread;
311
312    /** My parameter base -- I need to keep this in order to help the server
313        reinitialize contacts */
314    // SERIALIZE
315    public Parameter base;
316   
317    /** The address of the server */
318    // SERIALIZE
319    public String serverAddress;
320
321    /** The port of the server */
322    // SERIALIZE
323    public int serverPort;
324
325    /** The port of the client mailbox */
326    // SERIALIZE
327    public int clientPort;
328
329    /** whether the server should be running on the current island or not */
330    // SERIALIZE
331    public boolean iAmServer;
332
333    /** the id of the current island */
334    // SERIALIZE
335    public String ownId;
336
337    /** whether the communication is compressed or not */
338    // SERIALIZE
339    public boolean compressedCommunication;
340
341    /** the selection method for emigrants */
342    // SERIALIZE
343    public SelectionMethod immigrantsSelectionMethod;
344
345    /** the selection method for individuals to be replaced by immigrants */
346    // SERIALIZE
347    public SelectionMethod indsToDieSelectionMethod;
348
349    // the mailbox of the current client (exchanger)
350    IslandExchangeMailbox mailbox;
351
352    // the thread of the mailbox
353    Thread mailboxThread;
354
355    /// Communication with the islands where individuals have to be sent
356    // Number of islands to send individuals to
357    int number_of_destination_islands;
358
359    /** synchronous or asynchronous communication */
360    public boolean synchronous;
361
362    /** how often to send individuals */
363    public int modulo;
364
365    /** after how many generations to start sending individuals */
366    public int offset;
367
368    /** how many individuals to send each time */
369    public int size;
370
371    // Sockets to the destination islands
372    Socket[] outSockets;
373
374    // DataOutputStream to the destination islands
375    DataOutputStream[] outWriters;
376
377    // so we can print out nice names for our outgoing connections
378    String[] outgoingIds;
379
380    // information on the availability of the different islands
381    boolean[] running;
382
383    // the capacity of the mailboxes
384//    int mailboxCapacity;
385
386    // the socket to the server
387    Socket serverSocket;
388
389    // reader and writer to the serverSocket
390    DataOutputStream toServer;
391    DataInputStream fromServer;
392
393    // am I ONLY a server?
394    static boolean just_server;
395
396    public static void main(String[] args) throws InterruptedException
397        {
398        just_server = true;
399        int x;
400        ParameterDatabase parameters=null;
401        Output output;
402        boolean store;
403       
404        // The following is a little chunk of the ec.Evolve code sufficient
405        // to get IslandExchange up and running all by itself.
406       
407        System.err.println("Island Exchange Server\n" +
408            "Used in ECJ by Sean Luke\n");
409       
410       
411        // 0. find the parameter database
412        for(x=0;x<args.length-1;x++)
413            if (args[x].equals(Evolve.A_FILE))
414                {
415                try
416                    {
417                    parameters=new ParameterDatabase(
418                        // not available in jdk1.1: new File(args[x+1]).getAbsoluteFile(),
419                        new File(new File(args[x+1]).getAbsolutePath()),
420                        args);
421                    break;
422                    }
423                catch(FileNotFoundException e)
424                    { Output.initialError(
425                            "A File Not Found Exception was generated upon" +
426                            "reading the parameter file \"" + args[x+1] +
427                            "\".\nHere it is:\n" + e); }
428                catch(IOException e)
429                    { Output.initialError(
430                            "An IO Exception was generated upon reading the" +
431                            "parameter file \"" + args[x+1] +
432                            "\".\nHere it is:\n" + e); }
433                }
434        if (parameters==null)
435            Output.initialError(
436                "No parameter file was specified." );
437
438        // 1. create the output
439        //store = (parameters.getBoolean(new Parameter(Evolve.P_STORE),null,false));
440
441        output = new Output(true);
442        // output.setFlush(
443        //    parameters.getBoolean(new Parameter(Evolve.P_FLUSH),null,false));
444
445
446        // stdout is always log #0.  stderr is always log #1.
447        // stderr accepts announcements, and both are fully verbose
448        // by default.
449        output.addLog(ec.util.Log.D_STDOUT,false);
450        output.addLog(ec.util.Log.D_STDERR,true);
451       
452       
453        // this is an ugly, ugly, ugly, UGLY HACK
454        // it will only work if we don't ask interesting things
455        // of our "EvolutionState"  :-)  you know, things like
456        // random number generators or generation numbers!
457       
458        EvolutionState myEvolutionState = new
459            EvolutionState();
460       
461        myEvolutionState.parameters = parameters;
462        myEvolutionState.output = output;
463       
464        // set me up
465        Parameter myBase = new Parameter(EvolutionState.P_EXCHANGER);
466
467        IslandExchange ie = (IslandExchange)parameters.getInstanceForParameterEq(
468            myBase, null, IslandExchange.class);
469       
470        ie.setup(myEvolutionState,myBase);
471        ie.fireUpServer(myEvolutionState,myBase);
472        ie.serverThread.join();
473       
474        // flush the output
475        output.flush();
476        System.err.flush();
477        System.out.flush();
478        System.exit(0);
479        }
480
481    // sets up the Island Exchanger
482    public void setup( final EvolutionState state, final Parameter _base )
483        {
484        base = _base;
485
486        Parameter p;
487       
488        // get the port of the server
489        p = base.push( P_SERVER_PORT );
490        serverPort = state.parameters.getInt( p, null, 1 );
491        if( serverPort == 0 )
492            state.output.fatal( "Could not get the port of the server, or it is invalid.", p );
493
494        chatty = state.parameters.getBoolean(base.push(P_CHATTY), null, true);
495
496        // by default, communication is not compressed
497        compressedCommunication = state.parameters.getBoolean(base.push(P_COMPRESSED_COMMUNICATION),null,false);
498        if( compressedCommunication )
499            {
500//            state.output.fatal("JDK 1.5 has broken compression.  For now, you must set " + base.push(P_COMPRESSED_COMMUNICATION) + "=false");
501            state.output.message( "Communication will be compressed" );
502            }
503           
504        // check whether it has to launch the main server for coordination
505        p = base.push( P_IS_SERVER );
506        iAmServer = state.parameters.getBoolean( p, null, false );
507
508        // Am I ONLY the server or not?
509        if (just_server)
510            {
511            // print out my IP address
512            try
513                {
514                state.output.message("IP ADDRESS: " + LocalHost.getLocalHost().getHostAddress());
515                }
516            catch (java.net.UnknownHostException e) { }
517            }
518        else
519            {
520            // setup the selection method
521            p = base.push( P_SELECT_METHOD );
522            immigrantsSelectionMethod = (SelectionMethod)
523                state.parameters.getInstanceForParameter( p, null, ec.SelectionMethod.class );
524            immigrantsSelectionMethod.setup( state, base );
525
526            // setup the selection method
527            p = base.push( P_SELECT_TO_DIE_METHOD );
528            if( state.parameters.exists( p, null) )
529                indsToDieSelectionMethod = (SelectionMethod)
530                    state.parameters.getInstanceForParameter( p, null, ec.SelectionMethod.class );
531            else // use RandomSelection
532                indsToDieSelectionMethod = new ec.select.RandomSelection();
533            indsToDieSelectionMethod.setup( state, base );
534
535            // get the address of the server
536            p = base.push( P_SERVER_ADDRESS );
537            serverAddress = state.parameters.getStringWithDefault( p, null, "" );
538            if( serverAddress.equalsIgnoreCase("") )
539                state.output.fatal( "Could not get the address of the server.", p );
540
541            // get the port of the client mailbox
542            p = base.push( P_CLIENT_PORT );
543            clientPort = state.parameters.getInt( p, null, 1 );
544            if( clientPort == 0 )
545                state.output.fatal( "Could not get the port of the client, or it is invalid.", p );
546
547            // get the id of the island
548            p = base.push( P_OWN_ID );
549            ownId = state.parameters.getStringWithDefault( p, null, "" );
550            if( ownId.equals("") )
551                state.output.fatal( "Could not get the Id of the island.", p );
552            }
553        }
554
555    /** Custom serialization */
556    private void writeObject(ObjectOutputStream out) throws IOException
557        {
558        // this is all we need to write out -- everything else
559        // gets recreated when we call reinitializeContacts(...) again...
560   
561        out.writeObject(base);
562        out.writeObject(serverAddress);
563        out.writeObject(ownId);
564        out.writeBoolean(compressedCommunication);
565        out.writeObject(immigrantsSelectionMethod);
566        out.writeObject(indsToDieSelectionMethod);
567        out.writeInt(serverPort);
568        out.writeInt(clientPort);
569        out.writeBoolean(iAmServer);
570        }
571
572    /** Custom serialization */
573    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
574        {
575        // this is all we need to read in -- everything else
576        // gets recreated when we call reinitializeContacts(...) again...
577   
578        base = (Parameter)(in.readObject());
579        serverAddress=(String)(in.readObject());
580        ownId=(String)(in.readObject());
581        compressedCommunication = in.readBoolean();
582        immigrantsSelectionMethod=(SelectionMethod)(in.readObject());
583        indsToDieSelectionMethod=(SelectionMethod)(in.readObject());
584        serverPort = in.readInt();
585        clientPort = in.readInt();
586        iAmServer = in.readBoolean();
587        }
588   
589   
590    /** Fires up the server. */
591
592    public void fireUpServer(EvolutionState state, Parameter serverBase)
593        {
594        IslandExchangeServer serv = new IslandExchangeServer();
595        serv.setupServerFromDatabase(state,serverBase);
596        serverThread = serv.spawnThread();
597        }
598       
599
600    /** Initializes contacts with other processes, if that's what you're doing. Called at the beginning of an evolutionary run, before a population is set up. */
601    public void initializeContacts(EvolutionState state)
602        {
603
604        // launch the server
605        if( iAmServer )
606            {
607            fireUpServer(state,base);
608            state.output.message( "Server Launched." );
609            }
610        else
611            {
612            state.output.message( "I'm just a client." );
613            }
614
615
616
617        // In this thread, *I* am the client.  I connect to the server
618        // and get the information from the server, then I connect
619        // to the clients and go through the synchronization process
620        // with the server.  Spawn the mailbox. When the server says "go", I'm done with
621        // this function.
622
623
624        /** Make our connections and hook up */
625        long l = 0;
626        try
627            {
628            // spin until we get a connection
629            state.output.message("Connecting to Server " + serverAddress + ", port " + serverPort);
630            while(true)
631                {
632                try
633                    {
634                    serverSocket = new Socket(serverAddress,serverPort);
635                    break;
636                    }
637                catch (IOException e)   // it's not up yet...
638                    {
639                    l++;
640                    try
641                        {
642                        Thread.sleep( 5000 );
643                        }
644                    catch( InterruptedException f )
645                        {
646                        state.output.message(""+f);
647                        }
648                    state.output.message("Retrying");
649                    }
650                }
651           
652            // okay, we're connected.  Send our info.
653            state.output.message("Connected to Server after " + (l * SLEEP_TIME) + " ms");
654            fromServer = new DataInputStream(serverSocket.getInputStream());
655            toServer = new DataOutputStream(serverSocket.getOutputStream());
656
657            // sending the server own contact information
658            toServer.writeUTF( ownId );
659            toServer.flush();
660
661            // Launch the mailbox thread (read from the server how many sockets to allocate
662            // on the mailbox. Obtain the port and address of the mailbox.
663            mailbox = new IslandExchangeMailbox( state, clientPort, fromServer.readInt(),
664                fromServer.readInt(), ownId, chatty, compressedCommunication );
665            mailboxThread = new Thread( mailbox );
666            mailboxThread.start();
667
668            // record that the mailbox has been created
669            state.output.message( "IslandExchangeMailbox created." );
670
671            // tell the server the address and port of the mailbox
672            try
673                {
674                toServer.writeUTF( LocalHost.getLocalHost().getHostAddress() );
675                toServer.flush();
676                state.output.message("My address is: " + LocalHost.getLocalHost().getHostAddress() );
677                }
678            catch( UnknownHostException e )
679                {
680                state.output.fatal( "Could not get the address of the local computer." );
681                }
682            toServer.writeInt( mailbox.getPort() );
683            toServer.flush();
684
685            // read from the server the modulo, offset and size it has to use.
686            // this parameters allow an extendable/modifiable version where different
687            // islands send different number of individuals (based on the size of their populations)
688            synchronous = ( fromServer.readInt() == 1 );
689            if( synchronous )
690                {
691                state.output.message( "The communication will be synchronous." );
692                }
693            else
694                {
695                state.output.message( "The communication will be asynchronous." );
696                }
697            modulo = fromServer.readInt();
698            offset = fromServer.readInt();
699            size = fromServer.readInt();
700
701            // read the number of islands it has to send messages to
702            number_of_destination_islands = fromServer.readInt();
703
704            // allocate the arrays
705            outSockets = new Socket[ number_of_destination_islands ];
706            outWriters = new DataOutputStream[ number_of_destination_islands ];
707            running = new boolean[ number_of_destination_islands ];
708            outgoingIds = new String[ number_of_destination_islands ];
709
710            // open connections to each of the destination islands
711            for( int y = 0 ; y < number_of_destination_islands ; y++ )
712                {
713                // get the address and the port
714                String address = fromServer.readUTF().trim();
715                int port = fromServer.readInt();
716                try
717                    {
718                    try
719                        {
720                        state.output.message( "Trying to connect to " + address + " : " + port );
721                        // try opening a connection
722                        outSockets[y] = new Socket( address, port );
723                        }
724                    catch( UnknownHostException e )
725                        {
726                        // gracefully handle communication errors
727                        state.output.warning( "Unknown host exception while the client was opening a socket to " + address + " : " + port );
728                        running[y] = false;
729                        continue;
730                        }
731
732                    if( compressedCommunication )
733                        {
734                        /*                       
735                                                  outWriters[y] = new DataOutputStream(new CompressingOutputStream(outSockets[y].getOutputStream()));
736                                                  // read the mailbox's id, then write my own id
737                                                  outgoingIds[y] = new DataInputStream(new CompressingInputStream(outSockets[y].getInputStream())).readUTF().trim();
738                        */
739                       
740                        OutputStream compressedo = Output.makeCompressingOutputStream(outSockets[y].getOutputStream());
741                        InputStream compressedi = Output.makeCompressingInputStream(outSockets[y].getInputStream());
742                        if (compressedi == null || compressedo == null)
743                            state.output.fatal( "You do not appear to have JZLib installed on your system, and so may must have compression turned off for IslandExchange.  "+
744                                "To get JZLib, download from the ECJ website or from http://www.jcraft.com/jzlib/");
745                        outWriters[y] = new DataOutputStream(compressedo);
746                        outgoingIds[y] = new DataInputStream(compressedi).readUTF().trim();
747                        }
748                    else
749                        {
750                        outWriters[y] = new DataOutputStream(outSockets[y].getOutputStream());
751
752                        // read the mailbox's id, then write my own id
753                        outgoingIds[y] = new DataInputStream(outSockets[y].getInputStream()).readUTF().trim();
754                        }
755                       
756                    outWriters[y].writeUTF(ownId);
757                    outWriters[y].flush();
758           
759                    running[y] = true;
760                    }
761                catch( IOException e )
762                    {
763                    // this is caused if the server had problems locating information
764                    // on the mailbox of the other island, therefore remember the
765                    // communication with this island is not setup properly
766                    state.output.warning( "IO exception while the client was opening sockets to other islands' mailboxes :" + e );
767                    running[y] = false;
768                    }
769                }
770
771            // synchronization stuff: tells the server it finished connecting to other mailboxes
772            toServer.writeUTF( OKAY );
773            toServer.flush();
774
775            // wait for the run signal
776            fromServer.readUTF();
777
778            }
779        catch( IOException e )
780            {
781            state.output.fatal( "Error communicating to the server." );
782            }
783
784        // at this point, the mailbox is looking for incoming messages
785        // form other islands. we have to exit the function. there is
786        // one more thing to be done: to check for the server sending a
787        // FOUND signal. In order to do this, we set the socket to the
788        // server as non-blocking, and verify that for messages from the
789        // server in the runComplete function
790        try
791            {
792            serverSocket.setSoTimeout( FOUND_TIMEOUT );
793            }
794        catch( SocketException e )
795            {
796            state.output.fatal( "Could not set the connection to the server to non-blocking." );
797            }
798
799        }
800
801    /** Initializes contacts with other processes, if that's what you're doing.  Called after restarting from a checkpoint. */
802    public void reinitializeContacts(EvolutionState state)
803        {
804        // This function is almost the same as initializeContacts.
805        // The only main difference is that when reinitializeContacts
806        // is called, it's called because I started up from a checkpoint file.
807        // This means that I'm in the middle of evolution, so the modulo
808        // and start might cause me to update more recently than if I had
809        // started fresh.  But maybe it won't make a difference in this method
810        // if the way I determine when I'm firing off migrants is on a
811        // generation-by-generation basis.
812
813        initializeContacts( state );
814
815        }
816
817
818
819    public Population preBreedingExchangePopulation(EvolutionState state)
820        {
821        // sending individuals to other islands
822        // BUT ONLY if my modulo and offset are appropriate for this
823        // generation (state.generation)
824        // I am responsible for returning a population.  This could
825        // be a new population that I created fresh, or I could modify
826        // the existing population and return that.
827
828        // else, check whether the emigrants need to be sent
829        if( ( state.generation >= offset ) &&
830            ( ( modulo == 0 ) || ( ( ( state.generation - offset ) % modulo ) == 0 ) ) )
831            {
832
833            // send the individuals!!!!
834
835            // for each of the islands where we have to send individuals
836            for( int x = 0 ; x < number_of_destination_islands ; x++ )
837                try
838                    {
839
840                    // check whether the communication is ok with the current island
841                    if( running[x] )
842                        {
843
844                        if (chatty) state.output.message( "Sending " + size + " emigrants to island " + outgoingIds[x] );
845
846                        // for each of the subpopulations
847                        for( int subpop = 0 ; subpop < state.population.subpops.length ; subpop++ )
848                            {
849                            // send the subpopulation
850                            outWriters[x].writeInt( subpop );
851
852                            // send the number of individuals to be sent
853                            // it's better to send this information too, such that islands can (potentially)
854                            // send different numbers of individuals
855                            outWriters[x].writeInt( size );
856
857                            // select "size" individuals and send then to the destination as emigrants
858                            immigrantsSelectionMethod.prepareToProduce( state, subpop, 0 );
859                            for( int y = 0 ; y < size ; y++ ) // send all necesary individuals
860                                {
861                                int index = immigrantsSelectionMethod.produce( subpop, state, 0 );
862                                state.population.subpops[subpop].individuals[index].
863                                    writeIndividual( state, outWriters[x] );
864                                outWriters[x].flush();  // just in case the individuals didn't do a println
865                                }
866                            immigrantsSelectionMethod.finishProducing( state, subpop, 0 ); // end the selection step
867                            }
868                        }
869                    }
870                catch( IOException e )
871                    {
872                    running[x] = false;
873                    }
874            }
875
876        return state.population;
877
878        }
879
880
881    public Population postBreedingExchangePopulation(EvolutionState state)
882        {
883        // receiving individuals from other islands
884        // same situation here of course.
885
886        // if synchronous communication, synchronize with the mailbox
887        // if( ( state.generation >= offset ) && synchronous &&
888        //    ( ( modulo == 0 ) || ( ( ( state.generation - offset ) % modulo ) == 0 ) ) )
889        if (synchronous)
890            {
891            state.output.message( "Waiting for synchronization...." );
892
893            // set the socket to the server to blocking
894            try
895                {
896                serverSocket.setSoTimeout( 0 );
897                }
898            catch( SocketException e )
899                {
900                state.output.fatal( "Could not set the connection to the server to blocking." );
901                }
902
903            try
904                {
905                // send the sync message
906                toServer.writeUTF( SYNC );
907                toServer.flush();
908                // wait for the okay message
909                String temp = fromServer.readUTF();
910                if( temp.equals( IslandExchangeServer.GOODBYE ) )
911                    {
912                    alreadyReadGoodBye = true;
913                    }
914                }
915            catch( IOException e )
916                {
917                state.output.fatal( "Could not communicate to the server. Exiting...." );
918                }
919
920            // set the socket to the server to non-blocking
921            try
922                {
923                serverSocket.setSoTimeout( FOUND_TIMEOUT );
924                }
925            catch( SocketException e )
926                {
927                state.output.fatal( "Could not set the connection to the server to non-blocking." );
928                }
929            //state.output.message( "Synchronized. Reading individuals...." );
930            }
931
932        // synchronize, because immigrants is also accessed by the mailbox thread
933        synchronized( mailbox.immigrants )
934            {
935            for( int x = 0 ; x < mailbox.immigrants.length ; x++ )
936                {
937                if( mailbox.nImmigrants[x] > 0 )
938                    {
939                    if (chatty) state.output.message( "Immigrating " +  mailbox.nImmigrants[x] + " individuals from mailbox for subpopulation " + x );
940
941                    boolean[] selected = new boolean[ state.population.subpops[x].individuals.length ];
942                    int[] indeces = new int[ mailbox.nImmigrants[x] ];
943                    for( int i = 0 ; i < selected.length ; i++ )
944                        selected[i] = false;
945                    indsToDieSelectionMethod.prepareToProduce( state, x, 0 );
946                    for( int i = 0 ; i < mailbox.nImmigrants[x] ; i++ )
947                        {
948                        do {
949                            indeces[i] = indsToDieSelectionMethod.produce( x, state, 0 );
950                            } while( selected[indeces[i]] );
951                        selected[indeces[i]] = true;
952                        }
953                    indsToDieSelectionMethod.finishProducing( state, x, 0 );
954
955                    // there is no need to check for the differences in size: the mailbox.immigrants,
956                    // state.population.subpops and the mailbox.person2die should have the same size
957                    for( int y = 0 ; y < mailbox.nImmigrants[x] ; y++ )
958                        {
959
960                        // read the individual
961                        state.population.subpops[x].
962                            individuals[ indeces[y] ] = mailbox.immigrants[x][y];
963
964                        // reset the evaluated flag (the individuals are not evaluated in the current island */
965                        state.population.subpops[x].
966                            individuals[ indeces[y] ].evaluated = false;
967
968                        }
969
970                    // reset the number of immigrants in the mailbox for the current subpopulation
971                    // this doesn't need another synchronization, because the thread is already synchronized
972                    mailbox.nImmigrants[x] = 0;
973
974                    }
975                }
976
977            }
978
979        return state.population;
980        }
981
982    // if the GOODBYE message sent by the server gets read in the wrong place, this
983    // variable is set to true
984    boolean alreadyReadGoodBye = false;
985
986    // keeps the message to be returned next time on runComplete
987    String message;
988
989    /** Called after preBreedingExchangePopulation(...) to evaluate whether or not
990        the exchanger wishes the run to shut down (with ec.EvolutionState.R_FAILURE).
991        This would happen for two reasons.  First, another process might have found
992        an ideal individual and the global run is now over.  Second, some network
993        or operating system error may have occurred and the system needs to be shut
994        down gracefully.
995        This function does not return a String as soon as it wants to exit (another island found
996        the perfect individual, or couldn't connect to the server). Instead, it sets a flag, called
997        message, to remember next time to exit. This is due to a need for a graceful
998        shutdown, where checkpoints are working properly and save all needed information. */
999    public String runComplete(EvolutionState state)
1000        {
1001
1002        // first test the flag, and exit if it was previously set
1003        if( message != null ) // if an error occured earlier
1004            {
1005            return message;
1006            }
1007
1008        // check whether the server sent a FOUND message.
1009        // if it did, check whether it should exit or not
1010        try
1011            {
1012            // read a line. if it is successful, it means that the server sent a FOUND message
1013            // (this is the only message the server sends right now), and it should set the flag
1014            // for exiting next time when in this procedure
1015            String ww = fromServer.readUTF();
1016            if( ww != null || alreadyReadGoodBye ) // FOUND message sent from the server
1017                {
1018                // we should exit because some other individual has
1019                // found the perfect fit individual
1020                if( state.quitOnRunComplete )
1021                    {
1022                    message = "Exit: Another island found the perfect individual.";
1023                    state.output.message( "Another island found the perfect individual. Exiting...." );
1024                    toServer.writeUTF( OKAY );
1025                    toServer.flush();
1026                    }
1027                else
1028                    {
1029                    state.output.message( "Another island found the perfect individual." );
1030                    }
1031                }
1032            else // ( ww == null ) // the connection with the server was closed
1033                {
1034                // we should exit, because we cannot communicate with the
1035                // server anyway
1036                message = "Exit: Could not communicate with the server.";
1037                state.output.warning( "Could not communicate with the server. Exiting...." );
1038                }
1039            }
1040        catch( InterruptedIOException e )
1041            {
1042            // here don't do anything: it reaches this point when the server is on, but nobody found
1043            // the perfect individual. in this case, it should just return null, so that the
1044            // execution continues
1045            }
1046        catch( IOException e )
1047            {
1048            // some weird error
1049            // report it in a warning
1050            state.output.warning( "Some weird IO exception reported by the system in the IslandExchange::runComplete function.  Is it possible that the server has crashed?" );
1051            }
1052
1053        return null;
1054        }
1055
1056    /** Closes contacts with other processes, if that's what you're doing.  Called at the end of an evolutionary run. result is either ec.EvolutionState.R_SUCCESS or ec.EvolutionState.R_FAILURE, indicating whether or not an ideal individual was found. */
1057    public void closeContacts(EvolutionState state, int result)
1058        {
1059        // if the run was successful (perfect individual was found)
1060        // then send a message to the server that it was found
1061        if( result == EvolutionState.R_SUCCESS )
1062            {
1063            try
1064                {
1065                toServer.writeUTF( FOUND );
1066                toServer.flush();
1067                }
1068            catch( IOException e ) {}
1069            }
1070
1071        // close socket to server
1072        try
1073            {
1074            serverSocket.close();
1075            }
1076        catch( IOException e )
1077            {
1078            }
1079
1080        state.output.message( "Shutting down the mailbox" );
1081        // close the mailbox and wait for the thread to terminate
1082        mailbox.shutDown();
1083        mailboxThread.interrupt();
1084        try
1085            {
1086            mailboxThread.join();
1087            }
1088        catch( InterruptedException e )
1089            {
1090            }
1091        state.output.message( "Mailbox shut down" );
1092
1093        // close out-going sockets
1094        for( int x = 0 ; x < number_of_destination_islands ; x++ )
1095            {
1096            // catch each exception apart (don't take into consideration the running variables)
1097            try
1098                {
1099                if( running[x] )
1100                    outSockets[x].close();
1101                }
1102            catch( IOException e )
1103                {
1104                }
1105            }
1106
1107        // if the island also hosts the server, wait till it terminates
1108        if( iAmServer )
1109            {
1110            state.output.message( "Shutting down the server" );
1111            try
1112                {
1113                serverThread.join();
1114                }
1115            catch( InterruptedException e )
1116                {
1117                }
1118            state.output.message( "Server shut down" );
1119            }
1120
1121        }
1122
1123    /* (non-Javadoc)
1124     * @see ec.EvolutionState#finish(int)
1125     */
1126    public void finish(int result) {
1127        }
1128
1129    /* (non-Javadoc)
1130     * @see ec.EvolutionState#startFromCheckpoint()
1131     */
1132    public void startFromCheckpoint() {
1133        }
1134
1135    /* (non-Javadoc)
1136     * @see ec.EvolutionState#startFresh()
1137     */
1138    public void startFresh() {
1139        }
1140
1141    /* (non-Javadoc)
1142     * @see ec.EvolutionState#evolve()
1143     */
1144    public int evolve()
1145        throws InternalError {
1146        return 0;
1147        }
1148       
1149    }
1150
1151/** Class that contains all the mailbox functionality. It is supposed to wait on a new thread for incoming
1152    immigrants from other islands (it will receive in the constructor the number of islands that will send
1153    messages to the current island). Waiting on sockets is non-blocking, such that the order in which the
1154    islands send messages is unimportant. When immigrants are received, they are stored in a special buffer
1155    called immigrants. The storage is managed in a queue-like fashion, such that when the storage is full,
1156    the first immigrants that came are erased (hopefully the storage will be emptied fast enough such that
1157    this case doesn't appear too often).
1158    All accesses to the "immigrants" variable (also applies to nImmigrants) should be done only in the presence
1159    of synchronization, because there might be other threads using them too. The number of immigrants for each
1160    of the subpopulations (nImmigrants[x]) is between 0 and the size of the queue structure (received as a
1161    parameter in the constructor). */
1162class IslandExchangeMailbox implements Runnable
1163    {
1164
1165    /** How much to wait before starting checking for immigrants */
1166    public static final int SLEEP_BETWEEN_CHECKING_FOR_IMMIGRANTS = 1000;
1167
1168    /** How much to wait on a socket for a message, before starting to wait on another socket */
1169    public static final int CHECK_TIMEOUT = 1000;
1170
1171    /** How much to wait while synchronizing */
1172    public static final int SYNCHRONIZATION_SLEEP = 100;
1173
1174    /**  storage for the incoming immigrants: 2 sizes: the subpopulation and the index of the emigrant */
1175    public Individual[][] immigrants;
1176
1177    /** the number of immigrants in the storage for each of the subpopulations */
1178    public int[] nImmigrants;
1179
1180    // auxiliary variables to manage the queue storages
1181    int[] person2die;
1182
1183    // the socket where it listens for incomming messages
1184    ServerSocket serverSocket;
1185
1186    // the number of islands that send messages to the current mailbox
1187    int n_incoming;
1188
1189    // whether the information on sockets is compressed or not (receives this information in the constructor)
1190    boolean compressedCommunication;
1191
1192    // the sockets and readers for receiving incoming messages
1193    Socket[] inSockets;
1194    DataInputStream[] dataInput;
1195    String[] incomingIds;      // so we can print out nice names for our incoming connections
1196
1197    // the state of the islands it is communicating to
1198    boolean[] running;
1199
1200    // the state (to display messages mainly)
1201    EvolutionState state;
1202
1203    // synchronization variable
1204    Boolean syncVar;
1205
1206    // My ID
1207    String myId;
1208   
1209    boolean chatty;
1210
1211    /**
1212       Public constructor used to initialize most of the parameters of the mailbox:
1213       state_p : the EvolutionState, used mainly for displaying messages
1214       port : the port used to listen for incoming messages
1215       n_incoming_p : the number of islands that will send messages to the current island
1216       how_many : how many immigrants to manage in the queue-like storage for each of the subpopulations
1217    */
1218    public IslandExchangeMailbox( final EvolutionState state_p, int port, int n_incoming_p, int how_many, String _myId, boolean chatty, boolean _compressedCommunication )
1219        {
1220        myId = _myId;
1221        compressedCommunication = _compressedCommunication;
1222       
1223        this.chatty = chatty;
1224   
1225        // initialize public variables from the parameters of the constructor
1226        state = state_p;
1227        n_incoming = n_incoming_p;
1228
1229        Parameter p_numsubpops = new Parameter( ec.Initializer.P_POP ).push( ec.Population.P_SIZE );
1230        int numsubpops = state.parameters.getInt(p_numsubpops,null,1);
1231        if ( numsubpops == 0 )
1232            {
1233            // later on, Population will complain with this fatally, so don't
1234            // exit here, just deal with it and assume that you'll soon be shut
1235            // down
1236            }
1237
1238        // allocate the storages:
1239        // - immigrants = storage for the immigrants that will come to the current island
1240        //   - first dimension: the number of subpopulations
1241        //   - second dimension: how many immigrants to store for each of the subpopulations.
1242        // - person2die = where to insert next in the queue structure "immigrants"
1243        // - nImmigrants = how many immigrants there are in the storage "immigrants" for each of the subpopulations
1244        immigrants = new Individual[ numsubpops ][ how_many ];
1245        person2die = new int[ numsubpops ];
1246        nImmigrants = new int[ numsubpops ];
1247
1248        // set the synchronization variable to false (it will be set to true to signal exiting the waiting loop)
1249        syncVar = Boolean.FALSE;
1250
1251        // create the ServerSocket to listen to incoming messages
1252        try
1253            {
1254            serverSocket = new ServerSocket( port, n_incoming );
1255            }
1256        catch( IOException e )
1257            {
1258            state.output.fatal( "Could not start mailbox for incoming messages.  Perhaps the port (" + port + ") is bad?\n...or someone else already has it?");
1259            }
1260
1261        // allocate the sockets and the readers (will be used in the near future)
1262        inSockets = new Socket[ n_incoming ];
1263        dataInput = new DataInputStream[ n_incoming ];
1264        incomingIds = new String[ n_incoming ];
1265
1266        // allocate the status of the different readers
1267        running = new boolean[ n_incoming ];
1268
1269        }
1270
1271    /** The main functionality of the mailbox: waiting for incoming messages and dealing with the incoming immigrants */
1272    public void run()
1273        {
1274
1275        // wait for the "n_incoming" incoming connections from different islands, and initialize
1276        // the sockets and the readers to communicate with (receive messages from) them. All the
1277        // sockets are set to be non-blocking, such that they can be checked alternatively without
1278        // waiting for messages on a particular one.
1279        for( int x = 0 ; x < n_incoming ; x++ )
1280            {
1281            try
1282                {
1283                inSockets[x] = serverSocket.accept();
1284
1285                DataOutputStream dataOutput;
1286
1287                if( compressedCommunication )
1288                    {
1289                    /*
1290                      dataInput[x] = new DataInputStream(new CompressingInputStream(inSockets[x].getInputStream()));
1291                      dataOutput = new DataOutputStream(new CompressingOutputStream(inSockets[x].getOutputStream()));
1292                    */
1293                    OutputStream compressedo = Output.makeCompressingOutputStream(inSockets[x].getOutputStream());
1294                    InputStream compressedi = Output.makeCompressingInputStream(inSockets[x].getInputStream());
1295                    if (compressedi == null || compressedo == null)
1296                        state.output.fatal( "You do not appear to have JZLib installed on your system, and so may must have compression turned off for IslandExchange.  "+
1297                            "To get JZLib, download from the ECJ website or from http://www.jcraft.com/jzlib/");
1298
1299                    dataInput[x] = new DataInputStream(compressedi);
1300                    dataOutput = new DataOutputStream(compressedo);
1301                    }
1302                else
1303                    {
1304                    dataInput[x] = new DataInputStream(inSockets[x].getInputStream());
1305                    dataOutput = new DataOutputStream(inSockets[x].getOutputStream());
1306                    }
1307
1308                // send my id, then read an id
1309                dataOutput.writeUTF(myId);
1310                dataOutput.flush();
1311                incomingIds[x] = dataInput[x].readUTF().trim();   
1312
1313                state.output.message( "Island " + incomingIds[x] + " connected to my mailbox" );
1314
1315                // set the socket to non-blocking
1316                inSockets[x].setSoTimeout( CHECK_TIMEOUT );
1317                running[x] = true;
1318                }
1319            catch (IOException e)
1320                {
1321                running[x] = false;
1322                state.output.fatal( "An exception was generated while creating communication structures for island " + x + ".  Here it is: " + e );
1323                }
1324            }
1325
1326        state.output.message( "All islands have connected to my client." );
1327
1328        // variable used for deciding (based on the synchronized variable "syncVar") when to exit
1329        boolean shouldExit = false;
1330
1331        // enter the main loop
1332        do
1333            {
1334
1335            // wait some (do not check all the time, cause it would be a waste of time and computational resources)
1336            try
1337                {
1338                Thread.sleep( SLEEP_BETWEEN_CHECKING_FOR_IMMIGRANTS );
1339                }
1340            catch( InterruptedException e )
1341                {
1342                }
1343
1344            // for each of the connections established with the islands
1345            for( int x = 0 ; x < n_incoming ; x++ )
1346                {
1347                if( running[x] )
1348                    {
1349                    try
1350                        {
1351                        // enter an infinite loop to receive all the messages form the "x"s island
1352                        // it will exit the loop as soon as there are no more messages coming from
1353                        // the "x"s island (non-blocking socket)
1354                        while( true )
1355                            {
1356                            // read the subpopulation where the immigrants need to be inserted. In case there
1357                            // is no incoming message, an exception will be generated and the infinite loop
1358                            // will be exited (the mailbox will search the next socket (communication link)
1359                            // for incoming messages
1360                            int subpop = dataInput[x].readInt();
1361                           
1362                            // if it gets to this point, it means that a number of individuals will be sent
1363                            // it is the time to set up the receiving storages
1364                           
1365                            // set the socket to blocking for reading the individuals
1366                            try
1367                                {
1368                                inSockets[x].setSoTimeout( 0 );
1369                                }
1370                            catch( SocketException e )
1371                                {
1372                                state.output.warning( "Could not set the socket to blocking while receiving individuals in the mailbox." );
1373                                }
1374                           
1375                            // how many individuals will be received in the current dialogue?
1376                            int how_many_to_come = dataInput[x].readInt();
1377                           
1378                            if (chatty) state.output.message( "Receiving " + how_many_to_come + " immigrants for subpopulation "  + subpop + " from island " + incomingIds[x]);
1379
1380                            // synchronize on the immigrants (such that other threads cannot access it during its
1381                            // being modified)
1382                            synchronized( immigrants )
1383                                {
1384                               
1385                                // in case the immigrants buffer was emptied, the person2die is not reset (it is not public)
1386                                // so we have to reset it now
1387                                if( nImmigrants[subpop] == 0 ) // if it was reset
1388                                    person2die[subpop] = 0; // reset the person2die[x]
1389                               
1390                                // loop in order to receive all the incoming individuals in the current dialogue
1391                                for( int ind = 0 ; ind < how_many_to_come ; ind++ )
1392                                    {
1393                                    // read the individual
1394                                    try
1395                                        {
1396                                        // read the emigrant in the storage
1397                                        immigrants[subpop][person2die[subpop]] = state.population.subpops[subpop].species.
1398                                            newIndividual( state, dataInput[x] );
1399
1400                                        //state.output.message( "Individual received." );
1401                                       
1402                                        // increase the queue index
1403                                        if( person2die[subpop] == immigrants[subpop].length - 1 )
1404                                            person2die[subpop] = 0;
1405                                        else
1406                                            person2die[subpop]++;
1407                                       
1408                                        // can increment it without synchronization, as we do synchronization on the immigrants
1409                                        if( nImmigrants[subpop] < immigrants[subpop].length )
1410                                            nImmigrants[subpop]++;
1411
1412                                        }
1413                                    catch( IOException e )
1414                                        {
1415                                        // i hope it will also never happen :)
1416                                        state.output.message( "IO exception while communicating with an island" );
1417                                        running[x] = false;
1418                                        continue;
1419                                        }
1420                                    catch( NumberFormatException e )
1421                                        {
1422                                        // it happens when the socket is closed and cannot be doing any reading
1423                                        state.output.message( "IO exception while communicating with an island" );
1424                                        running[x] = false;
1425                                        continue;
1426                                        }
1427                                    }
1428                                } // end synchronized block on "immigrants"
1429                           
1430                            // set the socket to non-blocking (after current set of immigrants is over)
1431                            try
1432                                {
1433                                inSockets[x].setSoTimeout( CHECK_TIMEOUT );
1434                                }
1435                            catch( SocketException e )
1436                                {
1437                                state.output.warning( "Could not set the socket to non-blocking while receiving individuals in the mailbox." );
1438                                }
1439                            }
1440                        }
1441                    catch( InterruptedIOException e )
1442                        {
1443                        // here everything is ok
1444                        // just that there were no messages
1445                        }
1446                    catch( IOException e )
1447                        {
1448                        // now this is not nice
1449                        // report the error so that the programmer can fix it (hopefully)
1450                        state.output.message( "IO exception while communicating with an island" );
1451                        running[x] = false;
1452                        }
1453                    catch( NumberFormatException e )
1454                        {
1455                        // error received when some sockets break
1456                        state.output.message( "Socket closed" );
1457                        running[x] = false;
1458                        }
1459                    }
1460                }
1461
1462            // again with synchronization, try to access the syncVar to check whether the mailbox needs to finish
1463            // running (maybe some other island already found the perfect individual, or the resources of the current
1464            // run have been wasted)
1465            synchronized( syncVar )
1466                {
1467                // get the value of the syncVar. If it is true, we should exit.
1468                shouldExit = syncVar.booleanValue();
1469                }
1470            }
1471        while( !shouldExit );
1472
1473        // close the sockets (don't care about the running, but deal with exceptions)
1474        try
1475            {
1476            // close the ServerSocket
1477            serverSocket.close();
1478            }
1479        catch( IOException e )
1480            {
1481            }
1482        for( int x = 0 ; x < n_incoming ; x++ )
1483            {
1484            try
1485                {
1486                // close the sockets to communicate (receive messages) with the other islands
1487                inSockets[x].close();
1488                }
1489            catch( IOException e )
1490                {
1491                continue;
1492                }
1493            }
1494
1495        }
1496
1497    /**
1498       Method used to shutdown the mailbox. What it does is that it closes all communication links (sockets)
1499       and sets the syncVar to true (such that if the run() method is run on another thread, it will exit the
1500       loop and terminate.
1501    */
1502    public void shutDown()
1503        {
1504
1505        // set the syncVar to true (such that if another thread executes this.run(), it will exit the main loop
1506        // (hopefully, the information from the server was correct
1507        synchronized( syncVar )
1508            {
1509            syncVar = Boolean.TRUE;
1510            }
1511
1512        }
1513
1514    /**
1515       Return the port of the ServerSocket (where the islands where the other islands should
1516       connect in order to send their emigrants).
1517    */
1518    public int getPort()
1519        {
1520        // return the port of the ServerSocket
1521        return serverSocket.getLocalPort();
1522        }
1523
1524    }
1525
1526/**
1527   The IslandExchangeServer is the class that manages the main server that coordinates all the islands. The class
1528   implements Runnable (for running on a different thread).
1529*/
1530class IslandExchangeServer implements Runnable
1531    {
1532   
1533    /*
1534
1535      The server-specific parameters look roughly like this:
1536
1537      exch.server-port = 8021
1538      exch.num-islands = 3
1539      exch.island.0.id = SurvivorIsland
1540      exch.island.0.num-mig = 1
1541      exch.island.0.mig.0 = GilligansIsland
1542      exch.island.0.size = 5
1543      exch.island.0.mod = 2
1544      exch.island.0.start = 4
1545      exch.island.1.id = GilligansIsland
1546      exch.island.1.mod = 1
1547      exch.island.1.start = 2
1548      exch.island.1.size = 10
1549      exch.island.1.num-mig = 2
1550      exch.island.1.mig.0 = SurvivorIsland
1551      exch.island.1.mig.1 = GilligansIsland
1552      exch.island.2.id = BermudaIsland
1553      exch.island.2.mod = 2
1554      ...
1555    */
1556
1557    //// Server information
1558
1559    /** The server port */
1560    public static final String P_SERVER_PORT = "server-port";
1561
1562    /** The number of islands */
1563    public static final String P_NUM_ISLANDS = "num-islands";
1564
1565    /** The parameter for the island's information */
1566    public static final String P_ISLAND = "island";
1567
1568    /** The id */
1569    public static final String P_ID = "id";
1570
1571    // The number of islands that will send emigrants to the current island
1572    public static final String P_NUM_INCOMING_MIGRATING_COUNTRIES = "num-incoming-mig";
1573
1574    /** The number of islands where emigrants will be sent */
1575    public static final String P_NUM_MIGRATING_COUNTRIES = "num-mig";
1576
1577    /** the parameter for migrating islands' ids */
1578    public static final String P_MIGRATING_ISLAND = "mig";
1579
1580    /** The size of the mailbox (for each of the subpopulations) */
1581    public static final String P_MAILBOX_CAPACITY = "mailbox-capacity";
1582
1583    /** The parameter for the modulo (how many generations should pass between consecutive sendings of individuals */
1584    public static final String P_MODULO = "mod";
1585
1586    /** The number of emigrants to be sent */
1587    public static final String P_SIZE = "size";
1588
1589    /** How many generations to pass at the beginning of the evolution before the first emigration from the current island */
1590    public static final String P_OFFSET = "start";
1591
1592    /** Whether the execution should be synchronous or asynchronous */
1593    public static final String P_SYNCHRONOUS = "sync";
1594
1595    /** The run message to be sent to the clients */
1596    public static final String RUN = "run";
1597
1598    /** How much to wait for the found message (on a non-blocking socket) */
1599    public static final int FOUND_TIMEOUT = 100;
1600
1601    /** How much to sleep between checking for a FOUND message */
1602    public static final int SLEEP_TIME = 100;
1603
1604    /** The final message to be sent to all islands when an individual has been found */
1605    public static final String GOODBYE = "bye-bye";
1606
1607    /** The found message */
1608    public static final String FOUND = IslandExchange.FOUND;
1609
1610    /** The okay message */
1611    public static final String OKAY = IslandExchange.OKAY;
1612
1613    /** The synchronize message */
1614    public static final String SYNC = IslandExchange.SYNC;
1615
1616/** A class indicating all the information the server knows about
1617    a given island, including its mod, size, offset, and all the
1618    migrating islands it hooks to, etc. */
1619    public class IslandExchangeIslandInfo
1620        {
1621        /** how often to send individuals */
1622        public int modulo;
1623        /** the mailbox capacity (for each of the subpopulations) */
1624        public int mailbox_capacity;
1625        /** what generation to start sending individuals */
1626        public int offset;
1627        // how many individuals to send
1628        public int size;
1629        // to how many islands to send individuals
1630        public int num_mig;
1631        // the ids of the contries to send individuals to
1632        public String[] migrating_island_ids;
1633        // how many islands will send individuals to the mailbox
1634        public int num_incoming;
1635
1636        // also later filled in:
1637        // the address of the mailbox where to receive information
1638        public String address;
1639        // the port of the mailbox
1640        public int port;
1641        }
1642
1643    // The number of islands in the topology
1644    int numIslands;
1645
1646    // The port of the server
1647    int serverPort;
1648
1649    // the server's socket
1650    ServerSocket serverSocket;
1651
1652    // Hashtable for faster lookup of information regarding islands
1653    Hashtable info;
1654
1655    // Hashtable to count how many islands send individuals to each of the islands
1656    Hashtable info_immigrants;
1657
1658    EvolutionState state;
1659
1660    // Index of island ids sorted by parameter file
1661    String[] island_ids;
1662   
1663    // Index of island ids sorted by order of connection
1664    String[] connected_island_ids;
1665
1666
1667    // variables used if the execution is synchronous
1668    // int global_modulo, global_offset;
1669    boolean synchronous;
1670
1671    // how many individuals asked to be synchronized (when it reaches the total number of
1672    // running clients, the server resets this variable and allows everybody to continue running)
1673    boolean[] who_is_synchronized;
1674
1675    /** This setup should get called from the IslandExchange setup method. */
1676    public void setupServerFromDatabase(final EvolutionState state_p, final Parameter base)
1677        {
1678
1679        // Store the evolution state for further use in other functions ( ie. run )
1680        state = state_p;
1681
1682        // Don't bother with getting the default base -- we're a singleton!
1683
1684        Parameter p;
1685
1686        // get the number of islands
1687        p = base.push( P_NUM_ISLANDS );
1688        numIslands = state.parameters.getInt( p, null, 1 );
1689        if( numIslands == 0 )
1690            state.output.fatal( "The number of islands must be >0.", p );
1691
1692        // get the port of the server
1693        p = base.push( P_SERVER_PORT );
1694        serverPort = state.parameters.getInt( p, null, 1 );
1695        if( serverPort == 0 )
1696            state.output.fatal( "The server port must be >0.", p );
1697       
1698        // information on the islands = hashtable of ID and socket information
1699        info = new Hashtable( numIslands );
1700
1701        // initialize the hash table to count how many islands send individuals
1702        // to each of the islands
1703        info_immigrants = new Hashtable( numIslands );
1704
1705        // allocate the ids sorted by parameter file
1706        island_ids = new String[ numIslands ];
1707       
1708        // allocate the ids sorted by connection
1709        connected_island_ids = new String[ numIslands ] ;
1710
1711        // check whether the execution is synchronous or asynchronous
1712        // if it is synchronous, there should be two parameters in the parameters file:
1713        // the global modulo and offset (such that the islands coordinate smoothly)
1714        p = base.push( P_SYNCHRONOUS );
1715
1716        // get the value of the synchronous parameter (default is false)
1717        synchronous = state.parameters.getBoolean( p, null, false );
1718
1719        // if synchronous, read the other two global parameters
1720        if( synchronous )
1721            {
1722            state.output.message( "The communication will be synchronous." );
1723
1724/*
1725// get the global modulo
1726p = base.push( P_MODULO );
1727global_modulo = state.parameters.getInt( p, null, 1 );
1728if( global_modulo == 0 )
1729state.output.fatal( "Parameter not found, or it has an incorrect value.", p );
1730           
1731// get the global offset
1732p = base.push( P_OFFSET );
1733global_offset = state.parameters.getInt( p, null, 0 );
1734if( global_offset == -1 )
1735state.output.fatal( "Parameter not found, or it has an incorrect value.", p );
1736*/
1737            }
1738        else
1739            {
1740
1741            state.output.message( "The communication will be asynchronous." );
1742
1743            }
1744
1745        // get a new local base
1746        Parameter islandBase = base.push( P_ISLAND );
1747
1748        // load the island topology
1749        for( int x = 0 ; x < numIslands ; x++ )
1750            {
1751
1752            IslandExchangeIslandInfo ieii = new IslandExchangeIslandInfo();
1753
1754            Parameter localBase = islandBase.push( "" + x );
1755
1756            // get the id of the current island
1757            p = localBase.push( P_ID );
1758            island_ids[x] = state.parameters.getStringWithDefault( p, null, "" );
1759            if( island_ids[x].equals("") )
1760                state.output.fatal( "Parameter not found.", p );
1761
1762            // get the mailbox capacity of the imigration from the current island
1763            p = localBase.push( P_MAILBOX_CAPACITY );
1764            ieii.mailbox_capacity = state.parameters.getInt( p, base.push(P_MAILBOX_CAPACITY), 0 );
1765            if( ieii.mailbox_capacity == -1 )
1766                state.output.fatal( "Parameter not found, or it has an incorrect value.", p, base.push(P_MAILBOX_CAPACITY) );
1767
1768            // get the size of the imigration from the current island
1769            p = localBase.push( P_SIZE );
1770            ieii.size = state.parameters.getInt( p, base.push(P_SIZE), 0 );
1771            if( ieii.size == -1 )
1772                state.output.fatal( "Parameter not found, or it has an incorrect value.", p, base.push(P_SIZE) );
1773
1774            // if synchronous execution, use the global modulo and offset
1775            /* if( synchronous )
1776               {
1777               ieii.modulo = global_modulo;
1778               ieii.offset = global_offset;
1779               }
1780               else
1781               {*/
1782            // get the modulo of the imigration from the current island
1783            p = localBase.push( P_MODULO );
1784            ieii.modulo = state.parameters.getInt( p, base.push(P_MODULO), 1 );
1785            if( ieii.modulo == 0 )
1786                state.output.fatal( "Parameter not found, or it has an incorrect value.", p , base.push(P_MODULO));
1787
1788            // get the offset of the imigration from the current island
1789            p = localBase.push( P_OFFSET );
1790            ieii.offset = state.parameters.getInt( p, base.push(P_OFFSET), 0 );
1791            if( ieii.offset == -1 )
1792                state.output.fatal( "Parameter not found, or it has an incorrect value.", p, base.push(P_OFFSET) );
1793            /*     } */
1794
1795            // mark as uninitialized
1796            ieii.port = -1;
1797
1798            // insert the id in the hashset with the ids of the islands
1799            info.put( island_ids[x], ieii );
1800            }
1801
1802        // get the information on destination islands (with checking for consistency)
1803        for( int x = 0 ; x < numIslands ; x++ )
1804            {
1805
1806            IslandExchangeIslandInfo ieii = (IslandExchangeIslandInfo)info.get( island_ids[x] );
1807
1808            if( ieii == null )
1809                {
1810                state.output.error( "Inexistent information for island " + island_ids[x] + " stored in the server's information database." );
1811                continue;
1812                }
1813
1814            Parameter localBase = islandBase.push( "" + x );
1815
1816            // get the number of islands where individuals should be sent
1817            p = localBase.push( P_NUM_MIGRATING_COUNTRIES );
1818            ieii.num_mig = state.parameters.getInt( p, null, 0 );
1819            if( ieii.num_mig == -1 )
1820                state.output.fatal( "Parameter not found, or it has an incorrect value.", p );
1821
1822            // if there is at least 1 destination islands
1823            if( ieii.num_mig > 0 )
1824                {
1825
1826                // allocate the storage for ids
1827                ieii.migrating_island_ids = new String[ ieii.num_mig ];
1828
1829                // store a new base parameter
1830                Parameter ll;
1831                ll = localBase.push( P_MIGRATING_ISLAND );
1832
1833                // for each of the islands
1834                for( int y = 0 ; y < ieii.num_mig ; y++ )
1835                    {
1836
1837                    // read the id & check for errors
1838                    ieii.migrating_island_ids[y] = state.parameters.getStringWithDefault( ll.push(""+y), null, null );
1839                    if( ieii.migrating_island_ids[y] == null )
1840                        state.output.fatal( "Parameter not found.", ll.push(""+y) );
1841                    else if( !info.containsKey( ieii.migrating_island_ids[y] ) )
1842                        state.output.fatal( "Unknown island.", ll.push(""+y) );
1843                    else
1844                        {
1845                        // insert this knowledge into the hashtable for counting how many islands
1846                        // send individuals to each island
1847                        Integer integer = (Integer)info_immigrants.get( ieii.migrating_island_ids[y] );
1848                        if( integer == null )
1849                            info_immigrants.put( ieii.migrating_island_ids[y],
1850                                new Integer(1) );
1851                        else
1852                            info_immigrants.put( ieii.migrating_island_ids[y],
1853                                new Integer( integer.intValue() + 1 ) );
1854                        }
1855                    }
1856                }
1857
1858            // save the information back in the hash table
1859            // info.put( island_ids[x], ieii );                         // unneccessary -- Sean
1860
1861            }
1862
1863        for( int x = 0 ; x < numIslands ; x++ )
1864            {
1865
1866            IslandExchangeIslandInfo ieii = (IslandExchangeIslandInfo)info.get( island_ids[x] );
1867
1868            if( ieii == null )
1869                {
1870                state.output.fatal( "Inexistent information for island " + island_ids[x] + " stored in the server's information database." );
1871                }
1872
1873            Integer integer = (Integer)info_immigrants.get( island_ids[x] );
1874
1875            // if the information does not exist in the hasthable,
1876            // it means no islands send individuals there
1877            if( integer == null )
1878                ieii.num_incoming = 0;
1879            else
1880                ieii.num_incoming = integer.intValue();
1881
1882            // save the information back in the hash table
1883            // info.put( island_ids[x], ieii );                 // unneccessary -- Sean
1884
1885            }
1886
1887        // allocate and reset this variable to false
1888        who_is_synchronized = new boolean[ numIslands ];
1889
1890        for( int x = 0 ; x < numIslands ; x++ )
1891            who_is_synchronized[x] = false;
1892
1893        }
1894
1895    /** The main function running in the thread */
1896    public void run()
1897        {
1898
1899        // sockets to communicate to each of the islands
1900        Socket[] con = new Socket[numIslands];
1901
1902        // readers and writters for communication with each island
1903        DataInputStream[] dataIn = new DataInputStream[numIslands];
1904        DataOutputStream[] dataOut = new DataOutputStream[numIslands];
1905
1906
1907
1908        // whether each client is working (and communicating with the server) or not
1909        boolean[] clientRunning = new boolean[numIslands];
1910
1911        // initialize the running status of all clients
1912        for( int x = 0 ; x < numIslands ; x++ )
1913            clientRunning[x] = true;
1914
1915        try
1916            {
1917            // create a server
1918            serverSocket = new ServerSocket(serverPort,numIslands);
1919            }
1920        catch ( IOException e )
1921            {
1922            state.output.fatal( "Error creating a socket on port " + serverPort );
1923            }
1924
1925        // for each of the islands
1926        for(int x=0;x<numIslands;x++)
1927            {
1928            try
1929                {
1930                // set up connection with the island
1931                con[x] = serverSocket.accept();
1932
1933                // initialize the reader and the writer
1934                dataIn[x] = new DataInputStream(con[x].getInputStream());
1935                dataOut[x] = new DataOutputStream(con[x].getOutputStream());
1936
1937                // read the id
1938                connected_island_ids[x] = dataIn[x].readUTF().trim();
1939
1940                state.output.message( "Island " + connected_island_ids[x] + " logged in" );
1941
1942                // check whether the id appears in the information at the server
1943                if( !info.containsKey( connected_island_ids[x] ) )
1944                    {
1945                    state.output.error( "Incorrect ID (" + connected_island_ids[x] + ")" );
1946                    clientRunning[x] = false;
1947                    continue;
1948                    }
1949
1950                IslandExchangeIslandInfo ieii = (IslandExchangeIslandInfo)info.get( connected_island_ids[x] );
1951
1952                // redundant check, i know....
1953                if( ieii == null )
1954                    {
1955                    state.output.error( "Can't get IslandExchangeInfo for " + connected_island_ids[x]  );
1956                    clientRunning[x] = false;
1957                    continue;
1958                    }
1959
1960                // check whether an island with this id already registered with the server
1961                if( ieii.port >= 0 )
1962                    {
1963                    state.output.error( "Multiple islands are claiming the same ID (" + connected_island_ids[x] + ")" );
1964                    clientRunning[x] = false;
1965                    continue;
1966                    }
1967           
1968                // send the number of ids that will be send through the communication link
1969                dataOut[x].writeInt( ieii.num_incoming );
1970
1971                // send the capacity of the mailbox
1972                dataOut[x].writeInt( ieii.mailbox_capacity );
1973                               
1974                dataOut[x].flush();
1975
1976                // read the address and port of the island
1977                ieii.address = dataIn[x].readUTF().trim();
1978                ieii.port = dataIn[x].readInt();
1979
1980                state.output.message( "" + x + ": Island " + connected_island_ids[x] + " has address " +
1981                    ieii.address + " : " + ieii.port );
1982
1983                // re-insert the information in the hash table
1984                // info.put( id, ieii );                                // unnecessary -- Sean
1985                }
1986            catch( IOException e )
1987                {
1988                state.output.error( "Could not open connection #" + x );
1989                clientRunning[x] = false;
1990                }
1991            }
1992
1993        state.output.exitIfErrors();
1994
1995        // By this time, all mailboxes have been started and
1996        // they should be waiting for incoming messages. this is because
1997        // in order to send the server the information about the address and port
1998        // of the mailbox, they have to start them first. This is the reason
1999        // that makes us be able to start connecting without other synchronization
2000        // stuff right at this point.
2001
2002        // Now, I think, we've got a 1:1 mapping of keys to items in the info hashtable
2003        // So we tell everyone who they will communicate to
2004
2005        for( int x = 0 ; x < numIslands ; x++ )
2006            {
2007            if( clientRunning[x] )
2008                {
2009                IslandExchangeIslandInfo ieii = (IslandExchangeIslandInfo)info.get( connected_island_ids[x] );
2010
2011                if( ieii == null )
2012                    {
2013                    state.output.warning( "There is no information about island " + connected_island_ids[x]);
2014                    clientRunning[x] = false;
2015                    continue;
2016                    }
2017
2018                try
2019                    {
2020                    // send the synchronous, modulo, offset and size information to the current islands
2021                    if( synchronous )
2022                        dataOut[x].writeInt( 1 );
2023                    else
2024                        dataOut[x].writeInt( 0 );
2025                    dataOut[x].writeInt( ieii.modulo );
2026                    dataOut[x].writeInt( ieii.offset );
2027                    dataOut[x].writeInt( ieii.size );
2028
2029                    // send the number of address-port pairs that will be sent
2030                    dataOut[x].writeInt( ieii.num_mig );
2031
2032                    for( int y = 0 ; y < ieii.num_mig ; y++ )
2033                        {
2034                        IslandExchangeIslandInfo temp;
2035
2036                        temp = (IslandExchangeIslandInfo)info.get( ieii.migrating_island_ids[y] );
2037
2038                        if( temp == null )
2039                            {
2040                            state.output.warning( "There is incorrect information on the island " + connected_island_ids[x]  );
2041                            dataOut[x].writeUTF( " " );
2042                            dataOut[x].writeInt( -1 );
2043                            }
2044                        else
2045                            {
2046                            state.output.message( "Island " + connected_island_ids[x] + " should connect to island " +
2047                                ieii.migrating_island_ids[y] + " at " + temp.address + " : " + temp.port );
2048                            dataOut[x].writeUTF( temp.address );
2049                            dataOut[x].writeInt( temp.port );
2050                            }
2051                        }
2052                    dataOut[x].flush();
2053                    }
2054                catch( IOException e )
2055                    {
2056                    // other errors while reading
2057                    state.output.message("Server: Island " + island_ids[x] + " dropped connection");
2058                    clientRunning[x] = false;
2059                    continue;
2060                    }
2061                catch( NullPointerException e )
2062                    {
2063                    // other errors while reading
2064                    state.output.message("Server: Island " + island_ids[x] + " dropped connection");
2065                    clientRunning[x] = false;
2066                    try
2067                        {
2068                        dataIn[x].close();
2069                        dataOut[x].close();
2070                        con[x].close();
2071                        }
2072                    catch( IOException f )
2073                        {
2074                        }
2075                    continue;
2076                    }
2077                }
2078            }
2079
2080        try
2081            {
2082            // Next we wait until everyone acknowledges this
2083            for(int x=0;x<dataIn.length;x++)
2084                {
2085                dataIn[x].readUTF();
2086                }
2087
2088            // Now we tell everyone to start running
2089            for(int x=0;x<dataOut.length;x++)
2090                {
2091                dataOut[x].writeUTF( RUN );
2092                dataOut[x].flush();
2093                }
2094            }
2095        catch( IOException e )
2096            {
2097            }
2098
2099        // Okay we've sent off our information.  Now we wait until a client
2100        // tells us that he's found the solution, or until all the clients
2101        // have broken connections
2102       
2103        for(int x=0;x<con.length;x++)
2104            {
2105            try
2106                {
2107                con[x].setSoTimeout(FOUND_TIMEOUT);
2108                }
2109            catch( SocketException e )
2110                {
2111                state.output.error( "Could not set the connect with island " + x + " to non-blocking." );
2112                }
2113            }
2114
2115        boolean shouldExit = false;
2116
2117        while(!shouldExit)
2118            {
2119            // check whether there is at least one client running
2120            // otherwise the server might continue functioning just because the last client crashed or finished connection
2121            shouldExit = true;
2122            for( int x = 0 ; x < dataOut.length ; x++ )
2123                if( clientRunning[x] )
2124                    {
2125                    shouldExit = false;
2126                    break;
2127                    }
2128            if( shouldExit )
2129                break;
2130
2131            // sleep a while
2132            try
2133                {
2134                Thread.sleep(SLEEP_TIME);
2135                }
2136            catch( InterruptedException e )
2137                {
2138                }
2139
2140            String ww;
2141
2142            for(int x=0;x<dataOut.length;x++)
2143                {
2144                if (clientRunning[x])
2145                    {
2146
2147                    // initialize ww
2148                    ww = "";
2149
2150                    // check to see if he's still up, and if he's
2151                    // sent us a "I found it" signal
2152                    try
2153                        {
2154                        ww = dataIn[x].readUTF().trim();
2155                        }
2156                    catch( InterruptedIOException e )
2157                        {
2158                        // means that it run out of time and got no message,
2159                        // so it should just continue with the other sockets
2160                        continue;
2161                        }
2162                    catch( IOException e )
2163                        {
2164                        // other errors while reading
2165                        state.output.message("Server: Island " + island_ids[x] + " dropped connection");
2166                        clientRunning[x] = false;
2167                        continue;
2168                        }
2169                    catch( NullPointerException e )
2170                        {
2171                        // other errors while reading
2172                        state.output.message("Server: Island " + island_ids[x] + " dropped connection");
2173                        clientRunning[x] = false;
2174                        try
2175                            {
2176                            dataIn[x].close();
2177                            dataOut[x].close();
2178                            con[x].close();
2179                            }
2180                        catch( IOException f )
2181                            {
2182                            }
2183                        continue;
2184                        }
2185
2186                    if ( ww == null )  // the connection has been broken
2187                        {
2188                        state.output.message("Server: Island " + island_ids[x] + " dropped connection");
2189                        clientRunning[x] = false;
2190                        try
2191                            {
2192                            dataIn[x].close();
2193                            dataOut[x].close();
2194                            con[x].close();
2195                            }
2196                        catch( IOException e )
2197                            {
2198                            }
2199                        }
2200                    else if( ww.equals( FOUND ) ) // he found it!
2201                        {
2202                        // inform everyone that they need to shut down --
2203                        // we do not need to wrap
2204                        // our println statements in anything, they just
2205                        // return even if the client has broken the connection
2206                        for(int y=0;y<dataOut.length;y++)
2207                            {
2208                            if (clientRunning[y])
2209                                {
2210                                try
2211                                    {
2212                                    dataOut[y].writeUTF(GOODBYE);
2213                                    dataOut[y].close();
2214                                    dataIn[y].close();
2215                                    con[y].close();
2216                                    }
2217                                catch( IOException e )
2218                                    {
2219                                    }
2220                                }
2221                            }
2222                        // now we can just get out of all this and
2223                        // quit the thread
2224                        shouldExit=true;
2225                        break;
2226                        }
2227                    else if( ww.equals( SYNC ) )
2228                        {
2229                        who_is_synchronized[x] = true;
2230
2231                        boolean complete_synchronization = true;
2232
2233                        for( int y = 0 ; y < numIslands ; y++ )
2234                            complete_synchronization = complete_synchronization &&
2235                                ( ( ! clientRunning[y] ) || who_is_synchronized[y] );
2236
2237                        // if the number of total running islands is smaller than the
2238                        // number of islands that ask for synchronization, let them continue
2239                        // running
2240                        if( complete_synchronization )
2241                            {
2242
2243                            for( int y = 0 ; y < numIslands ; y++ )
2244                                {
2245                                // send the okay message (the client can continue executing)
2246                                if( clientRunning[y] )
2247                                    try
2248                                        {
2249                                        dataOut[y].writeUTF( OKAY );
2250                                        dataOut[y].flush();
2251                                        }
2252                                    catch( IOException e ) {}
2253                                // reset the who_is_synchronized variable
2254                                who_is_synchronized[y] = false;
2255                                }
2256                            }
2257
2258                        }
2259                    }
2260                }
2261            }
2262        state.output.message( "Server Exiting" );   
2263        }
2264
2265    /** Here we spawn off the thread on ourselves */
2266    public Thread spawnThread()
2267        {
2268        Thread thread = new Thread( this );
2269        thread.start();
2270        return thread;
2271        }
2272
2273    }
2274
Note: See TracBrowser for help on using the repository browser.