Changeset 11082


Ignore:
Timestamp:
07/03/14 16:03:14 (5 years ago)
Author:
ascheibe
Message:

#2153

  • added a new method HandleStartStopPauseError in Executor to handle error conditions in the same way
  • added timeouts for semaphores so that failed tasks or tasks with endless loops don't block the slave
  • removed ExceptionOccured events from Executor/SlaveTask/TaskManager and use TaskFailed instead
  • removed another ExcpetionOccured event in HeartbeatManager that was never used
Location:
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs

    r9456 r11082  
    204204      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
    205205      .ContinueWith((t) => {
    206         SlaveStatusInfo.IncrementExceptionOccured();
     206        SlaveStatusInfo.IncrementTasksFailed();
    207207        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
    208208      }, TaskContinuationOptions.OnlyOnFaulted);
     
    212212      TS.Task.Factory.StartNew(HandleStopTask, jobId)
    213213       .ContinueWith((t) => {
    214          SlaveStatusInfo.IncrementExceptionOccured();
     214         SlaveStatusInfo.IncrementTasksFailed();
    215215         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
    216216       }, TaskContinuationOptions.OnlyOnFaulted);
     
    220220      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
    221221       .ContinueWith((t) => {
    222          SlaveStatusInfo.IncrementExceptionOccured();
     222         SlaveStatusInfo.IncrementTasksFailed();
    223223         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
    224224       }, TaskContinuationOptions.OnlyOnFaulted);
     
    228228      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
    229229       .ContinueWith((t) => {
    230          SlaveStatusInfo.IncrementExceptionOccured();
     230         SlaveStatusInfo.IncrementTasksFailed();
    231231         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
    232232       }, TaskContinuationOptions.OnlyOnFaulted);
     
    328328      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
    329329      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
    330       this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
    331330      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
    332331    }
     
    391390      }
    392391      catch (TaskNotFoundException ex) {
    393         SlaveStatusInfo.IncrementExceptionOccured();
     392        SlaveStatusInfo.IncrementTasksFailed();
    394393        SlaveClientCom.Instance.LogMessage(ex.ToString());
    395394      }
    396395      catch (Exception ex) {
    397         SlaveStatusInfo.IncrementExceptionOccured();
     396        SlaveStatusInfo.IncrementTasksFailed();
    398397        SlaveClientCom.Instance.LogMessage(ex.ToString());
    399398      }
    400     }
    401 
    402     private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
    403       SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
    404       SlaveStatusInfo.IncrementExceptionOccured();
    405       heartbeatManager.AwakeHeartBeatThread();
    406       SlaveClientCom.Instance.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
    407       wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
    408399    }
    409400
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Executor.cs

    r9456 r11082  
    8585        task.Start();
    8686        if (!startTaskSem.WaitOne(Settings.Default.ExecutorSemTimeouts)) {
    87           taskDataInvalid = true;
    8887          throw new TimeoutException("Timeout when starting the task. TaskStarted event was not fired.");
    8988        }
    9089      }
    9190      catch (Exception e) {
    92         this.CurrentException = e;
    93         taskDataInvalid = true;
    94         Task_TaskFailed(this, new EventArgs<Exception>(e));
    95       } finally {
     91        HandleStartStopPauseError(e);
     92      }
     93      finally {
    9694        taskStartedSem.Set();
    9795      }
     
    103101      taskStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts);
    104102      if (task == null) {
    105         CurrentException = new Exception("Pausing task " + this.TaskId + ": Task is null");
    106         executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured);
     103        HandleStartStopPauseError(new Exception("Pausing task " + this.TaskId + ": Task is null"));
    107104        return;
    108105      }
     
    112109          task.Pause();
    113110          //we need to block the pause...
    114           pauseStopSem.WaitOne();
     111          if (!pauseStopSem.WaitOne(Settings.Default.ExecutorSemTimeouts)) {
     112            throw new Exception("Pausing task " + this.TaskId + " timed out.");
     113          }
    115114        }
    116115        catch (Exception ex) {
    117           CurrentException = new Exception("Error pausing task " + this.TaskId + ": " + ex.ToString());
    118           executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured);
     116          HandleStartStopPauseError(ex);
    119117        }
    120118      }
     
    125123      // wait until task is started. if this does not happen, the Task is null an we give up
    126124      taskStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts);
     125      wasTaskAborted = true;
     126
    127127      if (task == null) {
    128         CurrentException = new Exception("Stopping task " + this.TaskId + ": Task is null");
    129         executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured);
    130       }
    131       wasTaskAborted = true;
     128        HandleStartStopPauseError(new Exception("Stopping task " + this.TaskId + ": Task is null"));
     129        return;
     130      }
    132131
    133132      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
    134133        try {
    135134          task.Stop();
    136           pauseStopSem.WaitOne();
     135          if (!pauseStopSem.WaitOne(Settings.Default.ExecutorSemTimeouts)) {
     136            throw new Exception("Stopping task " + this.TaskId + " timed out.");
     137          }
    137138        }
    138139        catch (Exception ex) {
    139           CurrentException = new Exception("Error stopping task " + this.TaskId + ": " + ex.ToString());
    140           executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured);
     140          HandleStartStopPauseError(ex);
    141141        }
    142142      }
     
    190190      if (task != null && task.ExecutionState == ExecutionState.Started) {
    191191        throw new InvalidStateException("Task is still running");
     192      }
     193
     194      TaskData taskData = null;
     195      if (task == null) {
     196        if (CurrentException == null) {
     197          CurrentException = new Exception("Task with id " + this.TaskId + " is null, sending empty task");
     198        }
    192199      } else {
    193         TaskData taskData = new TaskData();
    194         if (task == null) {
    195           //send empty task and save exception
    196           taskData.Data = PersistenceUtil.Serialize(new TaskData());
    197           if (CurrentException == null) {
    198             CurrentException = new Exception("Task with id " + this.TaskId + " is null, sending empty task");
    199           }
    200         } else {
    201           taskData.Data = PersistenceUtil.Serialize(task);
    202         }
     200        taskData = new TaskData();
     201        taskData.Data = PersistenceUtil.Serialize(task);
    203202        taskData.TaskId = TaskId;
    204         return taskData;
    205       }
     203      }
     204      return taskData;
    206205    }
    207206
     
    211210      task = null;
    212211    }
     212
     213    private void HandleStartStopPauseError(Exception e) {
     214      taskDataInvalid = true;
     215      Task_TaskFailed(this, new EventArgs<Exception>(e));
     216    }
    213217  }
    214218}
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/ExecutorMessage.cs

    r9456 r11082  
    3030    TaskStopped,
    3131    TaskFailed,
    32     StopExecutorMonitoringThread,
    33     ExceptionOccured
     32    StopExecutorMonitoringThread
    3433  }
    3534
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/ConfigManager.cs

    r9456 r11082  
    117117      st.JobsFetched = SlaveStatusInfo.TasksFetched;
    118118      st.JobsFailed = SlaveStatusInfo.TasksFailed;
    119       st.ExceptionsOccured = SlaveStatusInfo.ExceptionsOccured;
    120119
    121120      st.Jobs = jobManager.GetExecutionTimes().Select(x => new TaskStatus { TaskId = x.Key, ExecutionTime = x.Value }).ToList();
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/HeartbeatManager.cs

    r9456 r11082  
    2424using System.Threading;
    2525using HeuristicLab.Clients.Hive.SlaveCore.Properties;
    26 using HeuristicLab.Common;
    2726
    2827namespace HeuristicLab.Clients.Hive.SlaveCore {
     
    115114              if (msgs == null) {
    116115                SlaveClientCom.Instance.LogMessage("Error getting response from HB");
    117                 OnExceptionOccured(new Exception("Error getting response from HB"));
    118116              } else {
    119117                SlaveClientCom.Instance.LogMessage("HB Response received (" + msgs.Count + "): ");
     
    126124        catch (Exception e) {
    127125          SlaveClientCom.Instance.LogMessage("Heartbeat thread failed: " + e.ToString());
    128           OnExceptionOccured(e);
    129126        }
    130127        waitHandle.WaitOne(this.interval);
     
    132129      SlaveClientCom.Instance.LogMessage("Heartbeat thread stopped");
    133130    }
    134 
    135     #region Eventhandler
    136     public event EventHandler<EventArgs<Exception>> ExceptionOccured;
    137     private void OnExceptionOccured(Exception e) {
    138       var handler = ExceptionOccured;
    139       if (handler != null) handler(this, new EventArgs<Exception>(e));
    140     }
    141     #endregion
    142131  }
    143132}
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs

    r9456 r11082  
    7171      try {
    7272        if (slaveTasks.ContainsKey(task.Id)) {
    73           SlaveStatusInfo.IncrementExceptionOccured();
     73          SlaveStatusInfo.IncrementTasksFailed();
    7474          throw new TaskAlreadyRunningException(task.Id);
    7575        } else {
     
    185185      slaveTask.TaskStopped += new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
    186186      slaveTask.TaskFailed += new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
    187       slaveTask.ExceptionOccured += new EventHandler<EventArgs<Guid, Exception>>(slaveTask_ExceptionOccured);
    188187    }
    189188
     
    193192      slaveTask.TaskStopped -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
    194193      slaveTask.TaskFailed -= new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
    195       slaveTask.ExceptionOccured -= new EventHandler<EventArgs<Guid, Exception>>(slaveTask_ExceptionOccured);
    196194    }
    197195
     
    220218      try {
    221219        taskData = slaveTask.GetTaskData();
    222         if (taskData == null) throw new SerializationException();
    223220        SlaveStatusInfo.IncrementTasksFinished();
    224221        OnTaskPaused(slaveTask, taskData);
     
    243240      try {
    244241        taskData = slaveTask.GetTaskData();
    245         if (taskData == null) throw new SerializationException();
    246242        SlaveStatusInfo.IncrementTasksFinished();
    247243        OnTaskStopped(slaveTask, taskData);
     
    266262      try {
    267263        taskData = slaveTask.GetTaskData();
    268         if (taskData == null) throw new SerializationException();
    269264      }
    270265      catch { /* taskData will be null */ }
    271266      SlaveStatusInfo.IncrementTasksFailed();
    272267      OnTaskFailed(slaveTask, taskData, e.Value2);
    273     }
    274 
    275     private void slaveTask_ExceptionOccured(object sender, EventArgs<Guid, Exception> e) {
    276       SlaveTask slaveTask;
    277       slaveTasksLocker.EnterUpgradeableReadLock();
    278       try {
    279         slaveTask = slaveTasks[e.Value];
    280         RemoveSlaveTask(e.Value, slaveTask);
    281       }
    282       finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
    283 
    284       SlaveStatusInfo.IncrementExceptionOccured();
    285       OnExceptionOccured(slaveTask, e.Value2);
    286268    }
    287269    #endregion
     
    312294    }
    313295
    314     public event EventHandler<EventArgs<SlaveTask, Exception>> ExceptionOccured;
    315     private void OnExceptionOccured(SlaveTask slaveTask, Exception exception) {
    316       var handler = ExceptionOccured;
    317       if (handler != null) handler(this, new EventArgs<SlaveTask, Exception>(slaveTask, exception));
    318     }
    319 
    320296    public event EventHandler<EventArgs<SlaveTask>> TaskAborted;
    321297    private void OnTaskAborted(SlaveTask slaveTask) {
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/SlaveStatusInfo.cs

    r9456 r11082  
    3131    private static int tasksAborted;  // server sent abort
    3232    private static int tasksFailed;   // tasks that failed in the sandbox
    33     private static int exceptionsOccured; // number jobs failed caused by the business logic, not a faulted task
    3433    private static int usedCores;    // number of cores currently used
    3534
     
    6059    }
    6160
    62     public static int ExceptionsOccured {
    63       get { return exceptionsOccured; }
    64     }
    65 
    6661    public static void IncrementTasksStarted() {
    6762      Interlocked.Increment(ref tasksStarted);
     
    8479    }
    8580
    86     public static void IncrementExceptionOccured() {
    87       Interlocked.Increment(ref exceptionsOccured);
    88     }
    89 
    9081    public static void IncrementUsedCores(int val) {
    9182      Interlocked.Add(ref usedCores, val);
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/SlaveTask.cs

    r9456 r11082  
    4343    public Guid TaskId { get; private set; }
    4444    public bool IsPrepared { get; private set; }
     45    private TaskData originalTaskData;
    4546
    4647    private int coresNeeded;
     
    7475      try {
    7576        this.TaskId = task.Id;
     77        originalTaskData = taskData;
    7678        Prepare(task);
    7779        StartTaskInAppDomain(taskData);
     
    143145        while (repeat > 0) {
    144146          try {
    145             waitForStartBeforeKillSem.WaitOne();
     147            waitForStartBeforeKillSem.WaitOne(Settings.Default.ExecutorSemTimeouts);
    146148            AppDomain.Unload(appDomain);
    147149            waitForStartBeforeKillSem.Dispose();
     
    164166    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
    165167      DisposeAppDomain();
    166       OnExceptionOccured(new Exception("Unhandled exception: " + e.ExceptionObject.ToString()));
     168      OnTaskFailed(new Exception("Unhandled exception: " + e.ExceptionObject.ToString()));
    167169    }
    168170
     
    171173      try {
    172174        data = executor.GetTaskData();
     175        //this means that there was a problem executing the task
     176        if (data == null) return originalTaskData;
    173177      }
    174178      catch (Exception ex) {
     
    233237            executorMonitoringRun = false;
    234238            break;
    235 
    236           case ExecutorMessageType.ExceptionOccured:
    237             executorMonitoringRun = false;
    238             DisposeAppDomain();
    239             if (executor.CurrentException != null) {
    240               OnExceptionOccured(executor.CurrentException);
    241             } else {
    242               OnExceptionOccured(new Exception(string.Format("Unknow exception occured in Executor for task {0}", TaskId)));
    243             }
    244             break;
    245239        }
    246240      }
     
    277271      if (handler != null) handler(this, new EventArgs<Guid, Exception>(this.TaskId, exception));
    278272    }
    279 
    280     public event EventHandler<EventArgs<Guid, Exception>> ExceptionOccured;
    281     private void OnExceptionOccured(Exception exception) {
    282       var handler = ExceptionOccured;
    283       if (handler != null) handler(this, new EventArgs<Guid, Exception>(this.TaskId, exception));
    284     }
    285273  }
    286274}
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/StatusCommons.cs

    r9456 r11082  
    5151    public int JobsFailed { get; set; }
    5252    [DataMember]
    53     public int ExceptionsOccured { get; set; }
    54     [DataMember]
    5553    public List<TaskStatus> Jobs { get; set; }
    5654    [DataMember]
     
    5856
    5957    public override string ToString() {
    60       return string.Format("Status: {0}, Fetc,Start,Fin,Abor,Fail,Exc: {1},{2},{3},{4},{5},{6}",
    61         Status, JobsFetched, JobsStarted, JobsFinished, JobsAborted, JobsFailed, ExceptionsOccured);
     58      return string.Format("Status: {0}, Fetc,Start,Fin,Abor,Fail,Exc: {1},{2},{3},{4},{5}",
     59        Status, JobsFetched, JobsStarted, JobsFinished, JobsAborted, JobsFailed);
    6260    }
    6361  }
Note: See TracChangeset for help on using the changeset viewer.