...
1
2
3
4
5 package http2
6
7 import (
8 "errors"
9 "io"
10 "sync"
11 )
12
13
14
15
16 type pipe struct {
17 mu sync.Mutex
18 c sync.Cond
19 b pipeBuffer
20 unread int
21 err error
22 breakErr error
23 donec chan struct{}
24 readFn func()
25 }
26
27 type pipeBuffer interface {
28 Len() int
29 io.Writer
30 io.Reader
31 }
32
33
34
35 func (p *pipe) setBuffer(b pipeBuffer) {
36 p.mu.Lock()
37 defer p.mu.Unlock()
38 if p.err != nil || p.breakErr != nil {
39 return
40 }
41 p.b = b
42 }
43
44 func (p *pipe) Len() int {
45 p.mu.Lock()
46 defer p.mu.Unlock()
47 if p.b == nil {
48 return p.unread
49 }
50 return p.b.Len()
51 }
52
53
54
55 func (p *pipe) Read(d []byte) (n int, err error) {
56 p.mu.Lock()
57 defer p.mu.Unlock()
58 if p.c.L == nil {
59 p.c.L = &p.mu
60 }
61 for {
62 if p.breakErr != nil {
63 return 0, p.breakErr
64 }
65 if p.b != nil && p.b.Len() > 0 {
66 return p.b.Read(d)
67 }
68 if p.err != nil {
69 if p.readFn != nil {
70 p.readFn()
71 p.readFn = nil
72 }
73 p.b = nil
74 return 0, p.err
75 }
76 p.c.Wait()
77 }
78 }
79
80 var errClosedPipeWrite = errors.New("write on closed buffer")
81
82
83
84 func (p *pipe) Write(d []byte) (n int, err error) {
85 p.mu.Lock()
86 defer p.mu.Unlock()
87 if p.c.L == nil {
88 p.c.L = &p.mu
89 }
90 defer p.c.Signal()
91 if p.err != nil || p.breakErr != nil {
92 return 0, errClosedPipeWrite
93 }
94 return p.b.Write(d)
95 }
96
97
98
99
100
101
102 func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
103
104
105
106
107 func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
108
109
110
111 func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
112
113 func (p *pipe) closeWithError(dst *error, err error, fn func()) {
114 if err == nil {
115 panic("err must be non-nil")
116 }
117 p.mu.Lock()
118 defer p.mu.Unlock()
119 if p.c.L == nil {
120 p.c.L = &p.mu
121 }
122 defer p.c.Signal()
123 if *dst != nil {
124
125 return
126 }
127 p.readFn = fn
128 if dst == &p.breakErr {
129 if p.b != nil {
130 p.unread += p.b.Len()
131 }
132 p.b = nil
133 }
134 *dst = err
135 p.closeDoneLocked()
136 }
137
138
139 func (p *pipe) closeDoneLocked() {
140 if p.donec == nil {
141 return
142 }
143
144
145 select {
146 case <-p.donec:
147 default:
148 close(p.donec)
149 }
150 }
151
152
153 func (p *pipe) Err() error {
154 p.mu.Lock()
155 defer p.mu.Unlock()
156 if p.breakErr != nil {
157 return p.breakErr
158 }
159 return p.err
160 }
161
162
163
164 func (p *pipe) Done() <-chan struct{} {
165 p.mu.Lock()
166 defer p.mu.Unlock()
167 if p.donec == nil {
168 p.donec = make(chan struct{})
169 if p.err != nil || p.breakErr != nil {
170
171 p.closeDoneLocked()
172 }
173 }
174 return p.donec
175 }
176
View as plain text