...
1
2
3
4 package codec
5
6 import (
7 "bufio"
8 "errors"
9 "io"
10 "net/rpc"
11 )
12
13 var (
14 errRpcIsClosed = errors.New("rpc - connection has been closed")
15 errRpcNoConn = errors.New("rpc - no connection")
16
17 rpcSpaceArr = [1]byte{' '}
18 )
19
20
21 type Rpc interface {
22 ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
23 ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
24 }
25
26
27 type RPCOptions struct {
28
29
30
31
32
33 RPCNoBuffer bool
34 }
35
36
37 type rpcCodec struct {
38 c io.Closer
39 r io.Reader
40 w io.Writer
41 f ioFlusher
42
43 dec *Decoder
44 enc *Encoder
45 h Handle
46
47 cls atomicClsErr
48 }
49
50 func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
51 return newRPCCodec2(conn, conn, conn, h)
52 }
53
54 func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
55 bh := h.getBasicHandle()
56
57
58
59 f, ok := w.(ioFlusher)
60 if !bh.RPCNoBuffer {
61 if bh.WriterBufferSize <= 0 {
62 if !ok {
63 bw := bufio.NewWriter(w)
64 f, w = bw, bw
65 }
66 }
67 if bh.ReaderBufferSize <= 0 {
68 if _, ok = w.(ioBuffered); !ok {
69 r = bufio.NewReader(r)
70 }
71 }
72 }
73 return rpcCodec{
74 c: c,
75 w: w,
76 r: r,
77 f: f,
78 h: h,
79 enc: NewEncoder(w, h),
80 dec: NewDecoder(r, h),
81 }
82 }
83
84 func (c *rpcCodec) write(obj ...interface{}) (err error) {
85 err = c.ready()
86 if err != nil {
87 return
88 }
89 if c.f != nil {
90 defer func() {
91 flushErr := c.f.Flush()
92 if flushErr != nil && err == nil {
93 err = flushErr
94 }
95 }()
96 }
97
98 for _, o := range obj {
99 err = c.enc.Encode(o)
100 if err != nil {
101 return
102 }
103
104
105
106 if c.h.isJson() {
107 _, err = c.w.Write(rpcSpaceArr[:])
108 if err != nil {
109 return
110 }
111 }
112 }
113 return
114 }
115
116 func (c *rpcCodec) read(obj interface{}) (err error) {
117 err = c.ready()
118 if err == nil {
119
120 if obj == nil {
121
122 err = c.dec.swallowErr()
123 } else {
124 err = c.dec.Decode(obj)
125 }
126 }
127 return
128 }
129
130 func (c *rpcCodec) Close() (err error) {
131 if c.c != nil {
132 cls := c.cls.load()
133 if !cls.closed {
134 cls.err = c.c.Close()
135 cls.closed = true
136 c.cls.store(cls)
137 }
138 err = cls.err
139 }
140 return
141 }
142
143 func (c *rpcCodec) ready() (err error) {
144 if c.c == nil {
145 err = errRpcNoConn
146 } else {
147 cls := c.cls.load()
148 if cls.closed {
149 if err = cls.err; err == nil {
150 err = errRpcIsClosed
151 }
152 }
153 }
154 return
155 }
156
157 func (c *rpcCodec) ReadResponseBody(body interface{}) error {
158 return c.read(body)
159 }
160
161
162
163 type goRpcCodec struct {
164 rpcCodec
165 }
166
167 func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
168 return c.write(r, body)
169 }
170
171 func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
172 return c.write(r, body)
173 }
174
175 func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
176 return c.read(r)
177 }
178
179 func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
180 return c.read(r)
181 }
182
183 func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
184 return c.read(body)
185 }
186
187
188
189
190
191 type goRpc struct{}
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226 var GoRpc goRpc
227
228 func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
229 return &goRpcCodec{newRPCCodec(conn, h)}
230 }
231
232 func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
233 return &goRpcCodec{newRPCCodec(conn, h)}
234 }
235
View as plain text