js中的es6 中提出 map reduce filter 等方法;
那么我们在c#中似乎没看到呢,真的吗? are you kiding me?
先看map
static IEnumerable<TResult> Map<T,TResult>(Func<T, TResult> func,IEnumerable<T> list) { foreach(var i in list) { yield return func(i); } } static void Main(string[] args) { var testList = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; var mapResult = Map<int, int>(x => x + 2, testList).ToList<int>(); foreach(var i in mapResult) { Console.WriteLine(i); } //然而,在我们的额linq 早就有了该方法;那就是我们的SELECT var result = testList.Select(obj => obj + 2).ToList<int>(); Console.ReadLine();
Reduce;其实,这个函数命名为reduce,我觉得很不习惯,明明就是:Aggregate; 为啥还要叫reduce呢.....
//还是比较挺有意思的; static T Reduce<T,U>(Func<U,T,T> func, IEnumerable<U> list,T acc) { foreach(var i in list) { acc = func(i, acc); //相加的结果,成为下一个函数开始的参数 聚合 //fn(x,y)=> fn(fn(x+y),z)=>fn(fn(fn(x+y),z),zz) } return acc; } static void Main(string[] args) { var testList = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; var reduceResult = Reduce<int, int>((x, y) => x + y, testList, 0); Console.WriteLine(reduceResult); //然而,我们的c# 中也是早有了该方法了滴呀; var linqReduce = Enumerable.Range(1, 10).Aggregate(0, (acc, x) => acc + x);
c# 中的filter ,这个就没啥好说的了,其实就是我们的Where;
总结:
Map = Select | Enumerable.Range(1, 10).Select(x => x + 2);
Reduce = Aggregate | Enumerable.Range(1, 10).Aggregate(0, (acc, x) => acc + x);
Filter = Where | Enumerable.Range(1, 10).Where(x => x % 2 == 0);
看到这里,难道你不会想到,某种高级的技术吗?
那就是google 的 mapReduce呀;
看这:https://ayende.com/blog/4435/map-reduce-a-visual-explanation
好文 推荐:http://www.justinshield.com/2011/06/mapreduce-in-c/
下面就是我们探索的代码:
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication80 { /// <summary> /// 今天大概就研究两个东西; /// linq =>map reduce /// plinq /// 并行处理,还有我们actor模型的;当然中间少不了我们 go 语言的 csp; /// 递归 /// </summary> class Program { static void PlinqInfo() { int[] arr = new int[10] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; //串行计算; int target = 5; int len = arr.Count(); int index = 0; for (int i = 0; i < len; i++) { if (arr[i] == target) { index = i; } } //这个是中串行的方式再坚决问题; //查找方式;一个从后端,一个从前端,进行查找,可以简单的理解成折半查找,检查查找的循环的次数; //虽然我们进行任务的拆分,减少的循环的次数,但是,内部的执行依然是串行滴呀; for (int j = 0; j < len / 3; j++) { //循环次数减少了,但是里面的自行任务执行,还是串行滴呀; //step 1 if (arr[j] == target) { index = j; } //step 2 if (arr[len - j] == target) { index = len - j; } } //继续优化方式; //这种方式,才算一次循环中,并发执行了两个操作; //并发的进行任务的操作,并发的进行模式匹配; //但是最后又要涉及到,结果的,统计,reduce; //最终变化为 //每一次循环,开两个线程去并发处理; for (int j = 0; j < len / 3; j++) { //step 1 new Thread(o => { int param = (int)o; if (arr[param] == target) { index = param; } }).Start(j); //step 2 new Thread(o => { int param = (int)o; if (arr[param] == target) { index = param; } }).Start((len - 1)); } // 方法式三:对任务进行拆分; //与其说是任务拆分,不如说是数据拆分; int[] arr1 = new int[5]; int[] arr2 = new int[5]; Array.Copy(arr, 4, arr1, 0, 5); Array.Copy(arr, 5, arr1, 0, 5); //先将一个数组,分成两份; 然后对每一份进行并行操作; new Thread(o => { //操作arr1 }); new Thread(o => { //操作arr2 }); //处理完之后,进行聚合的统计操作; //中间利用并行处理,并行的关键,还是我们的 任务才分; //任务才分,并发处理,函数编程;之间的通信;actor 模型; //并发处理的顶级方法,还是我们的actor; } /// <summary> /// 这种方式应该是最土的一种做法了; /// </summary> static void BadCountWord() { } /// <summary> /// 最原始的方式来实现我们单词实现的次数的统计; /// Count Word; /// </summary> static void CountWord() { //这个是单线程的执行,串行的执行; //查询每个次出现的频率; String[] text = new String[] { "hello world", "hello every one", "say hello to everyone in the world", "fuck the world" }; //统计每个词出现的频率; Dictionary<string, int> dic = new Dictionary<string, int>(); //遍历三次; int len = text.Length; for (int i = 0; i < len; i++) { string[] temp = text[i].Split(' ');//这里假设我们的数据都是很有规律的,中间都有我们的空格; int tempLen = temp.Length; for (int j = 0; j < tempLen; j++) { string key = temp[j]; if (!dic.ContainsKey(key)) { dic.Add(key, 1); } else { //取出原来的值 int value = dic[key]; value++; dic[key] = value; } } } //最后统计结果; foreach (var item in dic) { Console.WriteLine($"{item.Key}:{item.Value}"); } } /// <summary> /// 并发的去执行我们的单词统计; /// 效果是相当不错滴呀; /// 一个节点处理一个数据; /// 可以冲真个过程来看,如果是小规模的数据,这样做map reduce ,明显是不划算滴呀; /// </summary> static void ParalleCountWord() { string[] text = new string[] { "hello world", "hello every one", "say hello to everyone in the world", "fuck the world" }; //假设我的电脑有四个cpu,实际上也是有的; //先进行任务拆分; 其实就是我们数据的拆分; string[] task1 = text[0].Split(' '); string[] task2 = text[1].Split(' '); string[] task3 = text[2].Split(' '); string[] task4 = text[3].Split(' '); //如果是在统一作用域中,我们当然可以使用这种方式来实现 //Task<Dictionary<string, int>> task1_Result = Task.Run(()=> MapNode(task1)); //然后并发执行四个任务; //Task<Dictionary<string, int>> task1_Result = Task.Factory.StartNew((data)=>MapNode(data),(task1); //这个相当于我们的map 并行的去map Task<Dictionary<string, int>> task1_Result = Task.Run(() => MapNode(task1)); Task<Dictionary<string, int>> task2_Result = Task.Run(() => MapNode(task2)); Task<Dictionary<string, int>> task3_Result = Task.Run(() => MapNode(task3)); Task<Dictionary<string, int>> task4_Result = Task.Run(() => MapNode(task4)); //并行的去reduec //然后来做我们的reduce;就是我们最后结果的统计的拉; Task.WaitAll(task1_Result, task2_Result, task3_Result, task4_Result); //都并发执行完之后,来进行我么结果的统计; //这里我们用连个节点去做reduce,主要是为了模拟一下我们德 map reduce 的整个过程滴啊; //也就是我们的两次聚合, //反正你,会发现如果数据量小的话,这种map reduce 是完全部划算滴呀; //第一次reduce; Task<Dictionary<string, int>> reduce1_result_1_plus_2 = Task.Run(() => ReduceInfo(task1_Result.Result, task2_Result.Result)); Task<Dictionary<string, int>> reduce2_result_3_plus_4 = Task.Run(() => ReduceInfo(task3_Result.Result, task4_Result.Result)); Task.WaitAll(reduce1_result_1_plus_2, reduce2_result_3_plus_4); //第二次reduce,也就是我们的最后一次reduce Task<Dictionary<string, int>> finallyResult = Task.Run(() => ReduceInfo(reduce1_result_1_plus_2.Result, reduce2_result_3_plus_4.Result)); var result = finallyResult.Result; foreach (var item in result) { Console.WriteLine($"{item.Key}:{item.Value}"); } //不知道你注意到没有,上面的map 和 reduce 方法是串行执行的,其实 我们的 map 和 reduce方法 是可以独立开来的; } /// <summary> /// 一个可以独立执行的单元; /// 每个node 处理完之后,都会返回我们的处理结果; /// </summary> /// <param name="arr"></param> static Task<Dictionary<string, int>> MapNode(string[] arr) { int len = arr.Length; Dictionary<string, int> dic = new Dictionary<string, int>(); for (int i = 0; i < len; i++) { string key = arr[i]; if (!dic.ContainsKey(key)) { dic.Add(key, 1); } else { dic[key] = dic[key]++; } } return Task.FromResult(dic); } /// <summary> /// 现在我们又要对结果进行各种统计了; /// 难道又开启四个节点去统计他们的结果? /// 这里的聚合,其实,我们进行group by 然后 count xx 接可以了,不过,这里主要是为了模拟 map reudce /// </summary> /// <returns></returns> static Task<Dictionary<string, int>> ReduceInfo(params Dictionary<string, int>[] dics) { Dictionary<string, int> result = new Dictionary<string, int>(); int len = dics.Length; for (int i = 0; i < len; i++) { var temp = dics[i]; foreach (var key in temp.Keys) { if (!result.ContainsKey(key)) { int value = temp[key]; //添加 result.Add(key, value); } else { int value = temp[key]; result[key] = result[key] + value; //重新赋值 } } } return Task.FromResult(result); } /// <summary> /// 接下来我们ms自带的plinq 来实现; /// 并发执行应该考虑的 /// 任务是否可以拆分;也就是说单个任务是否可以独立运行; /// 并且运行的结果没有先后顺序; /// </summary> static void PLinq() { //接下来,就用ms自带的plinq的方式来进行 map reduce 的使用; // string[] text = new string[] { "hello","world", "hello","every","one", "say","hello","to","everyone","in","the","world", "fuck","the","world" }; ////先要个每个值 标记一个类似权重的东西; //var result = text.Select(o => new { key = o,value = 1 }).ToList(); //这样进行我们的第一次的map 这样我们就有可以进行先关的各种操作了滴呀; ////然后进行group by; //var list = result.GroupBy(o => o.key, (key, elements) => new { key = key, Cout = elements.Count() }).ToList(); ////我擦,就这样简单的方式,就统计完了;我擦; //foreach (var item in list) //{ // Console.WriteLine($"{item.key}:{item.Cout}"); //} //大数据,肯定不会直接这样去map滴呀; var mapReuslt= text.AsParallel().Select(o => new { key = o, value = 1 }).ToList(); //并行的去map //group by 是如何实现的呢;往一个 dic添加值,如果存在就将value 加1; var list = mapReuslt.AsParallel().GroupBy(o => o.key, (key, elements) => new { key = key, Cout = elements.Count() }).ToList(); // groupby=>key //大概就是这么执行的喽; //充分利用我们的并行; foreach (var item in list) { Console.WriteLine($"{item.key}:{item.Cout}"); } } /// <summary> /// 这样额并行,执行结果,就爽是完成了,窝草尼玛; /// 我操tam的,不就是gourp by 然后 就 count 吗,我日,要搞这么麻烦,我 /// </summary> static void PLinq2() { string[] text = new string[] { "hello", "world", "hello", "every", "one", "say", "hello", "to", "everyone", "in", "the", "world", "fuck", "the", "world" }; var re = text.ToLookup(o=>o); //但是group by 背后真正的逻辑是撒;它是如何去做到去重和数据统计滴呀; 返回这样我们想要的记过的滴呀; foreach(var i in re) { Console.WriteLine(i.Key+"--"+ i.Count()); } //var result = text.ToLookup(key => key, value => 1).Select(o => new { key = o.Key, cout = o.Sum() }).ToList(); //foreach(var i in result) //{ // Console.WriteLine(i.key + " "+i.cout); //} } static void F() { // Create a list of Packages to put into a Lookup data structure. List<Package> packages = new List<Package> { new Package { Company = "Coho Vineyard", Weight = 1, TrackingNumber = 89453312L }, new Package { Company = "Lucerne Publishing", Weight = 2, TrackingNumber = 89112755L }, new Package { Company = "Wingtip Toys", Weight = 3, TrackingNumber = 299456122L }, new Package { Company = "Contoso Pharmaceuticals", Weight = 4, TrackingNumber = 670053128L }, new Package { Company = "Wide World Importers", Weight = 5, TrackingNumber = 4665518773L } }; // Create a Lookup to organize the packages. Use the first character of Company as the key value. // Select Company appended to TrackingNumber for each element value in the Lookup. var resultMax = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleMax = elements.Max(o => o.Weight) }).ToList(); //group by 之后返回 weith 最大的一条 var resultMin = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleMin = elements.Min(o => o.Weight) }).ToList();//group by 之后,返回weight最小的一条; //上面返回的仅仅是 最大个 最小的值;我要的是obje //var resultMax = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleMax = element }).ToList(); //group by 之后返回 weith 最大的一条 var resultList = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eles =elements }).ToList();//group by 之后,返回对应的数据集合; var resultSum = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleCount = elements.Count() }).ToList();//group by 之后,求重负的总条数目 var resultSd = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, Sum = elements.Sum(o=>o.Weight) }).ToList();//group by 之后,记录的综合; var result = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, ele = elements.Select(obj => obj.Company + obj.TrackingNumber).Aggregate((acc,next)=>acc+"--"+next) }).ToList(); //gory by 之后,求出结合累加的操作; foreach(var o in result) { Console.WriteLine($"{o.key}-----{o.ele}"); } //Lookup<char, string> lookup = (Lookup<char, string>)packages.ToLookup(p => Convert.ToChar(p.Company.Substring(0, 1)), // p => p.Company + " " + p.TrackingNumber); //var result = lookup.Select(o=>o.); //Console.WriteLine(result); } static void Main(string[] args) { PLinq2(); //F(); //string[] words = new string[] { "Able", "was", "I", "ere", "I", "saw", "Elba" }; //string s = words.Aggregate((a, n) => a + " " + n); //IEnumerable<string> list = new List<string>() { "I", "want", "to", "fuck", "the", "life" }; ////seed 你可以理解成我们的初始化值吧了; ////Iaccresult =seed; //如果有的话; ////当然考虑性能的话还是用StringBuilder吧,这里主要介绍用法。这个Sum做不到吧! ////它是对最后的中的结果进行操作; //var result = list.Aggregate("[seed]", (acc, next) => acc + " " + next, o => o + "执行完最后的结果后,进行最后一次的result的map"); ////map reduce的 关键点事plin map-reduce big data handle processs //Console.WriteLine(result); //这里,我再做一哈时间统计; //Console.WriteLine("常规串行执行方法----------"); //Stopwatch sw = Stopwatch.StartNew(); //sw.Restart(); //CountWord(); //Console.WriteLine($"串行执行时间{sw.ElapsedMilliseconds}"); //Console.WriteLine("map-reduce-并行处理方法-----------"); //sw.Restart(); //ParalleCountWord(); //Console.WriteLine($"map reduce 并行执行时间{sw.ElapsedMilliseconds}"); //Console.WriteLine($"Plinq方式执行---------------"); //sw.Restart(); //PLinq(); //Console.WriteLine($"PLinq 并行执行时间{sw.ElapsedMilliseconds}"); //PLinq(); Console.ReadLine(); } } internal class Package { internal string Company; internal long TrackingNumber; internal double Weight; } }