/* Copyright 2006 by Sean Paus, Sean Luke, and George Mason University Licensed under the Academic Free License version 3.0 See the file "LICENSE" for more information */ /* * Created on Oct 8, 2004 */ package ec.eval; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ConnectException; import java.net.Socket; import java.net.UnknownHostException; import ec.*; import ec.coevolve.GroupedProblemForm; import ec.simple.SimpleProblemForm; import ec.simple.SimpleEvolutionState; import ec.util.*; /** * Slave.java *

Slave is the main entry point for a slave evaluation process. The slave works with a master process, receiving individuals from the master, evaluating them, and reporting the results back to the master, thus enabling distributed evolution.

Slave replicates most of the functionality of the ec.Evolve class, for example in terms of parameters and checkpointing. This is mostly because it needs to bootstrap and set up the EvolutionState in much the same way that ec.Evolve does. Additionally, depending on settings below, the Slave may act like a mini-evolver on the individuals it receives from the master.

Like ec.Evolve, Slave is run with like this:

java ec.eval.Slave -file parameter_file [-p parameter=value]*

This starts a new slave, using the parameter file parameter_file. The user can provide optional overriding parameters on the command-line with the -p option.

Slaves need to know some things in order to run: the master's IP address and socket port number, whether to do compression, and whether or not to return individuals or just fitnesses. Unfortunately, Sun's CompressedInputStream/CompressedOutputStream is broken (it doesn't allow partial flushes, which is critical for doing compressed network streams). In order to do compression, you need to download the JZLIB library from the ECJ website or from http://www.jcraft.com/jzlib/ . ECJ will detect and use it automatically.

Slaves presently always run in single-threaded mode and receive their random number generator seed from the master. Thus they ignore any seed parameters given to them.

Slaves run in one of three modes:

Parameters
eval.slave-name
String
(the slave's name, only for debugging purposes. If not specified, the slave makes one up.)
eval.master.host
String
(the IP Address of the master.)
eval.master.port
integer >= 1024
(the socket port number of the master.)
eval.compression
bool = true or false (default)
(should we use compressed streams in communicating with the master?)
eval.run-evolve
bool = true or false (default)
(should we immediately evaluate the individuals and return them (or their fitnesses), or if we have extra time (defined by eval.runtime), should we do a little evolution on our individuals first?)
eval.runtime
integer > 0
(if eval.run-evolve is true, how long (in milliseconds wall-clock time) should we allow the individuals to evolve?)
eval.return-inds
bool = true or false (default)
(should we return whole individuals or (if false) just the fitnesses of the individuals? This must be TRUE if eval.run-evolve is true.)
state
classname, inherits and != ec.EvolutionState
(the EvolutionState object class)
print-accessed-params
bool = true or false (default)
(at the end of a run, do we print out a list of all the parameters requested during the run?)
print-used-params
bool = true or false (default)
(at the end of a run, do we print out a list of all the parameters actually used during the run?)
print-unaccessed-params
bool = true or false (default)
(at the end of a run, do we print out a list of all the parameters NOT requested during the run?)
print-unused-params
bool = true or false (default)
(at the end of a run, do we print out a list of all the parameters NOT actually used during the run?)
print-all-params
bool = true or false (default)
(at the end of a run, do we print out a list of all the parameters stored in the parameter database?)
* * @author Liviu Panait, Sean Paus, Keith Sullivan, and Sean Luke */ public class Slave { public final static String P_PRINTACCESSEDPARAMETERS = "print-accessed-params"; public final static String P_PRINTUSEDPARAMETERS = "print-used-params"; public final static String P_PRINTALLPARAMETERS = "print-all-params"; public final static String P_PRINTUNUSEDPARAMETERS = "print-unused-params"; public final static String P_PRINTUNACCESSEDPARAMETERS = "print-unaccessed-params"; public final static String P_EVALSLAVENAME = "eval.slave-name"; public final static String P_EVALMASTERHOST = "eval.master.host"; public final static String P_EVALMASTERPORT = "eval.master.port"; public final static String P_EVALCOMPRESSION = "eval.compression"; public final static String P_RETURNINDIVIDUALS = "eval.return-inds"; public static final String P_SUBPOP = "pop.subpop"; public static final byte V_NOTHING = 0; public static final byte V_INDIVIDUAL = 1; public static final byte V_FITNESS = 2; public static final byte V_SHUTDOWN = 0; public static final byte V_EVALUATESIMPLE = 1; public static final byte V_EVALUATEGROUPED = 2; // public static final byte V_CHECKPOINT = 3; /* The argument indicating that we're starting up from a checkpoint file. */ // public static final String A_CHECKPOINT = "-checkpoint"; /** The argument indicating that we're starting fresh from a new parameter file. */ public static final String A_FILE = "-file"; /* flush announcements parameter */ // public static final String P_FLUSH = "flush"; /* nostore parameter */ // public static final String P_STORE = "store"; /** state parameter */ public static final String P_STATE = "state"; /** Time to run evolution on the slaves in seconds */ public static final String P_RUNTIME = "eval.runtime"; public static int runTime=0; /** Should slave run its own evolutionary process? */ public static final String P_RUNEVOLVE = "eval.run-evolve"; public static boolean runEvolve=false; /** How long we sleep in between attempts to connect to the master (in milliseconds). */ public static final int SLEEP_TIME = 100; public static void main(String[] args) { EvolutionState state = null; ParameterDatabase parameters = null; Output output = null; boolean store; int x; // 0. find the parameter database for (x = 0; x < args.length - 1; x++) if (args[x].equals(A_FILE)) { try { parameters = new ParameterDatabase( // not available in jdk1.1: new File(args[x+1]).getAbsoluteFile(), new File(new File(args[x + 1]).getAbsolutePath()), args); // add the fact that I am a slave: eval.i-am-slave = true // this is used only by the Evaluator to determine whether to use the MasterProblem parameters.set(new Parameter(ec.EvolutionState.P_EVALUATOR).push(ec.Evaluator.P_IAMSLAVE), "true"); break; } catch(FileNotFoundException e) { Output.initialError( "A File Not Found Exception was generated upon" + "reading the parameter file \"" + args[x+1] + "\".\nHere it is:\n" + e); } catch(IOException e) { Output.initialError( "An IO Exception was generated upon reading the" + "parameter file \"" + args[x+1] + "\".\nHere it is:\n" + e); } } if (parameters == null) Output.initialError("No parameter file was specified." ); // 5. Determine whether or not to return entire Individuals or just Fitnesses // (plus whether or not the Individual has been evaluated). boolean returnIndividuals = parameters.getBoolean(new Parameter(P_RETURNINDIVIDUALS),null,false); // 6. Open a server socket and listen for requests String slaveName = parameters.getString( new Parameter(P_EVALSLAVENAME),null); String masterHost = parameters.getString( new Parameter(P_EVALMASTERHOST),null ); int masterPort = parameters.getInt( new Parameter(P_EVALMASTERPORT),null); boolean useCompression = parameters.getBoolean(new Parameter(P_EVALCOMPRESSION),null,false); runTime = parameters.getInt(new Parameter(P_RUNTIME), null, 0); runEvolve = parameters.getBoolean(new Parameter(P_RUNEVOLVE),null,false); if (runEvolve && !returnIndividuals) { Output.initialError("You have the slave running in 'evolve' mode, but it's only returning fitnesses to the master, not whole individuals. This is almost certainly wrong.", new Parameter(P_RUNEVOLVE), new Parameter(P_RETURNINDIVIDUALS)); } Output.initialMessage("ECJ Slave"); if (runEvolve) Output.initialMessage("Running in Evolve mode, evolve time is " + runTime + " milliseconds"); if (returnIndividuals) Output.initialMessage("Whole individuals will be returned"); else Output.initialMessage("Only fitnesses will be returned"); // Continue to serve new masters until killed. while (true) { try { Socket socket; long connectAttemptCount = 0; Output.initialMessage("Connecting to master at "+masterHost+":"+masterPort); while (true) { try { socket = new Socket(masterHost, masterPort); break; } catch (ConnectException e) // it's not up yet... { connectAttemptCount++; try { Thread.sleep(SLEEP_TIME); } catch( InterruptedException f ) { } } } Output.initialMessage("Connected to master after " + (connectAttemptCount * SLEEP_TIME) + " ms"); DataInputStream dataIn = null; DataOutputStream dataOut = null; try { InputStream tmpIn = socket.getInputStream(); OutputStream tmpOut = socket.getOutputStream(); if (useCompression) { //Output.initialError("JDK 1.5 has broken compression. For now, you must set eval.compression=false"); /* tmpIn = new CompressingInputStream(tmpIn); tmpOut = new CompressingOutputStream(tmpOut); */ tmpIn = Output.makeCompressingInputStream(tmpIn); tmpOut = Output.makeCompressingOutputStream(tmpOut); if (tmpIn == null || tmpOut == null) Output.initialError("You do not appear to have JZLib installed on your system, and so must set eval.compression=false. " + "To get JZLib, download from the ECJ website or from http://www.jcraft.com/jzlib/"); } dataIn = new DataInputStream(tmpIn); dataOut = new DataOutputStream(tmpOut); } catch (IOException e) { Output.initialError("Unable to open input stream from socket:\n"+e); } // specify the slaveName if (slaveName==null) { slaveName = socket.getLocalAddress().toString() + "/" + System.currentTimeMillis(); Output.initialMessage("No slave name specified. Using: " + slaveName); } dataOut.writeUTF(slaveName); dataOut.flush(); // 1. create the output // store = parameters.getBoolean(new Parameter(P_STORE), null, false); if (output != null) output.close(); output = new Output(true); //output.setFlush( // parameters.getBoolean(new Parameter(P_FLUSH),null,false)); // stdout is always log #0. stderr is always log #1. // stderr accepts announcements, and both are fully verbose // by default. output.addLog(ec.util.Log.D_STDOUT, false); output.addLog(ec.util.Log.D_STDERR, true); output.systemMessage(Version.message()); // 2. set up thread values /* int breedthreads = parameters.getInt( new Parameter(Evolve.P_BREEDTHREADS),null,1); if (breedthreads < 1) output.fatal("Number of breeding threads should be an integer >0.", new Parameter(Evolve.P_BREEDTHREADS),null); int evalthreads = parameters.getInt( new Parameter(Evolve.P_EVALTHREADS),null,1); if (evalthreads < 1) output.fatal("Number of eval threads should be an integer >0.", new Parameter(Evolve.P_EVALTHREADS),null); */ int breedthreads = Evolve.determineThreads(output, parameters, new Parameter(Evolve.P_BREEDTHREADS)); int evalthreads = Evolve.determineThreads(output, parameters, new Parameter(Evolve.P_EVALTHREADS)); // Note that either breedthreads or evalthreads (or both) may be 'auto'. We don't warn about this because // the user isn't providing the thread seeds. // 3. create the Mersenne Twister random number generators, // one per thread MersenneTwisterFast[] random = new MersenneTwisterFast[breedthreads > evalthreads ? breedthreads : evalthreads]; int seed = dataIn.readInt(); for(int i = 0; i < random.length; i++) random[i] = Evolve.primeGenerator(new MersenneTwisterFast(seed++)); // we prime the generator to be more sure of randomness. // 4. Set up the evolution state // what evolution state to use? state = (EvolutionState) parameters.getInstanceForParameter(new Parameter(P_STATE),null, EvolutionState.class); state.parameters = new ParameterDatabase(); state.parameters.addParent(parameters); state.random = random; state.output = output; state.evalthreads = evalthreads; state.breedthreads = breedthreads; state.setup(state, null); state.population = state.initializer.setupPopulation(state, 0); // Is this a Simple or Grouped ProblemForm? int problemType; try { while (true) { EvolutionState newState = state; if (runEvolve) { // Construct and use a new EvolutionState. This will be inefficient the first time around // as we've set up TWO EvolutionStates in a row with no good reason. ParameterDatabase coverDatabase = new ParameterDatabase(); // protect the underlying one coverDatabase.addParent(state.parameters); newState = (EvolutionState) Evolve.initialize(coverDatabase, 0); newState.startFresh(); newState.output.message("Replacing random number generators, ignore above seed message"); newState.random = state.random; // continue with RNG } // 0 means to shut down System.err.println("reading next problem"); problemType = dataIn.readByte(); System.err.println("Read problem: " + (int)problemType); switch (problemType) { case V_SHUTDOWN: socket.close(); return; // we're outa here case V_EVALUATESIMPLE: evaluateSimpleProblemForm(newState, returnIndividuals, dataIn, dataOut, args); break; case V_EVALUATEGROUPED: evaluateGroupedProblemForm(newState, returnIndividuals, dataIn, dataOut); break; default: state.output.fatal("Unknown problem form specified: "+problemType); } //System.err.println("Done Evaluating Individual"); } } catch (IOException e) { // Since an IOException can happen here if the peer closes the socket // on it's end, we don't necessarily have to exit. Maybe we don't // even need to print a warning, but we'll do so just to indicate // something happened. state.output.warning("Unable to read type of evaluation from master. Maybe the master closed its socket and exited?:\n"+e); } } catch (UnknownHostException e) { state.output.fatal(e.getMessage()); } catch (IOException e) { state.output.fatal("Unable to connect to master:\n" + e); } } } public static void evaluateSimpleProblemForm( EvolutionState state, boolean returnIndividuals, DataInputStream dataIn, DataOutputStream dataOut, String[] args ) { ParameterDatabase params=null; // first load the individuals int numInds=1; try { numInds = dataIn.readInt(); } catch (IOException e) { state.output.fatal("Unable to read the number of individuals from the master:\n"+e); } // load the subpops int[] subpops = new int[numInds]; // subpops desired by each ind int[] indsPerSubpop = new int[state.population.subpops.length]; // num inds for each subpop for(int i = 0; i < numInds; i++) { try { subpops[i] = dataIn.readInt(); if (subpops[i] < 0 || subpops[i] >= state.population.subpops.length) state.output.fatal("Bad subpop number for individual #" + i + ": " + subpops[i]); indsPerSubpop[subpops[i]]++; } catch (IOException e) { state.output.fatal("Unable to read the subpop number from the master:\n"+e); } } // Read the individual(s) from the stream and evaluate boolean[] updateFitness = new boolean[numInds]; Individual[] inds = new Individual[numInds]; try { for (int i=0; i < numInds; i++) { inds[i] = state.population.subpops[subpops[i]].species.newIndividual(state, dataIn); if (!runEvolve) ((SimpleProblemForm)(state.evaluator.p_problem)).evaluate( state, inds[i], subpops[i], 0 ); updateFitness[i] = dataIn.readBoolean(); } } catch (IOException e) { state.output.fatal("Unable to read individual from master." + e); } if (runEvolve) { long startTime = System.currentTimeMillis(); long endTime=0; // Now we need to reset the subpopulations. They were already set up with the right // classes, Species, etc. in state.setup(), so all we need to do is modify the number // of individuals in each subpopulation. for(int subpop = 0; subpop < state.population.subpops.length; subpop++) { if (state.population.subpops[subpop].individuals.length != indsPerSubpop[subpop]) state.population.subpops[subpop].individuals = new Individual[indsPerSubpop[subpop]]; } // Disperse into the population int[] counts = new int[state.population.subpops.length]; for(int i =0; i < numInds; i++) state.population.subpops[subpops[i]].individuals[counts[subpops[i]]++] = inds[i]; // Evaluate the population until time is up, or the evolution stops int result = state.R_NOTDONE; while (result == state.R_NOTDONE) { result = state.evolve(); endTime = System.currentTimeMillis(); if ((endTime - startTime) > runTime) break; } // re-gather from population in the same order counts = new int[state.population.subpops.length]; for(int i =0; i < numInds; i++) inds[i] = state.population.subpops[subpops[i]].individuals[counts[subpops[i]]++]; state.finish(result); Evolve.cleanup(state); } //System.err.println("Returning Individuals "); // Return the evaluated individual to the master try { returnIndividualsToMaster(state, inds, updateFitness, dataOut, returnIndividuals); } catch( IOException e ) { state.output.fatal("Caught fatal IOException\n"+e ); } } public static void evaluateGroupedProblemForm( EvolutionState state, boolean returnIndividuals, DataInputStream dataIn, DataOutputStream dataOut ) { boolean countVictoriesOnly = false; // first load the individuals int numInds = 1; try { countVictoriesOnly = dataIn.readBoolean(); numInds = dataIn.readInt(); } catch (IOException e) { state.output.fatal("Unable to read the number of individuals from the master:\n"+e); } // load the subpops int[] subpops = new int[numInds]; // subpops desired by each ind int[] indsPerSubpop = new int[state.population.subpops.length]; // num inds for each subpop for(int i = 0; i < numInds; i++) { try { subpops[i] = dataIn.readInt(); if (subpops[i] < 0 || subpops[i] >= state.population.subpops.length) state.output.fatal("Bad subpop number for individual #" + i + ": " + subpops[i]); indsPerSubpop[subpops[i]]++; } catch (IOException e) { state.output.fatal("Unable to read the subpop number from the master:\n"+e); } } // Read the individuals from the stream Individual inds[] = new Individual[numInds]; boolean updateFitness[] = new boolean[numInds]; try { for(int i=0;i