从文件中读取异步I/O.

use*_*678 3 c multithreading pthreads

我最近得到了多个项目的想法,都涉及从文件中读取IP地址.由于它们都应该能够处理大量主机,因此我尝试实现多线程或创建套接字池并从中选择() - 以实现某种形式的并发以获得更好的性能.在多个场合,从文件中读取似乎是提高性能的瓶颈.我理解它的方式,从具有fgets或类似的文件读取是一个同步,阻塞操作.因此,即使我成功实现了异步连接到多个主机的客户端,操作仍然是同步的,因为我一次只能从文件中读取一个地址.

    /* partially pseudo code */

/* getaddrinfo() stuff here */

while(fgets(ip, sizeof(ip), file) {
FD_ZERO(&readfds);
/* create n sockets here in a for loop */
for (i = 0; i < socket_num; i++) {
    if (newfd > fd[i]) newfd = fd[i];
    FD_SET(fd[i], &readfds);
}

/* here's where I think I should connect n sockets to n addresses from file
 * but I'm only getting one IP at a time from file, so I'm not sure how to connect to 
 * n addresses at once with fgets
 */

for (j = 0; j < socket_num; j++) {
        if ((connect(socket, ai->ai_addr, ai->ai_addrlen)) == -1)
        // error
        else { 
            freeaddrinfo(ai);       
        FD_SET(socket, &master);
            fdmax = socket;
            if (select(socket+1, &master, NULL, NULL, &tv) == -1);
        // error        
            if ((recvd = read(socket, banner, RECVD)) <= 0)
        // error
            if (FD_ISSET(socket, &master))
        // print success
        }
    /* clear sets and close sockets and stuff */
}
Run Code Online (Sandbox Code Playgroud)

我已经用注释指出了我的问题,但只是为了澄清:我不确定如何在从文件读取的多个目标服务器上执行异步I/O操作,因为从文件中读取条目似乎是严格同步的.我遇到了类似的多线程问题,成功程度略高.

    void *function_passed_to_pthread_create(void *opts) 
    { 
        while(fgets(ip_addr, sizeof(ip_addr), opts->file) {
            /* speak to ip_addr and get response */
    }
}

main()
{
    /* necessary stuff */
    for (i = 0; i < thread_num; i++) {
        pthread_create(&tasks, NULL, above_function, opts)
    }
    for (j = 0; j < thread_num; j++)
        /* join threads */
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

这似乎有效,但由于多个线程都在处理同一个文件,因此结果并不总是准确的.我想这是因为多个线程可以同时处理来自文件的相同地址.

我已经考虑将文件中的所有条目加载到数组/内存中,但如果文件特别大,我想可能会导致内存问题.最重要的是,我不确定无论如何都要做到这一点.

作为最后的说明; 如果我正在读取的文件恰好是一个包含大量IP的特别大的文件,那么我不相信任何一种解决方案都可以很好地扩展.虽然C有什么可能,所以我想有一些方法可以达到我希望的目的.

总结这篇文章; 我想在从文件中读取条目时,找到一种使用异步I/O或多线程来提高客户端应用程序性能的方法.

Sea*_*ema 5

有几个人在他们的评论中暗示了一个很好的解决方案,但可能更值得详细解释.该解决方案有相当多的细节,是相当复杂的代码,所以我打算用伪来解释我建议.

你在这里拥有的是一个经典的生产者/消费者问题的变体:你有一个产生数据的东西,以及许多试图消费这些数据的东西.在你的情况下,它必须是产生该数据的"单一事物",因为源文件的每一行的长度是未知的:你不能只是向前跳'n'字节并且不知何故在下一个IP上.一次只能有一个actor将读指针移向下一个未知位置\n,因此根据定义你有一个生产者.

攻击方法有三种:

  • 解决方案A涉及让每个线程从共享文件缓冲区中拉出一点,并在每次上次读取完成时启动异步(非阻塞)读取.有很多令人头痛的问题,因为它对文件系统和正在执行的工作之间的时序差异非常敏感:如果文件读取速度很慢,那么工作人员都将停止等待文件.如果工作人员很慢,文件阅读器将停止或填满内存,等待他们使用数据.这个解决方案可能是绝对最快的,但同样令人难以置信的同步代码可以解决大量的问题.除非你是线程专家(或非常聪明的滥用epoll_wait()),否则你可能不想走这条路.

  • 解决方案B有一个"主"线程,负责读取文件,并使用它读取的数据填充某种线程安全队列,每个队列条目有一个IP地址(一个字符串).每个工作线程只是尽可能快地使用队列条目,查询远程服务器然后请求另一个队列条目.这需要一点点关注,但通常比解决方案A更安全,特别是如果你使用其他人的队列实现.

  • 解决方案C非常讨厌,但你不应该把它解雇,取决于你正在做什么.这个解决方案只涉及使用类似Un*xsed命令(请参阅从给定开始和结束行号的文件获取一系列行)来提前将源文件切割成一堆"矮胖"源文件 - 例如,二十他们 然后你就可以并行使用一个非常简单的单线程程序运行20个副本&,每个程序位于文件的不同"切片"上.与一个小的shell脚本一起自动化它,这可以是满足许多需求的"足够好"的解决方案.


让我们仔细看看解决方案B - 具有线程安全队列的主线程.我将作弊并假设您可以构建一个工作队列实现(如果没有,有关于使用pthreads实现线程安全队列的StackOverflow文章:pthread同步阻塞队列).

在伪代码中,这个解决方案是这样的:

main()
{
    /* Create a queue. */
    queue = create_queue();

    /* Kick off the master thread to read the file, and give it the queue. */
    master_thread = pthread_create(master, queue);

    /* Kick off a bunch of workers with access to the queue. */
    for (i = 0; i < 20; i++) {
        worker_thread[i] = pthread_create(worker, queue);
    }

    /* Wait for everybody to finish. */
    pthread_join(master_thread);
    for (i = 0; i < 20; i++) {
        pthread_join(worker_thread[i]);
    }
}

void master(queue q)
{
    FILE *fp = fopen("ips.txt", "r");
    char buffer[BIGGER_THAN_ANY_IP];

    /* Inhale the file as fast as we can, and push each line we
       read onto the queue. */
    while (fgets(fp, buffer) != NULL) {
        char *next_ip = strdup(buffer);
        enqueue(q, next_ip);
    }

    /* Add some final messages in the queue to let the workers
       know that we're out of data.  There are *much* better ways
       of notifying them that we're "done", but in this case,
       pushing a bunch of NULLs equal to the number of threads is
       simple and probably good enough. */
    for (i = 0; i < 20; i++) {
        enqueue(q, NULL);
    }
}

void worker(queue q)
{
    char *ip;

    /* Inhale messages off the queue as fast as we can until
       we get a "NULL", which means that it's time to stop.
       The call to dequeue() *must* block if there's nothing
       in the queue; the call should only return NULL if the
       queue actually had NULL pushed into it. */
    while ((ip = dequeue(q)) != NULL) {

        /* Insert code to actually do the work here. */
        connect_and_send_and_receive_to(ip);
    }
}
Run Code Online (Sandbox Code Playgroud)

在一个真正的实现中有很多警告和细节(比如:我们如何实现队列,环形缓冲区或链表?如果文本不是所有的IP怎么办?如果char缓冲区不够大怎么办?怎么办?很多线程就足够了?我们如何处理文件或网络错误?malloc性能会成为瓶颈吗?如果队列变得太大会怎样?我们能不能更好地重叠网络I/O?).

但是,除了警告和细节之外,我上面提到的伪代码是一个很好的起点,你可以将它扩展为一个可行的解决方案.