Changeset 4111 for branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Engine/3.3/HiveEngine.cs
- Timestamp:
- 07/27/10 14:20:42 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Engine/3.3/HiveEngine.cs
r4107 r4111 70 70 71 71 //public OperatorGraph OperatorGraph { 72 // get { return job.Engine.OperatorGraph; }72 // get { return currentJob.Engine.; } 73 73 //} 74 74 … … 76 76 // get { return job.Engine.GlobalScope; } 77 77 //} 78 79 public TimeSpan ExecutionTime {80 get { return job.Engine.ExecutionTime; }81 }82 78 83 79 public ThreadPriority Priority { … … 86 82 } 87 83 88 //public bool Running { 89 // get { return job.Engine.ExecutionState == Core.ExecutionState.Started; } 90 //} 91 92 //public bool Canceled { 93 // get { return job.Engine.Canceled; } 94 //} 95 96 //public bool Terminated { 97 // get { return job.Engine.Terminated; } 98 //} 99 100 public ExecutionState ExecutionState { 101 get { return job.Engine.ExecutionState; } 102 } 103 104 protected override void ProcessNextOperation() { 105 var jobObj = CreateJobObj(); 84 public new void Prepare(IOperation initialOperation) { 85 this.job.Prepare(initialOperation); 86 OnPrepared(); 87 } 88 89 public override void Start() { 90 SerializedJob jobObj = CreateSerializedJob(); 106 91 IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl); 107 92 108 93 List<string> groups = new List<string>(); 109 if (!String. Empty.Equals(RessourceIds)) {94 if (!String.IsNullOrEmpty(RessourceIds)) { 110 95 groups.AddRange(RessourceIds.Split(';')); 111 96 } 112 97 113 /*if(!String.Empty.Equals(RessourceIds)) { 114 String[] ids = RessourceIds.Split(';'); 115 foreach (string sid in ids) { 116 try { 117 System.Guid gid = new Guid(sid); 118 jobObj.JobInfo.AssignedResourceIds.Add(gid); 119 } catch(Exception ex) { 120 121 } 122 } 123 } */ 124 125 int loops = 1; 126 127 Int32.TryParse(MultiSubmitCount, out loops); 128 if (loops == 0) 129 loops = 1; 130 131 for (int i = 0; i < loops; i++) { 132 ResponseObject<Contracts.BusinessObjects.JobDto> res = executionEngineFacade.AddJobWithGroupStrings(jobObj, groups); 133 jobId = res.Obj.Id; 134 } 98 ResponseObject<JobDto> res = executionEngineFacade.AddJobWithGroupStrings(jobObj, groups); 99 jobId = res.Obj.Id; 135 100 136 101 StartResultPollingThread(); … … 165 130 job = restoredJob; 166 131 132 // [chn] how to show the View in 3.3? 167 133 //ControlManager.Manager.ShowControl(job.Engine.CreateView()); 168 134 //OnChanged(); … … 174 140 } 175 141 176 private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() { 177 HeuristicLab.Hive.Contracts.BusinessObjects.JobDto jobObj = new HeuristicLab.Hive.Contracts.BusinessObjects.JobDto(); 178 179 // create xml document with <Root> as root-node and the persisted job as child 142 public override void Pause() { 143 base.Pause(); 144 if (job != null) { 145 IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl); 146 executionEngineFacade.AbortJob(jobId); 147 } 148 } 149 150 public override void Stop() { 151 base.Stop(); 152 if (job != null) { 153 IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl); 154 executionEngineFacade.AbortJob(jobId); 155 } 156 } 157 158 /// <summary> 159 /// Creates a SerializedJob Object which contains the currentJob as binary array 160 /// and some metainfo about that job. 161 /// </summary> 162 private SerializedJob CreateSerializedJob() { 163 JobDto jobDto = new JobDto(); 164 165 // serialize current job 180 166 MemoryStream memStream = new MemoryStream(); 181 167 XmlGenerator.Serialize(job, memStream); 182 168 183 169 // convert memStream to byte-array 184 HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj = new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();185 executableJobObj.JobInfo = job Obj;170 SerializedJob executableJobObj = new SerializedJob(); 171 executableJobObj.JobInfo = jobDto; 186 172 executableJobObj.SerializedJobData = memStream.ToArray(); 187 173 … … 195 181 }).ToList(); 196 182 197 job Obj.CoresNeeded = 1;198 job Obj.PluginsNeeded = pluginsNeeded;199 job Obj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;183 jobDto.CoresNeeded = 1; 184 jobDto.PluginsNeeded = pluginsNeeded; 185 jobDto.State = State.offline; 200 186 return executableJobObj; 201 187 } 202 188 203 /// <summary>204 /// Returns a list of plugins in which the type itself and all members205 /// of the type are declared. Objectgraph is searched recursively.206 /// </summary>207 private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {208 HashSet<Type> types = new HashSet<Type>();209 FindTypes(type, types, "HeuristicLab.");210 IEnumerable<IPluginDescription> plugins = from t in types211 select ApplicationManager.Manager.GetDeclaringPlugin(t);212 return plugins;213 }214 215 /// <summary>216 /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart217 /// </summary>218 /// <param name="type">the type to be searched</param>219 /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>220 /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>221 private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {222 223 // search is not performed on attributes224 225 if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {226 types.Add(type);227 228 // interfaces229 foreach (Type t in type.GetInterfaces()) {230 FindTypes(t, types, namespaceStart);231 }232 233 // events234 foreach (EventInfo info in type.GetEvents()) {235 FindTypes(info.EventHandlerType, types, namespaceStart);236 FindTypes(info.DeclaringType, types, namespaceStart);237 }238 239 // properties240 foreach (PropertyInfo info in type.GetProperties()) {241 FindTypes(info.PropertyType, types, namespaceStart);242 }243 244 // fields245 foreach (FieldInfo info in type.GetFields()) {246 FindTypes(info.FieldType, types, namespaceStart);247 }248 249 // constructors : maybe constructors them out (?)250 foreach (ConstructorInfo info in type.GetConstructors()) {251 foreach (ParameterInfo paramInfo in info.GetParameters()) {252 FindTypes(paramInfo.ParameterType, types, namespaceStart);253 }254 }255 256 // methods257 foreach (MethodInfo info in type.GetMethods()) {258 foreach (ParameterInfo paramInfo in info.GetParameters()) {259 FindTypes(paramInfo.ParameterType, types, namespaceStart);260 }261 FindTypes(info.ReturnType, types, namespaceStart);262 }263 }264 }265 189 266 190 //public event EventHandler Initialized; … … 321 245 //} 322 246 323 //public void Reset() { 324 // abortRequested = false; 325 326 // throw new NotImplementedException("[chn] how to reset Engine?"); 327 // //job.Engine.Reset(); 328 329 // jobId = Guid.NewGuid(); 330 // OnInitialized(); 331 //} 332 333 //public void RequestSnapshot() { 334 // IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl); 335 // int retryCount = 0; 336 // ResponseObject<SerializedJob> response; 337 // lock (locker) { 338 // Logger.Debug("HiveEngine: Abort - RequestSnapshot"); 339 // Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId); 340 // if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) { 341 // // job is finished already 342 // Logger.Debug("HiveEngine: Abort - GetLastResult(false)"); 343 // response = executionEngineFacade.GetLastSerializedResult(jobId, false, false); 344 // Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success); 345 // } else { 346 // // server sent snapshot request to client 347 // // poll until snapshot is ready 348 // do { 349 // Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS); 350 // Logger.Debug("HiveEngine: Abort - GetLastResult(true)"); 351 // response = executionEngineFacade.GetLastSerializedResult(jobId, false, true); 352 // Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success); 353 // retryCount++; 354 // // loop while 355 // // 1. problem with communication with server 356 // // 2. job result not yet ready 357 // } while ( 358 // (retryCount < MAX_SNAPSHOT_RETRIES) && ( 359 // !response.Success || 360 // response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) 361 // ); 362 // } 363 // } 364 // SerializedJob jobResult = response.Obj; 365 // if (jobResult != null) { 366 // Logger.Debug("HiveEngine: Results-polling - Got result!"); 367 368 // //job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobData); 369 // //ControlManager.Manager.ShowControl(job.Engine.CreateView()); 370 // throw new NotImplementedException("TODO[chn]use persistency-3.3"); 371 // } 372 // //HiveLogger.Debug("HiveEngine: Results-polling - Exception!"); 373 // //Exception ex = new Exception(response.Obj.Exception.Message); 374 // //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 375 //} 376 377 //public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) { 378 // XmlNode node = base.GetXmlNode(name, document, persistedObjects); 379 // XmlAttribute attr = document.CreateAttribute("HiveServerUrl"); 380 // attr.Value = HiveServerUrl; 381 // node.Attributes.Append(attr); 382 // node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects)); 383 // return node; 384 //} 385 386 //public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) { 387 // base.Populate(node, restoredObjects); 388 // HiveServerUrl = node.Attributes["HiveServerUrl"].Value; 389 // job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects); 390 //} 391 392 247 public void RequestSnapshot() { 248 if (job != null) { 249 IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl); 250 int retryCount = 0; 251 ResponseObject<SerializedJob> response; 252 lock (locker) { 253 Logger.Debug("HiveEngine: Abort - RequestSnapshot"); 254 Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId); 255 if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) { 256 // job is finished already 257 Logger.Debug("HiveEngine: Abort - GetLastResult(false)"); 258 response = executionEngineFacade.GetLastSerializedResult(jobId, false, false); 259 Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success); 260 } else { 261 // server sent snapshot request to client 262 // poll until snapshot is ready 263 do { 264 Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS); 265 Logger.Debug("HiveEngine: Abort - GetLastResult(true)"); 266 response = executionEngineFacade.GetLastSerializedResult(jobId, false, true); 267 Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success); 268 retryCount++; 269 // loop while 270 // 1. problem with communication with server 271 // 2. job result not yet ready 272 } while ( 273 (retryCount < MAX_SNAPSHOT_RETRIES) && ( 274 !response.Success || 275 response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) 276 ); 277 } 278 279 } 280 SerializedJob jobResult = response.Obj; 281 if (jobResult != null) { 282 Logger.Debug("HiveEngine: Results-polling - Got result!"); 283 284 job = XmlParser.Deserialize<Job>(new MemoryStream(jobResult.SerializedJobData)); 285 286 throw new NotImplementedException("[chn] how to create a view in 3.3 and why should i do this here? shouldnt the caller of this method receive a result and decide what to do?"); 287 //ControlManager.Manager.ShowControl(job.Engine.CreateView()); 288 } 289 } 290 //HiveLogger.Debug("HiveEngine: Results-polling - Exception!"); 291 //Exception ex = new Exception(response.Obj.Exception.Message); 292 //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 293 } 294 295 protected override void ProcessNextOperation() { 296 throw new NotSupportedException(); 297 } 298 299 #region Required Plugin Search 300 /// <summary> 301 /// Returns a list of plugins in which the type itself and all members 302 /// of the type are declared. Objectgraph is searched recursively. 303 /// </summary> 304 private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) { 305 HashSet<Type> types = new HashSet<Type>(); 306 FindTypes(type, types, "HeuristicLab."); 307 return GetDeclaringPlugins(types); 308 } 309 310 /// <summary> 311 /// Returns the plugins (including dependencies) in which the given types are declared 312 /// </summary> 313 private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) { 314 HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>(); 315 foreach (Type t in types) { 316 FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins); 317 } 318 return plugins; 319 } 320 321 /// <summary> 322 /// Finds the dependencies of the given plugin and adds it to the plugins hashset. 323 /// Also searches the dependencies recursively. 324 /// </summary> 325 private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) { 326 if (!plugins.Contains(plugin)) { 327 plugins.Add(plugin); 328 foreach (IPluginDescription dependency in plugin.Dependencies) { 329 FindDeclaringPlugins(dependency, plugins); 330 } 331 } 332 } 333 334 /// <summary> 335 /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart 336 /// Be aware that search is not performed on attributes 337 /// </summary> 338 /// <param name="type">the type to be searched</param> 339 /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param> 340 /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param> 341 private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) { 342 if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) { 343 types.Add(type); 344 345 // constructors 346 foreach (ConstructorInfo info in type.GetConstructors()) { 347 foreach (ParameterInfo paramInfo in info.GetParameters()) { 348 FindTypes(paramInfo.ParameterType, types, namespaceStart); 349 } 350 } 351 352 // interfaces 353 foreach (Type t in type.GetInterfaces()) { 354 FindTypes(t, types, namespaceStart); 355 } 356 357 // events 358 foreach (EventInfo info in type.GetEvents()) { 359 FindTypes(info.EventHandlerType, types, namespaceStart); 360 FindTypes(info.DeclaringType, types, namespaceStart); 361 } 362 363 // properties 364 foreach (PropertyInfo info in type.GetProperties()) { 365 FindTypes(info.PropertyType, types, namespaceStart); 366 } 367 368 // fields 369 foreach (FieldInfo info in type.GetFields()) { 370 FindTypes(info.FieldType, types, namespaceStart); 371 } 372 373 // methods 374 foreach (MethodInfo info in type.GetMethods()) { 375 foreach (ParameterInfo paramInfo in info.GetParameters()) { 376 FindTypes(paramInfo.ParameterType, types, namespaceStart); 377 } 378 FindTypes(info.ReturnType, types, namespaceStart); 379 } 380 } 381 } 382 #endregion 393 383 } 394 384 }
Note: See TracChangeset
for help on using the changeset viewer.