StObj2.inarg2 = "String Param2 of task 2"
'进程中只有一个Threadpool,且成员多为共享,故不必单独为其实例化对象,直接用类。
ThreadPool.QueueUserWorkItem(New Threading.WaitCallback(AddressOf Taskl), StObj1)
ThreadPool.QueueUserWorkItem(New Threading.WaitCallback(AddressOf Task2), StObj2)
Console.Read()
End Sub
End Module
说明:本例代码简单,理解较难。
Imports System.Threading
Imports System.Text
Namespace GenThreadPool '通用线程池管理
Public Interface IThreadPool 'ThreadPool接口,用于GenThreadPoolImpl类
Sub AddJob(jobToRun As Thread) '添加作业
Function GetStats() As Stats '获取状态
End Interface
Public Class GenThreadPoolImpl
Implements IThreadPool
Private m_maxThreads As Integer '最多线程
Private m_minThreads As Integer '最少线程
Private m_maxIdleTime As Integer '最长空闲时间(超时删除对应线程)
Private Shared m_debug As Boolean '是否调试
Private m_pendingJobs As ArrayList '等待的作业量(数组形式,以便列队进入线程池)
Private m_availableThreads As ArrayList '可用线程数
Public Property PendingJobs() As ArrayList
Get
Return m_pendingJobs
End Get
Set
m_pendingJobs = Value
End Set
End Property
Public Property AvailableThreads() As ArrayList
Get
Return m_availableThreads
End Get
Set
m_availableThreads = Value
End Set
End Property
Public Property Debug() As Boolean
Get
Return m_debug
End Get
Set
m_debug = Value
End Set
End Property
Public Property MaxIdleTime() As Integer
Get
Return m_maxIdleTime
End Get
Set
m_maxIdleTime = Value
End Set
End Property
Public Property MaxThreads() As Integer
Get
Return m_maxThreads
End Get
Set
m_maxThreads = Value
End Set
End Property
Public Property MinThreads() As Integer
Get
Return m_minThreads
End Get
Set
m_minThreads = Value
End Set
End Property
Public Sub New() '默认构造。只允许1个线程在池中,0.3秒后销毁
m_maxThreads = 1
m_minThreads = 0
m_maxIdleTime = 300
m_pendingJobs = ArrayList.Synchronized(New ArrayList) '对数组同步包装(仍为数组),因为这样上
m_availableThreads = ArrayList.Synchronized(New ArrayList) '锁才线程安全,以防其它线程同时进行修改。
m_debug = False
End Sub
Public Sub New(ByVal maxThreads As Integer, ByVal minThreads As Integer, ByVal maxIdleTime As Integer)
'构造函数,用3个参数实例化
m_maxThreads = maxThreads
m_minThreads = minThreads
m_maxIdleTime = maxIdleTime
m_pendingJobs = ArrayList.Synchronized(New ArrayList)
m_availableThreads = ArrayList.Synchronized(New ArrayList)
m_debug = False
InitAvailableThreads()
End Sub
Private Sub InitAvailableThreads() '初始化线程池,分别进入池中
If m_maxThreads > 0 Then
For i As Integer = 1 To m_maxThreads
Dim t As New Thread(AddressOf (New GenPool(Me, Me)).Run)
Dim e As New ThreadElement(t)
e.Idle = True
m_availableThreads.Add(e)
Next
End If
End Sub
Public Sub New(ByVal maxThreads As Integer, ByVal minThreads As Integer, ByVal maxIdleTime As Integer, ByVal debug As Boolean)
'构造函数,用4个参数实例化,增加调试参数
m_maxThreads = maxThreads
m_minThreads = minThreads
m_maxIdleTime = maxIdleTime
m_pendingJobs = ArrayList.Synchronized(New ArrayList)
m_availableThreads = ArrayList.Synchronized(New ArrayList)
m_debug = debug
InitAvailableThreads()
End Sub
Public Sub AddJob(ByVal job As Thread) Implements IThreadPool.AddJob '向池中添加作业
If job Is Nothing Then
Return '作业不存在,退出
End If
SyncLock Me '锁定GenThreadPoolImpl,防止其它线程来添加或删除作业
m_pendingJobs.Add(job) '将作业添加到 ArrayList 的结尾处。
Dim index As Integer = FindFirstIdleThread() '空闲可用线程的索引
If m_debug Then
Console.WriteLine("First Idle Thread Is " & index.ToString)
End If
If index = -1 Then '-1无空闲线程,故需创建新的线程
If m_maxThreads = -1 Or m_availableThreads.Count < m_maxThreads Then
'池中无线程,或在用(有效)线程还未达最大线程数限制--->创建线程
If m_debug Then
Console.WriteLine("Creating a New thread")
End If
Dim t As New Thread(AddressOf (New GenPool(Me, Me)).Run)
Dim e As New ThreadElement(t) '帮助类,提供线程额外属性
e.Idle = False
e.GetMyThread.Start() '线程添加到ArrayList数组前先激发
Try '添加
m_availableThreads.Add(e)
Catch ex As OutOfMemoryException
Console.WriteLine("Out Of memory: " & ex.ToString)
Thread.Sleep(3000)
m_availableThreads.Add(e)
Console.WriteLine("Added Job again")
End Try
Return
End If
If m_debug Then
Console.WriteLine("No Threads Available...” & GetStats.ToString)
End If
Else '池中找到有空的线程
Try
If m_debug Then
Console.WriteLine("Using an existing thread...")
End If
CType(m_availableThreads(index), ThreadElement).Idle = False '标注忙碌
SyncLock CType(m_availableThreads(index), ThreadElement).GetMyThread()
Monitor.Pulse(CType(m_availableThreads(index), ThreadElement).GetMyThread())
End SyncLock
Catch ex As Exception
Console.WriteLine(("Error while reusing thread " & ex.Message))
If m_debug Then
Console.WriteLine("Value of index Is " & index.ToString)
Console.WriteLine("Size of available threads Is " & m_availableThreads.Count.ToString)
Console.WriteLine("Available Threads Is " & m_availableThreads.IsSynchronized.ToString)
End If
End Try
End If
End SyncLock
End Sub
Public Function GetStats() As Stats Implements IThreadPool.GetStats '池中状态
Dim statsInstance As New Stats()
statsInstance.MaxThreads = m_maxThreads
statsInstance.MinThreads = m_minThreads
statsInstance.MaxIdleTime = m_maxIdleTime '最大空闲时间
statsInstance.PendingJobs = m_pendingJobs.Count '等待的作业量
statsInstance.NumThreads = m_availableThreads.Count '有效线程数
statsInstance.JobsInProgress = m_availableThreads.Count - FindIdleThreadCount() '正在处理的作业量
Return statsInstance
End Function
Public Function FindIdleThreadCount() As Integer '遍历有效线程,返回空闲线程数
Dim idleThreads As Integer = 0
For i As Integer = 0 To m_availableThreads.Count - 1
If CType(m_availableThreads(i), ThreadElement).Idle Then '根据帮助类中的空闲标志Idle来统计
idleThreads += 1
End If
Next
Return idleThreads
End Function
Public Function FindFirstIdleThread() As Integer '遍历,找 到第一个空闲线程,就立即返回其索引
For i As Integer = 0 To m_availableThreads.Count - 1
If CType(m_availableThreads(i), ThreadElement).Idle Then
Return i
End If
Next
Return -1
End Function
Public Function FindThread() As Integer '查找当前线程的位置(索引号),失败为-1(说明池中无线程)
For i As Integer = 0 To m_availableThreads.Count - 1
If CType(m_availableThreads(i), ThreadElement).GetMyThread.Equals(Thread.CurrentThread) Then
Return i
End If
Next
Return -1
End Function
Public Sub RemoveThread() '移除线程
For i As Integer = 0 To m_availableThreads.Count - 1
If CType(m_availableThreads(i), ThreadElement).GetMyThread.Equals(Thread.CurrentThread) Then
m_availableThreads.RemoveAt(i)
Exit Sub
End If
Next
End Sub
End Class
Public Class GenPool '执行线程类(执行完毕后,过了指定超时时限,将自动从池中删除)
Private m_lock As Object '锁定对象
Private m_gn As GenThreadPoolImpl
Public Sub New(lock_ As Object, gn As GenThreadPoolImpl)
m_lock = lock_
m_gn = gn
End Sub
Public Sub Run() '循环运行并检测是否过期
Dim job As Thread
While True '无限循环,一直检查池中状态
While True
SyncLock m_lock
If m_gn.PendingJobs.Count = 0 Then '后续无作业进来
Dim index As Integer = m_gn.FindThread() '取当前线程索引号
If index = -1 Then '无作业,池中也无线程,退出
Exit Sub
End If '无作业,新的线程设为空闲
CType(m_gn.AvailableThreads(index), ThreadElement).Idle = True
Exit While
End If
job = CType(m_gn.PendingJobs(0), Thread) '有作业,取出(从原作业数组中删除)
m_gn.PendingJobs.RemoveAt(0)
End SyncLock
job.Start() '作业执行启动
End While
Try '无后续作业
SyncLock Me
If m_gn.MaxIdleTime = -1 Then '池中无空闲线程,阻塞等待
Monitor.Wait(Me)
Else
Monitor.Wait(Me, m_gn.MaxIdleTime)
End If
End SyncLock
Catch
End Try
SyncLock m_lock
If m_gn.PendingJobs.Count = 0 Then '无等待的作业(没有新的作业进来)
If m_gn.MinThreads <> -1 And m_gn.AvailableThreads.Count > m_gn.MinThreads Then
m_gn.RemoveThread() '池中线程不空,且有效线程大于最小线程,删除线程
Return
End If
End If
End SyncLock
End While
End Sub
End Class
Public Class ThreadElement '对应线程的线程帮助类(以例为之设计空闲标志,获取引用)
Private m_idle As Boolean '空闲线程标志
Private m_thread As Thread
Public Sub New(th As Thread)
m_thread = th
m_idle = True '初始化即为空闲
End Sub
Public Property Idle() As Boolean '设置或获取线程空闲标志
Get
Return m_idle
End Get
Set
m_idle = Value
End Set
End Property
Public Function GetMyThread() As Thread '取得原线程
Return m_thread
End Function
End Class
Public Structure Stats '状态统计
Public MaxThreads As Integer
Public MinThreads As Integer
Public MaxIdleTime As Integer
Public NumThreads As Integer
Public PendingJobs As Integer '列队等待的作业量
Public JobsInProgress As Integer '正在处理的作业量
Public Overrides Function ToString() As String '提取状态
Dim sb As New StringBuilder("MaxThreads = ", 107) '容量大小107字符
sb.Append(MaxThreads)
sb.Append(ControlChars.Lf & "MinThreads=" & MinThreads)
sb.Append(ControlChars.Lf & "MaxIdleTime=" & MaxIdleTime)
sb.Append(ControlChars.Lf & "PendingJobs=" & PendingJobs)
sb.Append(ControlChars.Lf & "JobsInProgress=" & JobsInProgress)
Return sb.ToString
End Function
End Structure
End Namespace
然后,完成主程序代码,用来测试:
Imports System.Threading
Namespace TestGenThreadPool
Public Class TestPerformance
Public count As Integer
Private m_lock As New Object()
Public Sub New(pool As GenThreadPool.IThreadPool, times As Integer)
Console.WriteLine("Performance using Pool [in ms]: ")
count = 0
Dim start As Long = Now.Millisecond
Console.WriteLine("Start Time For Job Is " & Now)
Dim i As Integer
For i = 0 To times - 1
Dim tl As New Thread(AddressOf (New Job(Me)).Run)
pool.AddJob(tl)
Next
While True
SyncLock m_lock
If count = times Then
Exit While
End If
End SyncLock
Try
Thread.Sleep(5000)
Catch
End Try
End While
Console.WriteLine(" " & (Now.Millisecond - start).ToString)
Console.WriteLine("End Time for Job is " & Now.ToString)
Console.WriteLine("Performance using no Pool [in ms]: ")
count = 0
start = Now.Millisecond
Console.WriteLine("Start Time for JobThread is " & Now.ToString)
For i = 0 To times - 1
Dim jt As New Thread(AddressOf (New JobThread(Me)).Run)
jt.Start()
Next
While True
SyncLock m_lock
If count = times Then
Exit While
End If
End SyncLock
Try
Thread.Sleep(5000)
Catch
End Try
End While
Console.WriteLine(" " & (Now.Millisecond - start).ToString())
Console.WriteLine("End Time for JobThread is ” & Now.ToString)
End Sub
NotInheritable Class JobThread
Private m_lock As New Object()
Private tpf As TestPerformance
Public Sub New(tpf_ As TestPerformance)
tpf = tpf_
End Sub
Public Sub Run()
SyncLock m_lock
tpf.count += 1
End SyncLock
End Sub
End Class
NotInheritable Class Job
Private m_lock As New Object()
Private tpf As TestPerformance
Public Sub New(tpf_ As TestPerformance)
tpf = tpf_
End Sub
Public Sub Run()
SyncLock m_lock
tpf.count += 1
End SyncLock
End Sub
End Class
End Class
Class TestPool
Private Shared i As Integer = 0
Private j As Integer = 0
Public Sub Run()
i += 1
j = i
Console.WriteLine("Value of i in run is {0} ", j)
End Sub
Public Shared Sub Main(args() As String)
Dim tp = New GenThreadPool.GenThreadPoolImpl(1000, 1000, 300, True)
Dim i As Integer
For i = 0 To 99 '添加作业到线程池管理器
Dim td1 As New TestPool
Dim t1 As New Thread(AddressOf td1.Run)
Dim td2 As New TestPool
Dim t2 As New Thread(AddressOf td2.Run)
Dim td3 As New TestPool
Dim t3 As New Thread(AddressOf td3.Run)
Dim td4 As New TestPool
Dim t4 As New Thread(AddressOf td4.Run)
Dim td5 As New TestPool
Dim t5 As New Thread(AddressOf td5.Run)
Dim td6 As New TestPool
Dim t6 As New Thread(AddressOf td6.Run)
Dim td7 As New TestPool
Dim t7 As New Thread(AddressOf td7.Run)
Dim td8 As New TestPool
Dim t8 As New Thread(AddressOf td8.Run)
Dim td9 As New TestPool
Dim t9 As New Thread(AddressOf td9.Run)
Dim td10 As New TestPool
Dim t10 As New Thread(AddressOf td10.Run)
Dim td11 As New TestPool
Dim t11 As New Thread(AddressOf td11.Run)
tp.AddJob(t1)
tp.AddJob(t2)
tp.AddJob(t3)
tp.AddJob(t4)
tp.AddJob(t5)
tp.AddJob(t6)
tp.AddJob(t7)
tp.AddJob(t8)
tp.AddJob(t9)
tp.AddJob(t10)
tp.AddJob(t11)
Next
Dim td12 As New TestPool
Dim t12 As New Thread(AddressOf td12.Run)
tp.AddJob(t12)
Dim p As New TestPerformance(tp, 1000)
End Sub
End Class
End Namespace
结果较长,同时播放视频时,就可明显感觉到视频有点卡顿,说明CPU有点忙不过来了:)