Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
03/22/11 11:36:53 (13 years ago)
Author:
cneumuel
Message:

#1233

  • implemented correct numbering of BatchRuns
  • improvements in ExperimentManager
  • fixed bug in server (jobs were scheduled multiple times)
  • added exception handling for task in slave
  • improved timeout handling of jobs (LifecycleManager)
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs

    r5780 r5786  
    5757    private int coreThreadId;
    5858
    59     private ISlaveCommunication ClientCom;
     59    private ISlaveCommunication clientCom;
    6060    private ServiceHost slaveComm;
    6161
     
    8484        slaveComm.Open();
    8585
    86         ClientCom = SlaveClientCom.Instance.ClientCom;
    87         ClientCom.LogMessage("Hive Slave started");
     86        clientCom = SlaveClientCom.Instance.ClientCom;
     87        clientCom.LogMessage("Hive Slave started");
    8888
    8989        ConfigManager manager = ConfigManager.Instance;
     
    139139
    140140    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
    141       ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
     141      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
    142142    }
    143143
    144144    void WcfService_Connected(object sender, EventArgs e) {
    145       ClientCom.LogMessage("Connected successfully to Hive server");
     145      clientCom.LogMessage("Connected successfully to Hive server");
    146146    }
    147147
     
    151151    /// <param name="container">The Container, containing the message</param>
    152152    private void DetermineAction(MessageContainer container) {
    153       ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
     153      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
    154154
    155155      if (container is ExecutorMessageContainer<Guid>) {
     
    159159        switch (container.Message) {
    160160          case MessageContainer.MessageType.CalculateJob:
    161             Task.Factory.StartNew(() => {
    162               Job job = wcfService.GetJob(container.JobId);
     161            clientCom.LogMessage("Task.StartNew[0]: jobId: " + container.JobId);
     162            Task.Factory.StartNew((jobIdObj) => {
     163              Guid jobId = (Guid)jobIdObj;
     164              clientCom.LogMessage("Task.StartNew[1]: jobId: " + jobId);
     165              Job job = wcfService.GetJob(jobId);
     166              if (job == null) throw new JobNotFoundException(jobId);
    163167              lock (engines) {
    164168                if (!jobs.ContainsKey(job.Id)) {
     
    167171              }
    168172              JobData jobData = wcfService.GetJobData(job.Id);
     173              if (job == null) throw new JobDataNotFoundException(jobId);
    169174              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
    170175              StartJobInAppDomain(job, jobData);
    171             });
     176            }, container.JobId)
     177            .ContinueWith((t) => {
     178              // handle exception of task
     179              clientCom.LogMessage(t.Exception.ToString());
     180            }, TaskContinuationOptions.OnlyOnFaulted);
    172181            break;
    173182          case MessageContainer.MessageType.ShutdownSlave:
     
    203212        }
    204213      } else {
    205         ClientCom.LogMessage("Unknown MessageContainer: " + container);
     214        clientCom.LogMessage("Unknown MessageContainer: " + container);
    206215      }
    207216    }
     
    217226
    218227        try {
    219           ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
     228          clientCom.LogMessage("Sending the paused job with id: " + job.Id);
    220229          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
    221230          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
    222231        }
    223232        catch (Exception e) {
    224           ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
     233          clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
    225234        }
    226235        finally {
     
    240249
    241250        try {
    242           ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
     251          clientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
    243252          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
    244253          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
    245254        }
    246255        catch (Exception e) {
    247           ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
     256          clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
    248257        }
    249258        finally {
     
    266275      }
    267276
    268       ClientCom.LogMessage("Aborted all jobs!");
     277      clientCom.LogMessage("Aborted all jobs!");
    269278    }
    270279
     
    273282    /// </summary>
    274283    private void DoPauseAll() {
    275       ClientCom.LogMessage("Pause all received");
     284      clientCom.LogMessage("Pause all received");
    276285
    277286      //copy guids because there will be removed items from 'Jobs'
     
    290299    /// </summary>
    291300    private void DoStopAll() {
    292       ClientCom.LogMessage("Stop all received");
     301      clientCom.LogMessage("Stop all received");
    293302
    294303      //copy guids because there will be removed items from 'Jobs'
     
    316325    /// </summary>
    317326    private void ShutdownCore() {
    318       ClientCom.LogMessage("Shutdown Signal received");
    319       ClientCom.LogMessage("Stopping heartbeat");
     327      clientCom.LogMessage("Shutdown Signal received");
     328      clientCom.LogMessage("Stopping heartbeat");
    320329      heartbeatManager.StopHeartBeat();
    321330      abortRequested = true;
    322       ClientCom.LogMessage("Logging out");
     331      clientCom.LogMessage("Logging out");
    323332
    324333
    325334      lock (engines) {
    326         ClientCom.LogMessage("engines locked");
     335        clientCom.LogMessage("engines locked");
    327336        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
    328           ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
     337          clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
    329338          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
    330339          AppDomain.Unload(kvp.Value);
     
    332341      }
    333342      WcfService.Instance.Disconnect();
    334       ClientCom.Shutdown();
     343      clientCom.Shutdown();
    335344      SlaveClientCom.Close();
    336345
     
    344353    /// </summary> 
    345354    private void DoStartSlave() {
    346       ClientCom.LogMessage("Restart received");
     355      clientCom.LogMessage("Restart received");
    347356      StartHeartbeats();
    348       ClientCom.LogMessage("Restart done");
     357      clientCom.LogMessage("Restart done");
    349358    }
    350359
     
    355364    //TODO: do we need an AbortSleep?
    356365    private void Sleep() {
    357       ClientCom.LogMessage("Sleep received");
     366      clientCom.LogMessage("Sleep received");
    358367      heartbeatManager.StopHeartBeat();
    359368      heartbeatManager = null;
    360369      DoStopAll();
    361370      WcfService.Instance.Disconnect();
    362       ClientCom.LogMessage("Sleep done");
     371      clientCom.LogMessage("Sleep done");
    363372    }
    364373
     
    371380    public void PauseWaitJob(JobData data) {
    372381      if (!Jobs.ContainsKey(data.JobId)) {
    373         ClientCom.LogMessage("Can't find job with id " + data.JobId);
     382        clientCom.LogMessage("Can't find job with id " + data.JobId);
    374383      } else {
    375384        Job job = Jobs[data.JobId];
     
    388397    public void SendFinishedJob(Guid jobId) {
    389398      try {
    390         ClientCom.LogMessage("Getting the finished job with id: " + jobId);
     399        clientCom.LogMessage("Getting the finished job with id: " + jobId);
    391400        if (!engines.ContainsKey(jobId)) {
    392           ClientCom.LogMessage("Engine doesn't exist");
     401          clientCom.LogMessage("Engine doesn't exist");
    393402          return;
    394403        }
    395404        if (!jobs.ContainsKey(jobId)) {
    396           ClientCom.LogMessage("Job doesn't exist");
     405          clientCom.LogMessage("Job doesn't exist");
    397406          return;
    398407        }
     
    405414
    406415        try {
    407           ClientCom.LogMessage("Sending the finished job with id: " + jobId);
     416          clientCom.LogMessage("Sending the finished job with id: " + jobId);
    408417          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
    409418          SlaveStatusInfo.JobsProcessed++;
    410419        }
    411420        catch (Exception e) {
    412           ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
     421          clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
    413422        }
    414423        finally {
     
    428437    /// <param name="e"></param>
    429438    private void StartJobInAppDomain(Job myJob, JobData jobData) {
    430       ClientCom.LogMessage("Received new job with id " + myJob.Id);
     439      clientCom.LogMessage("Received new job with id " + myJob.Id);
    431440      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
    432441      bool pluginsPrepared = false;
     
    435444      try {
    436445        PluginCache.Instance.PreparePlugins(myJob, out configFileName);
    437         ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
     446        clientCom.LogMessage("Plugins fetched for job " + myJob.Id);
    438447        pluginsPrepared = true;
    439448      }
    440449      catch (Exception exception) {
    441         ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
     450        clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
    442451      }
    443452
     
    448457          lock (engines) {
    449458            appDomains.Add(myJob.Id, appDomain);
    450             ClientCom.LogMessage("Creating AppDomain");
     459            clientCom.LogMessage("Creating AppDomain");
    451460            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
    452             ClientCom.LogMessage("Created AppDomain");
     461            clientCom.LogMessage("Created AppDomain");
    453462            engine.JobId = myJob.Id;
    454463            engine.Core = this;
    455             ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
     464            clientCom.LogMessage("Starting Engine for job " + myJob.Id);
    456465            engines.Add(myJob.Id, engine);
    457466            engine.Start(jobData.Data);
    458467            SlaveStatusInfo.JobsFetched++;
    459             ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
     468            clientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
    460469          }
    461470        }
    462471        catch (Exception exception) {
    463           ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
    464           ClientCom.LogMessage("Error thrown is: " + exception.ToString());
     472          clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
     473          clientCom.LogMessage("Error thrown is: " + exception.ToString());
    465474          KillAppDomain(myJob.Id);
    466475        }
     
    471480    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
    472481    private void OnExceptionOccured(Exception e) {
    473       ClientCom.LogMessage("Error: " + e.ToString());
     482      clientCom.LogMessage("Error: " + e.ToString());
    474483      var handler = ExceptionOccured;
    475484      if (handler != null) handler(this, new EventArgs<Exception>(e));
     
    477486
    478487    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
    479       ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
     488      clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
    480489      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
    481490    }
     
    508517      }
    509518
    510       ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
     519      clientCom.LogMessage("Shutting down Appdomain for Job " + id);
    511520      lock (engines) {
    512521        try {
     
    526535              }
    527536              catch (CannotUnloadAppDomainException) {
    528                 ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
     537                clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
    529538                Thread.Sleep(1000);
    530539                repeat--;
     
    542551        }
    543552        catch (Exception ex) {
    544           ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
     553          clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
    545554        }
    546555      }
Note: See TracChangeset for help on using the changeset viewer.