The practice of using the System.Threading space when writing multi-threaded applications in .NET.

Recently, we have to write “small” servers for multi-threaded processing of relatively small amounts of data. I want to share with the habrasociety a certain trick in writing such applications.

All tasks can be formalized into these 3 points:
1. There is a data set.
2. This data needs to be processed (it doesn’t matter how and what to process, the main thing is to do it in parallel).
3. Data is constantly coming in new.
To illustrate individual points, let's solve the problem of fetching data from many RSS feeds.
I cited the solution to this problem in the following code, which you can simply copy and run, code with comments, which I hope can easily describe individual moments.

using System;
using System.Collections. Generic ;
using System.Text;
using System.Threading;
using System.Net;
using System.IO;

namespace ConsoleApplication1
{
class Program
{
internal class RSSServer
{
//Главный поток для выборки данных на обработку
private Thread _server;

//Переменная контролирующая главный цикл выборки данных
private bool _start = false ;

//Результирующий список с данными RSS каналов
public List < string > ResultData
{
get ;
set ;
}

#region Счетчики для контроля установки событий

private int _countAll = 0; //Количество выбранных данных для обработки
private int _countEnd = 0; //Количество обработанных данных в текущей выборке

#endregion

//Событие с ручным сбросом - самый главный элемент для синхронизации потока выборки новых данных и обработки уже имеющихся
private ManualResetEvent _mre = new ManualResetEvent( false );

/// <summary>
/// Этот метод будет работать в главном потоке
/// </summary>
private void GetTask()
{
string [] _rssURLs;

/* Используем "бесконечный" цикл, этот прием наоболее оптимален в таких задачах */
while (_start)
{
_rssURLs = GetURLs(); //Получаем данные из источника
if (_rssURLs.Length > 0) //Проверка на то, что данные для обработки были получены
{
_mre.Reset(); //Сбрасываем событие, теперь пока оно не будет установлено, поток заснет при вызове метода WaitOne
_countAll = _rssURLs.Length; //Здесь нам сихронизация не нужна, т.к. значение будет изменяться только в этом потоке
_countEnd = 0; //Здесь тоже не синхронизируем переменную, т.к. она будет установлена в 0, уже после того как все остальные потоки ее меняющие уже закончат работу
ResultData = new List < string >(_countAll);
foreach ( string s in _rssURLs) //Начинаем обработку данных каждый экземпляр отдельно.
{
ProccessingRSS(s);
}
_mre.WaitOne(); // Теперь ждем до тех пор, пока у нас не обработаются элементы данных
foreach ( string x in ResultData)
{
Console .WriteLine(x);
}
/* В этом примере принудительно прерывается выполнение цикла,
* т.к. иначе данные в ResultData будут просто перезаписываться многократно
*
*/
break ;
}
else
{
/* Здесь используется небольшая задержка в случае, если данных для обработки еще не поступило.
* 200 милисекунд здесь выбрано произвольно, если у вас данных не будет продолжительное время, то конструкция выше
* просто начнет нагружать процессоры практически на 50%, если не больше. Можете убедится убрав строчку ниже.
* В реальности данная задержка должна вычисляться в зависимости от различных параметров, я обычно использую алгоритм
* который после каждого пустого запроса ждет данных на 1 секунду дольше, это ограничено сверху определенным числом,
* которое либо устанавливается через параметры (предпочтительный вариант), либо сразу зашивается в код.
*/
Thread.Sleep(200);
}

}
}

private void ProccessingRSS( string _rssURL)
{
/*
* Вот здесь начинается самое интересное. Для обработки каждой порции данных используется встроенный пул потоков.
*/
ThreadPool.QueueUserWorkItem( new WaitCallback(ProcRSS), _rssURL);
}

private void ProcRSS( object rss)
{
//Весь метод помещаем в try
try
{
string _rss = ( string )rss;
HttpWebRequest hwr = (HttpWebRequest)HttpWebRequest.Create(_rss);
HttpWebResponse hwrr = (HttpWebResponse)hwr.GetResponse();
string response = ( new StreamReader(hwrr.GetResponseStream())).ReadToEnd();
//Обязательно блокируем объект, в который будет помещаться результат
lock (ResultData)
{
ResultData.Add(response);
}
}
catch
{
//Игнорируем все исключения (в реальных задачах этого делать нельзя)
}
finally
{
Interlocked.Increment( ref _countEnd); //Увеличиваем переменную, специальным методом, т.к. она будет увеличиваться в различных потоках
if (_countEnd >= _countAll) //Если это был последний обрабатываемый поток, то устанавливаем событие, и в главном потоке начинается новая выборка данных
_mre.Set();
}
}

private string [] GetURLs()
{
return new string [2] { "http://habrahabr.ru/rss/" , "http://www.cbr.ru/scripts/RssPress.asp" }; //Источник данных неважен, для примера это просто массив адресов.
}

public void StartServer()
{
_server = new Thread( new ThreadStart(GetTask));

//Если у потока будет установлено это свойство в true, то это поток завершится,
//если завершится работа главного потока, иначе возможно, что поток продолжит свою работу,
//даже если приложени будет "якобы" закрыто
_server.IsBackground = true ;

//Это имя будет видно в отладчике,
//поэтому рекомендую всегда давать имя потокам, которые создаются явно
_server.Name = "GetTaskThread" ;

//Если поток еще не стартовал, то стартуем его
if ((_server.ThreadState & ThreadState.Unstarted) == ThreadState.Unstarted)
{
_start = true ; //Устанавливаем свойство в true, что бы выборка данных была постоянной
_server.Start();
}
}

public void StopServer()
{
_start = false ;
}
}

static void Main( string [] args)
{
RSSServer _rServer = new RSSServer();
_rServer.StartServer();
Console .Read();
_rServer.StopServer();
}
}
}

* This source code was highlighted with Source Code Highlighter .


This article is aimed at those people who still do not fully understand how to work with the System.Threading space. If you have any questions, ask them in the comments, I will try to answer.