...
1
16
17 package decoder
18
19 import (
20 `bytes`
21 `io`
22 `sync`
23
24 `github.com/bytedance/sonic/internal/native`
25 `github.com/bytedance/sonic/internal/native/types`
26 `github.com/bytedance/sonic/internal/rt`
27 `github.com/bytedance/sonic/option`
28 )
29
30 var (
31 minLeftBufferShift uint = 1
32 )
33
34
35 type StreamDecoder struct {
36 r io.Reader
37 buf []byte
38 scanp int
39 scanned int64
40 err error
41 Decoder
42 }
43
44 var bufPool = sync.Pool{
45 New: func () interface{} {
46 return make([]byte, 0, option.DefaultDecoderBufferSize)
47 },
48 }
49
50
51
52
53 func NewStreamDecoder(r io.Reader) *StreamDecoder {
54 return &StreamDecoder{r : r}
55 }
56
57
58
59
60
61 func (self *StreamDecoder) Decode(val interface{}) (err error) {
62
63 if self.More() {
64
65 var s = self.scanp
66 try_skip:
67 var e = len(self.buf)
68
69 var src = rt.Mem2Str(self.buf[s:e])
70
71
72
73
74
75
76 var x = 0;
77 if y := native.SkipOneFast(&src, &x); y < 0 {
78 if self.readMore() {
79
80 goto try_skip
81 } else {
82
83 err = SyntaxError{e, self.s, types.ParsingError(-s), ""}
84 self.setErr(err)
85 return
86 }
87 } else {
88 s = y + s
89 e = x + s
90 }
91
92
93
94 self.Decoder.Reset(string(self.buf[s:e]))
95 err = self.Decoder.Decode(val)
96 if err != nil {
97 self.setErr(err)
98 return
99 }
100
101 self.scanp = e
102 _, empty := self.scan()
103 if empty {
104
105
106 mem := self.buf
107 self.buf = nil
108 bufPool.Put(mem[:0])
109 } else {
110
111
112 n := copy(self.buf, self.buf[self.scanp:])
113 self.buf = self.buf[:n]
114 }
115
116 self.scanned += int64(self.scanp)
117 self.scanp = 0
118 }
119
120 return self.err
121 }
122
123
124
125 func (self *StreamDecoder) InputOffset() int64 {
126
127 return self.scanned + int64(self.scanp)
128 }
129
130
131
132 func (self *StreamDecoder) Buffered() io.Reader {
133 return bytes.NewReader(self.buf[self.scanp:])
134 }
135
136
137
138 func (self *StreamDecoder) More() bool {
139 if self.err != nil {
140 return false
141 }
142 c, err := self.peek()
143 return err == nil && c != ']' && c != '}'
144 }
145
146
147
148 func (self *StreamDecoder) readMore() bool {
149 if self.err != nil {
150 return false
151 }
152
153 var err error
154 var n int
155 for {
156
157 l := len(self.buf)
158 realloc(&self.buf)
159
160 n, err = self.r.Read(self.buf[l:cap(self.buf)])
161 self.buf = self.buf[: l+n]
162
163 self.scanp = l
164 _, empty := self.scan()
165 if !empty {
166 return true
167 }
168
169
170 if err != nil {
171 self.setErr(err)
172 return false
173 }
174 }
175 }
176
177 func (self *StreamDecoder) setErr(err error) {
178 self.err = err
179 mem := self.buf[:0]
180 self.buf = nil
181 bufPool.Put(mem)
182 }
183
184 func (self *StreamDecoder) peek() (byte, error) {
185 var err error
186 for {
187 c, empty := self.scan()
188 if !empty {
189 return byte(c), nil
190 }
191
192 if err != nil {
193 self.setErr(err)
194 return 0, err
195 }
196 err = self.refill()
197 }
198 }
199
200 func (self *StreamDecoder) scan() (byte, bool) {
201 for i := self.scanp; i < len(self.buf); i++ {
202 c := self.buf[i]
203 if isSpace(c) {
204 continue
205 }
206 self.scanp = i
207 return c, false
208 }
209 return 0, true
210 }
211
212 func isSpace(c byte) bool {
213 return types.SPACE_MASK & (1 << c) != 0
214 }
215
216 func (self *StreamDecoder) refill() error {
217
218
219 if self.scanp > 0 {
220 self.scanned += int64(self.scanp)
221 n := copy(self.buf, self.buf[self.scanp:])
222 self.buf = self.buf[:n]
223 self.scanp = 0
224 }
225
226
227 realloc(&self.buf)
228
229
230 n, err := self.r.Read(self.buf[len(self.buf):cap(self.buf)])
231 self.buf = self.buf[0 : len(self.buf)+n]
232
233 return err
234 }
235
236 func realloc(buf *[]byte) bool {
237 l := uint(len(*buf))
238 c := uint(cap(*buf))
239 if c == 0 {
240
241 *buf = bufPool.Get().([]byte)
242 return true
243 }
244 if c - l <= c >> minLeftBufferShift {
245
246 e := l+(l>>minLeftBufferShift)
247 if e <= c {
248 e = c*2
249 }
250 tmp := make([]byte, l, e)
251 copy(tmp, *buf)
252 *buf = tmp
253 return true
254 }
255 return false
256 }
257
258
View as plain text