微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

C# Task 多任务:C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容Task

    为什么编写TaskSchedulerEx类?

    因为.NET认线程池只有一个线程池,如果某个批量任务一直占着大量线程,甚至耗尽认线程池,则会严重影响应用程序域中其它任务或批量任务的性能

     特点:

    1、使用独立线程池,线程池中线程分为核心线程和辅助线程,辅助线程会动态增加和释放,且总线程数不大于参数_maxThreadCount

    2、无缝兼容Task,使用上和Task一样,可以用它来实现异步,参见:C# async await 异步执行方法封装 替代 BackgroundWorker

    3、队列中尚未执行的任务可以取消

    4、通过扩展类TaskHelper实现任务分组

    5、和SmartThreadPool对比,优点是无缝兼容Task类,和Task类使用没有区别,因为它本身就是对Task、TaskScheduler的扩展,所以Task类的ContinueWith、WaitAll等方法它都支持,以及兼容async、await异步编程

    6、代码量相当精简,TaskSchedulerEx类只有260多行代码

    7、池中的线程数量会根据负载自动增减,支持,但没有SmartThreadPool智能,为了性能,使用了比较笨的方式实现,不知道大家有没有既智能,性能又高的方案,我有一个思路,在定时器中计算每个任务执行平均耗时,然后使用公式(线程数 = cpu核心数 * ( 本地计算时间 + 等待时间 ) / 本地计算时间)来计算最佳线程数,然后按最佳线程数来动态创建线程,但这个计算过程可能会牺牲性能

     对比SmartThreadPool:

    TaskSchedulerEx类代码(使用Semaphore实现):

using System;
 System.Collections.Concurrent;
 System.Collections.Generic;
 System.Linq;
 System.Runtime.InteropServices;
 System.Text;
 System.Threading;
 System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// TaskScheduler扩展
     每个实例都是独立线程池
    </summary>
    public class TaskSchedulerEx : TaskScheduler,Idisposable
    {
        #region 外部方法
        [DllImport("kernel32.dll",EntryPoint = SetProcessWorkingSetSize")]
        static extern int SetProcessWorkingSetSize(IntPtr process,int minSize,1)">int maxSize);
        #endregion

        #region 变量属性事件
        private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
        private int _coreThreadCount = 0;
        int _maxThreadCount = int _auxiliaryThreadTimeOut = 20000; //辅助线程释放时间
        int _activeThreadCount = private System.Timers.Timer _timer;
        object _lockCreateTimer = new objectbool _run = trueprivate Semaphore _sem = nullint _semmaxCount = int.MaxValue; 可以同时授予的信号量的最大请求数
        int _semCount = 0; 可用信号量请求数
        int _runcount = 正在执行的和等待执行的任务数量

        <summary>
         活跃线程数
        </summary>
         ActiveThreadCount
        {
            get { return _activeThreadCount; }
        }

         核心线程数
         CoreThreadCount
        {
             _coreThreadCount; }
        }

         最大线程数
         MaxThreadCount
        {
             _maxThreadCount; }
        }
        #region 构造函数
         TaskScheduler扩展
         每个实例都是独立线程池
        </summary>
        <param name="coreThreadCount">核心线程数(大于或等于0,不宜过大)(如果是一次性使用,则设置为0比较合适)</param>
        <param name="maxThreadCount">最大线程数</param>
        public TaskSchedulerEx(int coreThreadCount = 10,1)">int maxThreadCount = 20)
        {
            _sem = new Semaphore(,_semmaxCount);
            _maxThreadCount = maxThreadCount;
            CreateCoreThreads(coreThreadCount);
        }
        #region override GetScheduledTasks
        protected override IEnumerable<Task> GetScheduledTasks()
        {
             _tasks;
        }
        #region override TryExecuteTaskInline
        override bool TryExecuteTaskInline(Task task,1)">bool taskwasprevIoUslyQueued)
        {
            return false;
        }
        #region override QueueTask
        void QueueTask(Task task)
        {
            _tasks.Enqueue(task);

            while (_semCount >= _semmaxCount) 信号量已满,等待
            {
                Thread.Sleep(1);
            }

            _sem.Release();
            Interlocked.Increment(ref _semCount);

            Interlocked.Increment( _runcount);
            if (_activeThreadCount < _maxThreadCount && _activeThreadCount < _runcount)
            {
                CreateThread();
            }
        }
        #region 资源释放
         资源释放
         队列中尚未执行的任务不再执行
         dispose()
        {
            _run = ;

            if (_timer != )
            {
                _timer.Stop();
                _timer.dispose();
                _timer = ;
            }

            while (_activeThreadCount > )
            {
                _sem.Release();
                Interlocked.Increment( _semCount);
            }
        }
        #region 创建核心线程池
         创建核心线程池
        void CreateCoreThreads(int? coreThreadCount = )
        {
            if (coreThreadCount != null) _coreThreadCount = coreThreadCount.Value;

            for (int i = 0; i < _coreThreadCount; i++)
            {
                Interlocked.Increment( _activeThreadCount);
                Thread thread = ;
                thread = new Thread(new ThreadStart(() =>
                {
                    Task task;
                    while (_run)
                    {
                        if (_tasks.TryDequeue(out task))
                        {
                            TryExecuteTask(task);
                            Interlocked.Decrement( _runcount);
                        }
                        else
                        {
                            _sem.WaitOne();
                            Interlocked.Decrement( _semCount);
                        }
                    }
                    Interlocked.Decrement( _activeThreadCount);
                    if (_activeThreadCount == )
                    {
                        GC.Collect();
                        GC.WaitForPendingFinalizers();
                        if (Environment.Osversion.Platform == PlatformID.Win32NT)
                        {
                            SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle,-1,-);
                        }
                    }
                }));
                thread.IsBackground = ;
                thread.Start();
            }
        }
        #region 创建辅助线程
         创建辅助线程
         CreateThread()
        {
            Interlocked.Increment( _activeThreadCount);
            Thread thread = ;
            thread = 
            {
                Task task;
                DateTime dt = DateTime.Now;
                while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _auxiliaryThreadTimeOut)
                {
                     task))
                    {
                        TryExecuteTask(task);
                        Interlocked.Decrement( _runcount);
                        dt = DateTime.Now;
                    }
                    
                    {
                        _sem.WaitOne(_auxiliaryThreadTimeOut);
                        Interlocked.Decrement( _semCount);
                    }
                }
                Interlocked.Decrement( _activeThreadCount);
                if (_activeThreadCount == _coreThreadCount)
                {
                    GC.Collect();
                    GC.WaitForPendingFinalizers();
                     PlatformID.Win32NT)
                    {
                        SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle,1)">);
                    }
                }
            }));
            thread.IsBackground = ;
            thread.Start();
        }
        #region 全部取消
         全部取消
         取消队列中尚未执行的任务
         CancelAll()
        {
            Task tempTask;
            while (_tasks.TryDequeue( tempTask))
            {
                Interlocked.Decrement( _runcount);
            }
        }
        #endregion

    }
}
View Code

    TaskSchedulerEx类代码(使用AutoResetEvent实现):

