.NET 공식(?) 라이브러리중 데이터 병렬 처리를 다룰 수 있도록 제공하는 Dataflow Task 라이브러리가 있습니다. (TPL Dataflow 라이브러리)
TPL Dataflow는 .NET 기본 API에 포함되지 않고 NuGet 패키지로 설치해서 사용할 수 있습니다.
NuGet - System.Threading.Tasks.Dataflow
TPL Dataflow MS Doc
이번 글에선 TPL Dataflow에서 제공하는 System.Threading.Tasks.Dataflow.BroadcastBlock<T> 클래스에 대해 알아보고
간단한 샘플 예제를 작성해 보겠습니다.
이 글에서 다루는 코드는 다음 Repository에서 확인할 수 있습니다.
Code_check - TPL_DataFlow
TPL Dataflow는 스트림형태의 데이터 메세지를 쉽게 구현할 수 있습니다. TPL Dataflow에서 메세지(데이터)는 Block단위로 처리되고 이 Block을 조작할 수 있는 기능들이 여러가지로 제공되고 있습니다.
System.Threading.Tasks.Dataflow.Post<TInput>() 메서드 및 System.Threading.Tasks.Dataflow.SendAsync<TInput>() 메서드로
동기 또는 비동기 형식으로 메세지를 Block에 추가할 수 있습니다.
TPL Dataflow에서 사용되는 Block은 여러 종류가 있는데
System.Threading.Tasks.Dataflow.ActionBlock<TInput> 클래스
System.Threading.Tasks.Dataflow.BufferBlock<T> 클래스
System.Threading.Tasks.Dataflow.TransformBlock<TInput, TOutput> 클래스
System.Threading.Tasks.Dataflow.BroadcastBlock<TInput, TOutput> 클래스 등이 있습니다.
그리고 각 Block들은 서로 System.Threading.Tasks.Dataflow.DataflowBlock.LinkTo() 메서드로 메세지 흐름을 서로 연결할 수 있습니다.
Synchronization Blocks
기본적으로 각 Block들은 다른 Block들과 독립적으로 처리됩니다. 즉 각 개별 Block에서 다른 처리를 할 수 있고 이렇게 Block은 병렬 처리를 제공합니다.
동기화가 필요한 처리인 경우 Block을 생성할때 System.Threading.Tasks.Dataflow.DataflowBlockOptions.TaskScheduler 속성을 지정할 수 있습니다.
이 속성으로 어떤 컨텍스트에서 실행될지 설정할 수 있습니다.
만약 System.Threading.Tasks.TaskScheduler.FromCurrentSynchronizationContext() 메서드로 현재의 스레드 컨텍스트에서
처리되도록 한다면 동기화 처리가 가능합니다.
ActionBlock
System.Threading.Tasks.Dataflow.ActionBlock<TInput> 클래스는 수신되는 각 메세지에 대응할 수 있는 콜백을 지원합니다.
메세지들을 foreach로 순차적 처리하는 것 처럼 처리할 수 있습니다.
TransformBlock
System.Threading.Tasks.Dataflow.TransformBlock<TInput, TOutput> 클래스는 System.Threading.Tasks.Dataflow.ActionBlock<TInput>() 클래스와 역할이 동일하지만
데이터 입력과 데이터 출력 부분을 동시에 받아서 입력 데이터 기준으로 출력 데이터를 가공해서 새로운 데이터로 출력 할 수 있습니다.
즉 System.Threading.Tasks.Dataflow.ActionBlock<TInput> 클래스는 수신된 데이터를 그대로 사용해야 하지만 System.Threading.Tasks.Dataflow.TransformBlock<TInput, TOutput>() 클래스는 수신된 데이터를 가공해서
새로운 데이터 타입으로 출력이 가능합니다.
BroadcastBlock
System.Threading.Tasks.Dataflow.BroadcastBlock<TInput, TOutput> 클래스는 수신된 데이터를 복사하고 자신의 Block에 연결된 모든 Block에게 전달 합니다.
만약 System.Threading.Tasks.Dataflow.BroadcastBlock<TInput, TOutput> 에 System.Threading.Tasks.Dataflow.TransformBlock<TInput, TOutput>() 와 System.Threading.Tasks.Dataflow.ActionBlock<TInput>() 가 연결되어 있다면
모든 Block에게 복사된 메세지를 전달할 수 있습니다.
Completing Blocks
온전히 Dataflow 처리가 완료 되었다면 System.Threading.Tasks.Dataflow.IDataflowBlock.Complete() 메서드로 완료 처리 할 수 있습니다.
Dataflow가 오류 없이 완료 되었는지 확인은 System.Threading.Tasks.Dataflow.IDataflowBlock.Completion 속성으로 확인 가능합니다.
해당 속성은 System.Threading.Tasks.Task 클래스를 반환하는데 System.Threading.Tasks.Wait() 메서드로
작업을 완료를 대기할때 오류가 발생되는 경우 try - catch 예외 처리로 확인할 수 있습니다.
_stockInfoBroadCaster.Complete();
try
{
_stockInfoBroadCaster.Completion.Wait();
}
catch (Exception ex)
{
// 데이터흐름 완료 오류
}
Example [WPF - StockInfo]
다음 예제는 TPL Dataflow 라이브러리를 사용해서 간단한 주식 정보 데이터를 받아 화면에 출력하는 예제 입니다.
System.Threading.Tasks.Dataflow.BroadcastBlock<TInput, TOutput> 에 System.Threading.Tasks.Dataflow.ActionBlock<TInput> 을 연결하고
System.Threading.Tasks.Dataflow.TransformBlock<TInput, TOutput> 으로 메세지를 가공해서 또 다른 System.Threading.Tasks.Dataflow.ActionBlock<TInput> 으로 연결시켜서
최종적으로 화면에 출력하게 되는 예제 입니다.
[Block 생성 및 연결 부분]
private BroadcastBlock<List<StockInfoModel>> _broadcast;
public MainViewModel()
{
// 브로드캐스트 생성
_broadcast = new BroadcastBlock<List<StockInfoModel>>(null);
}
private async Task MonitoringExecute(object param)
{
// 브로드캐스팅된 블록(주식정보)을 수신받아
// 정보 표시 ViewModel에 데이터 전달 역할을 하는 블록 생성
ActionBlock<List<StockInfoModel>> updateStockInfoStart = this.UpdateStockInfoStart();
// 브로드캐스트에 ActionBlock 연결
_broadcast.LinkTo(updateStockInfoStart);
// 브로드캐스트에 추가 블록을 연결할 수 있다.
ActionBlock<List<StockInfoModel>> foo = Foo();
_broadcast.LinkTo(foo);
// 브로드캐스팅으로 데이터를 수신받고
// 가공된 데이터를 ViewModel에 전달 역할을 하는 블록 생성
ActionBlock<Tuple<double, double>> updateChangeCalc = this.UpdateChangeCalc();
var changeCalc = new TransformBlock<List<StockInfoModel>, Tuple<double, double>>(p =>
{
var stockInfo1_currPrice = p[0].CurrentPrice;
var stockInfo2_currPrice = p[1].CurrentPrice;
double percentChangeByItem1 = (stockInfo1_currPrice - StockInfo1VM.PreviousPrice) * 100 / StockInfo1VM.PreviousPrice;
double percentChangeByItem2 = (stockInfo2_currPrice - StockInfo2VM.PreviousPrice) * 100 / StockInfo2VM.PreviousPrice;
StockInfo1VM.PreviousPrice = stockInfo1_currPrice;
StockInfo2VM.PreviousPrice = stockInfo2_currPrice;
return new Tuple<double, double>(percentChangeByItem1, percentChangeByItem2);
});
// 브로드캐스트에 TransformBlock 연결
_broadcast.LinkTo(changeCalc);
// TransformBlock에 ActionBlock 연결
changeCalc.LinkTo(updateChangeCalc);
// 브로드캐스팅 시작
StockInfoBroadCast stockInfoBroadCast = new(_broadcast);
await stockInfoBroadCast.RealTimeData();
}
private ActionBlock<List<StockInfoModel>> UpdateStockInfoStart()
{
return new ActionBlock<List<StockInfoModel>>(p => {
StockInfo1VM.UpdateStockInfo(p[0]);
StockInfo2VM.UpdateStockInfo(p[1]);
},
new ExecutionDataflowBlockOptions()
{
TaskScheduler =
TaskScheduler.FromCurrentSynchronizationContext()
});
}
private ActionBlock<List<StockInfoModel>> Foo()
{
return new ActionBlock<List<StockInfoModel>>(p => { /* do something */ },
new ExecutionDataflowBlockOptions()
{
TaskScheduler =
TaskScheduler.FromCurrentSynchronizationContext()
});
}
private ActionBlock<Tuple<double, double>> UpdateChangeCalc()
{
return new ActionBlock<Tuple<double, double>>(p =>
{
StockInfo1VM.PercentChange = p.Item1;
StockInfo2VM.PercentChange = p.Item2;
},
new ExecutionDataflowBlockOptions()
{
TaskScheduler =
TaskScheduler.FromCurrentSynchronizationContext()
});
}
[StockInfoBroadCast.cs]
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using TPL_DataFlowBroadCasting_Example.Models;
namespace TPL_DataFlowBroadCasting_Example;
public class StockInfoBroadCast
{
private readonly BroadcastBlock<List<StockInfoModel>> _stockInfoBroadCaster;
public StockInfoBroadCast(BroadcastBlock<List<StockInfoModel>> broadcaster)
{
_stockInfoBroadCaster = broadcaster;
}
public async Task RealTimeData()
{
var random1 = new Random();
var random2 = new Random();
StockInfoModel stockInfo1 = new()
{
StockName = "삼성전자",
StartPrice = random1.Next(1000, 10000)
};
StockInfoModel stockInfo2 = new()
{
StockName = "삼성전자우",
StartPrice = random2.Next(1000, 10000)
};
for (int i = 0; i < 100; i++)
{
await Task.Delay(1200);
stockInfo1.CurrentPrice = random1.Next(1000, 10000);
stockInfo2.CurrentPrice = random2.Next(1000, 10000);
await _stockInfoBroadCaster.SendAsync(new() { stockInfo1, stockInfo2 });
}
_stockInfoBroadCaster.Complete();
try
{
_stockInfoBroadCaster.Completion.Wait();
}
catch (Exception ex)
{
// 데이터흐름 완료 오류
}
}
}
[예제 프로그램 화면]
위 코드는 다음 Repository에서 확인할 수 있습니다.
Code_check - TPL_DataFlow