一、逻辑
#region 九大block
#region 一、bufferblock
/**
1. bufferblock
最基础Block
遵循先入先出
Post 添加数据
Recieve 异步处理
*/
private BufferBlock<int> m_buffer = new BufferBlock<int>();
private void Producer() {
Random random = new Random();
while (true) {
int item = random.Next(10, 50);
Console.WriteLine("P:" + item);
m_buffer.Post(item);
Thread.Sleep(100);
}
}
private void Consumer() {
while (true)
{
int item = m_buffer.Receive();
Console.WriteLine("C:" + item);
Thread.Sleep(100);
}
}
public void TestBufferBlock() {
var p = Task.Factory.StartNew(Producer);
var c = Task.Factory.StartNew(Consumer);
Task.WaitAll(p, c);
}
#endregion
#region 二、ActionBlock
/**
2. ActionBlock
允许使用委托
Action(T):需等待委托结束
Func<TInput, Task>:不等待委托结束
默认FIFO、并行处理任务为1
*/
private ActionBlock<int> m_action = new ActionBlock<int>((i) => {
Thread.Sleep(1000);
Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism=3});
private ActionBlock<Func<int,int>> m_action2 = new ActionBlock<Func<int, int>>((i) => {
Thread.Sleep(1000);
Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 3 });
public void TestActionBlock()
{
var p = Task.Factory.StartNew(()=> {
for (int i = 0; i < 10; i++)
{
m_action.Post(i);
if (i == 8) {
m_action.Complete(); // 控制生命周期
}
}
Console.WriteLine("Post finished");
});
Task.WaitAll();
m_action.Completion.Wait(); // 等待ActionBlock处理结束
Console.WriteLine("m_action.Completion finished");
}
public void TestActionBlock2()
{
var p = Task.Factory.StartNew(() => {
for (int i = 0; i < 10; i++)
{
m_action.Post(i);
if (i == 8)
{
m_action.Complete(); // 控制生命周期
}
}
Console.WriteLine("Post finished");
});
Task.WaitAll();
m_action.Completion.Wait();
Console.WriteLine("m_action.Completion finished");
}
#endregion
#region 三、TransformBlock
/***
* 3.TransformBlock
* 常常在数据流中充当数据转换处理的功能
* 内部维护了2个Queue
* InputQueue:存储输入的数据
* OutputQueue:通过Transform处理以后的数据则放在OutputQueue
* 通过Receive方法来阻塞的一个一个获取OutputQueue中的数据
* Completion.Wait()方法只有在OutputQueue中的数据为0的时候才会返回
*/
private TransformBlock<int, int> tbUrl = new TransformBlock<int, int>((i) =>
{
Thread.Sleep(500);
Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
return i;
});
public void TestTransformBlock()
{
for (int i = 0; i < 10; i++) {
tbUrl.Post(i);
}
while (!tbUrl.Completion.IsCompleted) {
int i = tbUrl.Receive();
Console.WriteLine(i+"处理完成");
}
}
#endregion
#region 四、TransformManyBlock
/**
* 4. TransformManyBlock
* 一个输入返回多个输出
* 其他与TransformBlock一致
*/
TransformManyBlock<int, int> tmb = new TransformManyBlock<int, int>((i) => { return new int[] { i, i + 1 }; });
ActionBlock<int> ab = new ActionBlock<int>((i) => Console.WriteLine(i));
public void TestTransformManyBlock()
{
tmb.LinkTo(ab);
for (int i = 0; i < 4; i++)
{
tmb.Post(i);
}
Console.WriteLine("Finished post");
}
#endregion
#region 五、BroadcastBlock
/**
* 5. BroadcastBlock
* 所有与其相连的block均可以收到副本
* post新的数据将会覆盖旧的数据
*/
BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; });
ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));
ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));
ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));
public void TestBroadcastBlock()
{
bb.LinkTo(displayBlock);
bb.LinkTo(saveBlock);
bb.LinkTo(sendBlock);
for (int i = 0; i < 4; i++)
{
bb.Post(i);
}
Console.WriteLine("Post finished");
}
#endregion
#region 六、WriteOnceBlock
/**
* 6. WriteOnceBlock
* 最简单的block
* 最多只能存储一个数据
* 数据发送后不能被删除或者被新来的数据替换
* 所有接收者均会收到这个数据的备份
*/
WriteOnceBlock<int> wb = new WriteOnceBlock<int>((i) => { return i; });
ActionBlock<int> displayBlock2 = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));
ActionBlock<int> saveBlock2 = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));
ActionBlock<int> sendBlock2 = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));
public void TestWriteOnceBlock()
{
wb.LinkTo(displayBlock2);
wb.LinkTo(saveBlock2);
wb.LinkTo(sendBlock2);
for (int i = 0; i < 4; i++)
{
wb.Post(i);
}
Console.WriteLine("Post finished");
}
#endregion
#region 七、BatchBlock
/**
* 7. WriteOnceBlock
* 打包单个数据的功能
* 即将数据累计到需要的数目再返回数据
* 但是调用Complete会直接返回所有数据,不管是否存满
*/
BatchBlock<int> bb1 = new BatchBlock<int>(3);
ActionBlock<int[]> ab1 = new ActionBlock<int[]>((i) =>
{
string s = string.Empty;
foreach (int m in i)
{
s += m + " ";
}
Console.WriteLine(s);
});
public void TestBatchBlock()
{
bb1.LinkTo(ab1);
for (int i = 0; i < 10; i++)
{
bb1.Post(i);
}
bb1.Complete();
Console.WriteLine("Finished post");
}
#endregion
#region 八、JoinBlock
/**
* 8. JoinBlock
* 当多个输入都满足时,才会有新的输出
*/
JoinBlock<int, string> jb = new JoinBlock<int, string>();
ActionBlock<Tuple<int, string>> ab2 = new ActionBlock<Tuple<int, string>>((i) =>
{
Console.WriteLine(i.Item1 + " " + i.Item2);
});
public void TestJoinBlock()
{
jb.LinkTo(ab2);
for (int i = 0; i < 5; i++)
{
jb.Target1.Post(i);
}
for (int i = 5; i > 0; i--)
{
Thread.Sleep(1000);
jb.Target2.Post(i.ToString());
}
Console.WriteLine("Finished post");
}
#endregion
#region 九、BatchedJoinBlock
/**
* 9. BatchedJoinBlock
* Batch与Join的组合
* 只要Target1和Target2的数目加起来等于设定的数目就会有新的输出
*/
BatchedJoinBlock<int, string> bjb = new BatchedJoinBlock<int, string>(3);
ActionBlock<Tuple<IList<int>, IList<string>>> ab3 = new ActionBlock<Tuple<IList<int>, IList<string>>>((i) =>
{
Console.WriteLine("-----------------------------");
foreach (int m in i.Item1)
{
Console.WriteLine(m);
};
foreach (string s in i.Item2)
{
Console.WriteLine(s);
};
});
public void TestBatchedJoinBlock()
{
bjb.LinkTo(ab3);
for (int i = 0; i < 5; i++)
{
bjb.Target1.Post(i);
}
for (int i = 5; i > 0; i--)
{
bjb.Target2.Post(i.ToString());
}
Console.WriteLine("Finished post");
}
#endregion
#endregion
二、测试
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Demo
{
internal class Program
{
static void Main(string[] args)
{
TPLDemo demo = new TPLDemo();
//demo.TestBufferBlock();
//demo.TestActionBlock();
//demo.TestActionBlock2();
//demo.TestTransformBlock();
//demo.TestTransformManyBlock();
//demo.TestBroadcastBlock();
//demo.TestWriteOnceBlock();
//demo.TestBatchBlock();
//demo.TestJoinBlock();
demo.TestBatchedJoinBlock();
Console.ReadLine();
}
}
}