source: branches/thasling/DistributedGA/DistributedGA.Hive/P2PTask.cs @ 13972

Last change on this file since 13972 was 13972, checked in by thasling, 6 years ago

#2615:
improved log
made changes in data structure

File size: 9.0 KB
Line 
1using System;
2using System.Linq;
3using System.Threading;
4using System.Threading.Tasks;
5using DistributedGA.Core;
6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Implementation;
8using DistributedGA.Core.Interface;
9using HeuristicLab.Common;
10using HeuristicLab.Core;
11using HeuristicLab.Data;
12using HeuristicLab.Optimization;
13using HeuristicLab.Parameters;
14using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
15
16namespace DistributedGA.Hive
17{
18    [StorableClass]
19    [Creatable(Category = "Haslinger's Very Special XO", Priority = 1000)]
20    public class P2PTask : ParameterizedNamedItem, IOptimizer
21    {
22
23        [Storable]
24        private DateTime startTime;
25        [Storable]
26        private RunCollection runCollection;
27
28        #region Constructors and Cloning
29        [StorableConstructor]
30        protected P2PTask(bool deserializing) { }
31        protected P2PTask(P2PTask original, Cloner cloner)
32            : base(original, cloner)
33        {
34            startTime = original.startTime;
35            runCollection = cloner.Clone(original.runCollection);
36        }
37        public P2PTask()
38        {
39            Name = "P2PTask";
40            Description = "P2PTask";
41            runCollection = new RunCollection();
42
43            Parameters.Add(new ValueParameter<StringValue>("LanIpPrefix", "", new StringValue("10.")));
44            Parameters.Add(new ValueParameter<StringValue>("ContactServerURL", "", new StringValue("net.tcp://10.42.1.150:9090/DistributedGA.ContactServer/ContactService")));
45            Parameters.Add(new ValueParameter<Log>("Log", "", new Log()));
46
47        }
48
49        [StorableHook(HookType.AfterDeserialization)]
50        protected virtual void AfterDeserialization()
51        {
52        }
53
54        public override IDeepCloneable Clone(Cloner cloner)
55        {
56            return new P2PTask(this, cloner);
57        }
58        #endregion
59
60        #region ITask Members
61        public ExecutionState ExecutionState { get; private set; }
62
63        public TimeSpan ExecutionTime { get; private set; }
64
65        public void Prepare()
66        {
67            Prepare(true);
68        }
69        public void Prepare(bool clearRuns)
70        {
71            // ignore
72            ExecutionState = HeuristicLab.Core.ExecutionState.Prepared;
73            OnExecutionStateChanged();
74            OnPrepared();
75        }
76
77        private CancellationTokenSource cts;
78        private ManualResetEvent stoppedEvent;
79
80        public void Start()
81        {
82            Task.Factory.StartNew(() =>
83            {
84                cts = new CancellationTokenSource();
85                stoppedEvent = new ManualResetEvent(false);
86                startTime = DateTime.Now;
87
88                ExecutionState = HeuristicLab.Core.ExecutionState.Started;
89                OnExecutionStateChanged();
90                OnStarted();
91
92                var log = ((Log)(Parameters["Log"].ActualValue));
93
94                try
95                {
96
97                    log.LogMessage("Starting peer...");
98                    IMessageHandler h = new PeerNetworkMessageHandler();
99                    var lanIpPrefix = ((StringValue)(Parameters["LanIpPrefix"].ActualValue)).Value;
100                    var contactServerUri = ((StringValue)(Parameters["ContactServerURL"].ActualValue)).Value;
101                    h.Init(lanIpPrefix, contactServerUri, "TEST", 10000, 100);
102                    PeerInfo pi = h.GetPeerInfo();
103                    log.LogMessage(string.Format("Peer is hostet at IP: {0} and port: {1}", pi.IpAddress, pi.Port));
104                    Thread.Sleep(1000 * 20);
105                    log.LogMessage("Current peers within network:");
106                    foreach (var item in h.GetCurrentNetwork())
107                    {
108                        log.LogMessage(string.Format("Peer at {0}:{1}", item.IpAddress, item.Port));
109                    }
110                    int i = 0;
111                    while (i < 10 && !cts.Token.IsCancellationRequested)
112                    {
113                        i++;
114                        Thread.Sleep(1000 * 10);
115                        var message = CreateMessage(pi, i);
116                        Console.WriteLine("Publishing messages...");
117                        //h.PublishDataToNetwork(message);
118                        Console.WriteLine("Messages published.");
119                        Console.WriteLine("Recieved messages:");
120                        foreach (var item in h.GetDataFromNetwork())
121                        {
122                            //log.LogMessage(string.Format("Message:{0}", GetString(item)));
123                        }
124                        ExecutionTime = DateTime.Now - startTime;
125                        OnExecutionTimeChanged();
126                    }
127                }
128                catch (Exception ex)
129                {
130                    log.LogMessage(ex.Message);
131                    log.LogMessage("press any key to continue...");
132                }
133
134                var run = new Run();
135                var results = new ResultCollection();
136
137                run.Results.Add("Execution Time", new TimeSpanValue(ExecutionTime));
138                run.Results.Add("Log", log);
139
140                runCollection.Add(run);
141
142                stoppedEvent.Set();
143                ExecutionState = HeuristicLab.Core.ExecutionState.Stopped;
144
145                OnExecutionStateChanged();
146                OnStopped();
147            });
148        }
149
150
151        public void Pause()
152        {
153            if (cts != null) cts.Cancel();
154            stoppedEvent.WaitOne();
155
156            ExecutionState = HeuristicLab.Core.ExecutionState.Paused;
157            OnExecutionStateChanged();
158
159            OnPaused();
160        }
161
162        public void Stop()
163        {
164            if (cts != null) cts.Cancel();
165            stoppedEvent.WaitOne();
166
167            ExecutionState = HeuristicLab.Core.ExecutionState.Stopped;
168            OnExecutionStateChanged();
169
170            OnStopped();
171        }
172
173        public event EventHandler Started;
174        protected virtual void OnStarted()
175        {
176            EventHandler handler = Started;
177            if (handler != null) handler(this, EventArgs.Empty);
178        }
179
180        public event EventHandler Stopped;
181        protected virtual void OnStopped()
182        {
183            EventHandler handler = Stopped;
184            if (handler != null) handler(this, EventArgs.Empty);
185        }
186
187        public event EventHandler Paused;
188        protected virtual void OnPaused()
189        {
190            EventHandler handler = Paused;
191            if (handler != null) handler(this, EventArgs.Empty);
192        }
193
194        public event EventHandler Prepared;
195        protected virtual void OnPrepared()
196        {
197            EventHandler handler = Prepared;
198            if (handler != null) handler(this, EventArgs.Empty);
199        }
200
201
202        public event EventHandler ComputeInParallelChanged;
203        protected virtual void OnComputeInParallelChanged()
204        {
205            EventHandler handler = ComputeInParallelChanged;
206            if (handler != null) handler(this, EventArgs.Empty);
207        }
208        #endregion
209
210        #region Events
211        public event EventHandler ExecutionTimeChanged;
212        protected virtual void OnExecutionTimeChanged()
213        {
214            EventHandler handler = ExecutionTimeChanged;
215            if (handler != null) handler(this, EventArgs.Empty);
216        }
217        public event EventHandler ExecutionStateChanged;
218        protected virtual void OnExecutionStateChanged()
219        {
220            EventHandler handler = ExecutionStateChanged;
221            if (handler != null) handler(this, EventArgs.Empty);
222        }
223        #endregion
224
225        public override string ToString()
226        {
227            return Name;
228        }
229
230
231
232        private static byte[][] CreateMessage(PeerInfo pi, int iterationNumber)
233        {
234            string msg1 = string.Concat("Message 1 from Peer ", pi.IpAddress, ":", pi.Port, " at iteration ", iterationNumber);
235            string msg2 = string.Concat("Message 2 from Peer ", pi.IpAddress, ":", pi.Port, " at iteration ", iterationNumber);
236            return new byte[][] { GetBytes(msg1), GetBytes(msg2) };
237        }
238
239        static byte[] GetBytes(string str)
240        {
241            byte[] bytes = new byte[str.Length * sizeof(char)];
242            System.Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length);
243            return bytes;
244        }
245
246        static string GetString(byte[] bytes)
247        {
248            char[] chars = new char[bytes.Length / sizeof(char)];
249            System.Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
250            return new string(chars);
251        }
252
253        public System.Collections.Generic.IEnumerable<IOptimizer> NestedOptimizers
254        {
255            get { return Enumerable.Empty<IOptimizer>(); }
256        }
257
258        public RunCollection Runs
259        {
260            get { return runCollection; }
261        }
262
263        public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
264
265    }
266}
Note: See TracBrowser for help on using the repository browser.