1 // Copyright 2018 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package poll 6 7 import ( 8 "internal/syscall/unix" 9 "runtime" 10 "sync" 11 "syscall" 12 "unsafe" 13 ) 14 15 const ( 16 // spliceNonblock doesn't make the splice itself necessarily nonblocking 17 // (because the actual file descriptors that are spliced from/to may block 18 // unless they have the O_NONBLOCK flag set), but it makes the splice pipe 19 // operations nonblocking. 20 spliceNonblock = 0x2 21 22 // maxSpliceSize is the maximum amount of data Splice asks 23 // the kernel to move in a single call to splice(2). 24 // We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size, 25 // which is determined by /proc/sys/fs/pipe-max-size. 26 maxSpliceSize = 1 << 20 27 ) 28 29 // Splice transfers at most remain bytes of data from src to dst, using the 30 // splice system call to minimize copies of data from and to userspace. 31 // 32 // Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer. 33 // src and dst must both be stream-oriented sockets. 34 // 35 // If err != nil, sc is the system call which caused the error. 36 func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { 37 p, sc, err := getPipe() 38 if err != nil { 39 return 0, false, sc, err 40 } 41 defer putPipe(p) 42 var inPipe, n int 43 for err == nil && remain > 0 { 44 max := maxSpliceSize 45 if int64(max) > remain { 46 max = int(remain) 47 } 48 inPipe, err = spliceDrain(p.wfd, src, max) 49 // The operation is considered handled if splice returns no 50 // error, or an error other than EINVAL. An EINVAL means the 51 // kernel does not support splice for the socket type of src. 52 // The failed syscall does not consume any data so it is safe 53 // to fall back to a generic copy. 54 // 55 // spliceDrain should never return EAGAIN, so if err != nil, 56 // Splice cannot continue. 57 // 58 // If inPipe == 0 && err == nil, src is at EOF, and the 59 // transfer is complete. 60 handled = handled || (err != syscall.EINVAL) 61 if err != nil || inPipe == 0 { 62 break 63 } 64 p.data += inPipe 65 66 n, err = splicePump(dst, p.rfd, inPipe) 67 if n > 0 { 68 written += int64(n) 69 remain -= int64(n) 70 p.data -= n 71 } 72 } 73 if err != nil { 74 return written, handled, "splice", err 75 } 76 return written, true, "", nil 77 } 78 79 // spliceDrain moves data from a socket to a pipe. 80 // 81 // Invariant: when entering spliceDrain, the pipe is empty. It is either in its 82 // initial state, or splicePump has emptied it previously. 83 // 84 // Given this, spliceDrain can reasonably assume that the pipe is ready for 85 // writing, so if splice returns EAGAIN, it must be because the socket is not 86 // ready for reading. 87 // 88 // If spliceDrain returns (0, nil), src is at EOF. 89 func spliceDrain(pipefd int, sock *FD, max int) (int, error) { 90 if err := sock.readLock(); err != nil { 91 return 0, err 92 } 93 defer sock.readUnlock() 94 if err := sock.pd.prepareRead(sock.isFile); err != nil { 95 return 0, err 96 } 97 for { 98 // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here, 99 // because it could return EAGAIN ceaselessly when the write end of the pipe is full, 100 // but this shouldn't be a concern here, since the pipe buffer must be sufficient for 101 // this data transmission on the basis of the workflow in Splice. 102 n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock) 103 if err == syscall.EINTR { 104 continue 105 } 106 if err != syscall.EAGAIN { 107 return n, err 108 } 109 if sock.pd.pollable() { 110 if err := sock.pd.waitRead(sock.isFile); err != nil { 111 return n, err 112 } 113 } 114 } 115 } 116 117 // splicePump moves all the buffered data from a pipe to a socket. 118 // 119 // Invariant: when entering splicePump, there are exactly inPipe 120 // bytes of data in the pipe, from a previous call to spliceDrain. 121 // 122 // By analogy to the condition from spliceDrain, splicePump 123 // only needs to poll the socket for readiness, if splice returns 124 // EAGAIN. 125 // 126 // If splicePump cannot move all the data in a single call to 127 // splice(2), it loops over the buffered data until it has written 128 // all of it to the socket. This behavior is similar to the Write 129 // step of an io.Copy in userspace. 130 func splicePump(sock *FD, pipefd int, inPipe int) (int, error) { 131 if err := sock.writeLock(); err != nil { 132 return 0, err 133 } 134 defer sock.writeUnlock() 135 if err := sock.pd.prepareWrite(sock.isFile); err != nil { 136 return 0, err 137 } 138 written := 0 139 for inPipe > 0 { 140 // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here, 141 // because it could return EAGAIN ceaselessly when the read end of the pipe is empty, 142 // but this shouldn't be a concern here, since the pipe buffer must contain inPipe size of 143 // data on the basis of the workflow in Splice. 144 n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock) 145 if err == syscall.EINTR { 146 continue 147 } 148 // Here, the condition n == 0 && err == nil should never be 149 // observed, since Splice controls the write side of the pipe. 150 if n > 0 { 151 inPipe -= n 152 written += n 153 continue 154 } 155 if err != syscall.EAGAIN { 156 return written, err 157 } 158 if sock.pd.pollable() { 159 if err := sock.pd.waitWrite(sock.isFile); err != nil { 160 return written, err 161 } 162 } 163 } 164 return written, nil 165 } 166 167 // splice wraps the splice system call. Since the current implementation 168 // only uses splice on sockets and pipes, the offset arguments are unused. 169 // splice returns int instead of int64, because callers never ask it to 170 // move more data in a single call than can fit in an int32. 171 func splice(out int, in int, max int, flags int) (int, error) { 172 n, err := syscall.Splice(in, nil, out, nil, max, flags) 173 return int(n), err 174 } 175 176 type splicePipeFields struct { 177 rfd int 178 wfd int 179 data int 180 } 181 182 type splicePipe struct { 183 splicePipeFields 184 185 // We want to use a finalizer, so ensure that the size is 186 // large enough to not use the tiny allocator. 187 _ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte 188 } 189 190 // splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers. 191 // The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up 192 // a finalizer for each pipe to close its file descriptors before the actual GC. 193 var splicePipePool = sync.Pool{New: newPoolPipe} 194 195 func newPoolPipe() any { 196 // Discard the error which occurred during the creation of pipe buffer, 197 // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback. 198 p := newPipe() 199 if p == nil { 200 return nil 201 } 202 runtime.SetFinalizer(p, destroyPipe) 203 return p 204 } 205 206 // getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache. 207 // 208 // Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error 209 // and system call name splice in a string as the indication. 210 func getPipe() (*splicePipe, string, error) { 211 v := splicePipePool.Get() 212 if v == nil { 213 return nil, "splice", syscall.EINVAL 214 } 215 return v.(*splicePipe), "", nil 216 } 217 218 func putPipe(p *splicePipe) { 219 // If there is still data left in the pipe, 220 // then close and discard it instead of putting it back into the pool. 221 if p.data != 0 { 222 runtime.SetFinalizer(p, nil) 223 destroyPipe(p) 224 return 225 } 226 splicePipePool.Put(p) 227 } 228 229 // newPipe sets up a pipe for a splice operation. 230 func newPipe() *splicePipe { 231 var fds [2]int 232 if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil { 233 return nil 234 } 235 236 // Splice will loop writing maxSpliceSize bytes from the source to the pipe, 237 // and then write those bytes from the pipe to the destination. 238 // Set the pipe buffer size to maxSpliceSize to optimize that. 239 // Ignore errors here, as a smaller buffer size will work, 240 // although it will require more system calls. 241 unix.Fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize) 242 243 return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}} 244 } 245 246 // destroyPipe destroys a pipe. 247 func destroyPipe(p *splicePipe) { 248 CloseFunc(p.rfd) 249 CloseFunc(p.wfd) 250 } 251