Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
08/27/10 08:35:43 (14 years ago)
Author:
cneumuel
Message:

added authorizationManager which checks for permission to specific jobs (#1168)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs

    r4316 r4333  
    6767    private Semaphore fetchJobSemaphore = new Semaphore(2, 2);
    6868
     69    private static object pendingOptimizerMappingsLocker = new object();
     70
    6971    private bool stopResultsPollingPending = false;
    7072
     
    197199      clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
    198200
    199       foreach (var pair in this.pendingOptimizersByJobId)
    200         clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
    201 
    202       foreach (var pair in this.parentOptimizersByPendingOptimizer)
    203         clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
    204 
     201      lock (pendingOptimizerMappingsLocker) {
     202        foreach (var pair in this.pendingOptimizersByJobId)
     203          clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
     204
     205        foreach (var pair in this.parentOptimizersByPendingOptimizer)
     206          clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
     207      }
    205208      clone.log = (ILog)cloner.Clone(log);
    206209      clone.stopPending = this.stopPending;
     
    243246      if (experiment != null) {
    244247        StopResultPolling();
    245         lock (pendingOptimizersByJobId) {
     248        lock (pendingOptimizerMappingsLocker) {
    246249          pendingOptimizersByJobId.Clear();
    247         }
    248         parentOptimizersByPendingOptimizer.Clear();
     250          parentOptimizersByPendingOptimizer.Clear();
     251        }
    249252        lock (jobItems) {
    250253          jobItems.Clear();
     
    275278
    276279          IEnumerable<string> groups = ResourceGroups;
    277 
    278           foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
    279             SerializedJob serializedJob = CreateSerializedJob(optimizer);
    280             ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups);
    281             lock (pendingOptimizersByJobId) {
     280          lock (pendingOptimizerMappingsLocker) {
     281            foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
     282              SerializedJob serializedJob = CreateSerializedJob(optimizer);
     283              ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups);
    282284              pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
     285
     286              JobItem jobItem = new JobItem() {
     287                JobDto = response.Obj,
     288                LatestSnapshot = null,
     289                Optimizer = optimizer
     290              };
     291              lock (jobItems) {
     292                jobItems.Add(jobItem);
     293              }
     294              LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
    283295            }
    284 
    285             JobItem jobItem = new JobItem() {
    286               JobDto = response.Obj,
    287               LatestSnapshot = null,
    288               Optimizer = optimizer
    289             };
    290             lock (jobItems) {
    291               jobItems.Add(jobItem);
    292             }
    293             LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
    294296          }
    295297        }
     
    396398
    397399    private bool NoMorePendingOptimizers() {
    398       lock (pendingOptimizersByJobId) {
     400      lock (pendingOptimizerMappingsLocker) {
    399401        return pendingOptimizersByJobId.Count == 0;
    400402      }
     
    408410    /// <param name="jobId"></param>
    409411    private void DisposeOptimizerMappings(Guid jobId) {
    410       lock (pendingOptimizersByJobId) {
    411         LogMessage(jobId, "Disposing Optimizer Mappings");
     412      LogMessage(jobId, "Disposing Optimizer Mappings");
     413      lock (pendingOptimizerMappingsLocker) {
    412414        parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
    413415        pendingOptimizersByJobId.Remove(jobId);
     
    436438    private void JobItem_JobStateChanged(object sender, EventArgs e) {
    437439      JobItem jobItem = (JobItem)sender;
     440
    438441      Thread t = new Thread(() => {
    439         if (jobItem.State == JobState.Finished) {
    440           FetchAndUpdateJob(jobItem.JobDto.Id);
    441           DisposeOptimizerMappings(jobItem.JobDto.Id);
    442         } else if (jobItem.State == JobState.Failed) {
    443           DisposeOptimizerMappings(jobItem.JobDto.Id);
    444         }
    445 
    446         if (NoMorePendingOptimizers()) {
    447           StopResultPolling();
    448           this.ExecutionState = Core.ExecutionState.Stopped;
    449           OnStopped();
     442        try {
     443          if (jobItem.State == JobState.Finished) {
     444            FetchAndUpdateJob(jobItem.JobDto.Id);
     445            DisposeOptimizerMappings(jobItem.JobDto.Id);
     446          } else if (jobItem.State == JobState.Failed) {
     447            DisposeOptimizerMappings(jobItem.JobDto.Id);
     448          }
     449
     450          if (NoMorePendingOptimizers()) {
     451            StopResultPolling();
     452            this.ExecutionState = Core.ExecutionState.Stopped;
     453            OnStopped();
     454          }
     455        }
     456        catch (Exception ex) {
     457          Logger.Error("JobItem_JobStateChanged failed badly: " + ex.Message);
     458          LogMessage("JobItem_JobStateChanged failed badly: " + ex.Message);
    450459        }
    451460      });
     
    457466    /// </summary>
    458467    private void FetchAndUpdateJob(Guid jobId) {
     468      bool tryagain = false;
    459469      LogMessage(jobId, "FetchAndUpdateJob started");
    460       IClientFacade clientFacade = CreateStreamedClientFacade();
    461       IOptimizer originalOptimizer;
    462       lock (pendingOptimizersByJobId) {
    463         originalOptimizer = pendingOptimizersByJobId[jobId];
    464       }
    465 
    466       fetchJobSemaphore.WaitOne();
    467       ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false);
    468       ServiceLocator.DisposeClientFacade(clientFacade);
    469       IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
    470       IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
    471 
    472       ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
    473       fetchJobSemaphore.Release();
    474       LogMessage(jobId, "FetchAndUpdateJob ended");
     470      if (fetchJobSemaphore.WaitOne(new TimeSpan(0, 2, 0))) {
     471        IClientFacade clientFacade = null;
     472        try {
     473          clientFacade = CreateStreamedClientFacade();
     474          IOptimizer originalOptimizer;
     475          originalOptimizer = pendingOptimizersByJobId[jobId];
     476
     477          ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false);
     478          IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
     479          IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
     480          ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
     481          LogMessage(jobId, "FetchAndUpdateJob ended");
     482        }
     483        catch (Exception e) {
     484          LogMessage(jobId, "FetchAndUpdateJob failed: " + e.Message + ". Will try again!");
     485          tryagain = true;
     486        }
     487        finally {
     488          ServiceLocator.DisposeClientFacade(clientFacade);
     489          fetchJobSemaphore.Release();
     490        }
     491      } else {
     492        LogMessage(jobId, "FetchAndUpdateJob timed out. Will try again!");
     493        tryagain = true;
     494      }
     495
     496      if (tryagain) {
     497        FetchAndUpdateJob(jobId);
     498      }
    475499    }
    476500
     
    509533        PluginsNeeded = pluginsNeeded,
    510534        State = JobState.Offline,
    511         MemoryNeeded = 0,
    512         UserId = Guid.Empty // [chn] set real userid here!
     535        MemoryNeeded = 0
    513536      };
    514537
     
    537560    public void StopResultPolling() {
    538561      this.stopResultsPollingPending = true;
    539       resultPollingThread.Interrupt();
     562      if (resultPollingThread != null && resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) {
     563        resultPollingThread.Interrupt();
     564      }
    540565      this.stopResultsPollingPending = false;
    541566    }
     
    581606          // thread has been interuppted
    582607        }
     608        catch (Exception e) {
     609          LogMessage("Result Polling Thread failed badly: " + e.Message);
     610          Logger.Error("Result Polling Thread failed badly: " + e.Message);
     611        }
    583612        finally {
    584613          this.IsPollingResults = false;
     
    598627    public void RequestSnapshot(Guid jobId) {
    599628      Thread t = new Thread(() => {
    600         IClientFacade clientFacade = CreateStreamedClientFacade();
     629        IClientFacade clientFacade = null;
    601630        try {
     631          clientFacade = CreateStreamedClientFacade();
     632
    602633          ResponseObject<SerializedJob> response;
    603634          int retryCount = 0;
     
    633664          }
    634665        }
     666        catch (Exception e) {
     667          LogMessage("RequestSnapshot Thread failed badly: " + e.Message);
     668          Logger.Error("RequestSnapshot Thread failed badly: " + e.Message);
     669        }
    635670        finally {
    636671          ServiceLocator.DisposeClientFacade(clientFacade);
     
    885920          clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp);
    886921
    887         } catch (EndpointNotFoundException exception) {
     922        }
     923        catch (EndpointNotFoundException exception) {
    888924          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
    889925          Thread.Sleep(resultPollingIntervalMs);
     
    899935          //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName));
    900936          clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp);
    901         } catch (EndpointNotFoundException exception) {
     937        }
     938        catch (EndpointNotFoundException exception) {
    902939          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
    903940          Thread.Sleep(resultPollingIntervalMs);
Note: See TracChangeset for help on using the changeset viewer.