/* Copyright 2006 by Sean Luke and George Mason University Licensed under the Academic Free License version 3.0 See the file "LICENSE" for more information */ package ec.exchange; import java.util.*; import java.io.*; import java.net.*; import ec.*; import ec.util.*; /* * IslandExchange.java * * Created Sat Feb 10 13:44:11 EST 2001 * By: Liviu Panait and Sean Luke */ /** * IslandExchange is an Exchanger which * implements a simple but quite functional asynchronous * island model for doing massive parallel distribution of evolution across * beowulf clusters. One of its really nice features is that because everything * is in Java, your cluster can have mixed platforms in it (MacOS, Unix, * Windoze, whatever you like). You can also have multiple processes running * on the same machine, as long as they're given different client ports. * IslandExchange operates over TCP/IP with Java sockets, and is compatible * with checkpointing. * *

IslandExchange uses an arbitrary graph topology for migrating individuals * from island (EC process) to island over the network. There are a few * restrictions for simplicity, however:

*

Every island is a client. Additionally one island is designated * a server. Note that, just like in the Hair Club for Men, the server * is also a client. The purpose of the server is to synchronize the clients * so that they all get set up properly and hook up to each other, then to * send them small signal messages (like informing them that another client has * discovered the ideal individual), and help them gracefully shut down. Other * than these few signals which are routed through the server to the clients, * all other information -- namely the migrants themselves -- are sent directly * from client to client in a peer-to-peer fashion. * *

The topology of the network is stored solely in the server's parameter * database. When the clients fire up, they first set up "Mailboxes" (where * immigrants from other clients will appear), then they go to the server * and ask it who they should connect to to send migrants. The server tells * them, and then they then hook up. When a client has finished hooking up, it * reports this to the server. After everyone has hooked up, the server tells * the clients to begin evolution, and they're off and running. * *

Islands send emigrants to other islands by copying good individuals * (selected with a SelectionMethod) and sending the good individuals to * the mailboxes of receiving clients. Once an individual has been received, * it is considered to be unevaluated by the receiving island, even though * it had been previously evaluated by the sending island. * *

The IslandExchange model is typically asynchronous because migrants may * appear in your mailbox at any time; islands do not wait for each other * to complete the next generation. This is a more efficient usage of network * bandwidth. When an island completes its breeding, it looks inside its * mailbox for new migrants. It then replaces some of its newly-bred * individuals (chosen entirely at random) * with the migrants (we could have increased the population size so we didn't * waste that breeding time, but we were lazy). It then flushes the mailbox, * which patiently sits waiting for more individuals. * *

Clients may also be given different start times and modulos for * migrating. For example, client A might be told that he begins sending emigrants * only after generation 6, and then sends emigrants on every 4 generations beyond * that. The purpose for the start times and modulos is so that not every client * sends emigrants at the same time; this also makes better use of network bandwidth. * *

When a client goes down, the other clients deal with it gracefully; they * simply stop trying to send to it. But if the server goes down, the clients * do not continue operation; they will shut themselves down. This means that in * general you can shut down an entire island model network just by killing the * server process. However, if the server quits because it runs out of generations, * it will wait for the clients to all quit before it finally stops. * *

IslandExchange works correctly with checkpointing. If you restart from * a checkpoint, the IslandExchange will start up the clients and servers again * and reconnect. Processes can start from different checkpoints, of course. * However, realize that if you restart from a checkpoint, some migrants * may have been lost in transit from island to island. That's the nature of * networking without heavy-duty transaction management! This means that we * cannot guarantee that restarting from checkpoint will yield the same results * as the first run yielded. * *

Islands are not described in the topology parameters by their * IP addresses; instead, they are described by "ids", strings which uniquely * identify each island. For example, "gilligans-island" might be an id. :-) * This allows you to move your topology to different IP addresses without having * to change all your parameter files! You can even move your topology to totally * different machines, and restart from previous checkpoints, and everything * should still work correctly. * *

There are times, especially to experiment with dynamics, that you need * a synchronous island model. If you specify synchronicity, the server's * stated modulo and offset override any modulii or offsets specified by clients. * Everyone will use the server's modulo and offset. This means that everyone * will trade individuals on the same generation. Additionally, clients will wait * until everyone else has traded, before they are permitted to continue evolving. * This has the effect of locking all the clients together generation-wise; no * clients can run faster than any other clients. * *

One last item: normally in this model, the server is also a client. But * if for some reason you need the server to be a process all by itself, without * creating a client as well, you can do that. You spawn such a server differently * than the main execution of ECJ. To spawn a server on a given server params file * (let's say it's server.params) but NOT spawn a client, you do:

 java ec.exchange.IslandExchange -file server.params
 
*

...this sets up a special process which just spawns a server, and doesn't do * all the setup of an evolutionary run. Of course as usual, for each of the * clients, you'll run java ec.Evolve ... instead.

Parameters

Note: some of these parameters are only necessary for creating clients. Others are necessary for creating the server.
base.chatty
boolean, default = true
Should we be verbose or silent about our exchanges?
base.select
classname, inherits and != ec.SelectionMethod
client: The selection method used for picking migrants to emigrate to other islands
base.select-to-die
classname, inherits and != ec.SelectionMethod, default is ec.select.RandomSelection
client: The selection method used for picking individuals to be replaced by incoming migrants. IMPORTANT Note. This selection method must not pick an individual based on fitness. The selection method will be called just after breeding but before evaluation; many individuals will not have had a fitness assigned at that point. You might want to design a SelectionMethod other than RandomSelection, however, to do things like not picking elites to die.
base.server-addr
String
client: The IP address of the server
base.server-port
int >= 1
client: The port number of the server
base.client-port
int >= 1
client: The port number of the client (where it will receive migrants) -- this should be different from the server port.
base.id
String
client: The "name" the client is giving itself. Each client should have a unique name. For example, "gilligans-island".
base.compressed
bool = true (default) or false
client: Whether the communication with other islands should be compressed or not. Compressing uses more CPU, but it may also significantly reduce communication.
base.i-am-server
bool = true or false (default)
client: Is this client also the server? If so, it'll read the server parameters and set up a server as well.
base.sync
bool = true or false (default)
server: Are we doing a synchronous island model? If so, the server's modulo and offset override any client's stated modulo and offset.
base.num-islands
int >= 1
server: The number of islands in the topology.
base.island.n.id
String
server: The ID of island #n in the topology.
base.island.n.num-mig
int >= 1
server: The number of islands that island #n sends emigrants to.
base.island.n.mig.m
int >= 1
server: The ID of island #m that island #n sends emigrants to.
base.island.n.size
int >= 1
server: The number of emigrants (per subpopulation) that island #n sends to other islands. If not set, uses the default parameter below.
base.size
int >= 1
server: Default parameter: number of emigrants (per subpopulation) that a given island sends to other islands.
base.island.n.start
int >= 0
server: The generation when island #n begins sending emigrants. If not set, uses the default parameter below.
base.start
bool = true or false (default)
server: Default parameter: the generation when an island begins sending emigrants.
base.island.n.mod
int >= 1
server: The number of generations that island #n waits between sending emigrants. If not set, uses the default parameter below.
base.mod
bool = true or false (default)
server: Default parameter: The number of generations an island waits between sending emigrants.
base.island.n.mailbox-capacity
int >= 1
server: The maximum size (per subpopulation) of the mailbox for island #n. If not set, uses the default parameter below.
base.mailbox-capacity
int >= 1
server: Default parameter: the maximum size (per subpopulation) of the mailbox for a given island.

Parameter bases
base.select selection method for the client's migrants
* @author Liviu Panait and Sean Luke * @version 2.0 */ public class IslandExchange extends Exchanger { //// Client information /** The server address */ public static final String P_SERVER_ADDRESS = "server-addr"; /** The server port */ public static final String P_SERVER_PORT = IslandExchangeServer.P_SERVER_PORT; /** The client port */ public static final String P_CLIENT_PORT = "client-port"; /** Whether the server is also on this island */ public static final String P_IS_SERVER = "i-am-server"; /** The id of the island */ public static final String P_OWN_ID = "id"; /** Whether the communication is compressed or not */ public static final String P_COMPRESSED_COMMUNICATION = "compressed"; /** The selection method for sending individuals to other islands */ public static final String P_SELECT_METHOD = "select"; /** The selection method for deciding individuals to be replaced by immigrants */ public static final String P_SELECT_TO_DIE_METHOD = "select-to-die"; /** How long we sleep in between attempts to connect or look for signals */ public static final int SLEEP_TIME = 100; /** How long we sleep between checking for FOUND messages */ public static final int FOUND_TIMEOUT = 100; /** Whether or not we're chatty */ public static final String P_CHATTY = "chatty"; /** Okay signal */ public static final String OKAY = "okay"; /** Synchronize signal */ public static final String SYNC = "sync"; /** Found signal */ public static final String FOUND = "found"; /** Our chattiness */ boolean chatty; /** The thread of the server (is different than null only for the island with the server) */ public Thread serverThread; /** My parameter base -- I need to keep this in order to help the server reinitialize contacts */ // SERIALIZE public Parameter base; /** The address of the server */ // SERIALIZE public String serverAddress; /** The port of the server */ // SERIALIZE public int serverPort; /** The port of the client mailbox */ // SERIALIZE public int clientPort; /** whether the server should be running on the current island or not */ // SERIALIZE public boolean iAmServer; /** the id of the current island */ // SERIALIZE public String ownId; /** whether the communication is compressed or not */ // SERIALIZE public boolean compressedCommunication; /** the selection method for emigrants */ // SERIALIZE public SelectionMethod immigrantsSelectionMethod; /** the selection method for individuals to be replaced by immigrants */ // SERIALIZE public SelectionMethod indsToDieSelectionMethod; // the mailbox of the current client (exchanger) IslandExchangeMailbox mailbox; // the thread of the mailbox Thread mailboxThread; /// Communication with the islands where individuals have to be sent // Number of islands to send individuals to int number_of_destination_islands; /** synchronous or asynchronous communication */ public boolean synchronous; /** how often to send individuals */ public int modulo; /** after how many generations to start sending individuals */ public int offset; /** how many individuals to send each time */ public int size; // Sockets to the destination islands Socket[] outSockets; // DataOutputStream to the destination islands DataOutputStream[] outWriters; // so we can print out nice names for our outgoing connections String[] outgoingIds; // information on the availability of the different islands boolean[] running; // the capacity of the mailboxes // int mailboxCapacity; // the socket to the server Socket serverSocket; // reader and writer to the serverSocket DataOutputStream toServer; DataInputStream fromServer; // am I ONLY a server? static boolean just_server; public static void main(String[] args) throws InterruptedException { just_server = true; int x; ParameterDatabase parameters=null; Output output; boolean store; // The following is a little chunk of the ec.Evolve code sufficient // to get IslandExchange up and running all by itself. System.err.println("Island Exchange Server\n" + "Used in ECJ by Sean Luke\n"); // 0. find the parameter database for(x=0;x= offset ) && ( ( modulo == 0 ) || ( ( ( state.generation - offset ) % modulo ) == 0 ) ) ) { // send the individuals!!!! // for each of the islands where we have to send individuals for( int x = 0 ; x < number_of_destination_islands ; x++ ) try { // check whether the communication is ok with the current island if( running[x] ) { if (chatty) state.output.message( "Sending " + size + " emigrants to island " + outgoingIds[x] ); // for each of the subpopulations for( int subpop = 0 ; subpop < state.population.subpops.length ; subpop++ ) { // send the subpopulation outWriters[x].writeInt( subpop ); // send the number of individuals to be sent // it's better to send this information too, such that islands can (potentially) // send different numbers of individuals outWriters[x].writeInt( size ); // select "size" individuals and send then to the destination as emigrants immigrantsSelectionMethod.prepareToProduce( state, subpop, 0 ); for( int y = 0 ; y < size ; y++ ) // send all necesary individuals { int index = immigrantsSelectionMethod.produce( subpop, state, 0 ); state.population.subpops[subpop].individuals[index]. writeIndividual( state, outWriters[x] ); outWriters[x].flush(); // just in case the individuals didn't do a println } immigrantsSelectionMethod.finishProducing( state, subpop, 0 ); // end the selection step } } } catch( IOException e ) { running[x] = false; } } return state.population; } public Population postBreedingExchangePopulation(EvolutionState state) { // receiving individuals from other islands // same situation here of course. // if synchronous communication, synchronize with the mailbox // if( ( state.generation >= offset ) && synchronous && // ( ( modulo == 0 ) || ( ( ( state.generation - offset ) % modulo ) == 0 ) ) ) if (synchronous) { state.output.message( "Waiting for synchronization...." ); // set the socket to the server to blocking try { serverSocket.setSoTimeout( 0 ); } catch( SocketException e ) { state.output.fatal( "Could not set the connection to the server to blocking." ); } try { // send the sync message toServer.writeUTF( SYNC ); toServer.flush(); // wait for the okay message String temp = fromServer.readUTF(); if( temp.equals( IslandExchangeServer.GOODBYE ) ) { alreadyReadGoodBye = true; } } catch( IOException e ) { state.output.fatal( "Could not communicate to the server. Exiting...." ); } // set the socket to the server to non-blocking try { serverSocket.setSoTimeout( FOUND_TIMEOUT ); } catch( SocketException e ) { state.output.fatal( "Could not set the connection to the server to non-blocking." ); } //state.output.message( "Synchronized. Reading individuals...." ); } // synchronize, because immigrants is also accessed by the mailbox thread synchronized( mailbox.immigrants ) { for( int x = 0 ; x < mailbox.immigrants.length ; x++ ) { if( mailbox.nImmigrants[x] > 0 ) { if (chatty) state.output.message( "Immigrating " + mailbox.nImmigrants[x] + " individuals from mailbox for subpopulation " + x ); boolean[] selected = new boolean[ state.population.subpops[x].individuals.length ]; int[] indeces = new int[ mailbox.nImmigrants[x] ]; for( int i = 0 ; i < selected.length ; i++ ) selected[i] = false; indsToDieSelectionMethod.prepareToProduce( state, x, 0 ); for( int i = 0 ; i < mailbox.nImmigrants[x] ; i++ ) { do { indeces[i] = indsToDieSelectionMethod.produce( x, state, 0 ); } while( selected[indeces[i]] ); selected[indeces[i]] = true; } indsToDieSelectionMethod.finishProducing( state, x, 0 ); // there is no need to check for the differences in size: the mailbox.immigrants, // state.population.subpops and the mailbox.person2die should have the same size for( int y = 0 ; y < mailbox.nImmigrants[x] ; y++ ) { // read the individual state.population.subpops[x]. individuals[ indeces[y] ] = mailbox.immigrants[x][y]; // reset the evaluated flag (the individuals are not evaluated in the current island */ state.population.subpops[x]. individuals[ indeces[y] ].evaluated = false; } // reset the number of immigrants in the mailbox for the current subpopulation // this doesn't need another synchronization, because the thread is already synchronized mailbox.nImmigrants[x] = 0; } } } return state.population; } // if the GOODBYE message sent by the server gets read in the wrong place, this // variable is set to true boolean alreadyReadGoodBye = false; // keeps the message to be returned next time on runComplete String message; /** Called after preBreedingExchangePopulation(...) to evaluate whether or not the exchanger wishes the run to shut down (with ec.EvolutionState.R_FAILURE). This would happen for two reasons. First, another process might have found an ideal individual and the global run is now over. Second, some network or operating system error may have occurred and the system needs to be shut down gracefully. This function does not return a String as soon as it wants to exit (another island found the perfect individual, or couldn't connect to the server). Instead, it sets a flag, called message, to remember next time to exit. This is due to a need for a graceful shutdown, where checkpoints are working properly and save all needed information. */ public String runComplete(EvolutionState state) { // first test the flag, and exit if it was previously set if( message != null ) // if an error occured earlier { return message; } // check whether the server sent a FOUND message. // if it did, check whether it should exit or not try { // read a line. if it is successful, it means that the server sent a FOUND message // (this is the only message the server sends right now), and it should set the flag // for exiting next time when in this procedure String ww = fromServer.readUTF(); if( ww != null || alreadyReadGoodBye ) // FOUND message sent from the server { // we should exit because some other individual has // found the perfect fit individual if( state.quitOnRunComplete ) { message = "Exit: Another island found the perfect individual."; state.output.message( "Another island found the perfect individual. Exiting...." ); toServer.writeUTF( OKAY ); toServer.flush(); } else { state.output.message( "Another island found the perfect individual." ); } } else // ( ww == null ) // the connection with the server was closed { // we should exit, because we cannot communicate with the // server anyway message = "Exit: Could not communicate with the server."; state.output.warning( "Could not communicate with the server. Exiting...." ); } } catch( InterruptedIOException e ) { // here don't do anything: it reaches this point when the server is on, but nobody found // the perfect individual. in this case, it should just return null, so that the // execution continues } catch( IOException e ) { // some weird error // report it in a warning state.output.warning( "Some weird IO exception reported by the system in the IslandExchange::runComplete function. Is it possible that the server has crashed?" ); } return null; } /** Closes contacts with other processes, if that's what you're doing. Called at the end of an evolutionary run. result is either ec.EvolutionState.R_SUCCESS or ec.EvolutionState.R_FAILURE, indicating whether or not an ideal individual was found. */ public void closeContacts(EvolutionState state, int result) { // if the run was successful (perfect individual was found) // then send a message to the server that it was found if( result == EvolutionState.R_SUCCESS ) { try { toServer.writeUTF( FOUND ); toServer.flush(); } catch( IOException e ) {} } // close socket to server try { serverSocket.close(); } catch( IOException e ) { } state.output.message( "Shutting down the mailbox" ); // close the mailbox and wait for the thread to terminate mailbox.shutDown(); mailboxThread.interrupt(); try { mailboxThread.join(); } catch( InterruptedException e ) { } state.output.message( "Mailbox shut down" ); // close out-going sockets for( int x = 0 ; x < number_of_destination_islands ; x++ ) { // catch each exception apart (don't take into consideration the running variables) try { if( running[x] ) outSockets[x].close(); } catch( IOException e ) { } } // if the island also hosts the server, wait till it terminates if( iAmServer ) { state.output.message( "Shutting down the server" ); try { serverThread.join(); } catch( InterruptedException e ) { } state.output.message( "Server shut down" ); } } /* (non-Javadoc) * @see ec.EvolutionState#finish(int) */ public void finish(int result) { } /* (non-Javadoc) * @see ec.EvolutionState#startFromCheckpoint() */ public void startFromCheckpoint() { } /* (non-Javadoc) * @see ec.EvolutionState#startFresh() */ public void startFresh() { } /* (non-Javadoc) * @see ec.EvolutionState#evolve() */ public int evolve() throws InternalError { return 0; } } /** Class that contains all the mailbox functionality. It is supposed to wait on a new thread for incoming immigrants from other islands (it will receive in the constructor the number of islands that will send messages to the current island). Waiting on sockets is non-blocking, such that the order in which the islands send messages is unimportant. When immigrants are received, they are stored in a special buffer called immigrants. The storage is managed in a queue-like fashion, such that when the storage is full, the first immigrants that came are erased (hopefully the storage will be emptied fast enough such that this case doesn't appear too often). All accesses to the "immigrants" variable (also applies to nImmigrants) should be done only in the presence of synchronization, because there might be other threads using them too. The number of immigrants for each of the subpopulations (nImmigrants[x]) is between 0 and the size of the queue structure (received as a parameter in the constructor). */ class IslandExchangeMailbox implements Runnable { /** How much to wait before starting checking for immigrants */ public static final int SLEEP_BETWEEN_CHECKING_FOR_IMMIGRANTS = 1000; /** How much to wait on a socket for a message, before starting to wait on another socket */ public static final int CHECK_TIMEOUT = 1000; /** How much to wait while synchronizing */ public static final int SYNCHRONIZATION_SLEEP = 100; /** storage for the incoming immigrants: 2 sizes: the subpopulation and the index of the emigrant */ public Individual[][] immigrants; /** the number of immigrants in the storage for each of the subpopulations */ public int[] nImmigrants; // auxiliary variables to manage the queue storages int[] person2die; // the socket where it listens for incomming messages ServerSocket serverSocket; // the number of islands that send messages to the current mailbox int n_incoming; // whether the information on sockets is compressed or not (receives this information in the constructor) boolean compressedCommunication; // the sockets and readers for receiving incoming messages Socket[] inSockets; DataInputStream[] dataInput; String[] incomingIds; // so we can print out nice names for our incoming connections // the state of the islands it is communicating to boolean[] running; // the state (to display messages mainly) EvolutionState state; // synchronization variable Boolean syncVar; // My ID String myId; boolean chatty; /** Public constructor used to initialize most of the parameters of the mailbox: state_p : the EvolutionState, used mainly for displaying messages port : the port used to listen for incoming messages n_incoming_p : the number of islands that will send messages to the current island how_many : how many immigrants to manage in the queue-like storage for each of the subpopulations */ public IslandExchangeMailbox( final EvolutionState state_p, int port, int n_incoming_p, int how_many, String _myId, boolean chatty, boolean _compressedCommunication ) { myId = _myId; compressedCommunication = _compressedCommunication; this.chatty = chatty; // initialize public variables from the parameters of the constructor state = state_p; n_incoming = n_incoming_p; Parameter p_numsubpops = new Parameter( ec.Initializer.P_POP ).push( ec.Population.P_SIZE ); int numsubpops = state.parameters.getInt(p_numsubpops,null,1); if ( numsubpops == 0 ) { // later on, Population will complain with this fatally, so don't // exit here, just deal with it and assume that you'll soon be shut // down } // allocate the storages: // - immigrants = storage for the immigrants that will come to the current island // - first dimension: the number of subpopulations // - second dimension: how many immigrants to store for each of the subpopulations. // - person2die = where to insert next in the queue structure "immigrants" // - nImmigrants = how many immigrants there are in the storage "immigrants" for each of the subpopulations immigrants = new Individual[ numsubpops ][ how_many ]; person2die = new int[ numsubpops ]; nImmigrants = new int[ numsubpops ]; // set the synchronization variable to false (it will be set to true to signal exiting the waiting loop) syncVar = Boolean.FALSE; // create the ServerSocket to listen to incoming messages try { serverSocket = new ServerSocket( port, n_incoming ); } catch( IOException e ) { state.output.fatal( "Could not start mailbox for incoming messages. Perhaps the port (" + port + ") is bad?\n...or someone else already has it?"); } // allocate the sockets and the readers (will be used in the near future) inSockets = new Socket[ n_incoming ]; dataInput = new DataInputStream[ n_incoming ]; incomingIds = new String[ n_incoming ]; // allocate the status of the different readers running = new boolean[ n_incoming ]; } /** The main functionality of the mailbox: waiting for incoming messages and dealing with the incoming immigrants */ public void run() { // wait for the "n_incoming" incoming connections from different islands, and initialize // the sockets and the readers to communicate with (receive messages from) them. All the // sockets are set to be non-blocking, such that they can be checked alternatively without // waiting for messages on a particular one. for( int x = 0 ; x < n_incoming ; x++ ) { try { inSockets[x] = serverSocket.accept(); DataOutputStream dataOutput; if( compressedCommunication ) { /* dataInput[x] = new DataInputStream(new CompressingInputStream(inSockets[x].getInputStream())); dataOutput = new DataOutputStream(new CompressingOutputStream(inSockets[x].getOutputStream())); */ OutputStream compressedo = Output.makeCompressingOutputStream(inSockets[x].getOutputStream()); InputStream compressedi = Output.makeCompressingInputStream(inSockets[x].getInputStream()); if (compressedi == null || compressedo == null) state.output.fatal( "You do not appear to have JZLib installed on your system, and so may must have compression turned off for IslandExchange. "+ "To get JZLib, download from the ECJ website or from http://www.jcraft.com/jzlib/"); dataInput[x] = new DataInputStream(compressedi); dataOutput = new DataOutputStream(compressedo); } else { dataInput[x] = new DataInputStream(inSockets[x].getInputStream()); dataOutput = new DataOutputStream(inSockets[x].getOutputStream()); } // send my id, then read an id dataOutput.writeUTF(myId); dataOutput.flush(); incomingIds[x] = dataInput[x].readUTF().trim(); state.output.message( "Island " + incomingIds[x] + " connected to my mailbox" ); // set the socket to non-blocking inSockets[x].setSoTimeout( CHECK_TIMEOUT ); running[x] = true; } catch (IOException e) { running[x] = false; state.output.fatal( "An exception was generated while creating communication structures for island " + x + ". Here it is: " + e ); } } state.output.message( "All islands have connected to my client." ); // variable used for deciding (based on the synchronized variable "syncVar") when to exit boolean shouldExit = false; // enter the main loop do { // wait some (do not check all the time, cause it would be a waste of time and computational resources) try { Thread.sleep( SLEEP_BETWEEN_CHECKING_FOR_IMMIGRANTS ); } catch( InterruptedException e ) { } // for each of the connections established with the islands for( int x = 0 ; x < n_incoming ; x++ ) { if( running[x] ) { try { // enter an infinite loop to receive all the messages form the "x"s island // it will exit the loop as soon as there are no more messages coming from // the "x"s island (non-blocking socket) while( true ) { // read the subpopulation where the immigrants need to be inserted. In case there // is no incoming message, an exception will be generated and the infinite loop // will be exited (the mailbox will search the next socket (communication link) // for incoming messages int subpop = dataInput[x].readInt(); // if it gets to this point, it means that a number of individuals will be sent // it is the time to set up the receiving storages // set the socket to blocking for reading the individuals try { inSockets[x].setSoTimeout( 0 ); } catch( SocketException e ) { state.output.warning( "Could not set the socket to blocking while receiving individuals in the mailbox." ); } // how many individuals will be received in the current dialogue? int how_many_to_come = dataInput[x].readInt(); if (chatty) state.output.message( "Receiving " + how_many_to_come + " immigrants for subpopulation " + subpop + " from island " + incomingIds[x]); // synchronize on the immigrants (such that other threads cannot access it during its // being modified) synchronized( immigrants ) { // in case the immigrants buffer was emptied, the person2die is not reset (it is not public) // so we have to reset it now if( nImmigrants[subpop] == 0 ) // if it was reset person2die[subpop] = 0; // reset the person2die[x] // loop in order to receive all the incoming individuals in the current dialogue for( int ind = 0 ; ind < how_many_to_come ; ind++ ) { // read the individual try { // read the emigrant in the storage immigrants[subpop][person2die[subpop]] = state.population.subpops[subpop].species. newIndividual( state, dataInput[x] ); //state.output.message( "Individual received." ); // increase the queue index if( person2die[subpop] == immigrants[subpop].length - 1 ) person2die[subpop] = 0; else person2die[subpop]++; // can increment it without synchronization, as we do synchronization on the immigrants if( nImmigrants[subpop] < immigrants[subpop].length ) nImmigrants[subpop]++; } catch( IOException e ) { // i hope it will also never happen :) state.output.message( "IO exception while communicating with an island" ); running[x] = false; continue; } catch( NumberFormatException e ) { // it happens when the socket is closed and cannot be doing any reading state.output.message( "IO exception while communicating with an island" ); running[x] = false; continue; } } } // end synchronized block on "immigrants" // set the socket to non-blocking (after current set of immigrants is over) try { inSockets[x].setSoTimeout( CHECK_TIMEOUT ); } catch( SocketException e ) { state.output.warning( "Could not set the socket to non-blocking while receiving individuals in the mailbox." ); } } } catch( InterruptedIOException e ) { // here everything is ok // just that there were no messages } catch( IOException e ) { // now this is not nice // report the error so that the programmer can fix it (hopefully) state.output.message( "IO exception while communicating with an island" ); running[x] = false; } catch( NumberFormatException e ) { // error received when some sockets break state.output.message( "Socket closed" ); running[x] = false; } } } // again with synchronization, try to access the syncVar to check whether the mailbox needs to finish // running (maybe some other island already found the perfect individual, or the resources of the current // run have been wasted) synchronized( syncVar ) { // get the value of the syncVar. If it is true, we should exit. shouldExit = syncVar.booleanValue(); } } while( !shouldExit ); // close the sockets (don't care about the running, but deal with exceptions) try { // close the ServerSocket serverSocket.close(); } catch( IOException e ) { } for( int x = 0 ; x < n_incoming ; x++ ) { try { // close the sockets to communicate (receive messages) with the other islands inSockets[x].close(); } catch( IOException e ) { continue; } } } /** Method used to shutdown the mailbox. What it does is that it closes all communication links (sockets) and sets the syncVar to true (such that if the run() method is run on another thread, it will exit the loop and terminate. */ public void shutDown() { // set the syncVar to true (such that if another thread executes this.run(), it will exit the main loop // (hopefully, the information from the server was correct synchronized( syncVar ) { syncVar = Boolean.TRUE; } } /** Return the port of the ServerSocket (where the islands where the other islands should connect in order to send their emigrants). */ public int getPort() { // return the port of the ServerSocket return serverSocket.getLocalPort(); } } /** The IslandExchangeServer is the class that manages the main server that coordinates all the islands. The class implements Runnable (for running on a different thread). */ class IslandExchangeServer implements Runnable { /* The server-specific parameters look roughly like this: exch.server-port = 8021 exch.num-islands = 3 exch.island.0.id = SurvivorIsland exch.island.0.num-mig = 1 exch.island.0.mig.0 = GilligansIsland exch.island.0.size = 5 exch.island.0.mod = 2 exch.island.0.start = 4 exch.island.1.id = GilligansIsland exch.island.1.mod = 1 exch.island.1.start = 2 exch.island.1.size = 10 exch.island.1.num-mig = 2 exch.island.1.mig.0 = SurvivorIsland exch.island.1.mig.1 = GilligansIsland exch.island.2.id = BermudaIsland exch.island.2.mod = 2 ... */ //// Server information /** The server port */ public static final String P_SERVER_PORT = "server-port"; /** The number of islands */ public static final String P_NUM_ISLANDS = "num-islands"; /** The parameter for the island's information */ public static final String P_ISLAND = "island"; /** The id */ public static final String P_ID = "id"; // The number of islands that will send emigrants to the current island public static final String P_NUM_INCOMING_MIGRATING_COUNTRIES = "num-incoming-mig"; /** The number of islands where emigrants will be sent */ public static final String P_NUM_MIGRATING_COUNTRIES = "num-mig"; /** the parameter for migrating islands' ids */ public static final String P_MIGRATING_ISLAND = "mig"; /** The size of the mailbox (for each of the subpopulations) */ public static final String P_MAILBOX_CAPACITY = "mailbox-capacity"; /** The parameter for the modulo (how many generations should pass between consecutive sendings of individuals */ public static final String P_MODULO = "mod"; /** The number of emigrants to be sent */ public static final String P_SIZE = "size"; /** How many generations to pass at the beginning of the evolution before the first emigration from the current island */ public static final String P_OFFSET = "start"; /** Whether the execution should be synchronous or asynchronous */ public static final String P_SYNCHRONOUS = "sync"; /** The run message to be sent to the clients */ public static final String RUN = "run"; /** How much to wait for the found message (on a non-blocking socket) */ public static final int FOUND_TIMEOUT = 100; /** How much to sleep between checking for a FOUND message */ public static final int SLEEP_TIME = 100; /** The final message to be sent to all islands when an individual has been found */ public static final String GOODBYE = "bye-bye"; /** The found message */ public static final String FOUND = IslandExchange.FOUND; /** The okay message */ public static final String OKAY = IslandExchange.OKAY; /** The synchronize message */ public static final String SYNC = IslandExchange.SYNC; /** A class indicating all the information the server knows about a given island, including its mod, size, offset, and all the migrating islands it hooks to, etc. */ public class IslandExchangeIslandInfo { /** how often to send individuals */ public int modulo; /** the mailbox capacity (for each of the subpopulations) */ public int mailbox_capacity; /** what generation to start sending individuals */ public int offset; // how many individuals to send public int size; // to how many islands to send individuals public int num_mig; // the ids of the contries to send individuals to public String[] migrating_island_ids; // how many islands will send individuals to the mailbox public int num_incoming; // also later filled in: // the address of the mailbox where to receive information public String address; // the port of the mailbox public int port; } // The number of islands in the topology int numIslands; // The port of the server int serverPort; // the server's socket ServerSocket serverSocket; // Hashtable for faster lookup of information regarding islands Hashtable info; // Hashtable to count how many islands send individuals to each of the islands Hashtable info_immigrants; EvolutionState state; // Index of island ids sorted by parameter file String[] island_ids; // Index of island ids sorted by order of connection String[] connected_island_ids; // variables used if the execution is synchronous // int global_modulo, global_offset; boolean synchronous; // how many individuals asked to be synchronized (when it reaches the total number of // running clients, the server resets this variable and allows everybody to continue running) boolean[] who_is_synchronized; /** This setup should get called from the IslandExchange setup method. */ public void setupServerFromDatabase(final EvolutionState state_p, final Parameter base) { // Store the evolution state for further use in other functions ( ie. run ) state = state_p; // Don't bother with getting the default base -- we're a singleton! Parameter p; // get the number of islands p = base.push( P_NUM_ISLANDS ); numIslands = state.parameters.getInt( p, null, 1 ); if( numIslands == 0 ) state.output.fatal( "The number of islands must be >0.", p ); // get the port of the server p = base.push( P_SERVER_PORT ); serverPort = state.parameters.getInt( p, null, 1 ); if( serverPort == 0 ) state.output.fatal( "The server port must be >0.", p ); // information on the islands = hashtable of ID and socket information info = new Hashtable( numIslands ); // initialize the hash table to count how many islands send individuals // to each of the islands info_immigrants = new Hashtable( numIslands ); // allocate the ids sorted by parameter file island_ids = new String[ numIslands ]; // allocate the ids sorted by connection connected_island_ids = new String[ numIslands ] ; // check whether the execution is synchronous or asynchronous // if it is synchronous, there should be two parameters in the parameters file: // the global modulo and offset (such that the islands coordinate smoothly) p = base.push( P_SYNCHRONOUS ); // get the value of the synchronous parameter (default is false) synchronous = state.parameters.getBoolean( p, null, false ); // if synchronous, read the other two global parameters if( synchronous ) { state.output.message( "The communication will be synchronous." ); /* // get the global modulo p = base.push( P_MODULO ); global_modulo = state.parameters.getInt( p, null, 1 ); if( global_modulo == 0 ) state.output.fatal( "Parameter not found, or it has an incorrect value.", p ); // get the global offset p = base.push( P_OFFSET ); global_offset = state.parameters.getInt( p, null, 0 ); if( global_offset == -1 ) state.output.fatal( "Parameter not found, or it has an incorrect value.", p ); */ } else { state.output.message( "The communication will be asynchronous." ); } // get a new local base Parameter islandBase = base.push( P_ISLAND ); // load the island topology for( int x = 0 ; x < numIslands ; x++ ) { IslandExchangeIslandInfo ieii = new IslandExchangeIslandInfo(); Parameter localBase = islandBase.push( "" + x ); // get the id of the current island p = localBase.push( P_ID ); island_ids[x] = state.parameters.getStringWithDefault( p, null, "" ); if( island_ids[x].equals("") ) state.output.fatal( "Parameter not found.", p ); // get the mailbox capacity of the imigration from the current island p = localBase.push( P_MAILBOX_CAPACITY ); ieii.mailbox_capacity = state.parameters.getInt( p, base.push(P_MAILBOX_CAPACITY), 0 ); if( ieii.mailbox_capacity == -1 ) state.output.fatal( "Parameter not found, or it has an incorrect value.", p, base.push(P_MAILBOX_CAPACITY) ); // get the size of the imigration from the current island p = localBase.push( P_SIZE ); ieii.size = state.parameters.getInt( p, base.push(P_SIZE), 0 ); if( ieii.size == -1 ) state.output.fatal( "Parameter not found, or it has an incorrect value.", p, base.push(P_SIZE) ); // if synchronous execution, use the global modulo and offset /* if( synchronous ) { ieii.modulo = global_modulo; ieii.offset = global_offset; } else {*/ // get the modulo of the imigration from the current island p = localBase.push( P_MODULO ); ieii.modulo = state.parameters.getInt( p, base.push(P_MODULO), 1 ); if( ieii.modulo == 0 ) state.output.fatal( "Parameter not found, or it has an incorrect value.", p , base.push(P_MODULO)); // get the offset of the imigration from the current island p = localBase.push( P_OFFSET ); ieii.offset = state.parameters.getInt( p, base.push(P_OFFSET), 0 ); if( ieii.offset == -1 ) state.output.fatal( "Parameter not found, or it has an incorrect value.", p, base.push(P_OFFSET) ); /* } */ // mark as uninitialized ieii.port = -1; // insert the id in the hashset with the ids of the islands info.put( island_ids[x], ieii ); } // get the information on destination islands (with checking for consistency) for( int x = 0 ; x < numIslands ; x++ ) { IslandExchangeIslandInfo ieii = (IslandExchangeIslandInfo)info.get( island_ids[x] ); if( ieii == null ) { state.output.error( "Inexistent information for island " + island_ids[x] + " stored in the server's information database." ); continue; } Parameter localBase = islandBase.push( "" + x ); // get the number of islands where individuals should be sent p = localBase.push( P_NUM_MIGRATING_COUNTRIES ); ieii.num_mig = state.parameters.getInt( p, null, 0 ); if( ieii.num_mig == -1 ) state.output.fatal( "Parameter not found, or it has an incorrect value.", p ); // if there is at least 1 destination islands if( ieii.num_mig > 0 ) { // allocate the storage for ids ieii.migrating_island_ids = new String[ ieii.num_mig ]; // store a new base parameter Parameter ll; ll = localBase.push( P_MIGRATING_ISLAND ); // for each of the islands for( int y = 0 ; y < ieii.num_mig ; y++ ) { // read the id & check for errors ieii.migrating_island_ids[y] = state.parameters.getStringWithDefault( ll.push(""+y), null, null ); if( ieii.migrating_island_ids[y] == null ) state.output.fatal( "Parameter not found.", ll.push(""+y) ); else if( !info.containsKey( ieii.migrating_island_ids[y] ) ) state.output.fatal( "Unknown island.", ll.push(""+y) ); else { // insert this knowledge into the hashtable for counting how many islands // send individuals to each island Integer integer = (Integer)info_immigrants.get( ieii.migrating_island_ids[y] ); if( integer == null ) info_immigrants.put( ieii.migrating_island_ids[y], new Integer(1) ); else info_immigrants.put( ieii.migrating_island_ids[y], new Integer( integer.intValue() + 1 ) ); } } } // save the information back in the hash table // info.put( island_ids[x], ieii ); // unneccessary -- Sean } for( int x = 0 ; x < numIslands ; x++ ) { IslandExchangeIslandInfo ieii = (IslandExchangeIslandInfo)info.get( island_ids[x] ); if( ieii == null ) { state.output.fatal( "Inexistent information for island " + island_ids[x] + " stored in the server's information database." ); } Integer integer = (Integer)info_immigrants.get( island_ids[x] ); // if the information does not exist in the hasthable, // it means no islands send individuals there if( integer == null ) ieii.num_incoming = 0; else ieii.num_incoming = integer.intValue(); // save the information back in the hash table // info.put( island_ids[x], ieii ); // unneccessary -- Sean } // allocate and reset this variable to false who_is_synchronized = new boolean[ numIslands ]; for( int x = 0 ; x < numIslands ; x++ ) who_is_synchronized[x] = false; } /** The main function running in the thread */ public void run() { // sockets to communicate to each of the islands Socket[] con = new Socket[numIslands]; // readers and writters for communication with each island DataInputStream[] dataIn = new DataInputStream[numIslands]; DataOutputStream[] dataOut = new DataOutputStream[numIslands]; // whether each client is working (and communicating with the server) or not boolean[] clientRunning = new boolean[numIslands]; // initialize the running status of all clients for( int x = 0 ; x < numIslands ; x++ ) clientRunning[x] = true; try { // create a server serverSocket = new ServerSocket(serverPort,numIslands); } catch ( IOException e ) { state.output.fatal( "Error creating a socket on port " + serverPort ); } // for each of the islands for(int x=0;x= 0 ) { state.output.error( "Multiple islands are claiming the same ID (" + connected_island_ids[x] + ")" ); clientRunning[x] = false; continue; } // send the number of ids that will be send through the communication link dataOut[x].writeInt( ieii.num_incoming ); // send the capacity of the mailbox dataOut[x].writeInt( ieii.mailbox_capacity ); dataOut[x].flush(); // read the address and port of the island ieii.address = dataIn[x].readUTF().trim(); ieii.port = dataIn[x].readInt(); state.output.message( "" + x + ": Island " + connected_island_ids[x] + " has address " + ieii.address + " : " + ieii.port ); // re-insert the information in the hash table // info.put( id, ieii ); // unnecessary -- Sean } catch( IOException e ) { state.output.error( "Could not open connection #" + x ); clientRunning[x] = false; } } state.output.exitIfErrors(); // By this time, all mailboxes have been started and // they should be waiting for incoming messages. this is because // in order to send the server the information about the address and port // of the mailbox, they have to start them first. This is the reason // that makes us be able to start connecting without other synchronization // stuff right at this point. // Now, I think, we've got a 1:1 mapping of keys to items in the info hashtable // So we tell everyone who they will communicate to for( int x = 0 ; x < numIslands ; x++ ) { if( clientRunning[x] ) { IslandExchangeIslandInfo ieii = (IslandExchangeIslandInfo)info.get( connected_island_ids[x] ); if( ieii == null ) { state.output.warning( "There is no information about island " + connected_island_ids[x]); clientRunning[x] = false; continue; } try { // send the synchronous, modulo, offset and size information to the current islands if( synchronous ) dataOut[x].writeInt( 1 ); else dataOut[x].writeInt( 0 ); dataOut[x].writeInt( ieii.modulo ); dataOut[x].writeInt( ieii.offset ); dataOut[x].writeInt( ieii.size ); // send the number of address-port pairs that will be sent dataOut[x].writeInt( ieii.num_mig ); for( int y = 0 ; y < ieii.num_mig ; y++ ) { IslandExchangeIslandInfo temp; temp = (IslandExchangeIslandInfo)info.get( ieii.migrating_island_ids[y] ); if( temp == null ) { state.output.warning( "There is incorrect information on the island " + connected_island_ids[x] ); dataOut[x].writeUTF( " " ); dataOut[x].writeInt( -1 ); } else { state.output.message( "Island " + connected_island_ids[x] + " should connect to island " + ieii.migrating_island_ids[y] + " at " + temp.address + " : " + temp.port ); dataOut[x].writeUTF( temp.address ); dataOut[x].writeInt( temp.port ); } } dataOut[x].flush(); } catch( IOException e ) { // other errors while reading state.output.message("Server: Island " + island_ids[x] + " dropped connection"); clientRunning[x] = false; continue; } catch( NullPointerException e ) { // other errors while reading state.output.message("Server: Island " + island_ids[x] + " dropped connection"); clientRunning[x] = false; try { dataIn[x].close(); dataOut[x].close(); con[x].close(); } catch( IOException f ) { } continue; } } } try { // Next we wait until everyone acknowledges this for(int x=0;x