foreach循环上的多线程?

lor*_*tyx 4 foreach multithreading c#-4.0

我想处理一些数据.我在字典中有大约25k项.在foreach循环中,我查询数据库以获得该项目的结果.它们被添加为词典的值.

foreach (KeyValuePair<string, Type> pair in allPeople)
{
    MySqlCommand comd = new MySqlCommand("SELECT * FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);
    MySqlDataReader reader2 = comd.ExecuteReader();
    Dictionary<string, Dictionary<int, Log>> allViews = new Dictionary<string, Dictionary<int, Log>>();
    while (reader2.Read())
    {
        if (!allViews.ContainsKey(reader2.GetString("src")))
        {
            allViews.Add(reader2.GetString("src"), reader2.GetInt32("time"));
        }
    }
    reader2.Close();
    reader2.Dispose();
    allPeople[pair.Key].View = allViews;
}
Run Code Online (Sandbox Code Playgroud)

我希望能够通过多线程更快地完成这项工作.我有8个线程可用,CPU使用率约为13%.我只是不知道它是否会起作用,因为它依赖于MySQL服务器.另一方面,也许8个线程可以打开8个DB连接,因此速度更快.

无论如何,如果多线程对我的情况有帮助,怎么样?oO我从未使用(多个)线程,所以任何帮助都会很棒:D

Pet*_*ham 5

MySqlDataReader是有状态的 - 你调用Read()它并移动到下一行,所以每个线程都需要自己的读者,你需要编写一个查询,以便它们获得不同的值.这可能不会太难,因为你自然会有很多不同的pair.Key值的查询.

您还需要为每个线程提供一个临时字典,然后将它们合并,或者使用锁来防止对字典进行并发修改.

以上假设MySQL将允许单个连接执行并发查询; 否则你可能也需要多个连接.

首先,我会看到如果您只询问数据库需要的数据("SELECT src,time FROM日志WHERE IP = '" + pair.Key + "' GROUP BY src")并使用GetString(0)和GetInt32(1)而不是使用名称来查找src和时间,会发生什么.也只从结果中获取一次值.

我也不确定逻辑 - 你没有按时间排序日志事件,所以哪一个是第一个返回的(因此存储在字典中)可能是其中任何一个.

类似于这种逻辑的东西 - 其中每个N个线程仅在第N对上运行,每个线程都有自己的读取器,并且实际上没有任何变化allPeople,只有以下值中的值的属性allPeople:

    private void RunSubQuery(Dictionary<string, Type> allPeople, MySqlConnection con, int threadNumber, int threadCount)
    {
        int hoppity = 0; // used to hop over the keys not processed by this thread

        foreach (var pair in allPeople)
        {
            // each of the (threadCount) threads only processes the (threadCount)th key
            if ((hoppity % threadCount) == threadNumber)
            {
                // you may need con per thread, or it might be that you can share con; I don't know
                MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);

                using (MySqlDataReader reader = comd.ExecuteReader())
                {
                    var allViews = new Dictionary<string, Dictionary<int, Log>>();

                    while (reader.Read())
                    {
                        string src = reader.GetString(0);
                        int time = reader.GetInt32(1);

                        // do whatever to allViews with src and time
                    }

                    // no thread will be modifying the same pair.Value, so this is safe
                    pair.Value.View = allViews;
                }
            }

            ++hoppity;
        }
    }
Run Code Online (Sandbox Code Playgroud)

这没有经过测试 - 我在这台机器上没有MySQL,也没有你的数据库和你正在使用的其他类型.它也是程序性的(如何在Fortran中使用OpenMPI进行),而不是将所有内容包装在任务对象中.

您可以像这样启动线程:

    void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
    {
        lock (allPeople)
        {
            const int threadCount = 8; // the number of threads

            // if it takes 18 seconds currently and you're not at .net 4 yet, then you may as well create
            // the threads here as any saving of using a pool will not matter against 18 seconds
            //
            // it could be more efficient to use a pool so that each thread takes a pair off of 
            // a queue, as doing it this way means that each thread has the same number of pairs to process,
            // and some pairs might take longer than others
            Thread[] threads = new Thread[threadCount];

            for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
            {
                threads[threadNumber] = new Thread(new ThreadStart(() => RunSubQuery(allPeople, connection, threadNumber, threadCount)));
                threads[threadNumber].Start();
            }

            // wait for all threads to finish
            for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
            {
                threads[threadNumber].Join();
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

所有线程返回后,所有人员都可以使用额外的锁定; 我不太确定是否需要它.任何对象都可以.

这没有任何保证任何性能增益 - 可能是MySQL库是单线程的,但服务器当然可以处理多个连接.使用不同数量的线程进行测量.


如果你使用的是.net 4,那么你不必乱用创建线程或跳过你没有工作的项目:

    // this time using .net 4 parallel; assumes that connection is thread safe
    static void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
    {
        Parallel.ForEach(allPeople, pair => RunPairQuery(pair, connection));
    }

    private static void RunPairQuery(KeyValuePair<string, Type> pair, MySqlConnection connection)
    {
        MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", connection);

        using (MySqlDataReader reader = comd.ExecuteReader())
        {
            var allViews = new Dictionary<string, Dictionary<int, Log>>();

            while (reader.Read())
            {
                string src = reader.GetString(0);
                int time = reader.GetInt32(1);

                // do whatever to allViews with src and time
            }

            // no iteration will be modifying the same pair.Value, so this is safe
            pair.Value.View = allViews;
        }
    }
Run Code Online (Sandbox Code Playgroud)