Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
09/17/10 10:26:55 (14 years ago)
Author:
cneumuel
Message:
  • Refactored HL.Hive.Experiment. JobItems are not called HiveJobs and OptimizerJobs do not contain a hierarchy anymore.
  • Dynamic generation of jobs on a slave are not reflected on the client user interface.
  • Optimizer-Trees are now strictly synchronized with the HiveJob-Trees (also the ComputeInParallel property is taken into account when the Child HiveJobs are created)
  • Improved the way a class can report progress and lock the UI (IProgressReporter, IProgress, Progress, ProgressView)
  • Changes were made to the config-files, so that server and clients work with blade12.hpc.fh-hagenberg.at
  • Lots of small changes and bugfixes
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs

    r4368 r4423  
    3636using HeuristicLab.Hive.Slave.ExecutionEngine;
    3737using HeuristicLab.Tracing;
     38using HeuristicLab.Common;
    3839
    3940namespace HeuristicLab.Hive.Slave.Core {
     
    4445    public static bool abortRequested { get; set; }
    4546
     47    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
     48    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
     49    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
     50
     51    private WcfService wcfService;
     52    private HeartbeatManager heartbeatManager;
     53
    4654    private bool currentlyFetching;
    4755    private bool CurrentlyFetching {
     
    5563    }
    5664
    57     private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
    58     private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
    59     private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
    60 
    61     private WcfService wcfService;
    62     private HeartbeatManager beat;
     65    public Dictionary<Guid, Executor> ExecutionEngines {
     66      get { return engines; }
     67    }
     68
     69    internal Dictionary<Guid, JobDto> Jobs {
     70      get { return jobs; }
     71    }
    6372
    6473    /// <summary>
     
    7079      SlaveConsoleServer server = new SlaveConsoleServer();
    7180      server.Start();
    72      
     81
    7382      ConfigManager manager = ConfigManager.Instance;
    7483      manager.Core = this;
     
    95104    private void StartHeartbeats() {
    96105      //Initialize the heartbeat
    97       beat = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
    98       beat.StartHeartbeat();
     106      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
     107      heartbeatManager.StartHeartbeat();
    99108    }
    100109
     
    111120      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
    112121      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
    113       wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
    114       //wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
    115122      wcfService.Connected += new EventHandler(wcfService_Connected);
    116123    }
     
    120127      wcfService.GetFinishedJobResultCompleted -= new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
    121128      wcfService.ProcessSnapshotCompleted -= new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
    122       wcfService.ConnectionRestored -= new EventHandler(wcfService_ConnectionRestored);
    123       //wcfService.ServerChanged -= new EventHandler(wcfService_ServerChanged);
    124129      wcfService.Connected -= new EventHandler(wcfService_Connected);
    125130    }
     
    135140        case MessageContainer.MessageType.AbortJob:
    136141          if (engines.ContainsKey(container.JobId))
    137             engines[container.JobId].Abort();
     142            try {
     143              engines[container.JobId].Abort();
     144            }
     145            catch (AppDomainUnloadedException) {
     146              // appdomain already unloaded. Finishing job probably ongoing
     147            }
    138148          else
    139149            Logger.Error("AbortJob: Engine doesn't exist");
     
    156166        //Snapshot is ready and can be sent back to the Server
    157167        case MessageContainer.MessageType.SnapshotReady:
    158           ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
     168          GetSnapshot(container.JobId);
    159169          break;
    160170
     
    168178          break;
    169179
    170 
    171180        //A Job has finished and can be sent back to the server
    172181        case MessageContainer.MessageType.FinishedJob:
    173           ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
    174           break;
    175 
     182          SendFinishedJob(container.JobId);
     183          break;
    176184
    177185        //When the timeslice is up
    178186        case MessageContainer.MessageType.UptimeLimitDisconnect:
    179187          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
    180 
    181188          ShutdownRunningJobsAndSubmitSnapshots();
    182189          break;
     
    201208          Logger.Debug("Stopping heartbeat");
    202209          abortRequested = true;
    203           beat.StopHeartBeat();
     210          heartbeatManager.StopHeartBeat();
    204211          Logger.Debug("Logging out");
    205212          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
     
    216223
    217224        case MessageContainer.MessageType.GetChildJobs:
    218           // send the job back to hive
    219225          GetChildJobs((MessageContainerWithCallback<SerializedJobList>)container);
     226          break;
     227
     228        case MessageContainer.MessageType.DeleteChildJobs:
     229          wcfService.DeleteChildJobs(container.JobId);
    220230          break;
    221231      }
     
    224234    private void GetChildJobs(MessageContainerWithCallback<SerializedJobList> mc) {
    225235      ResponseObject<SerializedJobList> response = wcfService.GetChildJobs(mc.JobId);
    226       if (response.StatusMessage != ResponseStatus.Ok) {
    227         Logger.Error("GetChildJobs failed: " + response.StatusMessage);
     236      if (response != null && response.StatusMessage == ResponseStatus.Ok) {
     237        mc.Callback(response.Obj);
    228238      } else {
    229         mc.Callback(response.Obj);
     239        if (response != null) {
     240          Logger.Error(string.Format("GetChildJobs failed: {0}", response.StatusMessage));
     241        } else {
     242          Logger.Error("GetChildJobs failed.");
     243        }
    230244      }
    231245    }
     
    234248      ResponseObject<JobDto> response = wcfService.PauseJob(mc.SerializedJob);
    235249      KillAppDomain(mc.JobId);
    236       if (response.StatusMessage != ResponseStatus.Ok) {
     250      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
    237251        Logger.Error("PauseJob failed: " + response.StatusMessage);
    238252      }
     
    241255    private ResponseObject<JobDto> AddChildJob(MessageContainerWithJob mc) {
    242256      ResponseObject<JobDto> response = wcfService.AddChildJob(mc.JobId, mc.SerializedJob);
    243       if (response.StatusMessage != ResponseStatus.Ok) {
     257      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
    244258        Logger.Error("AddChildJob failed: " + response.StatusMessage);
    245259      }
     
    267281    /// </summary>
    268282    /// <param name="jobId"></param>
    269     private void GetFinishedJob(object jobId) {
    270       Guid jId = (Guid)jobId;
    271       Logger.Info("Getting the finished job with id: " + jId);
     283    private void SendFinishedJob(object jobId) {
    272284      try {
     285        Guid jId = (Guid)jobId;
     286        Logger.Info("Getting the finished job with id: " + jId);
    273287        if (!engines.ContainsKey(jId)) {
    274288          Logger.Info("Engine doesn't exist");
     
    278292        byte[] sJob = engines[jId].GetFinishedJob();
    279293
    280         if (WcfService.Instance.LoggedIn) {
     294        try {
    281295          Logger.Info("Sending the finished job with id: " + jId);
    282           wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true);
     296          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, engines[jId].ExecutionTime, engines[jId].CurrentException, true);
     297        }
     298        catch (Exception e) {
     299          Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
     300          JobStorageManager.PersistObjectToDisc(wcfService.ServerIp, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment
     301        }
     302        finally {
     303          KillAppDomain(jId); // kill app-domain in every case
     304        }
     305      }
     306      catch (Exception e) {
     307        OnExceptionOccured(e);
     308      }
     309    }
     310
     311    private void GetSnapshot(object jobId) {
     312      try {
     313        Logger.Info("Fetching a snapshot for job " + jobId);
     314        Guid jId = (Guid)jobId;
     315        byte[] obj = engines[jId].GetSnapshot();
     316        wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id, jId, obj, engines[jId].ExecutionTime, null);
     317
     318        //Uptime Limit reached, now is a good time to destroy this jobs.
     319        Logger.Debug("Checking if uptime limit is reached");
     320        if (!UptimeManager.Instance.IsAllowedToCalculate()) {
     321          Logger.Debug("Uptime limit reached");
     322          Logger.Debug("Killing Appdomain");
     323          KillAppDomain(jId);
     324          //Still anything running? 
     325          if (engines.Count == 0) {
     326            Logger.Info("All jobs snapshotted and sent back, disconnecting");
     327            WcfService.Instance.Disconnect();
     328          } else {
     329            Logger.Debug("There are still active Jobs in the Field, not disconnecting");
     330          }
    283331        } else {
    284           Logger.Info("Storing the finished job with id: " + jId + " to hdd");
    285           JobStorageManager.PersistObjectToDisc(wcfService.ServerIp, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment
    286           KillAppDomain(jId);
    287         }
    288       }
    289       catch (InvalidStateException ise) {
    290         Logger.Error("Invalid State while Snapshoting:", ise);
    291       }
    292     }
    293 
    294     private void GetSnapshot(object jobId) {
    295       Logger.Info("Fetching a snapshot for job " + jobId);
    296       Guid jId = (Guid)jobId;
    297       byte[] obj = engines[jId].GetSnapshot();
    298       Logger.Debug("BEGIN: Sending snapshot sync");
    299       wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
    300         jId,
    301         obj,
    302         engines[jId].Progress,
    303         null);
    304       Logger.Debug("END: Sended snapshot sync");
    305       //Uptime Limit reached, now is a good time to destroy this jobs.
    306       Logger.Debug("Checking if uptime limit is reached");
    307       if (!UptimeManager.Instance.IsAllowedToCalculate()) {
    308         Logger.Debug("Uptime limit reached");
    309         Logger.Debug("Killing Appdomain");
    310         KillAppDomain(jId);
    311         //Still anything running? 
    312         if (engines.Count == 0) {
    313           Logger.Info("All jobs snapshotted and sent back, disconnecting");
    314           WcfService.Instance.Disconnect();
    315         } else {
    316           Logger.Debug("There are still active Jobs in the Field, not disconnecting");
    317         }
    318 
    319       } else {
    320         Logger.Debug("Restarting the job" + jobId);
    321         engines[jId].StartOnlyJob();
    322         Logger.Info("Restarted the job" + jobId);
     332          Logger.Debug("Restarting the job" + jobId);
     333          engines[jId].StartOnlyJob();
     334          Logger.Info("Restarted the job" + jobId);
     335        }
     336      }
     337      catch (Exception e) {
     338        OnExceptionOccured(e);
    323339      }
    324340    }
     
    349365      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
    350366        Logger.Info("Received new job with id " + e.Result.Obj.Id);
    351         bool sandboxed = false;
    352367        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
    353368        try {
    354 
    355369          PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
    356370          PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
    357371
    358           //        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
    359           //        files.AddRange(plugininfo.PluginFiles);
    360372          Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
    361373          String pluginDir = Path.Combine(PluginCache.Instance.PluginRepositoryDir, e.Result.Obj.Id.ToString());
     
    375387              engine.Start(e.Data);
    376388              engines.Add(e.Result.Obj.Id, engine);
    377 
    378389              SlaveStatusInfo.JobsFetched++;
    379390              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
    380391            }
    381392          }
    382           beat.InterruptHeartBeatThread();
     393          heartbeatManager.AwakeHeartBeatThread();
    383394        }
    384395        catch (Exception exception) {
     
    387398          CurrentlyFetching = false;
    388399          KillAppDomain(e.Result.Obj.Id);
    389           wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, 1, exception, true);
     400          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, e.Result.Obj.ExecutionTime, exception.ToString(), true);
    390401        }
    391402      } else {
     
    406417        SlaveStatusInfo.JobsProcessed++;
    407418        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
    408         beat.InterruptHeartBeatThread();
     419        heartbeatManager.AwakeHeartBeatThread();
    409420      } else {
    410421        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
     
    433444        FetchCalendarFromServer();
    434445      }
    435       //if the fetching from the server failed - still set the client online... maybe we get
    436       //a result within the next few heartbeats     
    437       //if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
    438446      Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
    439       Logger.Info("Setting client online");
    440       wcfService.Login(ConfigManager.Instance.GetClientInfo());
     447      CurrentlyFetching = false;
     448      CheckRunningAppDomains();
    441449      JobStorageManager.CheckAndSubmitJobsFromDisc();
    442       CurrentlyFetching = false;
    443450    }
    444451
     
    459466    }
    460467
    461     //this is a little bit tricky -
    462     void wcfService_ConnectionRestored(object sender, EventArgs e) {
    463       Logger.Info("Reconnected to old server - checking currently running appdomains");
    464 
     468    private void CheckRunningAppDomains() {
    465469      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
    466470        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
    467471          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
    468           Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
     472          Thread finThread = new Thread(new ParameterizedThreadStart(SendFinishedJob));
    469473          finThread.Start(execKVP.Value.JobId);
    470474        }
     
    474478    #endregion
    475479
    476     public Dictionary<Guid, Executor> ExecutionEngines {
    477       get { return engines; }
    478     }
    479 
    480     internal Dictionary<Guid, JobDto> Jobs {
    481       get { return jobs; }
     480    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
     481    private void OnExceptionOccured(Exception e) {
     482      Logger.Error("Error: " + e.ToString());
     483      var handler = ExceptionOccured;
     484      if (handler != null) handler(this, new EventArgs<Exception>(e));
    482485    }
    483486
     
    498501          if (appDomains.ContainsKey(id)) {
    499502            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
    500             AppDomain.Unload(appDomains[id]);
     503
     504            int repeat = 5;
     505            while (repeat > 0) {
     506              try {
     507                AppDomain.Unload(appDomains[id]);
     508                repeat = 0;
     509              }
     510              catch (CannotUnloadAppDomainException) {
     511                Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
     512                Thread.Sleep(1000);
     513                repeat--;
     514                if (repeat == 0) {
     515                  throw; // rethrow and let app crash
     516                }
     517              }
     518            }
    501519            appDomains.Remove(id);
    502520          }
Note: See TracChangeset for help on using the changeset viewer.