在c中实现流水线操作.最好的方法是什么?(自己的linux shell)

Pat*_*ryk 3 c linux pipeline pipe mkfifo

我想不出有任何方法可以在c中实现真正有效的流水线操作.这就是我决定写在这里的原因.我不得不说,我明白管道/前叉/ mkfifo是如何工作的.我见过很多实现2-3个管道的例子.这很简单.我的问题开始了,当我必须实现shell时,管道计数是未知的.

我现在得到的东西:例如.

ls -al | tr a-z A-Z | tr A-Z a-z | tr a-z A-Z
Run Code Online (Sandbox Code Playgroud)

我把这样的线转换成这样的东西:

array[0] = {"ls", "-al", NULL"}
array[1] = {"tr", "a-z", "A-Z", NULL"}
array[2] = {"tr", "A-Z", "a-z", NULL"}
array[3] = {"tr", "a-z", "A-Z", NULL"}
Run Code Online (Sandbox Code Playgroud)

所以我可以使用

execvp(array[0],array)
Run Code Online (Sandbox Code Playgroud)

稍后的.

Untli现在,我相信一切都好.当我试图将这些函数输入/输出重定向到彼此时,问题开始了.

这就是我这样做的方式:

    mkfifo("queue", 0777);

    for (i = 0; i<= pipelines_count; i++) // eg. if there's 3 pipelines, there's 4 functions to execvp
    {
    int b = fork();             
    if (b == 0) // child
        {           
        int c = fork();

        if (c == 0) 
        // baby (younger than child) 
        // I use c process, to unblock desc_read and desc_writ for b process only
        // nothing executes in here
            {       
            if (i == 0) // 1st pipeline
                {
                int desc_read = open("queue", O_RDONLY);
                // dup2 here, so after closing there's still something that can read from 
                // from desc_read
                dup2(desc_read, 0); 
                close(desc_read);           
                }

            if (i == pipelines_count) // last pipeline
                {
                int desc_write = open("queue", O_WRONLY);
                dup2(desc_write, 0);
                close(desc_write);                              
                }

            if (i > 0 && i < pipelines_count) // pipeline somewhere inside
                {
                int desc_read = open("queue", O_RDONLY);
                int desc_write = open("queue", O_WRONLY);
                dup2(desc_write, 1);
                dup2(desc_read, 0);
                close(desc_write);
                close(desc_read);
                }               
            exit(0); // closing every connection between process c and pipeline             
            }
        else
        // b process here
        // in b process, i execvp commands
        {                       
        if (i == 0) // 1st pipeline (changing stdout only)
            {   
            int desc_write = open("queue", O_WRONLY);               
            dup2(desc_write, 1); // changing stdout -> pdesc[1]
            close(desc_write);                  
            }

        if (i == pipelines_count) // last pipeline (changing stdin only)
            {   
            int desc_read = open("queue", O_RDONLY);                                    
            dup2(desc_read, 0); // changing stdin -> pdesc[0]   
            close(desc_read);           
            }

        if (i > 0 && i < pipelines_count) // pipeline somewhere inside
            {               
            int desc_write = open("queue", O_WRONLY);       
            dup2(desc_write, 1); // changing stdout -> pdesc[1]
            int desc_read = open("queue", O_RDONLY);                            
            dup2(desc_read, 0); // changing stdin -> pdesc[0]
            close(desc_write);
            close(desc_read);                               
            }

        wait(NULL); // it wait's until, process c is death                      
        execvp(array[0],array);         
        }
        }
    else // parent (waits for 1 sub command to be finished)
        {       
        wait(NULL);
        }       
    }
Run Code Online (Sandbox Code Playgroud)

谢谢.

Wil*_*ris 5

Patryk,你为什么要使用fifo,而且管道的每个阶段都使用相同的fifo?

在我看来,你需要在每个阶段之间使用管道.所以流程将是这样的:

Shell             ls               tr                tr
-----             ----             ----              ----
pipe(fds);
fork();  
close(fds[0]);    close(fds[1]);
                  dup2(fds[0],0); 
                  pipe(fds);
                  fork();         
                  close(fds[0]);   close(fds[1]);  
                  dup2(fds[1],1);  dup2(fds[0],0);
                  exex(...);       pipe(fds);
                                   fork();     
                                   close(fds[0]);     etc
                                   dup2(fds[1],1);
                                   exex(...);  
Run Code Online (Sandbox Code Playgroud)

在每个分叉shell(close,dup2,pipe等)中运行的序列看起来像一个函数(获取所需进程的名称和参数).请注意,exec在每个调用之前,shell的分叉副本正在运行.

编辑:

Patryk:

Also, is my thinking correct? Shall it work like that? (pseudocode): 
start_fork(ls) -> end_fork(ls) -> start_fork(tr) -> end_fork(tr) -> 
start_fork(tr) -> end_fork(tr) 
Run Code Online (Sandbox Code Playgroud)

我不确定你的意思是start_fork和end_fork.您是否暗示lstr开始之前完成运行?这不是上图所示的真正含义.你的shell ls在开始之前不会等待完成tr.它开始的所有过程的按顺序配管,设置stdinstdout为每一个,以使工艺连接在一起,stdoutlsstdintr; stdouttrstdin了下tr.这就是dup2调用正在做的事情.

进程运行的顺序由操作系统(调度程序)决定,但显然如果tr运行并从空读取stdin它必须等待(阻塞),直到前面的进程将某些内容写入管道.很可能lstr从它读取之前可能会完成stdin,但同样可能它不会.例如,如果链中的第一个命令是连续运行并且沿途产生输出的东西,那么管道中的第二个命令将不时被安排到任何沿管道发送的任何内容.

希望能澄清一点:-)

  • 不,因为你已经重复了描述符 (2认同)