private AutoResetEvent _evt = new AutoResetEvent();

        )
        {
            _maxThreadCount = QueueTask(Task task)
        {
            CreateTimer();
            _tasks.Enqueue(task);
            _evt.Set();
        }
        )
            {
                _evt.Set();
            }
        }
         task))
                        {
                            TryExecuteTask(task);
                        }
                        
                        {
                            _evt.WaitOne();
                        }
                    }
                    Interlocked.Decrement( task))
                    {
                        TryExecuteTask(task);
                        dt =
                    {
                        _evt.WaitOne(_auxiliaryThreadTimeOut);
                    }
                }
                Interlocked.Decrement(#region 创建定时器
         CreateTimer()
        {
            if (_timer == null) _timer不为空时,跳过,不走lock,提升性能
            {
                if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount) 活跃线程数达到最大线程数时,跳过,不走lock,提升性能
                {
                    lock (_lockCreateTimer)
                    {
                        )
                        {
                            _timer = new System.Timers.Timer();
                            _timer.Interval = _coreThreadCount == 0 ? 1 : 500;
                            _timer.Elapsed += (s,e) =>
                            {
                                if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount)
                                {
                                    if (_tasks.Count > )
                                    {
                                        if (_timer.Interval != 20) _timer.Interval = ;
                                        CreateThread();
                                    }
                                    
                                    {
                                        500) _timer.Interval = ;
                                    }
                                }
                                
                                {
                                    )
                                    {
                                        _timer.Stop();
                                        _timer.dispose();
                                        _timer = ;
                                    }
                                }
                            };
                            _timer.Start();
                        }
                    }
                }
            }
        }
         tempTask)) { }
        }
        

    }
}
View Code

    RunHelper类代码

 线程工具类
     RunHelper
    {
        #region 变量属性事件

        #region 线程中执行
         线程中执行
        static Task Run(this TaskScheduler scheduler,Action<object> doWork,1)">object arg = null,Action<Exception> errorAction = return Task.Factory.StartNew((obj) =>
            {
                try
                {
                    doWork(obj);
                }
                catch (Exception ex)
                {
                    if (errorAction != ) errorAction(ex);
                    LogUtil.Error(ex,ThreadUtil.Run错误);
                }
            },arg,CancellationToken.None,TaskCreationoptions.None,scheduler);
        }
        return Task.Factory.StartNew(() =>
                {
                    doWork();
                }
                static Task<T> Run<T>(object,T> doWork,1)">return Task.Factory.StartNew<T>((obj) =>
                {
                     doWork(obj);
                }
                );
                    default(T);
                }
            },Func<T> doWork,1)">return Task.Factory.StartNew<T>(() => doWork();
                }
                async Task<T> RunAsync<T>(await Task.Factory.StartNew<T>((obj) =>await Task.Factory.StartNew<T>(() =>

    }
}
View Code

    TaskHelper扩展类:

 Task帮助类基类
     TaskHelper
    {
        #region 变量
         处理器数
        int _processorCount = Environment.ProcessorCount;
        #region UI任务
        static TaskScheduler _UITask;
         UI任务(2-4个线程)
         TaskScheduler UITask
        {
            getif (_UITask == null) _UITask = new TaskSchedulerEx(2,4);
                 _UITask;
            }
        }
        #region 菜单任务
         TaskScheduler _MenuTask;
         菜单任务(2-4个线程)
         TaskScheduler MenuTask
        {
            if (_MenuTask == null) _MenuTask =  _MenuTask;
            }
        }
        #region 计算任务
         TaskScheduler _CalcTask;
         计算任务(线程数:处理器数*2)
         TaskScheduler CalcTask
        {
            if (_CalcTask == null) _CalcTask = new LimitedTaskScheduler(_processorCount * 2 _CalcTask;
            }
        }
        #region 网络请求
         TaskScheduler _RequestTask;
         网络请求(8-32个线程)
         TaskScheduler RequestTask
        {
            if (_RequestTask == null) _RequestTask = 8,1)">32 _RequestTask;
            }
        }
        #region 数据库任务
         TaskScheduler _DBTask;
         数据库任务(8-32个线程)
         TaskScheduler DBTask
        {
            if (_DBTask == null) _DBTask =  _DBTask;
            }
        }
        #region IO任务
         TaskScheduler _IOTask;
         IO任务(8-32个线程)
         TaskScheduler IOTask
        {
            if (_IOTask == null) _IOTask =  _IOTask;
            }
        }
        #region 首页任务
         TaskScheduler _MainPageTask;
         首页任务(8-32个线程)
         TaskScheduler MainPageTask
        {
            if (_MainPageTask == null) _MainPageTask =  _MainPageTask;
            }
        }
        #region 图片加载任务
         TaskScheduler _LoadImageTask;
         图片加载任务(8-32个线程)
         TaskScheduler LoadImageTask
        {
            if (_LoadImageTask == null) _LoadImageTask =  _LoadImageTask;
            }
        }
        #region 浏览器任务
         TaskScheduler _browserTask;
         浏览器任务(2-4个线程)
         TaskScheduler browserTask
        {
            if (_browserTask == null) _browserTask =  _browserTask;
            }
        }
        

    }
}
View Code

    Form1.cs测试代码

 System.ComponentModel;
 System.Data;
 System.Drawing;
 System.Management;
 System.Reflection;
 System.Threading.Tasks;
 System.Windows.Forms;
 Utils;

 test
{
    partial  Form1 : Form
    {
        private TaskSchedulerEx _taskSchedulerEx = private TaskSchedulerEx _taskSchedulerExSmall = private TaskSchedulerEx _task = ;

        public Form1()
        {
            InitializeComponent();
            _taskSchedulerEx = 50,1)">);
            _taskSchedulerExSmall = 5,1)">50);
            _task = 10);
        }

        void Form1_Load( sender,EventArgs e)
        {

        }

         模拟大量网络请求任务
        void button1_Click(200000,1)">1000,1)">);
        }

         模拟cpu密集型任务
        void button2_Click(100000,1)">2000,1)">void button3_Click(100,1)">void button4_Click( 模拟任务
        <param name="scheduler">scheduler<param name="taskCount">任务数量<param name="logCount">每隔多少条数据打一个日志<param name="delay">模拟延迟或耗时(毫秒)void DoTask(TaskSchedulerEx scheduler,1)">int taskCount,1)">int logCount,1)"> delay)
        {
            _task.Run(() =>
            {
                Log(开始);
                DateTime dt = DateTime.Now;
                List<Task> taskList = new List<Task>();
                1; i <= taskCount; i++)
                {
                    Task task = scheduler.Run((obj) =>
                    {
                        var k = ()obj;
                        Thread.Sleep(delay); 模拟延迟或耗时
                        if (k % logCount == )
                        {
                            Log(最大线程数:" + scheduler.MaxThreadCount +  核心线程数:" + scheduler.CoreThreadCount +  活跃线程数:" + scheduler.ActiveThreadCount.ToString().PadLeft(4,1)">' ') +  处理数/总数:" + k +  / " + taskCount);
                        }
                    },i,(ex) =>
                    {
                        Log(ex.Message);
                    });
                    taskList.Add(task);
                }
                Task.WaitAll(taskList.ToArray());
                double d = DateTime.Now.Subtract(dt).TotalSeconds;
                Log(完成,耗时:" + d + );
            });
        }

        void Form1_FormClosed(if (_taskSchedulerEx != )
            {
                _taskSchedulerEx.dispose(); 释放资源
                _taskSchedulerEx = ;
            }
        }
    }
}
View Code

     测试截图:

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