]> git.lizzy.rs Git - go-anidb.git/blob - udp/comm.go
1017c69006aebac83b6da80c0cf6fc0aac76ad5c
[go-anidb.git] / udp / comm.go
1 // Low-level AniDB UDP API client library
2 //
3 // Implements the commands essential to setting up and tearing down an API connection,
4 // as well as an asynchronous layer. Throttles sends internally according to API spec.
5 //
6 // This library doesn't implement caching; beware since aggressive caching is an
7 // implementation requirement. Not doing so can get you banned.
8 package udpapi
9
10 import (
11         "compress/zlib"
12         "fmt"
13         "io"
14         "log"
15         "net"
16         "sort"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 const (
23         AniDBUDPServer = "api.anidb.net"
24         AniDBUDPPort   = 9000
25 )
26
27 type AniDBUDP struct {
28         // Interval between keep-alive packets; only sent when PUSH notifications are enabled (default: 20 minutes)
29         KeepAliveInterval time.Duration
30
31         // The time to wait before a packet is considered lost (default: 45 seconds)
32         Timeout time.Duration
33
34         // Channel where PUSH notifications are sent to
35         Notifications chan APIReply
36
37         session string
38
39         conn *net.UDPConn
40         ecb  *ecbState
41
42         counter uint16
43         ctrLock sync.Mutex
44
45         tagRouter  map[string]chan APIReply
46         routerLock sync.RWMutex
47
48         sendCh chan packet
49
50         breakRecv chan bool
51         breakSend chan bool
52
53         // notifyState *notifyState
54         pingTimer *time.Timer
55 }
56
57 // Creates and initializes the AniDBUDP struct
58 func NewAniDBUDP() *AniDBUDP {
59         c := &AniDBUDP{
60                 KeepAliveInterval: 20 * time.Minute,
61                 Timeout:           45 * time.Second,
62                 Notifications:     make(chan APIReply, 5),
63                 tagRouter:         make(map[string]chan APIReply),
64         }
65         return c
66 }
67
68 // Key-value list of parameters.
69 type ParamMap map[string]interface{}
70
71 // Returns a query-like string representation of the ParamMap
72 func (m ParamMap) String() string {
73         keys := make([]string, 0, len(m))
74         for k, _ := range m {
75                 keys = append(keys, k)
76         }
77         sort.Strings(keys)
78
79         parts := make([]string, 0, len(m))
80         for _, k := range keys {
81                 parts = append(parts, strings.Join([]string{k, fmt.Sprint((m)[k])}, "="))
82         }
83         return strings.Join(parts, "&")
84 }
85
86 // Sends the requested query to the AniDB UDP API server.
87 //
88 // Returns a channel through which the eventual reply is sent.
89 //
90 // See http://wiki.anidb.net/w/UDP_API_Definition for the defined commands.
91 func (a *AniDBUDP) SendRecv(command string, args ParamMap) <-chan APIReply {
92         a.ctrLock.Lock()
93         tag := fmt.Sprintf("T%d", a.counter)
94         a.counter++
95         a.ctrLock.Unlock()
96
97         args["tag"] = tag
98         if a.session != "" {
99                 args["s"] = a.session
100         }
101         for k, v := range args {
102                 s := fmt.Sprint(v)
103                 s = strings.Replace(s, "\n", "<br/>", -1)
104                 args[k] = strings.Replace(s, "&", "&amp;", -1)
105         }
106
107         ch := make(chan APIReply, 1)
108
109         if err := a.dial(); err != nil {
110                 ch <- newErrorWrapper(err)
111                 close(ch)
112                 return ch
113         }
114
115         a.routerLock.Lock()
116         a.tagRouter[tag] = ch
117         a.routerLock.Unlock()
118
119         reply := make(chan APIReply, 1)
120         go func() {
121                 <-a.send(command, args)
122                 timeout := time.After(a.Timeout)
123
124                 select {
125                 case <-timeout:
126                         a.routerLock.Lock()
127                         delete(a.tagRouter, tag)
128                         a.routerLock.Unlock()
129                         close(ch)
130
131                         reply <- newErrorWrapper(TimeoutError)
132                         close(reply)
133
134                         log.Println("!!! Timeout")
135                 case r := <-ch:
136                         a.routerLock.Lock()
137                         delete(a.tagRouter, tag)
138                         a.routerLock.Unlock()
139                         close(ch)
140
141                         reply <- r
142                         close(reply)
143                 }
144         }()
145         return reply
146 }
147
148 var laddr, _ = net.ResolveUDPAddr("udp4", "0.0.0.0:0")
149
150 func (a *AniDBUDP) dial() (err error) {
151         if a.conn != nil {
152                 return nil
153         }
154
155         srv := fmt.Sprintf("%s:%d", AniDBUDPServer, AniDBUDPPort)
156         if raddr, err := net.ResolveUDPAddr("udp4", srv); err != nil {
157                 return err
158         } else {
159                 a.conn, err = net.DialUDP("udp4", laddr, raddr)
160
161                 if a.breakSend != nil {
162                         a.breakSend <- true
163                         <-a.breakSend
164                 } else {
165                         a.breakSend = make(chan bool)
166                 }
167                 a.sendCh = make(chan packet, 10)
168                 go a.sendLoop()
169
170                 if a.breakRecv != nil {
171                         a.breakRecv <- true
172                         <-a.breakRecv
173                 } else {
174                         a.breakRecv = make(chan bool)
175                 }
176                 go a.recvLoop()
177         }
178         return err
179 }
180
181 func (a *AniDBUDP) send(command string, args ParamMap) chan bool {
182         str := command
183         arg := args.String()
184         if len(arg) > 0 {
185                 str = strings.Join([]string{command, arg}, " ")
186         }
187         log.Println(">>>", str)
188
189         p := makePacket([]byte(str), a.ecb)
190
191         return sendPacket(p, a.sendCh)
192 }
193
194 func (a *AniDBUDP) sendLoop() {
195         for {
196                 select {
197                 case <-a.breakSend:
198                         a.breakSend <- true
199                         return
200                 case pkt := <-a.sendCh:
201                         a.conn.Write(pkt.b)
202
203                         // send twice: once for confirming with the queue,
204                         // again for timeout calculations
205                         for i := 0; i < 2; i++ {
206                                 pkt.sent <- true
207                         }
208                 }
209         }
210 }
211
212 func (a *AniDBUDP) recvLoop() {
213         pkt := make(chan packet, 1)
214         brk := make(chan bool)
215         go func() {
216                 for {
217                         select {
218                         case <-brk:
219                                 brk <- true
220                                 return
221                         default:
222                                 b, err := a.getPacket()
223                                 pkt <- packet{b: b, err: err}
224                         }
225                 }
226         }()
227
228         var pingTimer <-chan time.Time
229
230         for {
231                 if a.pingTimer != nil {
232                         pingTimer = a.pingTimer.C
233                 }
234
235                 select {
236                 case <-a.breakRecv:
237                         brk <- true
238                         <-brk
239                         a.breakRecv <- true
240                         return
241                 case <-pingTimer:
242                         go func() {
243                                 if a.KeepAliveInterval >= 30*time.Minute {
244                                         if (<-a.Uptime()).Error() != nil {
245                                                 return
246                                         }
247                                 } else if (<-a.Ping()).Error() != nil {
248                                         return
249                                 }
250                                 a.pingTimer.Reset(a.KeepAliveInterval)
251                         }()
252                 case p := <-pkt:
253                         b, err := p.b, p.err
254
255                         if err != nil && err != io.EOF && err != zlib.ErrChecksum {
256                                 // can UDP recv even raise other errors?
257                                 panic("UDP recv: " + err.Error())
258                         }
259
260                         if r := newGenericReply(b); r != nil {
261                                 if a.pingTimer != nil {
262                                         a.pingTimer.Reset(a.KeepAliveInterval)
263                                 }
264
265                                 if err == zlib.ErrChecksum {
266                                         r.truncated = true
267                                 }
268
269                                 a.routerLock.RLock()
270                                 if ch, ok := a.tagRouter[r.Tag()]; ok {
271                                         log.Println("<<<", string(b))
272                                         ch <- r
273                                 } else {
274                                         c := r.Code()
275                                         if c >= 720 && c < 799 {
276                                                 // notices that need PUSHACK
277                                                 id := strings.Fields(r.Text())[0]
278                                                 a.send("PUSHACK", ParamMap{"nid": id})
279
280                                                 a.Notifications <- r
281                                         } else if c == 799 {
282                                                 // notice that doesn't need PUSHACK
283                                                 a.Notifications <- r
284                                         } else if c == 270 {
285                                                 // PUSH enabled
286                                                 if a.pingTimer == nil {
287                                                         a.pingTimer = time.NewTimer(a.KeepAliveInterval)
288                                                 }
289                                         } else if c == 370 {
290                                                 // PUSH disabled
291                                                 a.pingTimer = nil
292                                         } else if c == 701 || c == 702 {
293                                                 // PUSHACK ACK, no need to route
294                                         } else if c == 281 || c == 282 || c == 381 || c == 382 {
295                                                 // NOTIFYACK reply, ignore
296                                         } else {
297                                                 // untagged error, broadcast to all
298                                                 log.Println("<!<", string(b))
299                                                 for _, ch := range a.tagRouter {
300                                                         ch <- r
301                                                 }
302                                         }
303                                 }
304                                 a.routerLock.RUnlock()
305                         }
306                 }
307         }
308 }