...

Source file src/golang.org/x/net/http2/pipe.go

Documentation: golang.org/x/net/http2

     1  // Copyright 2014 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http2
     6  
     7  import (
     8  	"errors"
     9  	"io"
    10  	"sync"
    11  )
    12  
    13  // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
    14  // io.Pipe except there are no PipeReader/PipeWriter halves, and the
    15  // underlying buffer is an interface. (io.Pipe is always unbuffered)
    16  type pipe struct {
    17  	mu       sync.Mutex
    18  	c        sync.Cond     // c.L lazily initialized to &p.mu
    19  	b        pipeBuffer    // nil when done reading
    20  	unread   int           // bytes unread when done
    21  	err      error         // read error once empty. non-nil means closed.
    22  	breakErr error         // immediate read error (caller doesn't see rest of b)
    23  	donec    chan struct{} // closed on error
    24  	readFn   func()        // optional code to run in Read before error
    25  }
    26  
    27  type pipeBuffer interface {
    28  	Len() int
    29  	io.Writer
    30  	io.Reader
    31  }
    32  
    33  // setBuffer initializes the pipe buffer.
    34  // It has no effect if the pipe is already closed.
    35  func (p *pipe) setBuffer(b pipeBuffer) {
    36  	p.mu.Lock()
    37  	defer p.mu.Unlock()
    38  	if p.err != nil || p.breakErr != nil {
    39  		return
    40  	}
    41  	p.b = b
    42  }
    43  
    44  func (p *pipe) Len() int {
    45  	p.mu.Lock()
    46  	defer p.mu.Unlock()
    47  	if p.b == nil {
    48  		return p.unread
    49  	}
    50  	return p.b.Len()
    51  }
    52  
    53  // Read waits until data is available and copies bytes
    54  // from the buffer into p.
    55  func (p *pipe) Read(d []byte) (n int, err error) {
    56  	p.mu.Lock()
    57  	defer p.mu.Unlock()
    58  	if p.c.L == nil {
    59  		p.c.L = &p.mu
    60  	}
    61  	for {
    62  		if p.breakErr != nil {
    63  			return 0, p.breakErr
    64  		}
    65  		if p.b != nil && p.b.Len() > 0 {
    66  			return p.b.Read(d)
    67  		}
    68  		if p.err != nil {
    69  			if p.readFn != nil {
    70  				p.readFn()     // e.g. copy trailers
    71  				p.readFn = nil // not sticky like p.err
    72  			}
    73  			p.b = nil
    74  			return 0, p.err
    75  		}
    76  		p.c.Wait()
    77  	}
    78  }
    79  
    80  var errClosedPipeWrite = errors.New("write on closed buffer")
    81  
    82  // Write copies bytes from p into the buffer and wakes a reader.
    83  // It is an error to write more data than the buffer can hold.
    84  func (p *pipe) Write(d []byte) (n int, err error) {
    85  	p.mu.Lock()
    86  	defer p.mu.Unlock()
    87  	if p.c.L == nil {
    88  		p.c.L = &p.mu
    89  	}
    90  	defer p.c.Signal()
    91  	if p.err != nil || p.breakErr != nil {
    92  		return 0, errClosedPipeWrite
    93  	}
    94  	return p.b.Write(d)
    95  }
    96  
    97  // CloseWithError causes the next Read (waking up a current blocked
    98  // Read if needed) to return the provided err after all data has been
    99  // read.
   100  //
   101  // The error must be non-nil.
   102  func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
   103  
   104  // BreakWithError causes the next Read (waking up a current blocked
   105  // Read if needed) to return the provided err immediately, without
   106  // waiting for unread data.
   107  func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
   108  
   109  // closeWithErrorAndCode is like CloseWithError but also sets some code to run
   110  // in the caller's goroutine before returning the error.
   111  func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
   112  
   113  func (p *pipe) closeWithError(dst *error, err error, fn func()) {
   114  	if err == nil {
   115  		panic("err must be non-nil")
   116  	}
   117  	p.mu.Lock()
   118  	defer p.mu.Unlock()
   119  	if p.c.L == nil {
   120  		p.c.L = &p.mu
   121  	}
   122  	defer p.c.Signal()
   123  	if *dst != nil {
   124  		// Already been done.
   125  		return
   126  	}
   127  	p.readFn = fn
   128  	if dst == &p.breakErr {
   129  		if p.b != nil {
   130  			p.unread += p.b.Len()
   131  		}
   132  		p.b = nil
   133  	}
   134  	*dst = err
   135  	p.closeDoneLocked()
   136  }
   137  
   138  // requires p.mu be held.
   139  func (p *pipe) closeDoneLocked() {
   140  	if p.donec == nil {
   141  		return
   142  	}
   143  	// Close if unclosed. This isn't racy since we always
   144  	// hold p.mu while closing.
   145  	select {
   146  	case <-p.donec:
   147  	default:
   148  		close(p.donec)
   149  	}
   150  }
   151  
   152  // Err returns the error (if any) first set by BreakWithError or CloseWithError.
   153  func (p *pipe) Err() error {
   154  	p.mu.Lock()
   155  	defer p.mu.Unlock()
   156  	if p.breakErr != nil {
   157  		return p.breakErr
   158  	}
   159  	return p.err
   160  }
   161  
   162  // Done returns a channel which is closed if and when this pipe is closed
   163  // with CloseWithError.
   164  func (p *pipe) Done() <-chan struct{} {
   165  	p.mu.Lock()
   166  	defer p.mu.Unlock()
   167  	if p.donec == nil {
   168  		p.donec = make(chan struct{})
   169  		if p.err != nil || p.breakErr != nil {
   170  			// Already hit an error.
   171  			p.closeDoneLocked()
   172  		}
   173  	}
   174  	return p.donec
   175  }
   176  

View as plain text