c# 中的 map-reduce-filter(map-reduce的简单探索)

左丘阳晖
2023-12-01

js中的es6 中提出 map  reduce filter 等方法;

那么我们在c#中似乎没看到呢,真的吗? are you kiding me?

 先看map

static IEnumerable<TResult> Map<T,TResult>(Func<T, TResult> func,IEnumerable<T> list)
        {
            foreach(var i in list)
            {
                yield return func(i);
            }
        }



        static void Main(string[] args)
        {
            var testList = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

            var mapResult = Map<int, int>(x => x + 2, testList).ToList<int>();

            foreach(var i in mapResult)
            {
                Console.WriteLine(i);
            }

            //然而,在我们的额linq 早就有了该方法;那就是我们的SELECT
            var result = testList.Select(obj => obj + 2).ToList<int>();

            Console.ReadLine();

 Reduce;其实,这个函数命名为reduce,我觉得很不习惯,明明就是:Aggregate; 为啥还要叫reduce呢.....

//还是比较挺有意思的;
        static T Reduce<T,U>(Func<U,T,T> func, IEnumerable<U> list,T acc)
        {
            foreach(var i in list)
            {
                acc = func(i, acc);
                //相加的结果,成为下一个函数开始的参数 聚合
                //fn(x,y)=> fn(fn(x+y),z)=>fn(fn(fn(x+y),z),zz)
            }
            return acc;
        }


        static void Main(string[] args)
        {
            var testList = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
            var reduceResult = Reduce<int, int>((x, y) => x + y, testList, 0);
            Console.WriteLine(reduceResult);


            //然而,我们的c# 中也是早有了该方法了滴呀;
            var linqReduce = Enumerable.Range(1, 10).Aggregate(0, (acc, x) => acc + x);

c# 中的filter ,这个就没啥好说的了,其实就是我们的Where;

总结:

Map = Select | Enumerable.Range(1, 10).Select(x => x + 2);
Reduce = Aggregate | Enumerable.Range(1, 10).Aggregate(0, (acc, x) => acc + x);
Filter = Where | Enumerable.Range(1, 10).Where(x => x % 2 == 0);

 

看到这里,难道你不会想到,某种高级的技术吗?

那就是google 的 mapReduce呀;

 看这:https://ayende.com/blog/4435/map-reduce-a-visual-explanation

 好文 推荐:http://www.justinshield.com/2011/06/mapreduce-in-c/

 

下面就是我们探索的代码:

 

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication80
{

    /// <summary>
    /// 今天大概就研究两个东西;
    /// linq =>map reduce 
    /// plinq
    /// 并行处理,还有我们actor模型的;当然中间少不了我们 go 语言的 csp;
    /// 递归
    /// </summary>
    class Program
    {

        static void PlinqInfo()
        {
            int[] arr = new int[10] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
            //串行计算;
            int target = 5;
            int len = arr.Count();
            int index = 0;
            for (int i = 0; i < len; i++)
            {
                if (arr[i] == target)
                {
                    index = i;
                }
            }

            //这个是中串行的方式再坚决问题;
            //查找方式;一个从后端,一个从前端,进行查找,可以简单的理解成折半查找,检查查找的循环的次数;
            //虽然我们进行任务的拆分,减少的循环的次数,但是,内部的执行依然是串行滴呀;
            for (int j = 0; j < len / 3; j++)
            {
                //循环次数减少了,但是里面的自行任务执行,还是串行滴呀;
                //step 1
                if (arr[j] == target)
                {
                    index = j;
                }
                //step 2
                if (arr[len - j] == target)
                {
                    index = len - j;
                }
            }

            //继续优化方式;
            //这种方式,才算一次循环中,并发执行了两个操作;
            //并发的进行任务的操作,并发的进行模式匹配;
            //但是最后又要涉及到,结果的,统计,reduce;
            //最终变化为
            //每一次循环,开两个线程去并发处理;
            for (int j = 0; j < len / 3; j++)
            {
                //step 1
                new Thread(o =>
                {
                    int param = (int)o;
                    if (arr[param] == target)
                    {
                        index = param;
                    }
                }).Start(j);

                //step 2
                new Thread(o =>
                {
                    int param = (int)o;
                    if (arr[param] == target)
                    {
                        index = param;
                    }
                }).Start((len - 1));

            }

            // 方法式三:对任务进行拆分;
            //与其说是任务拆分,不如说是数据拆分;
            int[] arr1 = new int[5];
            int[] arr2 = new int[5];
            Array.Copy(arr, 4, arr1, 0, 5);
            Array.Copy(arr, 5, arr1, 0, 5);
            //先将一个数组,分成两份; 然后对每一份进行并行操作;
            new Thread(o =>
            {
                //操作arr1

            });
            new Thread(o =>
            {
                //操作arr2

            });

            //处理完之后,进行聚合的统计操作;
            //中间利用并行处理,并行的关键,还是我们的 任务才分;
            //任务才分,并发处理,函数编程;之间的通信;actor 模型;
            //并发处理的顶级方法,还是我们的actor;

        }


        /// <summary>
        /// 这种方式应该是最土的一种做法了;
        /// </summary>
        static void BadCountWord()
        {

        }

        /// <summary>
        /// 最原始的方式来实现我们单词实现的次数的统计;
        /// Count Word;
        /// </summary>
        static void CountWord()
        {
            //这个是单线程的执行,串行的执行;
            //查询每个次出现的频率;
            String[] text = new String[] { "hello world", "hello every one", "say hello to everyone in the world", "fuck the world" };
            //统计每个词出现的频率;
            Dictionary<string, int> dic = new Dictionary<string, int>();
            //遍历三次;
            int len = text.Length;
            for (int i = 0; i < len; i++)
            {
                string[] temp = text[i].Split(' ');//这里假设我们的数据都是很有规律的,中间都有我们的空格;
                int tempLen = temp.Length;
                for (int j = 0; j < tempLen; j++)
                {
                    string key = temp[j];
                    if (!dic.ContainsKey(key))
                    {
                        dic.Add(key, 1);
                    }
                    else
                    {
                        //取出原来的值
                        int value = dic[key];
                        value++;
                        dic[key] = value;
                    }
                }

            }

            //最后统计结果;
            foreach (var item in dic)
            {
                Console.WriteLine($"{item.Key}:{item.Value}");
            }

        }


        /// <summary>
        /// 并发的去执行我们的单词统计;
        /// 效果是相当不错滴呀;
        /// 一个节点处理一个数据;
        /// 可以冲真个过程来看,如果是小规模的数据,这样做map reduce ,明显是不划算滴呀;
        /// </summary>
        static void ParalleCountWord()
        {
            string[] text = new string[] { "hello world", "hello every one", "say hello to everyone in the world", "fuck the world" };
            //假设我的电脑有四个cpu,实际上也是有的;
            //先进行任务拆分; 其实就是我们数据的拆分;
            string[] task1 = text[0].Split(' ');
            string[] task2 = text[1].Split(' ');
            string[] task3 = text[2].Split(' ');
            string[] task4 = text[3].Split(' ');


            //如果是在统一作用域中,我们当然可以使用这种方式来实现
            //Task<Dictionary<string, int>> task1_Result = Task.Run(()=> MapNode(task1));
            //然后并发执行四个任务;
            //Task<Dictionary<string, int>> task1_Result = Task.Factory.StartNew((data)=>MapNode(data),(task1);


            //这个相当于我们的map 并行的去map
            Task<Dictionary<string, int>> task1_Result = Task.Run(() => MapNode(task1));
            Task<Dictionary<string, int>> task2_Result = Task.Run(() => MapNode(task2));
            Task<Dictionary<string, int>> task3_Result = Task.Run(() => MapNode(task3));
            Task<Dictionary<string, int>> task4_Result = Task.Run(() => MapNode(task4));



            //并行的去reduec
            //然后来做我们的reduce;就是我们最后结果的统计的拉;
            Task.WaitAll(task1_Result, task2_Result, task3_Result, task4_Result);
            //都并发执行完之后,来进行我么结果的统计;
            //这里我们用连个节点去做reduce,主要是为了模拟一下我们德 map reduce 的整个过程滴啊;
            //也就是我们的两次聚合,
            //反正你,会发现如果数据量小的话,这种map reduce 是完全部划算滴呀;

            //第一次reduce;

            Task<Dictionary<string, int>> reduce1_result_1_plus_2 = Task.Run(() => ReduceInfo(task1_Result.Result, task2_Result.Result));
            Task<Dictionary<string, int>> reduce2_result_3_plus_4 = Task.Run(() => ReduceInfo(task3_Result.Result, task4_Result.Result));

            Task.WaitAll(reduce1_result_1_plus_2, reduce2_result_3_plus_4);
            //第二次reduce,也就是我们的最后一次reduce

            Task<Dictionary<string, int>> finallyResult = Task.Run(() => ReduceInfo(reduce1_result_1_plus_2.Result, reduce2_result_3_plus_4.Result));

            var result = finallyResult.Result;

            foreach (var item in result)
            {
                Console.WriteLine($"{item.Key}:{item.Value}");
            }

            //不知道你注意到没有,上面的map 和 reduce 方法是串行执行的,其实 我们的 map 和 reduce方法 是可以独立开来的;


        }

        /// <summary>
        /// 一个可以独立执行的单元;
        /// 每个node 处理完之后,都会返回我们的处理结果;
        /// </summary>
        /// <param name="arr"></param>
        static Task<Dictionary<string, int>> MapNode(string[] arr)
        {
            int len = arr.Length;
            Dictionary<string, int> dic = new Dictionary<string, int>();
            for (int i = 0; i < len; i++)
            {
                string key = arr[i];
                if (!dic.ContainsKey(key))
                {
                    dic.Add(key, 1);
                }
                else
                {
                    dic[key] = dic[key]++;
                }
            }

            return Task.FromResult(dic);
        }

        /// <summary>
        /// 现在我们又要对结果进行各种统计了;
        /// 难道又开启四个节点去统计他们的结果?
        /// 这里的聚合,其实,我们进行group by 然后 count xx 接可以了,不过,这里主要是为了模拟 map reudce
        /// </summary>
        /// <returns></returns>
        static Task<Dictionary<string, int>> ReduceInfo(params Dictionary<string, int>[] dics)
        {

            Dictionary<string, int> result = new Dictionary<string, int>();
            int len = dics.Length;
            for (int i = 0; i < len; i++)
            {
                var temp = dics[i];
                foreach (var key in temp.Keys)
                {
                    if (!result.ContainsKey(key))
                    {
                        int value = temp[key]; //添加
                        result.Add(key, value);
                    }
                    else
                    {
                        int value = temp[key];
                        result[key] = result[key] + value; //重新赋值
                    }

                }
            }
            return Task.FromResult(result);
        }


        /// <summary>
        /// 接下来我们ms自带的plinq 来实现;
        /// 并发执行应该考虑的
        /// 任务是否可以拆分;也就是说单个任务是否可以独立运行;
        /// 并且运行的结果没有先后顺序;
        /// </summary>
        static void PLinq()
        {
            //接下来,就用ms自带的plinq的方式来进行 map reduce 的使用;
            //
            string[] text = new string[] { "hello","world", "hello","every","one", "say","hello","to","everyone","in","the","world", "fuck","the","world" };
            ////先要个每个值 标记一个类似权重的东西;
            //var result = text.Select(o =>  new { key = o,value = 1 }).ToList(); //这样进行我们的第一次的map 这样我们就有可以进行先关的各种操作了滴呀;
            ////然后进行group by;
            //var list = result.GroupBy(o => o.key, (key, elements) => new { key = key, Cout = elements.Count() }).ToList();

            ////我擦,就这样简单的方式,就统计完了;我擦;

            //foreach (var item in list)
            //{
            //    Console.WriteLine($"{item.key}:{item.Cout}");
            //}

            //大数据,肯定不会直接这样去map滴呀;

            var mapReuslt= text.AsParallel().Select(o => new { key = o, value = 1 }).ToList();  //并行的去map


            //group by 是如何实现的呢;往一个 dic添加值,如果存在就将value 加1;
            var list = mapReuslt.AsParallel().GroupBy(o => o.key, (key, elements) => new { key = key, Cout = elements.Count() }).ToList(); // groupby=>key

            //大概就是这么执行的喽;
            //充分利用我们的并行;
            foreach (var item in list)
            {
                Console.WriteLine($"{item.key}:{item.Cout}");
            }

        }


        /// <summary>
        /// 这样额并行,执行结果,就爽是完成了,窝草尼玛;
        /// 我操tam的,不就是gourp by 然后 就 count 吗,我日,要搞这么麻烦,我
        /// </summary>
        static void PLinq2()
        {
            string[] text = new string[] { "hello", "world", "hello", "every", "one", "say", "hello", "to", "everyone", "in", "the", "world", "fuck", "the", "world" };

            var re = text.ToLookup(o=>o);   //但是group by 背后真正的逻辑是撒;它是如何去做到去重和数据统计滴呀; 返回这样我们想要的记过的滴呀;

            foreach(var i in re)
            {
                Console.WriteLine(i.Key+"--"+ i.Count());
            }

            //var result = text.ToLookup(key => key, value => 1).Select(o => new { key = o.Key, cout = o.Sum() }).ToList();



            //foreach(var i in result)
            //{
            //    Console.WriteLine(i.key + " "+i.cout);
            //}



        }

        static void F()
        {
            // Create a list of Packages to put into a Lookup data structure.
            List<Package> packages = new List<Package> { new Package { Company = "Coho Vineyard", Weight = 1, TrackingNumber = 89453312L },
                                                 new Package { Company = "Lucerne Publishing", Weight = 2, TrackingNumber = 89112755L },
                                                 new Package { Company = "Wingtip Toys", Weight = 3, TrackingNumber = 299456122L },
                                                 new Package { Company = "Contoso Pharmaceuticals", Weight = 4, TrackingNumber = 670053128L },
                                                 new Package { Company = "Wide World Importers", Weight = 5, TrackingNumber = 4665518773L } };

            // Create a Lookup to organize the packages. Use the first character of Company as the key value.
            // Select Company appended to TrackingNumber for each element value in the Lookup.


            var resultMax = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleMax = elements.Max(o => o.Weight) }).ToList(); //group by 之后返回 weith 最大的一条

            var resultMin = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleMin = elements.Min(o => o.Weight) }).ToList();//group by  之后,返回weight最小的一条;


            //上面返回的仅仅是 最大个 最小的值;我要的是obje

            //var resultMax = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleMax = element }).ToList(); //group by 之后返回 weith 最大的一条



            var resultList = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eles =elements }).ToList();//group by  之后,返回对应的数据集合;

            var resultSum = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleCount = elements.Count() }).ToList();//group by  之后,求重负的总条数目

            var resultSd = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, Sum = elements.Sum(o=>o.Weight) }).ToList();//group by  之后,记录的综合;


            var result = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, ele = elements.Select(obj => obj.Company + obj.TrackingNumber).Aggregate((acc,next)=>acc+"--"+next) }).ToList(); //gory by 之后,求出结合累加的操作;

            foreach(var o in result)
            {
                Console.WriteLine($"{o.key}-----{o.ele}");
            }


            //Lookup<char, string> lookup = (Lookup<char, string>)packages.ToLookup(p => Convert.ToChar(p.Company.Substring(0, 1)),
            //                                                p => p.Company + " " + p.TrackingNumber);

            
            //var result = lookup.Select(o=>o.);
            //Console.WriteLine(result);

        }



        static void Main(string[] args)
        {
            PLinq2();
            //F();

            //string[] words = new string[] { "Able", "was", "I", "ere", "I", "saw", "Elba" };
            //string s = words.Aggregate((a, n) => a + " " + n);


            //IEnumerable<string> list = new List<string>() { "I", "want", "to", "fuck", "the", "life" };

            ////seed 你可以理解成我们的初始化值吧了;
            ////Iaccresult =seed; //如果有的话;
            ////当然考虑性能的话还是用StringBuilder吧,这里主要介绍用法。这个Sum做不到吧!
            ////它是对最后的中的结果进行操作;
            //var result = list.Aggregate("[seed]", (acc, next) => acc + " " + next, o => o + "执行完最后的结果后,进行最后一次的result的map");
            ////map reduce的 关键点事plin map-reduce big data handle processs
            //Console.WriteLine(result);
            //这里,我再做一哈时间统计;
            //Console.WriteLine("常规串行执行方法----------");
            //Stopwatch sw = Stopwatch.StartNew();
            //sw.Restart();
            //CountWord();
            //Console.WriteLine($"串行执行时间{sw.ElapsedMilliseconds}");


            //Console.WriteLine("map-reduce-并行处理方法-----------");
            //sw.Restart();
            //ParalleCountWord();
            //Console.WriteLine($"map reduce 并行执行时间{sw.ElapsedMilliseconds}");


            //Console.WriteLine($"Plinq方式执行---------------");
            //sw.Restart();
            //PLinq();
            //Console.WriteLine($"PLinq 并行执行时间{sw.ElapsedMilliseconds}");

            //PLinq();

            Console.ReadLine();

        }
    }

    internal class Package
    {
        internal string Company;
        internal long TrackingNumber;
        internal double Weight;
    }
}

 

转载于:https://www.cnblogs.com/mc67/p/7391007.html

 类似资料: