1
2
3
4
5 package http2
6
7 import (
8 "fmt"
9 "math"
10 "sort"
11 )
12
13
14 const priorityDefaultWeight = 15
15
16
17 type PriorityWriteSchedulerConfig struct {
18
19
20
21
22
23
24
25
26
27
28
29
30 MaxClosedNodesInTree int
31
32
33
34
35
36
37
38
39
40
41
42 MaxIdleNodesInTree int
43
44
45
46
47
48
49
50
51
52 ThrottleOutOfOrderWrites bool
53 }
54
55
56
57
58 func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
59 if cfg == nil {
60
61
62 cfg = &PriorityWriteSchedulerConfig{
63 MaxClosedNodesInTree: 10,
64 MaxIdleNodesInTree: 10,
65 ThrottleOutOfOrderWrites: false,
66 }
67 }
68
69 ws := &priorityWriteScheduler{
70 nodes: make(map[uint32]*priorityNode),
71 maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
72 maxIdleNodesInTree: cfg.MaxIdleNodesInTree,
73 enableWriteThrottle: cfg.ThrottleOutOfOrderWrites,
74 }
75 ws.nodes[0] = &ws.root
76 if cfg.ThrottleOutOfOrderWrites {
77 ws.writeThrottleLimit = 1024
78 } else {
79 ws.writeThrottleLimit = math.MaxInt32
80 }
81 return ws
82 }
83
84 type priorityNodeState int
85
86 const (
87 priorityNodeOpen priorityNodeState = iota
88 priorityNodeClosed
89 priorityNodeIdle
90 )
91
92
93
94
95 type priorityNode struct {
96 q writeQueue
97 id uint32
98 weight uint8
99 state priorityNodeState
100 bytes int64
101 subtreeBytes int64
102
103
104 parent *priorityNode
105 kids *priorityNode
106 prev, next *priorityNode
107 }
108
109 func (n *priorityNode) setParent(parent *priorityNode) {
110 if n == parent {
111 panic("setParent to self")
112 }
113 if n.parent == parent {
114 return
115 }
116
117 if parent := n.parent; parent != nil {
118 if n.prev == nil {
119 parent.kids = n.next
120 } else {
121 n.prev.next = n.next
122 }
123 if n.next != nil {
124 n.next.prev = n.prev
125 }
126 }
127
128
129
130 n.parent = parent
131 if parent == nil {
132 n.next = nil
133 n.prev = nil
134 } else {
135 n.next = parent.kids
136 n.prev = nil
137 if n.next != nil {
138 n.next.prev = n
139 }
140 parent.kids = n
141 }
142 }
143
144 func (n *priorityNode) addBytes(b int64) {
145 n.bytes += b
146 for ; n != nil; n = n.parent {
147 n.subtreeBytes += b
148 }
149 }
150
151
152
153
154
155
156
157 func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f func(*priorityNode, bool) bool) bool {
158 if !n.q.empty() && f(n, openParent) {
159 return true
160 }
161 if n.kids == nil {
162 return false
163 }
164
165
166
167 if n.id != 0 {
168 openParent = openParent || (n.state == priorityNodeOpen)
169 }
170
171
172
173
174 w := n.kids.weight
175 needSort := false
176 for k := n.kids.next; k != nil; k = k.next {
177 if k.weight != w {
178 needSort = true
179 break
180 }
181 }
182 if !needSort {
183 for k := n.kids; k != nil; k = k.next {
184 if k.walkReadyInOrder(openParent, tmp, f) {
185 return true
186 }
187 }
188 return false
189 }
190
191
192
193 *tmp = (*tmp)[:0]
194 for n.kids != nil {
195 *tmp = append(*tmp, n.kids)
196 n.kids.setParent(nil)
197 }
198 sort.Sort(sortPriorityNodeSiblings(*tmp))
199 for i := len(*tmp) - 1; i >= 0; i-- {
200 (*tmp)[i].setParent(n)
201 }
202 for k := n.kids; k != nil; k = k.next {
203 if k.walkReadyInOrder(openParent, tmp, f) {
204 return true
205 }
206 }
207 return false
208 }
209
210 type sortPriorityNodeSiblings []*priorityNode
211
212 func (z sortPriorityNodeSiblings) Len() int { return len(z) }
213 func (z sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
214 func (z sortPriorityNodeSiblings) Less(i, k int) bool {
215
216
217 wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
218 wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes)
219 if bi == 0 && bk == 0 {
220 return wi >= wk
221 }
222 if bk == 0 {
223 return false
224 }
225 return bi/bk <= wi/wk
226 }
227
228 type priorityWriteScheduler struct {
229
230
231 root priorityNode
232
233
234 nodes map[uint32]*priorityNode
235
236
237 maxID uint32
238
239
240
241
242 closedNodes, idleNodes []*priorityNode
243
244
245 maxClosedNodesInTree int
246 maxIdleNodesInTree int
247 writeThrottleLimit int32
248 enableWriteThrottle bool
249
250
251 tmp []*priorityNode
252
253
254 queuePool writeQueuePool
255 }
256
257 func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
258
259 if curr := ws.nodes[streamID]; curr != nil {
260 if curr.state != priorityNodeIdle {
261 panic(fmt.Sprintf("stream %d already opened", streamID))
262 }
263 curr.state = priorityNodeOpen
264 return
265 }
266
267
268
269
270
271 parent := ws.nodes[options.PusherID]
272 if parent == nil {
273 parent = &ws.root
274 }
275 n := &priorityNode{
276 q: *ws.queuePool.get(),
277 id: streamID,
278 weight: priorityDefaultWeight,
279 state: priorityNodeOpen,
280 }
281 n.setParent(parent)
282 ws.nodes[streamID] = n
283 if streamID > ws.maxID {
284 ws.maxID = streamID
285 }
286 }
287
288 func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
289 if streamID == 0 {
290 panic("violation of WriteScheduler interface: cannot close stream 0")
291 }
292 if ws.nodes[streamID] == nil {
293 panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
294 }
295 if ws.nodes[streamID].state != priorityNodeOpen {
296 panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
297 }
298
299 n := ws.nodes[streamID]
300 n.state = priorityNodeClosed
301 n.addBytes(-n.bytes)
302
303 q := n.q
304 ws.queuePool.put(&q)
305 n.q.s = nil
306 if ws.maxClosedNodesInTree > 0 {
307 ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
308 } else {
309 ws.removeNode(n)
310 }
311 }
312
313 func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
314 if streamID == 0 {
315 panic("adjustPriority on root")
316 }
317
318
319
320
321 n := ws.nodes[streamID]
322 if n == nil {
323 if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
324 return
325 }
326 ws.maxID = streamID
327 n = &priorityNode{
328 q: *ws.queuePool.get(),
329 id: streamID,
330 weight: priorityDefaultWeight,
331 state: priorityNodeIdle,
332 }
333 n.setParent(&ws.root)
334 ws.nodes[streamID] = n
335 ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
336 }
337
338
339
340 parent := ws.nodes[priority.StreamDep]
341 if parent == nil {
342 n.setParent(&ws.root)
343 n.weight = priorityDefaultWeight
344 return
345 }
346
347
348 if n == parent {
349 return
350 }
351
352
353
354
355
356
357
358
359 for x := parent.parent; x != nil; x = x.parent {
360 if x == n {
361 parent.setParent(n.parent)
362 break
363 }
364 }
365
366
367
368
369 if priority.Exclusive {
370 k := parent.kids
371 for k != nil {
372 next := k.next
373 if k != n {
374 k.setParent(n)
375 }
376 k = next
377 }
378 }
379
380 n.setParent(parent)
381 n.weight = priority.Weight
382 }
383
384 func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
385 var n *priorityNode
386 if wr.isControl() {
387 n = &ws.root
388 } else {
389 id := wr.StreamID()
390 n = ws.nodes[id]
391 if n == nil {
392
393
394
395 if wr.DataSize() > 0 {
396 panic("add DATA on non-open stream")
397 }
398 n = &ws.root
399 }
400 }
401 n.q.push(wr)
402 }
403
404 func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
405 ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNode, openParent bool) bool {
406 limit := int32(math.MaxInt32)
407 if openParent {
408 limit = ws.writeThrottleLimit
409 }
410 wr, ok = n.q.consume(limit)
411 if !ok {
412 return false
413 }
414 n.addBytes(int64(wr.DataSize()))
415
416
417
418 if openParent {
419 ws.writeThrottleLimit += 1024
420 if ws.writeThrottleLimit < 0 {
421 ws.writeThrottleLimit = math.MaxInt32
422 }
423 } else if ws.enableWriteThrottle {
424 ws.writeThrottleLimit = 1024
425 }
426 return true
427 })
428 return wr, ok
429 }
430
431 func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, maxSize int, n *priorityNode) {
432 if maxSize == 0 {
433 return
434 }
435 if len(*list) == maxSize {
436
437 ws.removeNode((*list)[0])
438 x := (*list)[1:]
439 copy(*list, x)
440 *list = (*list)[:len(x)]
441 }
442 *list = append(*list, n)
443 }
444
445 func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
446 for k := n.kids; k != nil; k = k.next {
447 k.setParent(n.parent)
448 }
449 n.setParent(nil)
450 delete(ws.nodes, n.id)
451 }
452
View as plain text