In prepping for the next part of my OS journey (writing an init system!), I found a need to add pipelining to my shell. This entry is a quick jaunt into how that works. All the code we break down here can be found here.

What actually is a pipe?

Before we talk about a pipeline, it’s useful to know what a pipe is. Fundementally, a pipe is two File Descriptors (read: files, yes it’s a bit more complicated but bear with me), where what is written to one end of the pipe is readable in the other. Note that this isn’t a two way communication - one file descriptor is the input, and one is the output.

They can be created with the pipe(2) function in libc (which ultimately calls the mknod syscall), which returns the two file descriptors pertaining to the two ends of our pipe.

Pipes are useful as an Interprocess Communication (IPC) mechanism - if you delegate one end of the pipe to one process, and the other to another process, then those processes can communicate by sending data through the pipe.

A pipeline is simply a set of processes that use pipes to talk to one another. For example, if we have three processes, A, B, and C, a pipeline would involve A talking to B, and B talking to C through pipes.

Let’s implement one

Previously in our shell we implemented the ability to start a process. Processes were contained within a Process struct.

// A process that can be started and waited on.
#[derive(Debug)]
pub struct Process {
    pub argv: Vec<String>,
    pub state: ProcessState,
}

impl Process {
    /// Start the process in a new child process.
    pub fn start(&mut self) -> nix::Result<()> {
        ...
    }

    /// Waits for the process to be done.
    pub fn wait(&mut self) -> Result<(), WaitError> {
        ...
    }
}

Processes are created by the shell, using the parser to split up arguments, and then started with the start method. The shell then blocks until the process is finished by calling the wait command. To support pipelining, we’re gonna need to support creating a bunch of processes, and waiting on all of them.

We can start by defining a Pipeline struct:

/// The state of a pipeline of processes.
#[derive(Debug)]
pub enum PipelineState {
    Unstarted,
    // The process group ID of the pipeline.
    Running(Pid),
    Terminated,
}

/// A pipeline of processes.
pub struct Pipeline {
    pub processes: Vec<Process>,
    pub status: PipelineState,
}

A Pipeline is simply a collection of processes, and a status that defines whether the pipeline has been started or not. So how do we start a pipeline? Well, there’s a few steps:

  1. For every pair of processes, create a pipe, and set the stdout of the first process to the write end of the pipe, and the stdin of the second process to the read end of the pipe
  2. Start every process
  3. Wait for every process to exit

To abstract out the concept of stdin/stdout, I found it useful to create an IOTriple struct which contains each of those file descriptors, and defines how to create a pipe. The pipe method of an IOTriple simply calls the pipe libc command and returns two sets of stdin/out/errs, one for reading, and one for writing.

/// The standard input, output, and error file descriptors.
#[derive(Debug, Clone, Copy)]
pub struct IOTriple {
    pub stdin: i32,
    pub stdout: i32,
    pub stderr: i32,
}

impl IOTriple {
    /// Create a new pipe, and return the read and write ends of the pipe.
    /// The first element of the tuple is the read end of the pipe, which should be used to read from the pipe,
    /// and the second element is the write end of the pipe which should be used to write to the pipe.
    pub fn pipe(&self) -> Result<(IOTriple, IOTriple), nix::Error> {
        let (read, write) = pipe()?;

        // The read end of the pipe. Data from the write end can be read from the stdin of this triple
        let read = IOTriple {
            stdin: read,
            stdout: self.stdout,
            stderr: self.stderr,
        };

        // The write end of the pipe. Data written to the stdout of this triple will be readable from the read triple.
        let write = IOTriple {
            stdin: self.stdin,
            stdout: write,
            stderr: self.stderr,
        };

        Ok((read, write))
    }
}

We can then use that to start the pipeline:

pub fn execute(&mut self, triple: mut IOTriple) -> Result<(), WaitError> {
    // Split out the last process from the rest of the processes.
    let (last, rest) = self.processes.split_last_mut().expect("BUG: empty commands");

    // For every process that isn't the last, create a pipe start the process pointing to the write end
    // and store the read end for the next process in line.
    for process in rest.iter_mut() {
        let (read, write) = triple.pipe()?;
        process.start(write)?;

        triple = read;
    }

    // Finally, create the last process using the remaining triple (no need to pipe).
    last.start(triple)?;
}

