Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Problems.GaussianProcessTuning/ILNumerics.2.14.4735.573/Misc/ILThreadPool.cs @ 9407

Last change on this file since 9407 was 9102, checked in by gkronber, 12 years ago

#1967: ILNumerics source for experimentation

File size: 8.7 KB
RevLine 
[9102]1///
2///    This file is part of ILNumerics Community Edition.
3///
4///    ILNumerics Community Edition - high performance computing for applications.
5///    Copyright (C) 2006 - 2012 Haymo Kutschbach, http://ilnumerics.net
6///
7///    ILNumerics Community Edition is free software: you can redistribute it and/or modify
8///    it under the terms of the GNU General Public License version 3 as published by
9///    the Free Software Foundation.
10///
11///    ILNumerics Community Edition is distributed in the hope that it will be useful,
12///    but WITHOUT ANY WARRANTY; without even the implied warranty of
13///    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14///    GNU General Public License for more details.
15///
16///    You should have received a copy of the GNU General Public License
17///    along with ILNumerics Community Edition. See the file License.txt in the root
18///    of your distribution package. If not, see <http://www.gnu.org/licenses/>.
19///
20///    In addition this software uses the following components and/or licenses:
21///
22///    =================================================================================
23///    The Open Toolkit Library License
24///   
25///    Copyright (c) 2006 - 2009 the Open Toolkit library.
26///   
27///    Permission is hereby granted, free of charge, to any person obtaining a copy
28///    of this software and associated documentation files (the "Software"), to deal
29///    in the Software without restriction, including without limitation the rights to
30///    use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
31///    the Software, and to permit persons to whom the Software is furnished to do
32///    so, subject to the following conditions:
33///
34///    The above copyright notice and this permission notice shall be included in all
35///    copies or substantial portions of the Software.
36///
37///    =================================================================================
38///   
39
40using System;
41using System.Collections.Generic;
42using System.Linq;
43using System.Text;
44using System.Threading;
45using System.Diagnostics;
46using System.Runtime.InteropServices;
47
48namespace ILNumerics.Misc {
49    /// <summary>
50    /// simple thread pool implementation - NOT THREAD SAFE !!
51    /// </summary>
52    internal partial class ILThreadPool {
53
54        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
55        static extern bool GetProcessAffinityMask(IntPtr currentProcess, ref Int64 lpProcessAffinityMask, ref Int64 lpSystemAffinityMask);
56        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
57        static extern int GetCurrentThreadId();
58        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
59        public static extern int GetCurrentProcessorNumber();
60
61        private static ILThreadPool s_pool = new ILThreadPool(Settings.MaxNumberThreads - 1);
62        public static ILThreadPool Pool {
63            get {
64                return s_pool;
65            }
66        }
67        private static ILPerformanceCounter s_performanceCounters;
68        private static object s_lock = new object();
69        internal static bool QueueUserWorkItem(int id, Action<object> action, object data) {
70            lock (s_lock) {
71                ILThread thread = s_pool.m_threads[id];
72                ILThreadPoolWorkItem workItem;
73                workItem.Action = action;
74                workItem.Data = data;
75                thread.Queue(workItem);
76                return true;
77            }
78        }
79
80        private int m_maxNumberThreads;
81        public int MaxNumberThreads {
82            get { return m_maxNumberThreads; }
83            set {
84                lock (s_lock) {
85                    while (m_maxNumberThreads > value) {
86                        ILThread thread = m_threads[--m_maxNumberThreads];
87                        thread.End();
88                        m_threads.Remove(thread);
89                    }
90                    while (m_maxNumberThreads < value) {
91                        m_threads.Add(new ILThread(m_maxNumberThreads));
92                        m_maxNumberThreads++;
93                    }
94                }
95            }
96        }
97        private List<ILThread> m_threads = new List<ILThread>();
98
99        public ILThreadPool(int maxThreads) {
100
101            m_maxNumberThreads = maxThreads;
102            for (int i = 0; i < m_maxNumberThreads; i++) {
103                m_threads.Add(new ILThread(i));
104            }
105            if (Settings.s_useThreadAffinity) {
106                int targetMask = 8;
107                foreach (ProcessThread pt in Process.GetCurrentProcess().Threads) {
108                    int osThreadId = GetCurrentThreadId();
109                    if (osThreadId == pt.Id) {
110                        pt.ProcessorAffinity = (IntPtr)(targetMask);
111                        break;
112                    }
113                }
114                Thread.Sleep(10);
115                Thread.BeginThreadAffinity();
116            }
117            if (Settings.s_measurePerformanceAtRuntime) {
118                s_performanceCounters = new ILPerformanceCounter();
119                s_performanceCounters.PCcurNumberThreadsSet(maxThreads);
120            }
121        }
122        public static void Wait4Workers(ref int workerCount) {
123            //var spinWait = new SpinWait();
124            //while (workerCount > 0) {
125            //    //Thread.MemoryBarrier();
126            //    spinWait.SpinOnce();
127            //}
128
129            //System.Threading.SpinWait.SpinUntil(() => {
130            //    Thread.MemoryBarrier();
131            //    return workerCount <= 0;
132            //});
133            long msStart = Stopwatch.GetTimestamp();
134            while (workerCount > 0) {
135                System.Threading.Thread.SpinWait(8);
136            }
137            if (Settings.s_measurePerformanceAtRuntime)
138                s_performanceCounters.PCwaited4ThreadsSet(Stopwatch.GetTimestamp() - msStart);
139        }
140
141        public struct ILThreadPoolWorkItem {
142            public Action<object> Action;
143            public object Data;
144        }
145
146        private class ILThread {
147            private int m_index;
148            private Thread m_thread;
149            private AutoResetEvent m_resetEvent;
150            private Queue<ILThreadPoolWorkItem> m_queue;
151            private bool m_ending;
152
153            internal ILThread(int index) {
154                m_index = index;
155                m_ending = false;
156                m_resetEvent = new AutoResetEvent(false);
157                m_queue = new Queue<ILThreadPoolWorkItem>();
158                m_thread = new Thread(WorkerFunc);
159                m_thread.Priority = ThreadPriority.AboveNormal;
160                m_thread.IsBackground = true;
161                m_thread.Start();
162            }
163
164            private int Index {
165                get {
166                    return m_index;
167                }
168            }
169
170            public void End() {
171                lock (s_lock) {
172                    if (m_queue.Count > 0)
173                        throw new Exceptions.ILInvalidOperationException("invalid attempt to remove a thread with pending work items");
174                    m_ending = true;
175                    m_resetEvent.Set();
176                }
177            }
178            public void Queue(ILThreadPoolWorkItem workItem) {
179                m_queue.Enqueue(workItem);
180                m_resetEvent.Set();
181            }
182
183            private void WorkerFunc() {
184                if (Settings.s_useThreadAffinity) {
185                    Int64 targetMask = (Index + 1) << (Index + 1);
186                    // TODO: IMPROVE!!
187                    targetMask = 1;
188                    foreach (ProcessThread pt in Process.GetCurrentProcess().Threads) {
189                        int osThreadId = GetCurrentThreadId();
190                        if (osThreadId == pt.Id) {
191                            pt.ProcessorAffinity = (IntPtr)(targetMask);
192                            break;
193                        }
194                    }
195                    Thread.Sleep(10);
196                    //Thread.BeginThreadAffinity();
197                }
198                while (true) {
199                    //SpinWait.SpinUntil(() => { return m_queue.Count > 0; });
200                    m_resetEvent.WaitOne();
201                    while (m_queue.Count > 0) {
202                        //Thread.BeginThreadAffinity();
203                        ILThreadPoolWorkItem workItem = m_queue.Dequeue();
204                        workItem.Action(workItem.Data);
205                        //Thread.EndThreadAffinity();
206                    }
207                    if (m_ending) break;
208                }
209
210            }
211        }
212
213
214    }
215}
Note: See TracBrowser for help on using the repository browser.