...

Source file src/runtime/chan.go

Documentation: runtime

     1  // Copyright 2014 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  // This file contains the implementation of Go channels.
     8  
     9  // Invariants:
    10  //  At least one of c.sendq and c.recvq is empty,
    11  //  except for the case of an unbuffered channel with a single goroutine
    12  //  blocked on it for both sending and receiving using a select statement,
    13  //  in which case the length of c.sendq and c.recvq is limited only by the
    14  //  size of the select statement.
    15  //
    16  // For buffered channels, also:
    17  //  c.qcount > 0 implies that c.recvq is empty.
    18  //  c.qcount < c.dataqsiz implies that c.sendq is empty.
    19  
    20  import (
    21  	"internal/abi"
    22  	"runtime/internal/atomic"
    23  	"runtime/internal/math"
    24  	"unsafe"
    25  )
    26  
    27  const (
    28  	maxAlign  = 8
    29  	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    30  	debugChan = false
    31  )
    32  
    33  type hchan struct {
    34  	qcount   uint           // total data in the queue
    35  	dataqsiz uint           // size of the circular queue
    36  	buf      unsafe.Pointer // points to an array of dataqsiz elements
    37  	elemsize uint16
    38  	closed   uint32
    39  	elemtype *_type // element type
    40  	sendx    uint   // send index
    41  	recvx    uint   // receive index
    42  	recvq    waitq  // list of recv waiters
    43  	sendq    waitq  // list of send waiters
    44  
    45  	// lock protects all fields in hchan, as well as several
    46  	// fields in sudogs blocked on this channel.
    47  	//
    48  	// Do not change another G's status while holding this lock
    49  	// (in particular, do not ready a G), as this can deadlock
    50  	// with stack shrinking.
    51  	lock mutex
    52  }
    53  
    54  type waitq struct {
    55  	first *sudog
    56  	last  *sudog
    57  }
    58  
    59  //go:linkname reflect_makechan reflect.makechan
    60  func reflect_makechan(t *chantype, size int) *hchan {
    61  	return makechan(t, size)
    62  }
    63  
    64  func makechan64(t *chantype, size int64) *hchan {
    65  	if int64(int(size)) != size {
    66  		panic(plainError("makechan: size out of range"))
    67  	}
    68  
    69  	return makechan(t, int(size))
    70  }
    71  
    72  func makechan(t *chantype, size int) *hchan {
    73  	elem := t.Elem
    74  
    75  	// compiler checks this but be safe.
    76  	if elem.Size_ >= 1<<16 {
    77  		throw("makechan: invalid channel element type")
    78  	}
    79  	if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
    80  		throw("makechan: bad alignment")
    81  	}
    82  
    83  	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
    84  	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    85  		panic(plainError("makechan: size out of range"))
    86  	}
    87  
    88  	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    89  	// buf points into the same allocation, elemtype is persistent.
    90  	// SudoG's are referenced from their owning thread so they can't be collected.
    91  	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    92  	var c *hchan
    93  	switch {
    94  	case mem == 0:
    95  		// Queue or element size is zero.
    96  		c = (*hchan)(mallocgc(hchanSize, nil, true))
    97  		// Race detector uses this location for synchronization.
    98  		c.buf = c.raceaddr()
    99  	case elem.PtrBytes == 0:
   100  		// Elements do not contain pointers.
   101  		// Allocate hchan and buf in one call.
   102  		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
   103  		c.buf = add(unsafe.Pointer(c), hchanSize)
   104  	default:
   105  		// Elements contain pointers.
   106  		c = new(hchan)
   107  		c.buf = mallocgc(mem, elem, true)
   108  	}
   109  
   110  	c.elemsize = uint16(elem.Size_)
   111  	c.elemtype = elem
   112  	c.dataqsiz = uint(size)
   113  	lockInit(&c.lock, lockRankHchan)
   114  
   115  	if debugChan {
   116  		print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
   117  	}
   118  	return c
   119  }
   120  
   121  // chanbuf(c, i) is pointer to the i'th slot in the buffer.
   122  func chanbuf(c *hchan, i uint) unsafe.Pointer {
   123  	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
   124  }
   125  
   126  // full reports whether a send on c would block (that is, the channel is full).
   127  // It uses a single word-sized read of mutable state, so although
   128  // the answer is instantaneously true, the correct answer may have changed
   129  // by the time the calling function receives the return value.
   130  func full(c *hchan) bool {
   131  	// c.dataqsiz is immutable (never written after the channel is created)
   132  	// so it is safe to read at any time during channel operation.
   133  	if c.dataqsiz == 0 {
   134  		// Assumes that a pointer read is relaxed-atomic.
   135  		return c.recvq.first == nil
   136  	}
   137  	// Assumes that a uint read is relaxed-atomic.
   138  	return c.qcount == c.dataqsiz
   139  }
   140  
   141  // entry point for c <- x from compiled code.
   142  //
   143  //go:nosplit
   144  func chansend1(c *hchan, elem unsafe.Pointer) {
   145  	chansend(c, elem, true, getcallerpc())
   146  }
   147  
   148  /*
   149   * generic single channel send/recv
   150   * If block is not nil,
   151   * then the protocol will not
   152   * sleep but return if it could
   153   * not complete.
   154   *
   155   * sleep can wake up with g.param == nil
   156   * when a channel involved in the sleep has
   157   * been closed.  it is easiest to loop and re-run
   158   * the operation; we'll see that it's now closed.
   159   */
   160  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   161  	if c == nil {
   162  		if !block {
   163  			return false
   164  		}
   165  		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
   166  		throw("unreachable")
   167  	}
   168  
   169  	if debugChan {
   170  		print("chansend: chan=", c, "\n")
   171  	}
   172  
   173  	if raceenabled {
   174  		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
   175  	}
   176  
   177  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   178  	//
   179  	// After observing that the channel is not closed, we observe that the channel is
   180  	// not ready for sending. Each of these observations is a single word-sized read
   181  	// (first c.closed and second full()).
   182  	// Because a closed channel cannot transition from 'ready for sending' to
   183  	// 'not ready for sending', even if the channel is closed between the two observations,
   184  	// they imply a moment between the two when the channel was both not yet closed
   185  	// and not ready for sending. We behave as if we observed the channel at that moment,
   186  	// and report that the send cannot proceed.
   187  	//
   188  	// It is okay if the reads are reordered here: if we observe that the channel is not
   189  	// ready for sending and then observe that it is not closed, that implies that the
   190  	// channel wasn't closed during the first observation. However, nothing here
   191  	// guarantees forward progress. We rely on the side effects of lock release in
   192  	// chanrecv() and closechan() to update this thread's view of c.closed and full().
   193  	if !block && c.closed == 0 && full(c) {
   194  		return false
   195  	}
   196  
   197  	var t0 int64
   198  	if blockprofilerate > 0 {
   199  		t0 = cputicks()
   200  	}
   201  
   202  	lock(&c.lock)
   203  
   204  	if c.closed != 0 {
   205  		unlock(&c.lock)
   206  		panic(plainError("send on closed channel"))
   207  	}
   208  
   209  	if sg := c.recvq.dequeue(); sg != nil {
   210  		// Found a waiting receiver. We pass the value we want to send
   211  		// directly to the receiver, bypassing the channel buffer (if any).
   212  		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
   213  		return true
   214  	}
   215  
   216  	if c.qcount < c.dataqsiz {
   217  		// Space is available in the channel buffer. Enqueue the element to send.
   218  		qp := chanbuf(c, c.sendx)
   219  		if raceenabled {
   220  			racenotify(c, c.sendx, nil)
   221  		}
   222  		typedmemmove(c.elemtype, qp, ep)
   223  		c.sendx++
   224  		if c.sendx == c.dataqsiz {
   225  			c.sendx = 0
   226  		}
   227  		c.qcount++
   228  		unlock(&c.lock)
   229  		return true
   230  	}
   231  
   232  	if !block {
   233  		unlock(&c.lock)
   234  		return false
   235  	}
   236  
   237  	// Block on the channel. Some receiver will complete our operation for us.
   238  	gp := getg()
   239  	mysg := acquireSudog()
   240  	mysg.releasetime = 0
   241  	if t0 != 0 {
   242  		mysg.releasetime = -1
   243  	}
   244  	// No stack splits between assigning elem and enqueuing mysg
   245  	// on gp.waiting where copystack can find it.
   246  	mysg.elem = ep
   247  	mysg.waitlink = nil
   248  	mysg.g = gp
   249  	mysg.isSelect = false
   250  	mysg.c = c
   251  	gp.waiting = mysg
   252  	gp.param = nil
   253  	c.sendq.enqueue(mysg)
   254  	// Signal to anyone trying to shrink our stack that we're about
   255  	// to park on a channel. The window between when this G's status
   256  	// changes and when we set gp.activeStackChans is not safe for
   257  	// stack shrinking.
   258  	gp.parkingOnChan.Store(true)
   259  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
   260  	// Ensure the value being sent is kept alive until the
   261  	// receiver copies it out. The sudog has a pointer to the
   262  	// stack object, but sudogs aren't considered as roots of the
   263  	// stack tracer.
   264  	KeepAlive(ep)
   265  
   266  	// someone woke us up.
   267  	if mysg != gp.waiting {
   268  		throw("G waiting list is corrupted")
   269  	}
   270  	gp.waiting = nil
   271  	gp.activeStackChans = false
   272  	closed := !mysg.success
   273  	gp.param = nil
   274  	if mysg.releasetime > 0 {
   275  		blockevent(mysg.releasetime-t0, 2)
   276  	}
   277  	mysg.c = nil
   278  	releaseSudog(mysg)
   279  	if closed {
   280  		if c.closed == 0 {
   281  			throw("chansend: spurious wakeup")
   282  		}
   283  		panic(plainError("send on closed channel"))
   284  	}
   285  	return true
   286  }
   287  
   288  // send processes a send operation on an empty channel c.
   289  // The value ep sent by the sender is copied to the receiver sg.
   290  // The receiver is then woken up to go on its merry way.
   291  // Channel c must be empty and locked.  send unlocks c with unlockf.
   292  // sg must already be dequeued from c.
   293  // ep must be non-nil and point to the heap or the caller's stack.
   294  func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   295  	if raceenabled {
   296  		if c.dataqsiz == 0 {
   297  			racesync(c, sg)
   298  		} else {
   299  			// Pretend we go through the buffer, even though
   300  			// we copy directly. Note that we need to increment
   301  			// the head/tail locations only when raceenabled.
   302  			racenotify(c, c.recvx, nil)
   303  			racenotify(c, c.recvx, sg)
   304  			c.recvx++
   305  			if c.recvx == c.dataqsiz {
   306  				c.recvx = 0
   307  			}
   308  			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   309  		}
   310  	}
   311  	if sg.elem != nil {
   312  		sendDirect(c.elemtype, sg, ep)
   313  		sg.elem = nil
   314  	}
   315  	gp := sg.g
   316  	unlockf()
   317  	gp.param = unsafe.Pointer(sg)
   318  	sg.success = true
   319  	if sg.releasetime != 0 {
   320  		sg.releasetime = cputicks()
   321  	}
   322  	goready(gp, skip+1)
   323  }
   324  
   325  // Sends and receives on unbuffered or empty-buffered channels are the
   326  // only operations where one running goroutine writes to the stack of
   327  // another running goroutine. The GC assumes that stack writes only
   328  // happen when the goroutine is running and are only done by that
   329  // goroutine. Using a write barrier is sufficient to make up for
   330  // violating that assumption, but the write barrier has to work.
   331  // typedmemmove will call bulkBarrierPreWrite, but the target bytes
   332  // are not in the heap, so that will not help. We arrange to call
   333  // memmove and typeBitsBulkBarrier instead.
   334  
   335  func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
   336  	// src is on our stack, dst is a slot on another stack.
   337  
   338  	// Once we read sg.elem out of sg, it will no longer
   339  	// be updated if the destination's stack gets copied (shrunk).
   340  	// So make sure that no preemption points can happen between read & use.
   341  	dst := sg.elem
   342  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   343  	// No need for cgo write barrier checks because dst is always
   344  	// Go memory.
   345  	memmove(dst, src, t.Size_)
   346  }
   347  
   348  func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
   349  	// dst is on our stack or the heap, src is on another stack.
   350  	// The channel is locked, so src will not move during this
   351  	// operation.
   352  	src := sg.elem
   353  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
   354  	memmove(dst, src, t.Size_)
   355  }
   356  
   357  func closechan(c *hchan) {
   358  	if c == nil {
   359  		panic(plainError("close of nil channel"))
   360  	}
   361  
   362  	lock(&c.lock)
   363  	if c.closed != 0 {
   364  		unlock(&c.lock)
   365  		panic(plainError("close of closed channel"))
   366  	}
   367  
   368  	if raceenabled {
   369  		callerpc := getcallerpc()
   370  		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
   371  		racerelease(c.raceaddr())
   372  	}
   373  
   374  	c.closed = 1
   375  
   376  	var glist gList
   377  
   378  	// release all readers
   379  	for {
   380  		sg := c.recvq.dequeue()
   381  		if sg == nil {
   382  			break
   383  		}
   384  		if sg.elem != nil {
   385  			typedmemclr(c.elemtype, sg.elem)
   386  			sg.elem = nil
   387  		}
   388  		if sg.releasetime != 0 {
   389  			sg.releasetime = cputicks()
   390  		}
   391  		gp := sg.g
   392  		gp.param = unsafe.Pointer(sg)
   393  		sg.success = false
   394  		if raceenabled {
   395  			raceacquireg(gp, c.raceaddr())
   396  		}
   397  		glist.push(gp)
   398  	}
   399  
   400  	// release all writers (they will panic)
   401  	for {
   402  		sg := c.sendq.dequeue()
   403  		if sg == nil {
   404  			break
   405  		}
   406  		sg.elem = nil
   407  		if sg.releasetime != 0 {
   408  			sg.releasetime = cputicks()
   409  		}
   410  		gp := sg.g
   411  		gp.param = unsafe.Pointer(sg)
   412  		sg.success = false
   413  		if raceenabled {
   414  			raceacquireg(gp, c.raceaddr())
   415  		}
   416  		glist.push(gp)
   417  	}
   418  	unlock(&c.lock)
   419  
   420  	// Ready all Gs now that we've dropped the channel lock.
   421  	for !glist.empty() {
   422  		gp := glist.pop()
   423  		gp.schedlink = 0
   424  		goready(gp, 3)
   425  	}
   426  }
   427  
   428  // empty reports whether a read from c would block (that is, the channel is
   429  // empty).  It uses a single atomic read of mutable state.
   430  func empty(c *hchan) bool {
   431  	// c.dataqsiz is immutable.
   432  	if c.dataqsiz == 0 {
   433  		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
   434  	}
   435  	return atomic.Loaduint(&c.qcount) == 0
   436  }
   437  
   438  // entry points for <- c from compiled code.
   439  //
   440  //go:nosplit
   441  func chanrecv1(c *hchan, elem unsafe.Pointer) {
   442  	chanrecv(c, elem, true)
   443  }
   444  
   445  //go:nosplit
   446  func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
   447  	_, received = chanrecv(c, elem, true)
   448  	return
   449  }
   450  
   451  // chanrecv receives on channel c and writes the received data to ep.
   452  // ep may be nil, in which case received data is ignored.
   453  // If block == false and no elements are available, returns (false, false).
   454  // Otherwise, if c is closed, zeros *ep and returns (true, false).
   455  // Otherwise, fills in *ep with an element and returns (true, true).
   456  // A non-nil ep must point to the heap or the caller's stack.
   457  func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   458  	// raceenabled: don't need to check ep, as it is always on the stack
   459  	// or is new memory allocated by reflect.
   460  
   461  	if debugChan {
   462  		print("chanrecv: chan=", c, "\n")
   463  	}
   464  
   465  	if c == nil {
   466  		if !block {
   467  			return
   468  		}
   469  		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
   470  		throw("unreachable")
   471  	}
   472  
   473  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   474  	if !block && empty(c) {
   475  		// After observing that the channel is not ready for receiving, we observe whether the
   476  		// channel is closed.
   477  		//
   478  		// Reordering of these checks could lead to incorrect behavior when racing with a close.
   479  		// For example, if the channel was open and not empty, was closed, and then drained,
   480  		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
   481  		// we use atomic loads for both checks, and rely on emptying and closing to happen in
   482  		// separate critical sections under the same lock.  This assumption fails when closing
   483  		// an unbuffered channel with a blocked send, but that is an error condition anyway.
   484  		if atomic.Load(&c.closed) == 0 {
   485  			// Because a channel cannot be reopened, the later observation of the channel
   486  			// being not closed implies that it was also not closed at the moment of the
   487  			// first observation. We behave as if we observed the channel at that moment
   488  			// and report that the receive cannot proceed.
   489  			return
   490  		}
   491  		// The channel is irreversibly closed. Re-check whether the channel has any pending data
   492  		// to receive, which could have arrived between the empty and closed checks above.
   493  		// Sequential consistency is also required here, when racing with such a send.
   494  		if empty(c) {
   495  			// The channel is irreversibly closed and empty.
   496  			if raceenabled {
   497  				raceacquire(c.raceaddr())
   498  			}
   499  			if ep != nil {
   500  				typedmemclr(c.elemtype, ep)
   501  			}
   502  			return true, false
   503  		}
   504  	}
   505  
   506  	var t0 int64
   507  	if blockprofilerate > 0 {
   508  		t0 = cputicks()
   509  	}
   510  
   511  	lock(&c.lock)
   512  
   513  	if c.closed != 0 {
   514  		if c.qcount == 0 {
   515  			if raceenabled {
   516  				raceacquire(c.raceaddr())
   517  			}
   518  			unlock(&c.lock)
   519  			if ep != nil {
   520  				typedmemclr(c.elemtype, ep)
   521  			}
   522  			return true, false
   523  		}
   524  		// The channel has been closed, but the channel's buffer have data.
   525  	} else {
   526  		// Just found waiting sender with not closed.
   527  		if sg := c.sendq.dequeue(); sg != nil {
   528  			// Found a waiting sender. If buffer is size 0, receive value
   529  			// directly from sender. Otherwise, receive from head of queue
   530  			// and add sender's value to the tail of the queue (both map to
   531  			// the same buffer slot because the queue is full).
   532  			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
   533  			return true, true
   534  		}
   535  	}
   536  
   537  	if c.qcount > 0 {
   538  		// Receive directly from queue
   539  		qp := chanbuf(c, c.recvx)
   540  		if raceenabled {
   541  			racenotify(c, c.recvx, nil)
   542  		}
   543  		if ep != nil {
   544  			typedmemmove(c.elemtype, ep, qp)
   545  		}
   546  		typedmemclr(c.elemtype, qp)
   547  		c.recvx++
   548  		if c.recvx == c.dataqsiz {
   549  			c.recvx = 0
   550  		}
   551  		c.qcount--
   552  		unlock(&c.lock)
   553  		return true, true
   554  	}
   555  
   556  	if !block {
   557  		unlock(&c.lock)
   558  		return false, false
   559  	}
   560  
   561  	// no sender available: block on this channel.
   562  	gp := getg()
   563  	mysg := acquireSudog()
   564  	mysg.releasetime = 0
   565  	if t0 != 0 {
   566  		mysg.releasetime = -1
   567  	}
   568  	// No stack splits between assigning elem and enqueuing mysg
   569  	// on gp.waiting where copystack can find it.
   570  	mysg.elem = ep
   571  	mysg.waitlink = nil
   572  	gp.waiting = mysg
   573  	mysg.g = gp
   574  	mysg.isSelect = false
   575  	mysg.c = c
   576  	gp.param = nil
   577  	c.recvq.enqueue(mysg)
   578  	// Signal to anyone trying to shrink our stack that we're about
   579  	// to park on a channel. The window between when this G's status
   580  	// changes and when we set gp.activeStackChans is not safe for
   581  	// stack shrinking.
   582  	gp.parkingOnChan.Store(true)
   583  	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
   584  
   585  	// someone woke us up
   586  	if mysg != gp.waiting {
   587  		throw("G waiting list is corrupted")
   588  	}
   589  	gp.waiting = nil
   590  	gp.activeStackChans = false
   591  	if mysg.releasetime > 0 {
   592  		blockevent(mysg.releasetime-t0, 2)
   593  	}
   594  	success := mysg.success
   595  	gp.param = nil
   596  	mysg.c = nil
   597  	releaseSudog(mysg)
   598  	return true, success
   599  }
   600  
   601  // recv processes a receive operation on a full channel c.
   602  // There are 2 parts:
   603  //  1. The value sent by the sender sg is put into the channel
   604  //     and the sender is woken up to go on its merry way.
   605  //  2. The value received by the receiver (the current G) is
   606  //     written to ep.
   607  //
   608  // For synchronous channels, both values are the same.
   609  // For asynchronous channels, the receiver gets its data from
   610  // the channel buffer and the sender's data is put in the
   611  // channel buffer.
   612  // Channel c must be full and locked. recv unlocks c with unlockf.
   613  // sg must already be dequeued from c.
   614  // A non-nil ep must point to the heap or the caller's stack.
   615  func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   616  	if c.dataqsiz == 0 {
   617  		if raceenabled {
   618  			racesync(c, sg)
   619  		}
   620  		if ep != nil {
   621  			// copy data from sender
   622  			recvDirect(c.elemtype, sg, ep)
   623  		}
   624  	} else {
   625  		// Queue is full. Take the item at the
   626  		// head of the queue. Make the sender enqueue
   627  		// its item at the tail of the queue. Since the
   628  		// queue is full, those are both the same slot.
   629  		qp := chanbuf(c, c.recvx)
   630  		if raceenabled {
   631  			racenotify(c, c.recvx, nil)
   632  			racenotify(c, c.recvx, sg)
   633  		}
   634  		// copy data from queue to receiver
   635  		if ep != nil {
   636  			typedmemmove(c.elemtype, ep, qp)
   637  		}
   638  		// copy data from sender to queue
   639  		typedmemmove(c.elemtype, qp, sg.elem)
   640  		c.recvx++
   641  		if c.recvx == c.dataqsiz {
   642  			c.recvx = 0
   643  		}
   644  		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   645  	}
   646  	sg.elem = nil
   647  	gp := sg.g
   648  	unlockf()
   649  	gp.param = unsafe.Pointer(sg)
   650  	sg.success = true
   651  	if sg.releasetime != 0 {
   652  		sg.releasetime = cputicks()
   653  	}
   654  	goready(gp, skip+1)
   655  }
   656  
   657  func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
   658  	// There are unlocked sudogs that point into gp's stack. Stack
   659  	// copying must lock the channels of those sudogs.
   660  	// Set activeStackChans here instead of before we try parking
   661  	// because we could self-deadlock in stack growth on the
   662  	// channel lock.
   663  	gp.activeStackChans = true
   664  	// Mark that it's safe for stack shrinking to occur now,
   665  	// because any thread acquiring this G's stack for shrinking
   666  	// is guaranteed to observe activeStackChans after this store.
   667  	gp.parkingOnChan.Store(false)
   668  	// Make sure we unlock after setting activeStackChans and
   669  	// unsetting parkingOnChan. The moment we unlock chanLock
   670  	// we risk gp getting readied by a channel operation and
   671  	// so gp could continue running before everything before
   672  	// the unlock is visible (even to gp itself).
   673  	unlock((*mutex)(chanLock))
   674  	return true
   675  }
   676  
   677  // compiler implements
   678  //
   679  //	select {
   680  //	case c <- v:
   681  //		... foo
   682  //	default:
   683  //		... bar
   684  //	}
   685  //
   686  // as
   687  //
   688  //	if selectnbsend(c, v) {
   689  //		... foo
   690  //	} else {
   691  //		... bar
   692  //	}
   693  func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
   694  	return chansend(c, elem, false, getcallerpc())
   695  }
   696  
   697  // compiler implements
   698  //
   699  //	select {
   700  //	case v, ok = <-c:
   701  //		... foo
   702  //	default:
   703  //		... bar
   704  //	}
   705  //
   706  // as
   707  //
   708  //	if selected, ok = selectnbrecv(&v, c); selected {
   709  //		... foo
   710  //	} else {
   711  //		... bar
   712  //	}
   713  func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
   714  	return chanrecv(c, elem, false)
   715  }
   716  
   717  //go:linkname reflect_chansend reflect.chansend0
   718  func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
   719  	return chansend(c, elem, !nb, getcallerpc())
   720  }
   721  
   722  //go:linkname reflect_chanrecv reflect.chanrecv
   723  func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
   724  	return chanrecv(c, elem, !nb)
   725  }
   726  
   727  //go:linkname reflect_chanlen reflect.chanlen
   728  func reflect_chanlen(c *hchan) int {
   729  	if c == nil {
   730  		return 0
   731  	}
   732  	return int(c.qcount)
   733  }
   734  
   735  //go:linkname reflectlite_chanlen internal/reflectlite.chanlen
   736  func reflectlite_chanlen(c *hchan) int {
   737  	if c == nil {
   738  		return 0
   739  	}
   740  	return int(c.qcount)
   741  }
   742  
   743  //go:linkname reflect_chancap reflect.chancap
   744  func reflect_chancap(c *hchan) int {
   745  	if c == nil {
   746  		return 0
   747  	}
   748  	return int(c.dataqsiz)
   749  }
   750  
   751  //go:linkname reflect_chanclose reflect.chanclose
   752  func reflect_chanclose(c *hchan) {
   753  	closechan(c)
   754  }
   755  
   756  func (q *waitq) enqueue(sgp *sudog) {
   757  	sgp.next = nil
   758  	x := q.last
   759  	if x == nil {
   760  		sgp.prev = nil
   761  		q.first = sgp
   762  		q.last = sgp
   763  		return
   764  	}
   765  	sgp.prev = x
   766  	x.next = sgp
   767  	q.last = sgp
   768  }
   769  
   770  func (q *waitq) dequeue() *sudog {
   771  	for {
   772  		sgp := q.first
   773  		if sgp == nil {
   774  			return nil
   775  		}
   776  		y := sgp.next
   777  		if y == nil {
   778  			q.first = nil
   779  			q.last = nil
   780  		} else {
   781  			y.prev = nil
   782  			q.first = y
   783  			sgp.next = nil // mark as removed (see dequeueSudoG)
   784  		}
   785  
   786  		// if a goroutine was put on this queue because of a
   787  		// select, there is a small window between the goroutine
   788  		// being woken up by a different case and it grabbing the
   789  		// channel locks. Once it has the lock
   790  		// it removes itself from the queue, so we won't see it after that.
   791  		// We use a flag in the G struct to tell us when someone
   792  		// else has won the race to signal this goroutine but the goroutine
   793  		// hasn't removed itself from the queue yet.
   794  		if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
   795  			continue
   796  		}
   797  
   798  		return sgp
   799  	}
   800  }
   801  
   802  func (c *hchan) raceaddr() unsafe.Pointer {
   803  	// Treat read-like and write-like operations on the channel to
   804  	// happen at this address. Avoid using the address of qcount
   805  	// or dataqsiz, because the len() and cap() builtins read
   806  	// those addresses, and we don't want them racing with
   807  	// operations like close().
   808  	return unsafe.Pointer(&c.buf)
   809  }
   810  
   811  func racesync(c *hchan, sg *sudog) {
   812  	racerelease(chanbuf(c, 0))
   813  	raceacquireg(sg.g, chanbuf(c, 0))
   814  	racereleaseg(sg.g, chanbuf(c, 0))
   815  	raceacquire(chanbuf(c, 0))
   816  }
   817  
   818  // Notify the race detector of a send or receive involving buffer entry idx
   819  // and a channel c or its communicating partner sg.
   820  // This function handles the special case of c.elemsize==0.
   821  func racenotify(c *hchan, idx uint, sg *sudog) {
   822  	// We could have passed the unsafe.Pointer corresponding to entry idx
   823  	// instead of idx itself.  However, in a future version of this function,
   824  	// we can use idx to better handle the case of elemsize==0.
   825  	// A future improvement to the detector is to call TSan with c and idx:
   826  	// this way, Go will continue to not allocating buffer entries for channels
   827  	// of elemsize==0, yet the race detector can be made to handle multiple
   828  	// sync objects underneath the hood (one sync object per idx)
   829  	qp := chanbuf(c, idx)
   830  	// When elemsize==0, we don't allocate a full buffer for the channel.
   831  	// Instead of individual buffer entries, the race detector uses the
   832  	// c.buf as the only buffer entry.  This simplification prevents us from
   833  	// following the memory model's happens-before rules (rules that are
   834  	// implemented in racereleaseacquire).  Instead, we accumulate happens-before
   835  	// information in the synchronization object associated with c.buf.
   836  	if c.elemsize == 0 {
   837  		if sg == nil {
   838  			raceacquire(qp)
   839  			racerelease(qp)
   840  		} else {
   841  			raceacquireg(sg.g, qp)
   842  			racereleaseg(sg.g, qp)
   843  		}
   844  	} else {
   845  		if sg == nil {
   846  			racereleaseacquire(qp)
   847  		} else {
   848  			racereleaseacquireg(sg.g, qp)
   849  		}
   850  	}
   851  }
   852  

View as plain text