Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/16/11 21:19:34 (13 years ago)
Author:
ascheibe
Message:

#1233

  • dropped dependency of Core from Executor
  • enabled sandboxing
  • moved most parts of Job handling from Core to SlaveJob to simplify locking
  • optimized how UsedCores is handled
  • SlaveStatusInfo is now thread-save and counts jobs more correct
Location:
branches/HeuristicLab.Hive-3.4/sources
Files:
3 added
9 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave.Views/3.4/SlaveView.cs

    r6004 r6203  
    124124      DataPoint pJobs = new DataPoint(status.Jobs.Count, status.Jobs.Count);
    125125      DataPoint pJobsAborted = new DataPoint(status.JobsAborted, status.JobsAborted);
    126       DataPoint pJobsDone = new DataPoint(status.JobsDone, status.JobsDone);
     126      DataPoint pJobsDone = new DataPoint(status.JobsFinished, status.JobsFinished);
    127127      DataPoint pJobsFetched = new DataPoint(status.JobsFetched, status.JobsFetched);
    128128
     
    131131      pJobsAborted.LegendText = "Aborted jobs: " + status.JobsAborted;
    132132      pJobsAborted.Color = System.Drawing.Color.Red;
    133       pJobsDone.LegendText = "Finished jobs: " + status.JobsDone;
     133      pJobsDone.LegendText = "Finished jobs: " + status.JobsFinished;
    134134      pJobsDone.Color = System.Drawing.Color.Green;
    135135      pJobsFetched.LegendText = "Fetched jobs: " + status.JobsFetched;
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/ConfigManager.cs

    r6168 r6203  
    7979
    8080      st.TotalCores = slave.Cores.HasValue ? slave.Cores.Value : 0;
    81       st.FreeCores = slave.Cores.HasValue ? slave.Cores.Value - GetUsedCores() : 0;
     81      st.FreeCores = slave.Cores.HasValue ? slave.Cores.Value - SlaveStatusInfo.UsedCores : 0;
    8282
    8383      st.JobsAborted = SlaveStatusInfo.JobsAborted;
    84       st.JobsDone = SlaveStatusInfo.JobsProcessed;
     84      st.JobsFinished = SlaveStatusInfo.JobsFinished;
    8585      st.JobsFetched = SlaveStatusInfo.JobsFetched;
    86 
    87       Dictionary<Guid, Executor> engines = Core.Executors;
     86      st.JobsFailed = SlaveStatusInfo.JobsFailed;
     87
     88      Dictionary<Guid, SlaveJob> slaveJobs = Core.SlaveJobs;
    8889      st.Jobs = new List<JobStatus>();
    8990
    90       lock (engines) {
    91         foreach (KeyValuePair<Guid, Executor> kvp in engines) {
    92           Executor e = kvp.Value;
    93           st.Jobs.Add(new JobStatus { JobId = e.JobId, ExecutionTime = e.ExecutionTime, Since = e.CreationTime });
     91      lock (slaveJobs) {
     92        foreach (KeyValuePair<Guid, SlaveJob> kvp in slaveJobs) {
     93          Executor e = kvp.Value.JobExecutor;
     94          if (e != null && !kvp.Value.Finished) {
     95            st.Jobs.Add(new JobStatus { JobId = e.JobId, ExecutionTime = e.ExecutionTime, Since = e.CreationTime });
     96          }
    9497        }
    9598      }
     
    99102    public Dictionary<Guid, TimeSpan> GetExecutionTimeOfAllJobs() {
    100103      Dictionary<Guid, TimeSpan> prog = new Dictionary<Guid, TimeSpan>();
    101       Dictionary<Guid, Executor> engines = Core.Executors;
    102       lock (engines) {
    103         foreach (KeyValuePair<Guid, Executor> kvp in engines) {
    104           Executor e = kvp.Value;
    105           //don't include jobs in hb's which are currently serializing
    106           if (e.SendHeartbeatForExecutor) {
    107             prog[e.JobId] = e.ExecutionTime;
     104      Dictionary<Guid, SlaveJob> slaveJobs = Core.SlaveJobs;
     105      lock (slaveJobs) {
     106        foreach (KeyValuePair<Guid, SlaveJob> kvp in slaveJobs) {
     107          Executor e = kvp.Value.JobExecutor;
     108          if (e != null && !kvp.Value.Finished) {
     109            //don't include jobs in hb's which are currently serializing
     110            if (e.SendHeartbeatForExecutor) {
     111              prog[e.JobId] = e.ExecutionTime;
     112            }
    108113          }
    109114        }
    110115      }
    111116      return prog;
    112     }
    113 
    114     public int GetUsedCores() {
    115       return Core.GetCoresNeeded();
    116117    }
    117118
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs

    r6202 r6203  
    2323using System.Collections.Generic;
    2424using System.Diagnostics;
    25 using System.IO;
    26 using System.Linq;
    2725using System.ServiceModel;
    2826using System.Threading;
     
    4543    public static ILog Log { get; set; }
    4644
    47     private Dictionary<Guid, Executor> executors = new Dictionary<Guid, Executor>();
    48     private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
    49 
    50     // signalizes if the Executor.Start method has properly finished. only then the appdomain may be unloaded
    51     private Dictionary<Guid, Semaphore> semaphores = new Dictionary<Guid, Semaphore>();
     45    private Dictionary<Guid, SlaveJob> slaveJobs = new Dictionary<Guid, SlaveJob>();
    5246
    5347    private WcfService wcfService;
    54     private HeartbeatManager heartbeatManager;
    55     private int coreThreadId;
     48    private static HeartbeatManager heartbeatManager;
     49    public static HeartbeatManager HBManager { get { return heartbeatManager; } }
    5650
    5751    private ISlaveCommunication clientCom;
    5852    private ServiceHost slaveComm;
    5953
    60     public Dictionary<Guid, Executor> Executors {
    61       get { return executors; }
     54    public Dictionary<Guid, SlaveJob> SlaveJobs {
     55      get { return slaveJobs; }
    6256    }
    6357
     
    6862    /// </summary>
    6963    public void Start() {
    70       coreThreadId = Thread.CurrentThread.ManagedThreadId;
    7164      abortRequested = false;
    7265
     
    160153            Task.Factory.StartNew((jobIdObj) => {
    161154              Guid jobId = (Guid)jobIdObj;
    162               Job job = wcfService.GetJob(jobId);
    163               if (job == null) throw new JobNotFoundException(jobId);
    164               JobData jobData = wcfService.GetJobData(job.Id);
    165               if (jobData == null) throw new JobDataNotFoundException(jobId);
    166               SlaveStatusInfo.JobsFetched++;
    167               job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
    168               if (job == null) throw new JobNotFoundException(jobId);
    169               StartJobInAppDomain(job, jobData);
     155              SlaveJob newJob = new SlaveJob(this);
     156              bool start = true;
     157
     158              lock (slaveJobs) {
     159                if (slaveJobs.ContainsKey(jobId)) {
     160                  start = false;
     161                  clientCom.LogMessage(string.Format("Job with id {0} already exists. Start aborted.", jobId));
     162                } else {
     163                  slaveJobs.Add(jobId, newJob);
     164                }
     165              }
     166
     167              if (start) {
     168                newJob.CalculateJob(jobId);
     169              }
    170170            }, container.JobId)
    171171            .ContinueWith((t) => {
     
    173173              clientCom.LogMessage(t.Exception.ToString());
    174174              wcfService.UpdateJobState(container.JobId, JobState.Failed, t.Exception.ToString());
    175               SlaveStatusInfo.JobsAborted++;
     175              SlaveStatusInfo.IncrementJobsFailed();
    176176            }, TaskContinuationOptions.OnlyOnFaulted);
    177177            break;
     
    189189            break;
    190190          case MessageContainer.MessageType.AbortJob:
    191             SlaveStatusInfo.JobsAborted++;
    192             KillAppDomain(container.JobId);
     191            SlaveStatusInfo.IncrementJobsAborted(); //TODO: move to a sane place
     192
     193            Task.Factory.StartNew((jobIdObj) => {
     194              Guid jobId = (Guid)jobIdObj;
     195              bool abort = true;
     196              SlaveJob sj = null;
     197
     198              lock (slaveJobs) {
     199                if (!slaveJobs.ContainsKey(jobId)) {
     200                  clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Abort aborted.", jobId));
     201                  abort = false;
     202                } else {
     203                  sj = slaveJobs[jobId];
     204                }
     205              }
     206              if (abort && !sj.Finished) {
     207                sj.KillAppDomain();
     208              }
     209            }, container.JobId)
     210             .ContinueWith((t) => {
     211               // handle exception of task
     212               clientCom.LogMessage(t.Exception.ToString());
     213             }, TaskContinuationOptions.OnlyOnFaulted);
    193214            break;
    194215          case MessageContainer.MessageType.StopJob:
    195             DoStopJob(container.JobId);
     216            Task.Factory.StartNew((jobIdObj) => {
     217              Guid jobId = (Guid)jobIdObj;
     218              bool stop = true;
     219              SlaveJob sj = null;
     220
     221              lock (slaveJobs) {
     222                if (!slaveJobs.ContainsKey(jobId)) {
     223                  clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Stop aborted.", jobId));
     224                  stop = false;
     225                } else {
     226                  sj = slaveJobs[jobId];
     227                }
     228              }
     229              if (stop && !sj.Finished) {
     230                sj.StopJob();
     231              }
     232            }, container.JobId)
     233             .ContinueWith((t) => {
     234               // handle exception of task
     235               clientCom.LogMessage(t.Exception.ToString());
     236             }, TaskContinuationOptions.OnlyOnFaulted);
    196237            break;
    197238          case MessageContainer.MessageType.PauseJob:
    198             DoPauseJob(container.JobId);
     239            Task.Factory.StartNew((jobIdObj) => {
     240              Guid jobId = (Guid)jobIdObj;
     241              bool pause = true;
     242              SlaveJob sj = null;
     243
     244              lock (slaveJobs) {
     245                if (!slaveJobs.ContainsKey(jobId)) {
     246                  clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Pause aborted.", jobId));
     247                  pause = false;
     248                } else {
     249                  sj = slaveJobs[jobId];
     250                }
     251              }
     252              if (pause && !sj.Finished) {
     253                sj.PauseJob();
     254              }
     255            }, container.JobId)
     256             .ContinueWith((t) => {
     257               // handle exception of task
     258               clientCom.LogMessage(t.Exception.ToString());
     259             }, TaskContinuationOptions.OnlyOnFaulted);
    199260            break;
    200261          case MessageContainer.MessageType.Restart:
     
    213274    }
    214275
    215     private void DoPauseJob(Guid jobId) {
    216       if (!executors.ContainsKey(jobId)) {
    217         clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
    218       } else {
    219         Job job = wcfService.GetJob(jobId);
    220 
    221         if (job != null && executors.ContainsKey(job.Id)) {
    222           executors[job.Id].Pause();
    223           JobData sJob = executors[job.Id].GetPausedJob();
    224           job.ExecutionTime = executors[job.Id].ExecutionTime;
    225 
    226           try {
    227             if (executors[job.Id].CurrentException != string.Empty) {
    228               wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
    229               SlaveStatusInfo.JobsAborted++;
    230             } else {
    231               SlaveStatusInfo.JobsProcessed++;
    232             }
    233             clientCom.LogMessage("Sending the paused job with id: " + job.Id);
    234             wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
     276    /// <summary>
     277    /// aborts all running jobs, no results are sent back
     278    /// </summary>
     279    private void DoAbortAll() {
     280      lock (slaveJobs) {
     281        foreach (SlaveJob sj in slaveJobs.Values) {
     282          if (!sj.Finished) {
     283            sj.KillAppDomain();
    235284          }
    236           catch (Exception e) {
    237             clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
    238           }
    239           finally {
    240             KillAppDomain(job.Id); // kill app-domain in every case         
    241           }
    242         }
    243       }
    244     }
    245 
    246     private void DoStopJob(Guid jobId) {
    247       if (!executors.ContainsKey(jobId)) {
    248         clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
    249       } else {
    250         Job job = wcfService.GetJob(jobId);
    251 
    252         if (job != null) {
    253           executors[job.Id].Stop();
    254           JobData sJob = executors[job.Id].GetFinishedJob();
    255           job.ExecutionTime = executors[job.Id].ExecutionTime;
    256 
    257           try {
    258             if (executors[job.Id].CurrentException != string.Empty) {
    259               wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
    260             }
    261             SlaveStatusInfo.JobsAborted++;
    262 
    263             clientCom.LogMessage("Sending the stopped job with id: " + job.Id);
    264             wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
    265           }
    266           catch (Exception e) {
    267             clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
    268           }
    269           finally {
    270             KillAppDomain(job.Id); // kill app-domain in every case         
    271           }
    272         }
    273       }
    274     }
    275 
    276     /// <summary>
    277     /// aborts all running jobs, no results are sent back
    278     /// </summary>
    279     private void DoAbortAll() {
    280       List<Guid> jobIds;
    281       lock (executors) {
    282         jobIds = new List<Guid>(executors.Keys);
    283       }
    284       foreach (Guid jobId in jobIds) {
    285         KillAppDomain(jobId);
     285        }
    286286      }
    287287      clientCom.LogMessage("Aborted all jobs!");
     
    294294      clientCom.LogMessage("Pause all received");
    295295
    296       //copy guids because there will be removed items from 'Jobs'
    297       List<Guid> jobIds;
    298       lock (executors) {
    299         jobIds = new List<Guid>(Executors.Keys);
    300       }
    301 
    302       foreach (Guid jobId in jobIds) {
    303         DoPauseJob(jobId);
     296      lock (slaveJobs) {
     297        foreach (SlaveJob sj in slaveJobs.Values) {
     298          if (!sj.Finished) {
     299            sj.PauseJob();
     300          }
     301        }
    304302      }
    305303    }
     
    311309      clientCom.LogMessage("Stop all received");
    312310
    313       //copy guids because there will be removed items from 'Jobs'
    314       List<Guid> jobIds;
    315       lock (executors) {
    316         jobIds = new List<Guid>(executors.Keys);
    317       }
    318 
    319       foreach (Guid jobId in jobIds) {
    320         DoStopJob(jobId);
     311      lock (slaveJobs) {
     312        foreach (SlaveJob sj in slaveJobs.Values) {
     313          if (!sj.Finished) {
     314            sj.StopJob();
     315          }
     316        }
    321317      }
    322318    }
     
    341337      clientCom.LogMessage("Logging out");
    342338
    343 
    344       lock (executors) {
    345         clientCom.LogMessage("executors locked");
    346         foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
    347           clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
    348           appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
    349           AppDomain.Unload(kvp.Value);
    350         }
    351       }
     339      DoAbortAll();
     340
    352341      WcfService.Instance.Disconnect();
    353342      clientCom.Shutdown();
     
    382371
    383372    /// <summary>
    384     /// Pauses a job, which means sending it to the server and killing it locally;
    385     /// atm only used when executor is waiting for child jobs
    386     /// </summary>
    387     public void PauseWaitJob(JobData data) {
    388       if (!Executors.ContainsKey(data.JobId)) {
    389         clientCom.LogMessage("Can't find job with id " + data.JobId);
    390       } else {
    391         Job job = wcfService.GetJob(data.JobId);
    392         wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
    393         wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
    394       }
    395       KillAppDomain(data.JobId);
    396     }
    397 
    398     /// <summary>
    399     /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
    400     /// once the connection gets reestablished, the job gets submitted
    401     /// </summary>
    402     public void SendFinishedJob(Guid jobId) {
    403       try {
    404         clientCom.LogMessage("Getting the finished job with id: " + jobId);
    405         if (!executors.ContainsKey(jobId)) {
    406           clientCom.LogMessage("Executor doesn't exist");
    407           return;
    408         }
    409         if (!executors.ContainsKey(jobId)) {
    410           clientCom.LogMessage("Job doesn't exist");
    411           return;
    412         }
    413         Job job = wcfService.GetJob(jobId);
    414         job.ExecutionTime = executors[jobId].ExecutionTime;
    415 
    416         if (executors[jobId].Aborted) {
    417           SlaveStatusInfo.JobsAborted++;
    418         } else {
    419           SlaveStatusInfo.JobsProcessed++;
    420         }
    421 
    422         if (executors[jobId].CurrentException != string.Empty) {
    423           wcfService.UpdateJobState(jobId, JobState.Failed, executors[jobId].CurrentException);
    424         }
    425 
    426         JobData sJob = executors[jobId].GetFinishedJob();
    427         try {
    428           clientCom.LogMessage("Sending the finished job with id: " + jobId);
    429           wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
    430         }
    431         catch (Exception e) {
    432           clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
    433         }
    434         finally {
    435           KillAppDomain(jobId);
    436           heartbeatManager.AwakeHeartBeatThread();
    437         }
    438       }
    439       catch (Exception e) {
    440         OnExceptionOccured(e);
    441       }
    442     }
    443 
    444     private static object startInAppDomainLocker = new object();
    445     private static Mutex startInAppDomainMutex = new Mutex();
    446     /// <summary>
    447     /// A new Job from the wcfService has been received and will be started within a AppDomain.
    448     /// </summary>   
    449     private void StartJobInAppDomain(Job job, JobData jobData) {
    450       clientCom.LogMessage("Received new job with id " + job.Id);
    451       clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
    452 
    453       startInAppDomainMutex.WaitOne(); // mutex is used instead of lock to be able to release it before executor.Start() is called (which may take long time)
    454       bool released = false;
    455 
    456         if (executors.ContainsKey(job.Id)) {
    457           clientCom.LogMessage("Job with key " + job.Id + " already exists. Job will be ignored.");
    458           return;
    459         }
    460 
    461         String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString());
    462         bool pluginsPrepared = false;
    463         string configFileName = string.Empty;
    464 
    465         try {
    466           PluginCache.Instance.PreparePlugins(job, out configFileName);
    467           clientCom.LogMessage("Plugins fetched for job " + job.Id);
    468           pluginsPrepared = true;
    469         }
    470         catch (Exception exception) {
    471           clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception));
    472           wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
    473           SlaveStatusInfo.JobsAborted++;
    474         }
    475 
    476         if (pluginsPrepared) {
    477           try {
    478             //TODO: switch back to unprivileged sandbox
    479             AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitPrivilegedSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
    480             appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
    481             Executor executor;
    482             appDomains.Add(job.Id, appDomain);
    483             clientCom.LogMessage("Creating AppDomain");
    484             executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
    485             clientCom.LogMessage("Created AppDomain");
    486             executor.Core = this;
    487             executor.JobId = job.Id;
    488             executor.CoresNeeded = job.CoresNeeded;
    489             executor.MemoryNeeded = job.MemoryNeeded;
    490             clientCom.LogMessage("Starting Executor for job " + job.Id);
    491             lock (executors) {
    492               if (executors.ContainsKey(job.Id)) {
    493                 throw new JobAlreadyExistsException(job.Id);
    494               }
    495               executors.Add(job.Id, executor);
    496             }
    497             startInAppDomainMutex.ReleaseMutex();
    498             released = true;
    499             semaphores[job.Id] = new Semaphore(0, 1);
    500             executor.Start(jobData.Data);
    501             semaphores[job.Id].Release();
    502           }
    503           catch (JobAlreadyExistsException e) {
    504             clientCom.LogMessage(string.Format("Job {0} has already been started. Job will be ignored", e.JobId));
    505           }
    506           catch (Exception exception) {
    507             clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + job.Id);
    508             clientCom.LogMessage("Error thrown is: " + exception.ToString());
    509 
    510             if (executors.ContainsKey(job.Id) && executors[job.Id].CurrentException != string.Empty) {
    511               wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
    512             } else {
    513               wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
    514             }
    515             SlaveStatusInfo.JobsAborted++;
    516 
    517             KillAppDomain(job.Id);
    518           }
    519 
    520           if (!released)
    521             startInAppDomainMutex.ReleaseMutex();
    522         }
    523       heartbeatManager.AwakeHeartBeatThread();
    524     }
    525 
    526     public event EventHandler<EventArgs<Exception>> ExceptionOccured;
    527     private void OnExceptionOccured(Exception e) {
    528       clientCom.LogMessage("Error: " + e.ToString());
    529       var handler = ExceptionOccured;
    530       if (handler != null) handler(this, new EventArgs<Exception>(e));
    531     }
    532 
    533     private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
    534       clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
    535       KillAppDomain(new Guid(e.ExceptionObject.ToString()));
    536     }
    537 
    538     /// <summary>
    539373    /// Enqueues messages from the executor to the message queue.
    540374    /// This is necessary if the core thread has to execute certain actions, e.g.
     
    549383    }
    550384
    551     /// <summary>
    552     /// Kill a appdomain with a specific id.
    553     /// </summary>
    554     /// <param name="jobId">the GUID of the job</param>   
    555     public void KillAppDomain(Guid jobId) {
    556       if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
    557         EnqueueExecutorMessage<Guid>(KillAppDomain, jobId);
    558         return;
    559       }
    560 
    561       clientCom.LogMessage("Shutting down Appdomain for Job " + jobId);
    562       lock (executors) {
    563         try {
    564           if (executors.ContainsKey(jobId)) {
    565             executors[jobId].Dispose();
    566             executors.Remove(jobId);
    567           }
    568 
    569           if (appDomains.ContainsKey(jobId)) {
    570             appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
    571             int repeat = 5;
    572             while (repeat > 0) {
    573               try {
    574                 semaphores[jobId].WaitOne();
    575                 AppDomain.Unload(appDomains[jobId]);
    576                 semaphores[jobId].Dispose();
    577                 semaphores.Remove(jobId);
    578                 repeat = 0;
    579               }
    580               catch (CannotUnloadAppDomainException) {
    581                 clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
    582                 Thread.Sleep(1000);
    583                 repeat--;
    584                 if (repeat == 0) {
    585                   clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
    586                   throw; // rethrow and let app crash
    587                 }
    588               }
    589             }
    590             appDomains.Remove(jobId);
    591           }
    592 
    593           PluginCache.Instance.DeletePluginsForJob(jobId);
    594           GC.Collect();
    595         }
    596         catch (Exception ex) {
    597           clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
    598         }
    599       }
    600       GC.Collect();
    601       clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
    602     }
    603 
    604     public override object InitializeLifetimeService() {
    605       return null; // avoid destruction of proxy object after 5 minutes
    606     }
    607 
    608     public int GetCoresNeeded() {
    609       lock (executors) {
    610         return executors.Sum(x => x.Value.CoresNeeded);
     385    public void RemoveSlaveJobFromList(Guid jobId) {
     386      lock (slaveJobs) {
     387        if (slaveJobs.ContainsKey(jobId)) {
     388          slaveJobs.Remove(jobId);
     389        }
    611390      }
    612391    }
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs

    r6178 r6203  
    4040    private Semaphore pauseStopSem = new Semaphore(0, 1);
    4141    private Semaphore startJobSem = new Semaphore(0, 1);
     42    //make pause or stop wait until start is finished
     43    private Semaphore jobStartedSem = new Semaphore(0, 1);
     44
     45    public ExecutorQueue executorQueue;
    4246
    4347    public bool SendHeartbeatForExecutor { get; set; }
     
    7276    public Executor() {
    7377      SendHeartbeatForExecutor = true;
     78      executorQueue = new ExecutorQueue();
    7479    }
    7580
     
    8994        } else {
    9095          Job.Start();
    91           if (!startJobSem.WaitOne(TimeSpan.FromSeconds(15))) {
     96          if (!jobStartedSem.WaitOne(TimeSpan.FromSeconds(15))) {
    9297            throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired.");
    9398          }
     99          jobStartedSem.Release();
    94100        }
    95101      }
     
    102108    public void Pause() {
    103109      SendHeartbeatForExecutor = false;
     110      // wait until job is started. if this does not happen, the Job is null an we give up
     111      jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
    104112      if (Job == null) {
    105113        currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
    106         Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
     114        return;
    107115      }
    108116
     
    121129    public void Stop() {
    122130      SendHeartbeatForExecutor = false;
     131      // wait until job is started. if this does not happen, the Job is null an we give up
     132      jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
    123133      if (Job == null) {
    124134        currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
    125         Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
    126135      }
    127136      wasJobAborted = true;
     
    176185      childJob.PluginsNeededIds = FindPluginsNeeded(e.Value);
    177186
    178       //TODO: is return value needed?
    179       WcfService.Instance.AddChildJob(this.JobId, childJob, childJobData);
     187      ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.NewChildJob);
     188      msg.MsgData = childJobData;
     189      msg.MsgJob = childJob;
     190
     191      executorQueue.AddMessage(msg);
    180192    }
    181193
     
    188200      jdata.JobId = this.JobId;
    189201
    190       Core.PauseWaitJob(jdata);
     202      ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.WaitForChildJobs);
     203      msg.MsgData = jdata;
     204      executorQueue.AddMessage(msg);
    191205    }
    192206
    193207    private void Job_DeleteChildJobs(object sender, EventArgs e) {
    194       WcfService.Instance.DeleteChildJobs(JobId);
     208      executorQueue.AddMessage(ExecutorMessageType.DeleteChildJobs);
    195209    }
    196210
     
    198212      HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e;
    199213      currentException = ex.Value;
    200       Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
    201214      Aborted = true;
     215
     216      executorQueue.AddMessage(ExecutorMessageType.JobFailed);
    202217    }
    203218
     
    208223      } else {
    209224        //it's a clean and finished job, so send it
    210         Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
     225        executorQueue.AddMessage(ExecutorMessageType.JobStopped);
    211226      }
    212227    }
     
    216231        if (currentException == null) {
    217232          currentException = new Exception("Getting finished job " + this.JobId + ": Job is null");
    218         }
    219         Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
     233          return GetJob();
     234        }
    220235      }
    221236
     
    234249    }
    235250
    236 
    237251    public JobData GetPausedJob() {
    238252      if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) {
     
    247261
    248262    void Job_JobStarted(object sender, EventArgs e) {
    249       startJobSem.Release();
     263      jobStartedSem.Release();
    250264    }
    251265
     
    255269      } else {
    256270        JobData jdata = new JobData();
    257         jdata.Data = PersistenceUtil.Serialize(Job);
     271        if (Job == null) {
     272          //send empty job and save exception
     273          jdata.Data = PersistenceUtil.Serialize(new JobData());
     274          if (currentException == null) {
     275            currentException = new Exception("Job with id " + this.JobId + " is null, sending empty job");
     276          }
     277        } else {
     278          jdata.Data = PersistenceUtil.Serialize(Job);
     279        }
    258280        jdata.JobId = JobId;
    259281        return jdata;
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/HeartbeatManager.cs

    r6112 r6203  
    8989              Heartbeat heartBeatData = new Heartbeat {
    9090                SlaveId = info.Id,
    91                 FreeCores = info.Cores.HasValue ? info.Cores.Value - ConfigManager.Instance.GetUsedCores() : 0,
     91                FreeCores = info.Cores.HasValue ? info.Cores.Value - SlaveStatusInfo.UsedCores : 0,
    9292                FreeMemory = ConfigManager.GetFreeMemory(),
    9393                JobProgress = ConfigManager.Instance.GetExecutionTimeOfAllJobs(),
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/HeuristicLab.Clients.Hive.Slave-3.4.csproj

    r6202 r6203  
    9898  <ItemGroup>
    9999    <Compile Include="ConfigManager.cs" />
     100    <Compile Include="Exceptions\JobAlreadyExistsException.cs" />
    100101    <Compile Include="Exceptions\JobNotFoundException.cs" />
    101102    <Compile Include="Exceptions\JobNotDataFoundException.cs" />
    102     <Compile Include="Exceptions\JobAlreadyExistsException.cs" />
     103    <Compile Include="ExecutorMessage.cs" />
     104    <Compile Include="ExecutorQueue.cs" />
    103105    <Compile Include="SlaveClientCom.cs" />
    104106    <Compile Include="Core.cs" />
     
    120122    <Compile Include="ServiceContracts\ISlaveCommunicationCallbacks.cs" />
    121123    <Compile Include="SlaveCommunicationService.cs" />
     124    <Compile Include="SlaveJob.cs" />
    122125    <Compile Include="SlaveStatusInfo.cs" />
    123126    <Compile Include="StatusCommons.cs" />
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/MessageQueue.cs

    r5599 r6203  
    5252    /// </summary>
    5353    /// <returns>nothing</returns>
    54     public override object InitializeLifetimeService() {
     54    /*public override object InitializeLifetimeService() {
    5555      return null;
    56     }
     56    } */
    5757
    5858    /// <summary>
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/SlaveStatusInfo.cs

    r6202 r6203  
    2424namespace HeuristicLab.Clients.Hive.SlaveCore {
    2525  public class SlaveStatusInfo {
    26     static public int JobsProcessed { get; set; }
    27     static public int JobsAborted { get; set; }
    28     static public int JobsFetched { get; set; }
     26    static private int jobsFinished; // everything went fine
     27    static private int jobsAborted;  // server sent stop or abort
     28    static private int jobsFetched;  // number of fetched jobs
     29    static private int jobsFailed;   // jobs that failed in the sandbox
     30    static private int usedCores;    // number of cores currently used
     31
    2932    static public DateTime LoginTime { get; set; }
     33    static private Object jobStatLock = new Object();
     34    static private Object coreLock = new Object();
     35
     36    static public int UsedCores {
     37      get {
     38        lock (coreLock) {
     39          return usedCores;
     40        }
     41      }
     42    }
     43
     44    static public int JobsFinished {
     45      get {
     46        lock (jobStatLock) {
     47          return jobsFinished;
     48        }
     49      }
     50    }
     51
     52    static public int JobsAborted {
     53      get {
     54        lock (jobStatLock) {
     55          return jobsAborted;
     56        }
     57      }
     58    }
     59
     60    static public int JobsFetched {
     61      get {
     62        lock (jobStatLock) {
     63          return jobsFetched;
     64        }
     65      }
     66    }
     67
     68    static public int JobsFailed {
     69      get {
     70        lock (jobStatLock) {
     71          return jobsFailed;
     72        }
     73      }
     74    }
     75
     76    public static void IncrementJobsFinished() {
     77      lock (jobStatLock) {
     78        jobsFinished++;
     79      }
     80    }
     81
     82    public static void IncrementJobsFailed() {
     83      lock (jobStatLock) {
     84        jobsFailed++;
     85      }
     86    }
     87
     88    public static void IncrementJobsAborted() {
     89      lock (jobStatLock) {
     90        jobsAborted++;
     91      }
     92    }
     93
     94    public static void IncrementJobsFetched() {
     95      lock (jobStatLock) {
     96        jobsFetched++;
     97      }
     98    }
     99
     100    public static void IncrementUsedCores(int val) {
     101      lock (coreLock) {
     102        usedCores += val;
     103      }
     104    }
     105
     106    public static void DecrementUsedCores(int val) {
     107      lock (coreLock) {
     108        usedCores -= val;
     109      }
     110    }
    30111  }
    31112}
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/StatusCommons.cs

    r6033 r6203  
    4343    public int JobsFetched { get; set; }
    4444    [DataMember]
    45     public int JobsDone { get; set; }
     45    public int JobsFinished { get; set; }
    4646    [DataMember]
    4747    public int JobsAborted { get; set; }
     48    [DataMember]
     49    public int JobsFailed { get; set; }
    4850    [DataMember]
    4951    public List<JobStatus> Jobs { get; set; }
    5052
    5153    public override string ToString() {
    52       return string.Format("Status: {0}, Fetched/Done/Aborted: {1},{2},{3}", Status, JobsFetched, JobsDone, JobsAborted);
     54      return string.Format("Status: {0}, Fetched/Finished/Aborted/Failed: {1},{2},{3},{4}", Status, JobsFetched, JobsFinished, JobsAborted, JobsFailed);
    5355    }
     56
     57
    5458  }
    5559}
Note: See TracChangeset for help on using the changeset viewer.