Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Hive/P2PTask.cs @ 14906

Last change on this file since 14906 was 13972, checked in by thasling, 8 years ago

#2615:
improved log
made changes in data structure

File size: 9.0 KB
RevLine 
[13556]1using System;
2using System.Linq;
3using System.Threading;
4using System.Threading.Tasks;
5using DistributedGA.Core;
6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Implementation;
8using DistributedGA.Core.Interface;
9using HeuristicLab.Common;
10using HeuristicLab.Core;
11using HeuristicLab.Data;
12using HeuristicLab.Optimization;
13using HeuristicLab.Parameters;
14using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
15
[13887]16namespace DistributedGA.Hive
17{
18    [StorableClass]
19    [Creatable(Category = "Haslinger's Very Special XO", Priority = 1000)]
20    public class P2PTask : ParameterizedNamedItem, IOptimizer
21    {
[13556]22
[13887]23        [Storable]
24        private DateTime startTime;
25        [Storable]
26        private RunCollection runCollection;
[13557]27
[13887]28        #region Constructors and Cloning
29        [StorableConstructor]
30        protected P2PTask(bool deserializing) { }
31        protected P2PTask(P2PTask original, Cloner cloner)
32            : base(original, cloner)
33        {
34            startTime = original.startTime;
35            runCollection = cloner.Clone(original.runCollection);
36        }
37        public P2PTask()
38        {
39            Name = "P2PTask";
40            Description = "P2PTask";
41            runCollection = new RunCollection();
[13556]42
[13887]43            Parameters.Add(new ValueParameter<StringValue>("LanIpPrefix", "", new StringValue("10.")));
44            Parameters.Add(new ValueParameter<StringValue>("ContactServerURL", "", new StringValue("net.tcp://10.42.1.150:9090/DistributedGA.ContactServer/ContactService")));
45            Parameters.Add(new ValueParameter<Log>("Log", "", new Log()));
[13556]46
[13887]47        }
[13556]48
[13887]49        [StorableHook(HookType.AfterDeserialization)]
50        protected virtual void AfterDeserialization()
51        {
52        }
[13556]53
[13887]54        public override IDeepCloneable Clone(Cloner cloner)
55        {
56            return new P2PTask(this, cloner);
57        }
58        #endregion
[13556]59
[13887]60        #region ITask Members
61        public ExecutionState ExecutionState { get; private set; }
[13556]62
[13887]63        public TimeSpan ExecutionTime { get; private set; }
[13556]64
[13887]65        public void Prepare()
66        {
67            Prepare(true);
68        }
69        public void Prepare(bool clearRuns)
70        {
71            // ignore
72            ExecutionState = HeuristicLab.Core.ExecutionState.Prepared;
73            OnExecutionStateChanged();
74            OnPrepared();
75        }
[13556]76
[13887]77        private CancellationTokenSource cts;
78        private ManualResetEvent stoppedEvent;
[13556]79
[13887]80        public void Start()
81        {
82            Task.Factory.StartNew(() =>
83            {
84                cts = new CancellationTokenSource();
85                stoppedEvent = new ManualResetEvent(false);
86                startTime = DateTime.Now;
[13556]87
[13887]88                ExecutionState = HeuristicLab.Core.ExecutionState.Started;
89                OnExecutionStateChanged();
90                OnStarted();
[13556]91
[13887]92                var log = ((Log)(Parameters["Log"].ActualValue));
[13556]93
[13887]94                try
95                {
[13557]96
[13887]97                    log.LogMessage("Starting peer...");
98                    IMessageHandler h = new PeerNetworkMessageHandler();
99                    var lanIpPrefix = ((StringValue)(Parameters["LanIpPrefix"].ActualValue)).Value;
100                    var contactServerUri = ((StringValue)(Parameters["ContactServerURL"].ActualValue)).Value;
[13956]101                    h.Init(lanIpPrefix, contactServerUri, "TEST", 10000, 100);
[13887]102                    PeerInfo pi = h.GetPeerInfo();
103                    log.LogMessage(string.Format("Peer is hostet at IP: {0} and port: {1}", pi.IpAddress, pi.Port));
104                    Thread.Sleep(1000 * 20);
105                    log.LogMessage("Current peers within network:");
106                    foreach (var item in h.GetCurrentNetwork())
107                    {
108                        log.LogMessage(string.Format("Peer at {0}:{1}", item.IpAddress, item.Port));
109                    }
110                    int i = 0;
111                    while (i < 10 && !cts.Token.IsCancellationRequested)
112                    {
113                        i++;
114                        Thread.Sleep(1000 * 10);
115                        var message = CreateMessage(pi, i);
116                        Console.WriteLine("Publishing messages...");
[13972]117                        //h.PublishDataToNetwork(message);
[13887]118                        Console.WriteLine("Messages published.");
119                        Console.WriteLine("Recieved messages:");
120                        foreach (var item in h.GetDataFromNetwork())
121                        {
[13972]122                            //log.LogMessage(string.Format("Message:{0}", GetString(item)));
[13887]123                        }
124                        ExecutionTime = DateTime.Now - startTime;
125                        OnExecutionTimeChanged();
126                    }
127                }
128                catch (Exception ex)
129                {
130                    log.LogMessage(ex.Message);
131                    log.LogMessage("press any key to continue...");
132                }
[13557]133
[13887]134                var run = new Run();
135                var results = new ResultCollection();
[13557]136
[13887]137                run.Results.Add("Execution Time", new TimeSpanValue(ExecutionTime));
138                run.Results.Add("Log", log);
[13556]139
[13887]140                runCollection.Add(run);
[13556]141
[13887]142                stoppedEvent.Set();
143                ExecutionState = HeuristicLab.Core.ExecutionState.Stopped;
[13556]144
[13887]145                OnExecutionStateChanged();
146                OnStopped();
147            });
148        }
[13556]149
150
[13887]151        public void Pause()
152        {
153            if (cts != null) cts.Cancel();
154            stoppedEvent.WaitOne();
[13556]155
[13887]156            ExecutionState = HeuristicLab.Core.ExecutionState.Paused;
157            OnExecutionStateChanged();
[13556]158
[13887]159            OnPaused();
160        }
[13556]161
[13887]162        public void Stop()
163        {
164            if (cts != null) cts.Cancel();
165            stoppedEvent.WaitOne();
[13556]166
[13887]167            ExecutionState = HeuristicLab.Core.ExecutionState.Stopped;
168            OnExecutionStateChanged();
[13556]169
[13887]170            OnStopped();
171        }
[13556]172
[13887]173        public event EventHandler Started;
174        protected virtual void OnStarted()
175        {
176            EventHandler handler = Started;
177            if (handler != null) handler(this, EventArgs.Empty);
178        }
[13556]179
[13887]180        public event EventHandler Stopped;
181        protected virtual void OnStopped()
182        {
183            EventHandler handler = Stopped;
184            if (handler != null) handler(this, EventArgs.Empty);
185        }
[13556]186
[13887]187        public event EventHandler Paused;
188        protected virtual void OnPaused()
189        {
190            EventHandler handler = Paused;
191            if (handler != null) handler(this, EventArgs.Empty);
192        }
[13556]193
[13887]194        public event EventHandler Prepared;
195        protected virtual void OnPrepared()
196        {
197            EventHandler handler = Prepared;
198            if (handler != null) handler(this, EventArgs.Empty);
199        }
[13556]200
201
[13887]202        public event EventHandler ComputeInParallelChanged;
203        protected virtual void OnComputeInParallelChanged()
204        {
205            EventHandler handler = ComputeInParallelChanged;
206            if (handler != null) handler(this, EventArgs.Empty);
207        }
208        #endregion
[13556]209
[13887]210        #region Events
211        public event EventHandler ExecutionTimeChanged;
212        protected virtual void OnExecutionTimeChanged()
213        {
214            EventHandler handler = ExecutionTimeChanged;
215            if (handler != null) handler(this, EventArgs.Empty);
216        }
217        public event EventHandler ExecutionStateChanged;
218        protected virtual void OnExecutionStateChanged()
219        {
220            EventHandler handler = ExecutionStateChanged;
221            if (handler != null) handler(this, EventArgs.Empty);
222        }
223        #endregion
[13556]224
[13887]225        public override string ToString()
226        {
227            return Name;
228        }
[13556]229
230
231
[13887]232        private static byte[][] CreateMessage(PeerInfo pi, int iterationNumber)
233        {
234            string msg1 = string.Concat("Message 1 from Peer ", pi.IpAddress, ":", pi.Port, " at iteration ", iterationNumber);
235            string msg2 = string.Concat("Message 2 from Peer ", pi.IpAddress, ":", pi.Port, " at iteration ", iterationNumber);
236            return new byte[][] { GetBytes(msg1), GetBytes(msg2) };
237        }
[13556]238
[13887]239        static byte[] GetBytes(string str)
240        {
241            byte[] bytes = new byte[str.Length * sizeof(char)];
242            System.Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length);
243            return bytes;
244        }
[13556]245
[13887]246        static string GetString(byte[] bytes)
247        {
248            char[] chars = new char[bytes.Length / sizeof(char)];
249            System.Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
250            return new string(chars);
[13556]251        }
[13887]252
253        public System.Collections.Generic.IEnumerable<IOptimizer> NestedOptimizers
254        {
255            get { return Enumerable.Empty<IOptimizer>(); }
[13556]256        }
[13887]257
258        public RunCollection Runs
259        {
260            get { return runCollection; }
[13556]261        }
262
[13887]263        public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
[13556]264
265    }
266}
Note: See TracBrowser for help on using the repository browser.