...
  
  
     1  
     2  
     3  
     4  
     5  package http2
     6  
     7  import (
     8  	"fmt"
     9  	"math"
    10  )
    11  
    12  type roundRobinWriteScheduler struct {
    13  	
    14  	control writeQueue
    15  
    16  	
    17  	streams map[uint32]*writeQueue
    18  
    19  	
    20  	
    21  	head *writeQueue
    22  
    23  	
    24  	queuePool writeQueuePool
    25  }
    26  
    27  
    28  
    29  
    30  
    31  
    32  func newRoundRobinWriteScheduler() WriteScheduler {
    33  	ws := &roundRobinWriteScheduler{
    34  		streams: make(map[uint32]*writeQueue),
    35  	}
    36  	return ws
    37  }
    38  
    39  func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
    40  	if ws.streams[streamID] != nil {
    41  		panic(fmt.Errorf("stream %d already opened", streamID))
    42  	}
    43  	q := ws.queuePool.get()
    44  	ws.streams[streamID] = q
    45  	if ws.head == nil {
    46  		ws.head = q
    47  		q.next = q
    48  		q.prev = q
    49  	} else {
    50  		
    51  		
    52  		q.prev = ws.head.prev
    53  		q.next = ws.head
    54  		q.prev.next = q
    55  		q.next.prev = q
    56  	}
    57  }
    58  
    59  func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
    60  	q := ws.streams[streamID]
    61  	if q == nil {
    62  		return
    63  	}
    64  	if q.next == q {
    65  		
    66  		ws.head = nil
    67  	} else {
    68  		q.prev.next = q.next
    69  		q.next.prev = q.prev
    70  		if ws.head == q {
    71  			ws.head = q.next
    72  		}
    73  	}
    74  	delete(ws.streams, streamID)
    75  	ws.queuePool.put(q)
    76  }
    77  
    78  func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}
    79  
    80  func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
    81  	if wr.isControl() {
    82  		ws.control.push(wr)
    83  		return
    84  	}
    85  	q := ws.streams[wr.StreamID()]
    86  	if q == nil {
    87  		
    88  		
    89  		
    90  		if wr.DataSize() > 0 {
    91  			panic("add DATA on non-open stream")
    92  		}
    93  		ws.control.push(wr)
    94  		return
    95  	}
    96  	q.push(wr)
    97  }
    98  
    99  func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
   100  	
   101  	if !ws.control.empty() {
   102  		return ws.control.shift(), true
   103  	}
   104  	if ws.head == nil {
   105  		return FrameWriteRequest{}, false
   106  	}
   107  	q := ws.head
   108  	for {
   109  		if wr, ok := q.consume(math.MaxInt32); ok {
   110  			ws.head = q.next
   111  			return wr, true
   112  		}
   113  		q = q.next
   114  		if q == ws.head {
   115  			break
   116  		}
   117  	}
   118  	return FrameWriteRequest{}, false
   119  }
   120  
View as plain text