Changeset 6388 for branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs
- Timestamp:
- 06/08/11 11:07:33 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs
r6354 r6388 31 31 using HeuristicLab.Optimization; 32 32 using System.Linq; 33 using Microsoft.Hpc.Scheduler; 34 using System.ServiceModel; 35 using HeuristicLab.MPIAlgorithmRunner; 36 using HeuristicLab.Operators.MPISupport; 33 37 34 38 namespace HeuristicLab.MPIEngine { … … 49 53 } 50 54 51 private string algFile;55 private IAlgorithm algorithm; 52 56 53 57 public override void Start() { … … 64 68 alg.Engine = new SequentialEngine.SequentialEngine(); 65 69 66 algFile = Path.GetTempFileName(); 67 XmlGenerator.Serialize(alg, algFile); 70 algorithm = alg; 68 71 } 69 72 … … 77 80 IScope globalScope = context.Scope; 78 81 79 string resultFile = Path.GetTempFileName(); 80 ItemList<ResultCollection> empty = new ItemList<ResultCollection>(); 81 XmlGenerator.Serialize(empty, resultFile); 82 string exec = @"mpiexec"; 83 string args = @"-c 3 C:\public\MPISupport\HeuristicLab.MPIAlgorithmRunner-3.3.exe "; 82 84 83 string exec = @"C:\Program Files\Microsoft Compute Cluster Pack\Bin\mpiexec.exe";84 s tring args = @"-n 3 HeuristicLab.MPIAlgorithmRunner-3.3.exe " + algFile + " " + resultFile + " 5000";85 IScheduler scheduler = new Scheduler(); 86 scheduler.Connect("blade00.hpc.fh-hagenberg.at"); 85 87 86 System.Threading.Thread pollThread = new System.Threading.Thread(delegate(object state) { 87 while (true) { 88 System.Threading.Thread.Sleep(5000); 88 ISchedulerJob job = scheduler.CreateJob(); 89 job.Name = "HeuristicLab.MPIEngine"; 90 job.RequestedNodes.Add("BLADE03"); 91 ISchedulerTask task = job.CreateTask(); 92 task.Name = "HeuristicLab.MPIAlgorithmRunner"; 93 task.CommandLine = exec + " " + args; 94 task.StdOutFilePath = "stdout.txt"; 95 task.StdErrFilePath = "stderr.txt"; 96 task.MinimumNumberOfCores = task.MaximumNumberOfCores = 3; 97 job.AddTask(task); 89 98 90 lock (resultFile) { 91 object results = XmlParser.Deserialize(resultFile); 92 ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection); 99 scheduler.SubmitJob(job, @"HPC\svonolfe", @"Vunkopf!3"); 93 100 94 if (resultCollection != null){95 if (!resultCollection.ContainsKey("MPIResults"))96 resultCollection.Add(new Result("MPIResults", results as IItem));101 try { 102 string address = null; 103 int timeout = 10; 97 104 98 resultCollection["MPIResults"].Value = results as IItem; 105 while (address == null && timeout > 0) { 106 ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id); 107 if (schedulerJob != null) { 108 INameValueCollection properties = schedulerJob.GetCustomProperties(); 109 NameValue item = properties.FirstOrDefault(i => i.Name == "address"); 110 if (item != null) { 111 address = item.Value; 112 } else { 113 System.Threading.Thread.Sleep(1000); 114 timeout--; 99 115 } 100 101 116 } 102 117 } 103 });104 pollThread.Start();105 118 106 Process p = Process.Start(exec, args); 107 p.WaitForExit(); 119 if (address == null) { 120 throw new Exception("A timeout occurred when starting the MPIAlgorithmRunner"); 121 } 108 122 109 pollThread.Abort(); 110 File.Delete(algFile); 111 File.Delete(resultFile); 123 NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None); 124 ChannelFactory<IAlgorithmBroker> factory = new ChannelFactory<IAlgorithmBroker>(netTCPBinding, address); 125 IAlgorithmBroker proxy = factory.CreateChannel(); 126 127 proxy.TransmitAlgorithm(new MPITransportWrapper<IAlgorithm>(algorithm)); 128 129 while (!proxy.IsAlgorithmTerminated()) { 130 ItemList<ResultCollection> results = proxy.GetResults().InnerItem; 131 132 ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection); 133 134 if (resultCollection != null && results != null) { 135 if (!resultCollection.ContainsKey("MPIResults")) 136 resultCollection.Add(new Result("MPIResults", results)); 137 138 resultCollection["MPIResults"].Value = results; 139 } 140 141 System.Threading.Thread.Sleep(5000); 142 } 143 } 144 catch (Exception e) { 145 scheduler.CancelJob(job.Id, "Exception: " + e.Message); 146 throw e; 147 } 112 148 } 113 149 }
Note: See TracChangeset
for help on using the changeset viewer.