...

Source file src/os/pipe_test.go

Documentation: os

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Test broken pipes on Unix systems.
     6  //
     7  //go:build !plan9 && !js && !wasip1
     8  
     9  package os_test
    10  
    11  import (
    12  	"bufio"
    13  	"bytes"
    14  	"fmt"
    15  	"internal/testenv"
    16  	"io"
    17  	"io/fs"
    18  	"os"
    19  	"os/exec"
    20  	"os/signal"
    21  	"runtime"
    22  	"strconv"
    23  	"strings"
    24  	"sync"
    25  	"syscall"
    26  	"testing"
    27  	"time"
    28  )
    29  
    30  func TestEPIPE(t *testing.T) {
    31  	// This test cannot be run in parallel because of a race similar
    32  	// to the one reported in https://go.dev/issue/22315.
    33  	//
    34  	// Even though the pipe is opened with O_CLOEXEC, if another test forks in
    35  	// between the call to os.Pipe and the call to r.Close, that child process can
    36  	// retain an open copy of r's file descriptor until it execs. If one of our
    37  	// Write calls occurs during that interval it can spuriously succeed,
    38  	// buffering the write to the child's copy of the pipe (even though the child
    39  	// will not actually read the buffered bytes).
    40  
    41  	r, w, err := os.Pipe()
    42  	if err != nil {
    43  		t.Fatal(err)
    44  	}
    45  	if err := r.Close(); err != nil {
    46  		t.Fatal(err)
    47  	}
    48  
    49  	expect := syscall.EPIPE
    50  	if runtime.GOOS == "windows" {
    51  		// 232 is Windows error code ERROR_NO_DATA, "The pipe is being closed".
    52  		expect = syscall.Errno(232)
    53  	}
    54  	// Every time we write to the pipe we should get an EPIPE.
    55  	for i := 0; i < 20; i++ {
    56  		_, err = w.Write([]byte("hi"))
    57  		if err == nil {
    58  			t.Fatal("unexpected success of Write to broken pipe")
    59  		}
    60  		if pe, ok := err.(*fs.PathError); ok {
    61  			err = pe.Err
    62  		}
    63  		if se, ok := err.(*os.SyscallError); ok {
    64  			err = se.Err
    65  		}
    66  		if err != expect {
    67  			t.Errorf("iteration %d: got %v, expected %v", i, err, expect)
    68  		}
    69  	}
    70  }
    71  
    72  func TestStdPipe(t *testing.T) {
    73  	switch runtime.GOOS {
    74  	case "windows":
    75  		t.Skip("Windows doesn't support SIGPIPE")
    76  	}
    77  
    78  	if os.Getenv("GO_TEST_STD_PIPE_HELPER") != "" {
    79  		if os.Getenv("GO_TEST_STD_PIPE_HELPER_SIGNAL") != "" {
    80  			signal.Notify(make(chan os.Signal, 1), syscall.SIGPIPE)
    81  		}
    82  		switch os.Getenv("GO_TEST_STD_PIPE_HELPER") {
    83  		case "1":
    84  			os.Stdout.Write([]byte("stdout"))
    85  		case "2":
    86  			os.Stderr.Write([]byte("stderr"))
    87  		case "3":
    88  			if _, err := os.NewFile(3, "3").Write([]byte("3")); err == nil {
    89  				os.Exit(3)
    90  			}
    91  		default:
    92  			panic("unrecognized value for GO_TEST_STD_PIPE_HELPER")
    93  		}
    94  		// For stdout/stderr, we should have crashed with a broken pipe error.
    95  		// The caller will be looking for that exit status,
    96  		// so just exit normally here to cause a failure in the caller.
    97  		// For descriptor 3, a normal exit is expected.
    98  		os.Exit(0)
    99  	}
   100  
   101  	testenv.MustHaveExec(t)
   102  	// This test cannot be run in parallel due to the same race as for TestEPIPE.
   103  	// (We expect a write to a closed pipe can fail, but a concurrent fork of a
   104  	// child process can cause the pipe to unexpectedly remain open.)
   105  
   106  	r, w, err := os.Pipe()
   107  	if err != nil {
   108  		t.Fatal(err)
   109  	}
   110  	if err := r.Close(); err != nil {
   111  		t.Fatal(err)
   112  	}
   113  	// Invoke the test program to run the test and write to a closed pipe.
   114  	// If sig is false:
   115  	// writing to stdout or stderr should cause an immediate SIGPIPE;
   116  	// writing to descriptor 3 should fail with EPIPE and then exit 0.
   117  	// If sig is true:
   118  	// all writes should fail with EPIPE and then exit 0.
   119  	for _, sig := range []bool{false, true} {
   120  		for dest := 1; dest < 4; dest++ {
   121  			cmd := testenv.Command(t, os.Args[0], "-test.run", "TestStdPipe")
   122  			cmd.Stdout = w
   123  			cmd.Stderr = w
   124  			cmd.ExtraFiles = []*os.File{w}
   125  			cmd.Env = append(os.Environ(), fmt.Sprintf("GO_TEST_STD_PIPE_HELPER=%d", dest))
   126  			if sig {
   127  				cmd.Env = append(cmd.Env, "GO_TEST_STD_PIPE_HELPER_SIGNAL=1")
   128  			}
   129  			if err := cmd.Run(); err == nil {
   130  				if !sig && dest < 3 {
   131  					t.Errorf("unexpected success of write to closed pipe %d sig %t in child", dest, sig)
   132  				}
   133  			} else if ee, ok := err.(*exec.ExitError); !ok {
   134  				t.Errorf("unexpected exec error type %T: %v", err, err)
   135  			} else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
   136  				t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
   137  			} else if ws.Signaled() && ws.Signal() == syscall.SIGPIPE {
   138  				if sig || dest > 2 {
   139  					t.Errorf("unexpected SIGPIPE signal for descriptor %d sig %t", dest, sig)
   140  				}
   141  			} else {
   142  				t.Errorf("unexpected exit status %v for descriptor %d sig %t", err, dest, sig)
   143  			}
   144  		}
   145  	}
   146  
   147  	// Test redirecting stdout but not stderr.  Issue 40076.
   148  	cmd := testenv.Command(t, os.Args[0], "-test.run", "TestStdPipe")
   149  	cmd.Stdout = w
   150  	var stderr bytes.Buffer
   151  	cmd.Stderr = &stderr
   152  	cmd.Env = append(cmd.Environ(), "GO_TEST_STD_PIPE_HELPER=1")
   153  	if err := cmd.Run(); err == nil {
   154  		t.Errorf("unexpected success of write to closed stdout")
   155  	} else if ee, ok := err.(*exec.ExitError); !ok {
   156  		t.Errorf("unexpected exec error type %T: %v", err, err)
   157  	} else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
   158  		t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
   159  	} else if !ws.Signaled() || ws.Signal() != syscall.SIGPIPE {
   160  		t.Errorf("unexpected exit status %v for write to closed stdout", err)
   161  	}
   162  	if output := stderr.Bytes(); len(output) > 0 {
   163  		t.Errorf("unexpected output on stderr: %s", output)
   164  	}
   165  }
   166  
   167  func testClosedPipeRace(t *testing.T, read bool) {
   168  	// This test cannot be run in parallel due to the same race as for TestEPIPE.
   169  	// (We expect a write to a closed pipe can fail, but a concurrent fork of a
   170  	// child process can cause the pipe to unexpectedly remain open.)
   171  
   172  	limit := 1
   173  	if !read {
   174  		// Get the amount we have to write to overload a pipe
   175  		// with no reader.
   176  		limit = 131073
   177  		if b, err := os.ReadFile("/proc/sys/fs/pipe-max-size"); err == nil {
   178  			if i, err := strconv.Atoi(strings.TrimSpace(string(b))); err == nil {
   179  				limit = i + 1
   180  			}
   181  		}
   182  		t.Logf("using pipe write limit of %d", limit)
   183  	}
   184  
   185  	r, w, err := os.Pipe()
   186  	if err != nil {
   187  		t.Fatal(err)
   188  	}
   189  	defer r.Close()
   190  	defer w.Close()
   191  
   192  	// Close the read end of the pipe in a goroutine while we are
   193  	// writing to the write end, or vice-versa.
   194  	go func() {
   195  		// Give the main goroutine a chance to enter the Read or
   196  		// Write call. This is sloppy but the test will pass even
   197  		// if we close before the read/write.
   198  		time.Sleep(20 * time.Millisecond)
   199  
   200  		var err error
   201  		if read {
   202  			err = r.Close()
   203  		} else {
   204  			err = w.Close()
   205  		}
   206  		if err != nil {
   207  			t.Error(err)
   208  		}
   209  	}()
   210  
   211  	b := make([]byte, limit)
   212  	if read {
   213  		_, err = r.Read(b[:])
   214  	} else {
   215  		_, err = w.Write(b[:])
   216  	}
   217  	if err == nil {
   218  		t.Error("I/O on closed pipe unexpectedly succeeded")
   219  	} else if pe, ok := err.(*fs.PathError); !ok {
   220  		t.Errorf("I/O on closed pipe returned unexpected error type %T; expected fs.PathError", pe)
   221  	} else if pe.Err != fs.ErrClosed {
   222  		t.Errorf("got error %q but expected %q", pe.Err, fs.ErrClosed)
   223  	} else {
   224  		t.Logf("I/O returned expected error %q", err)
   225  	}
   226  }
   227  
   228  func TestClosedPipeRaceRead(t *testing.T) {
   229  	testClosedPipeRace(t, true)
   230  }
   231  
   232  func TestClosedPipeRaceWrite(t *testing.T) {
   233  	testClosedPipeRace(t, false)
   234  }
   235  
   236  // Issue 20915: Reading on nonblocking fd should not return "waiting
   237  // for unsupported file type." Currently it returns EAGAIN; it is
   238  // possible that in the future it will simply wait for data.
   239  func TestReadNonblockingFd(t *testing.T) {
   240  	switch runtime.GOOS {
   241  	case "windows":
   242  		t.Skip("Windows doesn't support SetNonblock")
   243  	}
   244  	if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
   245  		fd := syscallDescriptor(os.Stdin.Fd())
   246  		syscall.SetNonblock(fd, true)
   247  		defer syscall.SetNonblock(fd, false)
   248  		_, err := os.Stdin.Read(make([]byte, 1))
   249  		if err != nil {
   250  			if perr, ok := err.(*fs.PathError); !ok || perr.Err != syscall.EAGAIN {
   251  				t.Fatalf("read on nonblocking stdin got %q, should have gotten EAGAIN", err)
   252  			}
   253  		}
   254  		os.Exit(0)
   255  	}
   256  
   257  	testenv.MustHaveExec(t)
   258  	t.Parallel()
   259  
   260  	r, w, err := os.Pipe()
   261  	if err != nil {
   262  		t.Fatal(err)
   263  	}
   264  	defer r.Close()
   265  	defer w.Close()
   266  	cmd := testenv.Command(t, os.Args[0], "-test.run=^"+t.Name()+"$")
   267  	cmd.Env = append(cmd.Environ(), "GO_WANT_READ_NONBLOCKING_FD=1")
   268  	cmd.Stdin = r
   269  	output, err := cmd.CombinedOutput()
   270  	t.Logf("%s", output)
   271  	if err != nil {
   272  		t.Errorf("child process failed: %v", err)
   273  	}
   274  }
   275  
   276  func TestCloseWithBlockingReadByNewFile(t *testing.T) {
   277  	t.Parallel()
   278  
   279  	var p [2]syscallDescriptor
   280  	err := syscall.Pipe(p[:])
   281  	if err != nil {
   282  		t.Fatal(err)
   283  	}
   284  	// os.NewFile returns a blocking mode file.
   285  	testCloseWithBlockingRead(t, os.NewFile(uintptr(p[0]), "reader"), os.NewFile(uintptr(p[1]), "writer"))
   286  }
   287  
   288  func TestCloseWithBlockingReadByFd(t *testing.T) {
   289  	t.Parallel()
   290  
   291  	r, w, err := os.Pipe()
   292  	if err != nil {
   293  		t.Fatal(err)
   294  	}
   295  	// Calling Fd will put the file into blocking mode.
   296  	_ = r.Fd()
   297  	testCloseWithBlockingRead(t, r, w)
   298  }
   299  
   300  // Test that we don't let a blocking read prevent a close.
   301  func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
   302  	var (
   303  		enteringRead = make(chan struct{})
   304  		done         = make(chan struct{})
   305  	)
   306  	go func() {
   307  		var b [1]byte
   308  		close(enteringRead)
   309  		_, err := r.Read(b[:])
   310  		if err == nil {
   311  			t.Error("I/O on closed pipe unexpectedly succeeded")
   312  		}
   313  
   314  		if pe, ok := err.(*fs.PathError); ok {
   315  			err = pe.Err
   316  		}
   317  		if err != io.EOF && err != fs.ErrClosed {
   318  			t.Errorf("got %v, expected EOF or closed", err)
   319  		}
   320  		close(done)
   321  	}()
   322  
   323  	// Give the goroutine a chance to enter the Read
   324  	// or Write call. This is sloppy but the test will
   325  	// pass even if we close before the read/write.
   326  	<-enteringRead
   327  	time.Sleep(20 * time.Millisecond)
   328  
   329  	if err := r.Close(); err != nil {
   330  		t.Error(err)
   331  	}
   332  	// r.Close has completed, but since we assume r is in blocking mode that
   333  	// probably didn't unblock the call to r.Read. Close w to unblock it.
   334  	w.Close()
   335  	<-done
   336  }
   337  
   338  func TestPipeEOF(t *testing.T) {
   339  	t.Parallel()
   340  
   341  	r, w, err := os.Pipe()
   342  	if err != nil {
   343  		t.Fatal(err)
   344  	}
   345  
   346  	testPipeEOF(t, r, w)
   347  }
   348  
   349  // testPipeEOF tests that when the write side of a pipe or FIFO is closed,
   350  // a blocked Read call on the reader side returns io.EOF.
   351  //
   352  // This scenario previously failed to unblock the Read call on darwin.
   353  // (See https://go.dev/issue/24164.)
   354  func testPipeEOF(t *testing.T, r io.ReadCloser, w io.WriteCloser) {
   355  	// parkDelay is an arbitrary delay we wait for a pipe-reader goroutine to park
   356  	// before issuing the corresponding write. The test should pass no matter what
   357  	// delay we use, but with a longer delay is has a higher chance of detecting
   358  	// poller bugs.
   359  	parkDelay := 10 * time.Millisecond
   360  	if testing.Short() {
   361  		parkDelay = 100 * time.Microsecond
   362  	}
   363  	writerDone := make(chan struct{})
   364  	defer func() {
   365  		if err := r.Close(); err != nil {
   366  			t.Errorf("error closing reader: %v", err)
   367  		}
   368  		<-writerDone
   369  	}()
   370  
   371  	write := make(chan int, 1)
   372  	go func() {
   373  		defer close(writerDone)
   374  
   375  		for i := range write {
   376  			time.Sleep(parkDelay)
   377  			_, err := fmt.Fprintf(w, "line %d\n", i)
   378  			if err != nil {
   379  				t.Errorf("error writing to fifo: %v", err)
   380  				return
   381  			}
   382  		}
   383  
   384  		time.Sleep(parkDelay)
   385  		if err := w.Close(); err != nil {
   386  			t.Errorf("error closing writer: %v", err)
   387  		}
   388  	}()
   389  
   390  	rbuf := bufio.NewReader(r)
   391  	for i := 0; i < 3; i++ {
   392  		write <- i
   393  		b, err := rbuf.ReadBytes('\n')
   394  		if err != nil {
   395  			t.Fatal(err)
   396  		}
   397  		t.Logf("%s\n", bytes.TrimSpace(b))
   398  	}
   399  
   400  	close(write)
   401  	b, err := rbuf.ReadBytes('\n')
   402  	if err != io.EOF || len(b) != 0 {
   403  		t.Errorf(`ReadBytes: %q, %v; want "", io.EOF`, b, err)
   404  	}
   405  }
   406  
   407  // Issue 24481.
   408  func TestFdRace(t *testing.T) {
   409  	// This test starts 100 simultaneous goroutines, which could bury a more
   410  	// interesting stack if this or some other test happens to panic. It is also
   411  	// nearly instantaneous, so any latency benefit from running it in parallel
   412  	// would be minimal.
   413  
   414  	r, w, err := os.Pipe()
   415  	if err != nil {
   416  		t.Fatal(err)
   417  	}
   418  	defer r.Close()
   419  	defer w.Close()
   420  
   421  	var wg sync.WaitGroup
   422  	call := func() {
   423  		defer wg.Done()
   424  		w.Fd()
   425  	}
   426  
   427  	const tries = 100
   428  	for i := 0; i < tries; i++ {
   429  		wg.Add(1)
   430  		go call()
   431  	}
   432  	wg.Wait()
   433  }
   434  
   435  func TestFdReadRace(t *testing.T) {
   436  	t.Parallel()
   437  
   438  	r, w, err := os.Pipe()
   439  	if err != nil {
   440  		t.Fatal(err)
   441  	}
   442  	defer r.Close()
   443  	defer w.Close()
   444  
   445  	const count = 10
   446  
   447  	c := make(chan bool, 1)
   448  	var wg sync.WaitGroup
   449  	wg.Add(1)
   450  	go func() {
   451  		defer wg.Done()
   452  		var buf [count]byte
   453  		r.SetReadDeadline(time.Now().Add(time.Minute))
   454  		c <- true
   455  		if _, err := r.Read(buf[:]); os.IsTimeout(err) {
   456  			t.Error("read timed out")
   457  		}
   458  	}()
   459  
   460  	wg.Add(1)
   461  	go func() {
   462  		defer wg.Done()
   463  		<-c
   464  		// Give the other goroutine a chance to enter the Read.
   465  		// It doesn't matter if this occasionally fails, the test
   466  		// will still pass, it just won't test anything.
   467  		time.Sleep(10 * time.Millisecond)
   468  		r.Fd()
   469  
   470  		// The bug was that Fd would hang until Read timed out.
   471  		// If the bug is fixed, then writing to w and closing r here
   472  		// will cause the Read to exit before the timeout expires.
   473  		w.Write(make([]byte, count))
   474  		r.Close()
   475  	}()
   476  
   477  	wg.Wait()
   478  }
   479  

View as plain text