That seems to work, but there’s a curious behaviour. If we take a simple pipeline, it seems to hang:

$ echo test | cat
test
... Hangs

That’s a bit strange, right? When echo exits its file descriptors should be cleaned up, which should result in cat seeing an end of file (EOF) and exiting. We’re missing something… Oh! When we fork a child process it inherits the shells file descriptors, but if the shell doesn’t close them, then they’ll stay open when the child exits. So we need to close the pipes in the shell, to make sure the pipe only belongs to the child process. Let’s add this to our loop:

// Close any pipe file descriptors, because they've been moved into the child process.
close(write.stdin)?;
close(write.stdout)?;
close(write.stderr)?;

And that seems to work! We can run our command now, and it exits without a problem:

$ echo test | cat
test
$

Waiting

The other problem in our pipeline is how we can wait for every process in the pipeline to finish. In our one process world, this is simple - we can use the waitpid systemcall to wait for the process to finish, by supplying the process ID of the process to that systemcall. What we need to do here is to wait on multiple process IDs. Hrmm…

So it turns out, there’s multiple variants of the wait call. There’s waitpid that we’re using, but there’s also waitid (note the missing p). If we read the docs for waitid, we find:

waitid()
The waitid() system call (available since Linux 2.6.9) provides more precise control over which child state changes to wait for.

The idtype and id arguments select the child(ren) to wait for, as follows:
...
idtype == P_PGID
    Wait for any child whose process group ID matches id. 

So what we need to do is put our pipeline processes in a “process group”, and then wait on that process group ID. Because unix likes its acronyms, we call a process group ID a “pgid”, and we can set the pgid of a process with the setpgid command. From those docs:

if pgid is 0, the process ID of the indicated process shall be used.

So, in the general case, pgid’s are basically process IDs. So our logic here is going to be to set every process in the pipeline’s pgid to the pid of the first process. To do that, we’ll need to store the pid of the first process, and then pass it into the start method of every other process. Let’s update start first:

 /// Start the process in a new child process.
pub fn start(&mut self, pgid: Option<Pid>, triple: IOTriple) -> nix::Result<()> {
    unsafe {
        match fork() {
            Ok(ForkResult::Parent { child }) => {
                if let Some(pgid) = pgid {
                    setpgid(child, pgid)?;
                } else {
                    setpgid(child, child)?;
                }
                self.state = ProcessState::Running(child);
            }
...

So start now takes an optional pgid. If that is set, then we put the child process into that process group. If it isn’t then we make a new process group with the PID of the child process. Using that logic, the first process in the pipeline will create its own process group, and then every other process will join it. On the pipeline side, this looks like:

pub fn execute(&mut self, triple: mut IOTriple) -> Result<(), WaitError> {
...
    let (read, write) = triple.pipe()?;
    command.start(pgid, write)?;

    // The process group ID of the pipeline will be the pid of the first process in the pipeline.
    if pgid.is_none() {
        match command.state {
            ProcessState::Running(pid) => pgid = Some(pid),
            _ => return Err(WaitError::NotRunning),
        }
    }
...
}

Finally, now that we have a process group, we can call waitid to wait on any of the child processes in our pipeline:

pub fn wait(&mut self) -> Result<(), WaitError> {
    let pgid = match self.status {
        PipelineState::Running(pgid) => pgid,
        _ => return Err(WaitError::NotRunning),
    };

    // Wait for any of the children to 
    if let Some(pid) = waitid(Id::PGid(pgid), WaitPidFlag::__WALL | WaitPidFlag::WEXITED)?.pid() {
        // Mark the child as finished.  
    }

    Ok(())
}

If we call wait until we get an ECHILD (which indicates we have no more children to wait on), then that’s our pipelining finished!

Wrapping up

So we now have pipelining in our shell - that’s pretty neat! It came in three real steps:

  • Chain processes together with pipes
  • Move all the processes into a process group
  • Wait on all the processes using that process group

None of which I’d really done before. It’s been super interesting to learn about these lower level concepts, and I hope folks are enjoying learning along!

I'm on Twitter: @sinkingpoint and BlueSky: @colindou.ch. Come yell at me!