Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/19/10 09:17:24 (14 years ago)
Author:
kgrading
Message:

added minor speedups and better transaction handling to the server (#828)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/3.2/sources/HeuristicLab.Hive.Server.Core/3.2/JobManager.cs

    r3578 r3931  
    11#region License Information
     2
    23/* HeuristicLab
    34 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
     
    1819 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
    1920 */
     21
    2022#endregion
    2123
     
    3840
    3941namespace HeuristicLab.Hive.Server.Core {
    40   class JobManager: IJobManager, IInternalJobManager {
    41 
     42  internal class JobManager : IJobManager, IInternalJobManager {
    4243    //ISessionFactory factory;
    43     ILifecycleManager lifecycleManager;
     44    private ILifecycleManager lifecycleManager;
    4445
    4546    #region IJobManager Members
     
    5354    }
    5455
    55     private JobDto GetLastJobResult(Guid jobId) {     
     56    private JobDto GetLastJobResult(Guid jobId) {
    5657      return DaoLocator.JobDao.FindById(jobId);
    5758    }
    5859
    59     public void ResetJobsDependingOnResults(JobDto job) {
    60       Logger.Info("Setting job " + job.Id + " offline");
    61       if (job != null) {
    62         DaoLocator.JobDao.SetJobOffline(job);
    63       }
    64     }
    65          
    66 
    67     void checkForDeadJobs() {
     60
     61    private void checkForDeadJobs() {
    6862      Logger.Info("Searching for dead Jobs");
    69       using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
     63      using (
     64        TransactionScope scope = new TransactionScope(TransactionScopeOption.Required,
     65                                                      new TransactionOptions
     66                                                      {IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE})) {
    7067        List<JobDto> allJobs = new List<JobDto>(DaoLocator.JobDao.FindAll());
    7168        foreach (JobDto curJob in allJobs) {
    72           if (curJob.State != State.calculating) {
    73             ResetJobsDependingOnResults(curJob);
     69          if (curJob.State != State.calculating && curJob.State != State.finished) {
     70            DaoLocator.JobDao.SetJobOffline(curJob);
    7471          }
    7572        }
     
    7976    }
    8077
    81     void lifecycleManager_OnStartup(object sender, EventArgs e) {
     78    private void lifecycleManager_OnStartup(object sender, EventArgs e) {
     79      Logger.Info("Startup Event Fired, Checking DB for consistency");
    8280      checkForDeadJobs();
    83     }
    84 
    85     void lifecycleManager_OnShutdown(object sender, EventArgs e) {
     81      Logger.Info("Startup Event Done");
     82    }
     83
     84    private void lifecycleManager_OnShutdown(object sender, EventArgs e) {
     85      Logger.Info("Startup Event Fired, Checking DB for consistency");
    8686      checkForDeadJobs();
     87      Logger.Info("Startup Event Done");
    8788    }
    8889
     
    9293    /// <returns></returns>
    9394    public ResponseList<JobDto> GetAllJobs() {
    94          ResponseList<JobDto> response = new ResponseList<JobDto>();
    95 
    96          response.List = new List<JobDto>(DaoLocator.JobDao.FindAll());
    97          response.Success = true;
    98          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS;
    99 
    100          return response;
     95      ResponseList<JobDto> response = new ResponseList<JobDto>();
     96
     97      response.List = new List<JobDto>(DaoLocator.JobDao.FindAll());
     98      response.Success = true;
     99      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS;
     100
     101      return response;
    101102    }
    102103
     
    105106      response.List = new List<JobDto>(DaoLocator.JobDao.FindWithLimitations(jobState, offset, count));
    106107      return response;
    107     } 
     108    }
    108109
    109110    /// <summary>
     
    112113    /// <param name="jobId"></param>
    113114    /// <returns></returns>
    114     public Stream GetJobStreamById(Guid jobId) {           
     115    public Stream GetJobStreamById(Guid jobId) {
    115116      return DaoLocator.JobDao.GetSerializedJobStream(jobId);
    116      
    117117    }
    118118
     
    122122    /// <returns></returns>
    123123    public ResponseObject<JobDto> GetJobById(Guid jobId) {
    124         ResponseObject<JobDto> response = new ResponseObject<JobDto>();
     124      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
    125125
    126126      response.Obj = DaoLocator.JobDao.FindById(jobId);
    127         if (response.Obj != null) {
    128           response.Success = true;
    129           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
    130         } else {
    131           response.Success = false;
    132           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
    133         }
    134 
    135         return response;
    136       }
     127      if (response.Obj != null) {
     128        response.Success = true;
     129        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
     130      }
     131      else {
     132        response.Success = false;
     133        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
     134      }
     135
     136      return response;
     137    }
    137138
    138139    public ResponseObject<JobDto> GetJobByIdWithDetails(Guid jobId) {
     
    144145
    145146        job.Obj.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
    146       } else {
     147      }
     148      else {
    147149        job.Success = false;
    148150        job.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
    149       }     
     151      }
    150152      return job;
    151153    }
     
    154156      IClientGroupDao cgd = DaoLocator.ClientGroupDao;
    155157      foreach (string res in resources) {
    156         foreach(ClientGroupDto cg in cgd.FindByName(res)) {
     158        foreach (ClientGroupDto cg in cgd.FindByName(res)) {
    157159          job.JobInfo.AssignedResourceIds.Add(cg.Id);
    158160        }
     
    167169    /// <returns></returns>
    168170    public ResponseObject<JobDto> AddNewJob(SerializedJob job) {
    169         ResponseObject<JobDto> response = new ResponseObject<JobDto>();
    170 
    171         if (job != null && job.JobInfo != null) {
    172           if (job.JobInfo.State != State.offline) {
    173             response.Success = false;
    174             response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE;
    175             return response;
    176           }
    177           if (job.JobInfo.Id != Guid.Empty) {
    178             response.Success = false;
    179             response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET;
    180             return response;
    181           }
    182           if (job.SerializedJobData == null) {
    183             response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
    184             response.Success = false;
    185             return response;
    186           }
    187 
    188           job.JobInfo.DateCreated = DateTime.Now;
    189           DaoLocator.JobDao.InsertWithAttachedJob(job);
    190           DaoLocator.PluginInfoDao.InsertPluginDependenciesForJob(job.JobInfo);
    191          
    192           response.Success = true;
    193           response.Obj = job.JobInfo;
    194           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED;
    195         } else {
     171      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
     172
     173      if (job != null && job.JobInfo != null) {
     174        if (job.JobInfo.State != State.offline) {
    196175          response.Success = false;
     176          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE;
     177          return response;
     178        }
     179        if (job.JobInfo.Id != Guid.Empty) {
     180          response.Success = false;
     181          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET;
     182          return response;
     183        }
     184        if (job.SerializedJobData == null) {
    197185          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
    198         }
    199 
     186          response.Success = false;
     187          return response;
     188        }
     189
     190        job.JobInfo.DateCreated = DateTime.Now;
     191        DaoLocator.JobDao.InsertWithAttachedJob(job);
     192        DaoLocator.PluginInfoDao.InsertPluginDependenciesForJob(job.JobInfo);
     193
     194        response.Success = true;
     195        response.Obj = job.JobInfo;
     196        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED;
     197      }
     198      else {
     199        response.Success = false;
     200        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
     201      }
     202
     203      return response;
     204    }
     205
     206    /// <summary>
     207    /// Removes a job from the database
     208    /// </summary>
     209    /// <param name="jobId"></param>
     210    /// <returns></returns>
     211    public Response RemoveJob(Guid jobId) {
     212      Response response = new Response();
     213
     214      JobDto job = DaoLocator.JobDao.FindById(jobId);
     215      if (job == null) {
     216        response.Success = false;
     217        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
    200218        return response;
    201219      }
    202     /*  finally {
    203         if (session != null)
    204           session.EndSession();
    205       }
    206     } */
    207 
    208     /// <summary>
    209     /// Removes a job from the database
    210     /// </summary>
    211     /// <param name="jobId"></param>
    212     /// <returns></returns>
    213     public Response RemoveJob(Guid jobId) {
    214         Response response = new Response();
    215 
    216       JobDto job = DaoLocator.JobDao.FindById(jobId);
    217         if (job == null) {
    218           response.Success = false;
    219           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
    220           return response;
    221         }
    222         DaoLocator.JobDao.Delete(job);
    223         response.Success = false;
    224         response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED;
    225 
    226         return response;
    227       }
     220      DaoLocator.JobDao.Delete(job);
     221      response.Success = false;
     222      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED;
     223
     224      return response;
     225    }
    228226
    229227    public ResponseObject<JobDto> GetLastJobResultOf(Guid jobId) {
     
    231229        new ResponseObject<JobDto>();
    232230
    233        result.Obj =
    234          GetLastJobResult(jobId);
    235        result.Success =
    236          result.Obj != null;
    237 
    238        return result;
     231      result.Obj =
     232        GetLastJobResult(jobId);
     233      result.Success =
     234        result.Obj != null;
     235
     236      return result;
    239237    }
    240238
    241239    public ResponseObject<SerializedJob>
    242240      GetLastSerializedJobResultOf(Guid jobId, bool requested) {
    243 
    244         ResponseObject<SerializedJob> response =
    245           new ResponseObject<SerializedJob>();
     241      ResponseObject<SerializedJob> response =
     242        new ResponseObject<SerializedJob>();
    246243
    247244      JobDto job = DaoLocator.JobDao.FindById(jobId);
    248         if (requested && (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent)) {
    249           response.Success = true;
    250           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
    251          
    252           //tx.Commit();
    253          
    254           return response;
    255         }
    256 
    257         /*JobResult lastResult =
    258           jobResultsAdapter.GetLastResultOf(job.Id);*/
    259 
    260         //if (lastResult != null) {
    261           response.Success = true;
    262           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
    263           response.Obj = new SerializedJob();
    264           response.Obj.JobInfo = job;
    265           response.Obj.SerializedJobData =
    266             DaoLocator.JobDao.GetBinaryJobFile(jobId);
    267         //} else {
    268         //  response.Success = false;
    269         //}
    270 
    271         //tx.Commit();
     245      if (requested && (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent)) {
     246        response.Success = true;
     247        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
     248
    272249        return response;
    273250      }
    274       /*catch (Exception ex) {
    275         if (tx != null)
    276           tx.Rollback();
    277         throw ex;
    278       }
    279       finally {
    280         if (session != null)
    281           session.EndSession();
    282       }
    283     }    */
     251
     252      response.Success = true;
     253      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
     254      response.Obj = new SerializedJob();
     255      response.Obj.JobInfo = job;
     256      response.Obj.SerializedJobData =
     257        DaoLocator.JobDao.GetBinaryJobFile(jobId);
     258      return response;
     259    }
    284260
    285261
    286262    public Response RequestSnapshot(Guid jobId) {
    287      // ISession session = factory.GetSessionForCurrentThread();
    288263      Response response = new Response();
    289      
    290      /* try {
    291         IJobAdapter jobAdapter = session.GetDataAdapter<JobDto, IJobAdapter>();*/
    292 
    293         JobDto job = DaoLocator.JobDao.FindById(jobId);
    294         if (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent) {
    295           response.Success = true;
    296           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET;
    297           return response; // no commit needed
    298         }
    299         if (job.State != State.calculating) {
    300           response.Success = false;
    301           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
    302           return response; // no commit needed
    303         }
    304         // job is in correct state
    305         job.State = State.requestSnapshot;
    306         DaoLocator.JobDao.Update(job);
    307 
    308         response.Success = true;
    309         response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET;
    310 
     264
     265
     266      JobDto job = DaoLocator.JobDao.FindById(jobId);
     267      if (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent) {
     268        response.Success = true;
     269        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET;
    311270        return response;
    312271      }
    313     /*  finally {
    314         if (session != null)
    315           session.EndSession();
    316       }
    317     } */
     272      if (job.State != State.calculating) {
     273        response.Success = false;
     274        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
     275        return response;
     276      }
     277      // job is in correct state
     278      job.State = State.requestSnapshot;
     279      DaoLocator.JobDao.Update(job);
     280
     281      response.Success = true;
     282      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET;
     283
     284      return response;
     285    }
    318286
    319287    public Response AbortJob(Guid jobId) {
    320       //ISession session = factory.GetSessionForCurrentThread();
    321288      Response response = new Response();
    322289
    323     /*  try {
    324         IJobAdapter jobAdapter = session.GetDataAdapter<JobDto, IJobAdapter>();*/
    325 
    326 //        JobDto job = jobAdapter.GetById(jobId);
    327290      JobDto job = DaoLocator.JobDao.FindById(jobId);
    328         if (job == null) {
    329           response.Success = false;
    330           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
    331           return response; // no commit needed
    332         }
    333         if (job.State == State.abort) {
    334           response.Success = true;
    335           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_ALLREADY_SET;
    336           return response; // no commit needed
    337         }
    338         if (job.State != State.calculating && job.State != State.requestSnapshot && job.State != State.requestSnapshotSent) {
    339           response.Success = false;
    340           response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
    341           return response; // no commit needed
    342         }
    343         // job is in correct state
    344         job.State = State.abort;
     291      if (job == null) {
     292        response.Success = false;
     293        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
     294        return response; // no commit needed
     295      }
     296      if (job.State == State.abort) {
     297        response.Success = true;
     298        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_ALLREADY_SET;
     299        return response; // no commit needed
     300      }
     301      if (job.State != State.calculating && job.State != State.requestSnapshot && job.State != State.requestSnapshotSent) {
     302        response.Success = false;
     303        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
     304        return response; // no commit needed
     305      }
     306      // job is in correct state
     307      job.State = State.abort;
    345308      DaoLocator.JobDao.Update(job);
    346309
    347         response.Success = true;
    348         response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET;
    349 
    350         return response;
    351       }
    352       /*finally {
    353         if (session != null)
    354           session.EndSession();
    355       }
    356     }   */
     310      response.Success = true;
     311      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET;
     312
     313      return response;
     314    }
    357315
    358316    public ResponseList<JobResult> GetAllJobResults(Guid jobId) {
     
    381339
    382340    public ResponseList<JobDto> GetJobsByProject(Guid projectId) {
    383       return null;     
    384     }
     341      return null;
     342    }
     343
     344    public byte[] GetSerializedJobDataById(Guid jobId) {
     345      return DaoLocator.JobDao.GetBinaryJobFile(jobId);
     346    }
     347
     348    public void SetSerializedJobDataById(Guid jobId, byte[] data) {
     349      DaoLocator.JobDao.SetBinaryJobFile(jobId, data);
     350    }
     351
    385352    #endregion
    386353  }
Note: See TracChangeset for help on using the changeset viewer.