]> git.lizzy.rs Git - go-anidb.git/blob - udp/comm.go
5d8cd1eb0db0cbd274ed82e37ffc5c1e975cf9bd
[go-anidb.git] / udp / comm.go
1 // Low-level AniDB UDP API client library
2 //
3 // Implements the commands essential to setting up and tearing down an API connection,
4 // as well as an asynchronous layer. Throttles sends internally according to API spec.
5 //
6 // This library doesn't implement caching; beware since aggressive caching is an
7 // implementation requirement. Not doing so can get you banned.
8 package udp
9
10 import (
11         "bytes"
12         "compress/zlib"
13         "crypto/aes"
14         "crypto/cipher"
15         "crypto/md5"
16         "fmt"
17         "io"
18         "io/ioutil"
19         "log"
20         "net"
21         "sort"
22         "strings"
23         "sync"
24         "time"
25 )
26
27 const (
28         AniDBUDPServer = "api.anidb.net"
29         AniDBUDPPort   = 9000
30 )
31
32 type AniDBUDP struct {
33         KeepAliveInterval time.Duration // Interval between keep-alive packets; only sent when PUSH notifications are enabled (default: 20 minutes)
34         Timeout           time.Duration // The time to wait before a packet is considered lost (default: 45 seconds)
35         Notifications     chan APIReply // Channel where PUSH notifications are sent to
36
37         session string
38
39         conn *net.UDPConn
40         ecb  *ecbState
41
42         counter uint16
43         ctrLock sync.Mutex
44
45         tagRouter  map[string]chan APIReply
46         routerLock sync.RWMutex
47
48         sendCh chan packet
49
50         breakRecv chan bool
51         breakSend chan bool
52
53         // notifyState *notifyState
54         pingTimer *time.Timer
55 }
56
57 // Creates and initializes the AniDBUDP struct
58 func NewAniDBUDP() *AniDBUDP {
59         c := &AniDBUDP{
60                 KeepAliveInterval: 20 * time.Minute,
61                 Timeout:           45 * time.Second,
62                 Notifications:     make(chan APIReply, 5),
63                 tagRouter:         make(map[string]chan APIReply),
64         }
65         return c
66 }
67
68 // Key-value list of parameters.
69 type ParamMap map[string]interface{}
70
71 // Returns a query-like string representation of the ParamMap
72 func (m ParamMap) String() string {
73         keys := make([]string, 0, len(m))
74         for k, _ := range m {
75                 keys = append(keys, k)
76         }
77         sort.Strings(keys)
78
79         parts := make([]string, 0, len(m))
80         for _, k := range keys {
81                 parts = append(parts, strings.Join([]string{k, fmt.Sprint((m)[k])}, "="))
82         }
83         return strings.Join(parts, "&")
84 }
85
86 // Sends the requested query to the AniDB UDP API server.
87 //
88 // Returns a channel through which the eventual reply is sent.
89 //
90 // See http://wiki.anidb.net/w/UDP_API_Definition for the defined commands.
91 func (a *AniDBUDP) SendRecv(command string, args ParamMap) <-chan APIReply {
92         a.ctrLock.Lock()
93         tag := fmt.Sprintf("T%d", a.counter)
94         a.counter++
95         a.ctrLock.Unlock()
96
97         args["tag"] = tag
98         if a.session != "" {
99                 args["s"] = a.session
100         }
101
102         if err := a.dial(); err != nil {
103                 ch <- newErrorWrapper(err)
104                 close(ch)
105                 return ch
106         }
107
108         ch := make(chan APIReply, 1)
109
110         a.routerLock.Lock()
111         a.tagRouter[tag] = ch
112         a.routerLock.Unlock()
113
114         reply := make(chan APIReply, 1)
115         go func() {
116                 <-a.send(command, args)
117                 timeout := time.After(a.Timeout)
118
119                 select {
120                 case <-timeout:
121                         a.routerLock.Lock()
122                         delete(a.tagRouter, tag)
123                         a.routerLock.Unlock()
124                         close(ch)
125
126                         reply <- TimeoutError
127                         close(reply)
128
129                         log.Println("!!! Timeout")
130                 case r := <-ch:
131                         a.routerLock.Lock()
132                         delete(a.tagRouter, tag)
133                         a.routerLock.Unlock()
134                         close(ch)
135
136                         reply <- r
137                         close(reply)
138                 }
139         }()
140         return reply
141 }
142
143 var laddr, _ = net.ResolveUDPAddr("udp4", "0.0.0.0:0")
144
145 func (a *AniDBUDP) dial() (err error) {
146         if a.conn != nil {
147                 return nil
148         }
149
150         srv := fmt.Sprintf("%s:%d", AniDBUDPServer, AniDBUDPPort)
151         if raddr, err := net.ResolveUDPAddr("udp4", srv); err != nil {
152                 return err
153         } else {
154                 a.conn, err = net.DialUDP("udp4", laddr, raddr)
155
156                 if a.breakSend != nil {
157                         a.breakSend <- true
158                         <-a.breakSend
159                 } else {
160                         a.breakSend = make(chan bool)
161                 }
162                 a.sendCh = make(chan packet, 10)
163                 go a.sendLoop()
164
165                 if a.breakRecv != nil {
166                         a.breakRecv <- true
167                         <-a.breakRecv
168                 } else {
169                         a.breakRecv = make(chan bool)
170                 }
171                 go a.recvLoop()
172         }
173         return err
174 }
175
176 func (a *AniDBUDP) send(command string, args ParamMap) chan bool {
177         str := command
178         arg := args.String()
179         if len(arg) > 0 {
180                 str = strings.Join([]string{command, arg}, " ")
181         }
182         log.Println(">>>", str)
183
184         p := makePacket([]byte(str), a.ecb)
185
186         sendPacket(p, a.sendCh)
187 }
188
189 type packet struct {
190         b    []byte
191         err  error
192         sent chan bool
193 }
194
195 func (a *AniDBUDP) sendLoop() {
196         for {
197                 select {
198                 case <-a.breakSend:
199                         a.breakSend <- true
200                         return
201                 case pkt := <-a.sendCh:
202                         a.conn.Write(pkt.b)
203
204                         // send twice: once for confirming with the queue,
205                         // again for timeout calculations
206                         for i := 0; i < 2; i++ {
207                                 pkt.sent <- true
208                         }
209                 }
210         }
211 }
212
213 func (a *AniDBUDP) recvLoop() {
214         pkt := make(chan packet, 1)
215         brk := make(chan bool)
216         go func() {
217                 for {
218                         select {
219                         case <-brk:
220                                 brk <- true
221                                 return
222                         default:
223                                 b, err := getPacket(a.conn, a.ecb)
224                                 pkt <- packet{b: b, err: err}
225                         }
226                 }
227         }()
228
229         var pingTimer <-chan time.Time
230
231         for {
232                 if a.pingTimer != nil {
233                         pingTimer = a.pingTimer.C
234                 }
235
236                 select {
237                 case <-a.breakRecv:
238                         brk <- true
239                         <-brk
240                         a.breakRecv <- true
241                         return
242                 case <-pingTimer:
243                         go func() {
244                                 if a.KeepAliveInterval >= 30*time.Minute {
245                                         if (<-a.Uptime()).Error() != nil {
246                                                 return
247                                         }
248                                 } else if (<-a.Ping()).Error() != nil {
249                                         return
250                                 }
251                                 a.pingTimer.Reset(a.KeepAliveInterval)
252                         }()
253                 case p := <-pkt:
254                         b, err := p.b, p.err
255
256                         if err != nil && err != io.EOF && err != zlib.ErrChecksum {
257                                 // can UDP recv even raise other errors?
258                                 panic("UDP recv: " + err.Error())
259                         }
260
261                         if r := newGenericReply(b); r != nil {
262                                 if a.pingTimer != nil {
263                                         a.pingTimer.Reset(a.KeepAliveInterval)
264                                 }
265
266                                 if err == zlib.ErrChecksum {
267                                         r.truncated = true
268                                 }
269
270                                 a.routerLock.RLock()
271                                 if ch, ok := a.tagRouter[r.Tag()]; ok {
272
273                                         log.Println("<<<", string(b))
274                                         ch <- r
275                                 } else {
276                                         c := r.Code()
277                                         if c >= 720 && c < 799 {
278                                                 // notices that need PUSHACK
279                                                 id := strings.Fields(r.Text())[0]
280                                                 a.send("PUSHACK", paramMap{"nid": id})
281
282                                                 a.Notifications <- r
283                                         } else if c == 799 {
284                                                 // notice that doesn't need PUSHACK
285                                                 a.Notifications <- r
286                                         } else if c == 270 {
287                                                 // PUSH enabled
288                                                 if a.pingTimer == nil {
289                                                         a.pingTimer = time.NewTimer(a.KeepAliveInterval)
290                                                 }
291                                         } else if c == 370 {
292                                                 // PUSH disabled
293                                                 a.pingTimer = nil
294                                         } else if c == 701 || c == 702 {
295                                                 // PUSHACK ACK, no need to route
296                                         } else if c == 281 || c == 282 || c == 381 || c == 382 {
297                                                 // NOTIFYACK reply, ignore
298                                         } else {
299                                                 // untagged error, broadcast to all
300                                                 log.Println("<!<", string(b))
301                                                 for _, ch := range a.tagRouter {
302                                                         ch <- r
303                                                 }
304                                         }
305                                 }
306                                 a.routerLock.RUnlock()
307                         }
308                 }
309         }
310 }