Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/16/11 21:19:34 (13 years ago)
Author:
ascheibe
Message:

#1233

  • dropped dependency of Core from Executor
  • enabled sandboxing
  • moved most parts of Job handling from Core to SlaveJob to simplify locking
  • optimized how UsedCores is handled
  • SlaveStatusInfo is now thread-save and counts jobs more correct
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs

    r6202 r6203  
    2323using System.Collections.Generic;
    2424using System.Diagnostics;
    25 using System.IO;
    26 using System.Linq;
    2725using System.ServiceModel;
    2826using System.Threading;
     
    4543    public static ILog Log { get; set; }
    4644
    47     private Dictionary<Guid, Executor> executors = new Dictionary<Guid, Executor>();
    48     private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
    49 
    50     // signalizes if the Executor.Start method has properly finished. only then the appdomain may be unloaded
    51     private Dictionary<Guid, Semaphore> semaphores = new Dictionary<Guid, Semaphore>();
     45    private Dictionary<Guid, SlaveJob> slaveJobs = new Dictionary<Guid, SlaveJob>();
    5246
    5347    private WcfService wcfService;
    54     private HeartbeatManager heartbeatManager;
    55     private int coreThreadId;
     48    private static HeartbeatManager heartbeatManager;
     49    public static HeartbeatManager HBManager { get { return heartbeatManager; } }
    5650
    5751    private ISlaveCommunication clientCom;
    5852    private ServiceHost slaveComm;
    5953
    60     public Dictionary<Guid, Executor> Executors {
    61       get { return executors; }
     54    public Dictionary<Guid, SlaveJob> SlaveJobs {
     55      get { return slaveJobs; }
    6256    }
    6357
     
    6862    /// </summary>
    6963    public void Start() {
    70       coreThreadId = Thread.CurrentThread.ManagedThreadId;
    7164      abortRequested = false;
    7265
     
    160153            Task.Factory.StartNew((jobIdObj) => {
    161154              Guid jobId = (Guid)jobIdObj;
    162               Job job = wcfService.GetJob(jobId);
    163               if (job == null) throw new JobNotFoundException(jobId);
    164               JobData jobData = wcfService.GetJobData(job.Id);
    165               if (jobData == null) throw new JobDataNotFoundException(jobId);
    166               SlaveStatusInfo.JobsFetched++;
    167               job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
    168               if (job == null) throw new JobNotFoundException(jobId);
    169               StartJobInAppDomain(job, jobData);
     155              SlaveJob newJob = new SlaveJob(this);
     156              bool start = true;
     157
     158              lock (slaveJobs) {
     159                if (slaveJobs.ContainsKey(jobId)) {
     160                  start = false;
     161                  clientCom.LogMessage(string.Format("Job with id {0} already exists. Start aborted.", jobId));
     162                } else {
     163                  slaveJobs.Add(jobId, newJob);
     164                }
     165              }
     166
     167              if (start) {
     168                newJob.CalculateJob(jobId);
     169              }
    170170            }, container.JobId)
    171171            .ContinueWith((t) => {
     
    173173              clientCom.LogMessage(t.Exception.ToString());
    174174              wcfService.UpdateJobState(container.JobId, JobState.Failed, t.Exception.ToString());
    175               SlaveStatusInfo.JobsAborted++;
     175              SlaveStatusInfo.IncrementJobsFailed();
    176176            }, TaskContinuationOptions.OnlyOnFaulted);
    177177            break;
     
    189189            break;
    190190          case MessageContainer.MessageType.AbortJob:
    191             SlaveStatusInfo.JobsAborted++;
    192             KillAppDomain(container.JobId);
     191            SlaveStatusInfo.IncrementJobsAborted(); //TODO: move to a sane place
     192
     193            Task.Factory.StartNew((jobIdObj) => {
     194              Guid jobId = (Guid)jobIdObj;
     195              bool abort = true;
     196              SlaveJob sj = null;
     197
     198              lock (slaveJobs) {
     199                if (!slaveJobs.ContainsKey(jobId)) {
     200                  clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Abort aborted.", jobId));
     201                  abort = false;
     202                } else {
     203                  sj = slaveJobs[jobId];
     204                }
     205              }
     206              if (abort && !sj.Finished) {
     207                sj.KillAppDomain();
     208              }
     209            }, container.JobId)
     210             .ContinueWith((t) => {
     211               // handle exception of task
     212               clientCom.LogMessage(t.Exception.ToString());
     213             }, TaskContinuationOptions.OnlyOnFaulted);
    193214            break;
    194215          case MessageContainer.MessageType.StopJob:
    195             DoStopJob(container.JobId);
     216            Task.Factory.StartNew((jobIdObj) => {
     217              Guid jobId = (Guid)jobIdObj;
     218              bool stop = true;
     219              SlaveJob sj = null;
     220
     221              lock (slaveJobs) {
     222                if (!slaveJobs.ContainsKey(jobId)) {
     223                  clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Stop aborted.", jobId));
     224                  stop = false;
     225                } else {
     226                  sj = slaveJobs[jobId];
     227                }
     228              }
     229              if (stop && !sj.Finished) {
     230                sj.StopJob();
     231              }
     232            }, container.JobId)
     233             .ContinueWith((t) => {
     234               // handle exception of task
     235               clientCom.LogMessage(t.Exception.ToString());
     236             }, TaskContinuationOptions.OnlyOnFaulted);
    196237            break;
    197238          case MessageContainer.MessageType.PauseJob:
    198             DoPauseJob(container.JobId);
     239            Task.Factory.StartNew((jobIdObj) => {
     240              Guid jobId = (Guid)jobIdObj;
     241              bool pause = true;
     242              SlaveJob sj = null;
     243
     244              lock (slaveJobs) {
     245                if (!slaveJobs.ContainsKey(jobId)) {
     246                  clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Pause aborted.", jobId));
     247                  pause = false;
     248                } else {
     249                  sj = slaveJobs[jobId];
     250                }
     251              }
     252              if (pause && !sj.Finished) {
     253                sj.PauseJob();
     254              }
     255            }, container.JobId)
     256             .ContinueWith((t) => {
     257               // handle exception of task
     258               clientCom.LogMessage(t.Exception.ToString());
     259             }, TaskContinuationOptions.OnlyOnFaulted);
    199260            break;
    200261          case MessageContainer.MessageType.Restart:
     
    213274    }
    214275
    215     private void DoPauseJob(Guid jobId) {
    216       if (!executors.ContainsKey(jobId)) {
    217         clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
    218       } else {
    219         Job job = wcfService.GetJob(jobId);
    220 
    221         if (job != null && executors.ContainsKey(job.Id)) {
    222           executors[job.Id].Pause();
    223           JobData sJob = executors[job.Id].GetPausedJob();
    224           job.ExecutionTime = executors[job.Id].ExecutionTime;
    225 
    226           try {
    227             if (executors[job.Id].CurrentException != string.Empty) {
    228               wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
    229               SlaveStatusInfo.JobsAborted++;
    230             } else {
    231               SlaveStatusInfo.JobsProcessed++;
    232             }
    233             clientCom.LogMessage("Sending the paused job with id: " + job.Id);
    234             wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
     276    /// <summary>
     277    /// aborts all running jobs, no results are sent back
     278    /// </summary>
     279    private void DoAbortAll() {
     280      lock (slaveJobs) {
     281        foreach (SlaveJob sj in slaveJobs.Values) {
     282          if (!sj.Finished) {
     283            sj.KillAppDomain();
    235284          }
    236           catch (Exception e) {
    237             clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
    238           }
    239           finally {
    240             KillAppDomain(job.Id); // kill app-domain in every case         
    241           }
    242         }
    243       }
    244     }
    245 
    246     private void DoStopJob(Guid jobId) {
    247       if (!executors.ContainsKey(jobId)) {
    248         clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
    249       } else {
    250         Job job = wcfService.GetJob(jobId);
    251 
    252         if (job != null) {
    253           executors[job.Id].Stop();
    254           JobData sJob = executors[job.Id].GetFinishedJob();
    255           job.ExecutionTime = executors[job.Id].ExecutionTime;
    256 
    257           try {
    258             if (executors[job.Id].CurrentException != string.Empty) {
    259               wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
    260             }
    261             SlaveStatusInfo.JobsAborted++;
    262 
    263             clientCom.LogMessage("Sending the stopped job with id: " + job.Id);
    264             wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
    265           }
    266           catch (Exception e) {
    267             clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
    268           }
    269           finally {
    270             KillAppDomain(job.Id); // kill app-domain in every case         
    271           }
    272         }
    273       }
    274     }
    275 
    276     /// <summary>
    277     /// aborts all running jobs, no results are sent back
    278     /// </summary>
    279     private void DoAbortAll() {
    280       List<Guid> jobIds;
    281       lock (executors) {
    282         jobIds = new List<Guid>(executors.Keys);
    283       }
    284       foreach (Guid jobId in jobIds) {
    285         KillAppDomain(jobId);
     285        }
    286286      }
    287287      clientCom.LogMessage("Aborted all jobs!");
     
    294294      clientCom.LogMessage("Pause all received");
    295295
    296       //copy guids because there will be removed items from 'Jobs'
    297       List<Guid> jobIds;
    298       lock (executors) {
    299         jobIds = new List<Guid>(Executors.Keys);
    300       }
    301 
    302       foreach (Guid jobId in jobIds) {
    303         DoPauseJob(jobId);
     296      lock (slaveJobs) {
     297        foreach (SlaveJob sj in slaveJobs.Values) {
     298          if (!sj.Finished) {
     299            sj.PauseJob();
     300          }
     301        }
    304302      }
    305303    }
     
    311309      clientCom.LogMessage("Stop all received");
    312310
    313       //copy guids because there will be removed items from 'Jobs'
    314       List<Guid> jobIds;
    315       lock (executors) {
    316         jobIds = new List<Guid>(executors.Keys);
    317       }
    318 
    319       foreach (Guid jobId in jobIds) {
    320         DoStopJob(jobId);
     311      lock (slaveJobs) {
     312        foreach (SlaveJob sj in slaveJobs.Values) {
     313          if (!sj.Finished) {
     314            sj.StopJob();
     315          }
     316        }
    321317      }
    322318    }
     
    341337      clientCom.LogMessage("Logging out");
    342338
    343 
    344       lock (executors) {
    345         clientCom.LogMessage("executors locked");
    346         foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
    347           clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
    348           appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
    349           AppDomain.Unload(kvp.Value);
    350         }
    351       }
     339      DoAbortAll();
     340
    352341      WcfService.Instance.Disconnect();
    353342      clientCom.Shutdown();
     
    382371
    383372    /// <summary>
    384     /// Pauses a job, which means sending it to the server and killing it locally;
    385     /// atm only used when executor is waiting for child jobs
    386     /// </summary>
    387     public void PauseWaitJob(JobData data) {
    388       if (!Executors.ContainsKey(data.JobId)) {
    389         clientCom.LogMessage("Can't find job with id " + data.JobId);
    390       } else {
    391         Job job = wcfService.GetJob(data.JobId);
    392         wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
    393         wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
    394       }
    395       KillAppDomain(data.JobId);
    396     }
    397 
    398     /// <summary>
    399     /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
    400     /// once the connection gets reestablished, the job gets submitted
    401     /// </summary>
    402     public void SendFinishedJob(Guid jobId) {
    403       try {
    404         clientCom.LogMessage("Getting the finished job with id: " + jobId);
    405         if (!executors.ContainsKey(jobId)) {
    406           clientCom.LogMessage("Executor doesn't exist");
    407           return;
    408         }
    409         if (!executors.ContainsKey(jobId)) {
    410           clientCom.LogMessage("Job doesn't exist");
    411           return;
    412         }
    413         Job job = wcfService.GetJob(jobId);
    414         job.ExecutionTime = executors[jobId].ExecutionTime;
    415 
    416         if (executors[jobId].Aborted) {
    417           SlaveStatusInfo.JobsAborted++;
    418         } else {
    419           SlaveStatusInfo.JobsProcessed++;
    420         }
    421 
    422         if (executors[jobId].CurrentException != string.Empty) {
    423           wcfService.UpdateJobState(jobId, JobState.Failed, executors[jobId].CurrentException);
    424         }
    425 
    426         JobData sJob = executors[jobId].GetFinishedJob();
    427         try {
    428           clientCom.LogMessage("Sending the finished job with id: " + jobId);
    429           wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
    430         }
    431         catch (Exception e) {
    432           clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
    433         }
    434         finally {
    435           KillAppDomain(jobId);
    436           heartbeatManager.AwakeHeartBeatThread();
    437         }
    438       }
    439       catch (Exception e) {
    440         OnExceptionOccured(e);
    441       }
    442     }
    443 
    444     private static object startInAppDomainLocker = new object();
    445     private static Mutex startInAppDomainMutex = new Mutex();
    446     /// <summary>
    447     /// A new Job from the wcfService has been received and will be started within a AppDomain.
    448     /// </summary>   
    449     private void StartJobInAppDomain(Job job, JobData jobData) {
    450       clientCom.LogMessage("Received new job with id " + job.Id);
    451       clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
    452 
    453       startInAppDomainMutex.WaitOne(); // mutex is used instead of lock to be able to release it before executor.Start() is called (which may take long time)
    454       bool released = false;
    455 
    456         if (executors.ContainsKey(job.Id)) {
    457           clientCom.LogMessage("Job with key " + job.Id + " already exists. Job will be ignored.");
    458           return;
    459         }
    460 
    461         String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString());
    462         bool pluginsPrepared = false;
    463         string configFileName = string.Empty;
    464 
    465         try {
    466           PluginCache.Instance.PreparePlugins(job, out configFileName);
    467           clientCom.LogMessage("Plugins fetched for job " + job.Id);
    468           pluginsPrepared = true;
    469         }
    470         catch (Exception exception) {
    471           clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception));
    472           wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
    473           SlaveStatusInfo.JobsAborted++;
    474         }
    475 
    476         if (pluginsPrepared) {
    477           try {
    478             //TODO: switch back to unprivileged sandbox
    479             AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitPrivilegedSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
    480             appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
    481             Executor executor;
    482             appDomains.Add(job.Id, appDomain);
    483             clientCom.LogMessage("Creating AppDomain");
    484             executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
    485             clientCom.LogMessage("Created AppDomain");
    486             executor.Core = this;
    487             executor.JobId = job.Id;
    488             executor.CoresNeeded = job.CoresNeeded;
    489             executor.MemoryNeeded = job.MemoryNeeded;
    490             clientCom.LogMessage("Starting Executor for job " + job.Id);
    491             lock (executors) {
    492               if (executors.ContainsKey(job.Id)) {
    493                 throw new JobAlreadyExistsException(job.Id);
    494               }
    495               executors.Add(job.Id, executor);
    496             }
    497             startInAppDomainMutex.ReleaseMutex();
    498             released = true;
    499             semaphores[job.Id] = new Semaphore(0, 1);
    500             executor.Start(jobData.Data);
    501             semaphores[job.Id].Release();
    502           }
    503           catch (JobAlreadyExistsException e) {
    504             clientCom.LogMessage(string.Format("Job {0} has already been started. Job will be ignored", e.JobId));
    505           }
    506           catch (Exception exception) {
    507             clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + job.Id);
    508             clientCom.LogMessage("Error thrown is: " + exception.ToString());
    509 
    510             if (executors.ContainsKey(job.Id) && executors[job.Id].CurrentException != string.Empty) {
    511               wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
    512             } else {
    513               wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
    514             }
    515             SlaveStatusInfo.JobsAborted++;
    516 
    517             KillAppDomain(job.Id);
    518           }
    519 
    520           if (!released)
    521             startInAppDomainMutex.ReleaseMutex();
    522         }
    523       heartbeatManager.AwakeHeartBeatThread();
    524     }
    525 
    526     public event EventHandler<EventArgs<Exception>> ExceptionOccured;
    527     private void OnExceptionOccured(Exception e) {
    528       clientCom.LogMessage("Error: " + e.ToString());
    529       var handler = ExceptionOccured;
    530       if (handler != null) handler(this, new EventArgs<Exception>(e));
    531     }
    532 
    533     private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
    534       clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
    535       KillAppDomain(new Guid(e.ExceptionObject.ToString()));
    536     }
    537 
    538     /// <summary>
    539373    /// Enqueues messages from the executor to the message queue.
    540374    /// This is necessary if the core thread has to execute certain actions, e.g.
     
    549383    }
    550384
    551     /// <summary>
    552     /// Kill a appdomain with a specific id.
    553     /// </summary>
    554     /// <param name="jobId">the GUID of the job</param>   
    555     public void KillAppDomain(Guid jobId) {
    556       if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
    557         EnqueueExecutorMessage<Guid>(KillAppDomain, jobId);
    558         return;
    559       }
    560 
    561       clientCom.LogMessage("Shutting down Appdomain for Job " + jobId);
    562       lock (executors) {
    563         try {
    564           if (executors.ContainsKey(jobId)) {
    565             executors[jobId].Dispose();
    566             executors.Remove(jobId);
    567           }
    568 
    569           if (appDomains.ContainsKey(jobId)) {
    570             appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
    571             int repeat = 5;
    572             while (repeat > 0) {
    573               try {
    574                 semaphores[jobId].WaitOne();
    575                 AppDomain.Unload(appDomains[jobId]);
    576                 semaphores[jobId].Dispose();
    577                 semaphores.Remove(jobId);
    578                 repeat = 0;
    579               }
    580               catch (CannotUnloadAppDomainException) {
    581                 clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
    582                 Thread.Sleep(1000);
    583                 repeat--;
    584                 if (repeat == 0) {
    585                   clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
    586                   throw; // rethrow and let app crash
    587                 }
    588               }
    589             }
    590             appDomains.Remove(jobId);
    591           }
    592 
    593           PluginCache.Instance.DeletePluginsForJob(jobId);
    594           GC.Collect();
    595         }
    596         catch (Exception ex) {
    597           clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
    598         }
    599       }
    600       GC.Collect();
    601       clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
    602     }
    603 
    604     public override object InitializeLifetimeService() {
    605       return null; // avoid destruction of proxy object after 5 minutes
    606     }
    607 
    608     public int GetCoresNeeded() {
    609       lock (executors) {
    610         return executors.Sum(x => x.Value.CoresNeeded);
     385    public void RemoveSlaveJobFromList(Guid jobId) {
     386      lock (slaveJobs) {
     387        if (slaveJobs.ContainsKey(jobId)) {
     388          slaveJobs.Remove(jobId);
     389        }
    611390      }
    612391    }
Note: See TracChangeset for help on using the changeset viewer.