]> git.lizzy.rs Git - go-anidb.git/blob - udp/comm.go
anidb: Use a map instead of various fields that have to be switched
[go-anidb.git] / udp / comm.go
1 // Low-level AniDB UDP API client library
2 //
3 // Implements the commands essential to setting up and tearing down an API connection,
4 // as well as an asynchronous layer. Throttles sends internally according to API spec.
5 //
6 // This library doesn't implement caching; beware since aggressive caching is an
7 // implementation requirement. Not doing so can get you banned.
8 package udpapi
9
10 import (
11         "compress/zlib"
12         "fmt"
13         "io"
14         "net"
15         "sort"
16         "strings"
17         "sync"
18         "time"
19 )
20
21 const (
22         AniDBUDPServer = "api.anidb.net"
23         AniDBUDPPort   = 9000
24 )
25
26 type AniDBUDP struct {
27         // Interval between keep-alive packets; only sent when PUSH notifications are enabled (default: 20 minutes)
28         KeepAliveInterval time.Duration
29
30         // The time to wait before a packet is considered lost (default: 45 seconds)
31         Timeout time.Duration
32
33         // Channel where PUSH notifications are sent to
34         Notifications chan APIReply
35
36         session string
37
38         conn *net.UDPConn
39         ecb  *ecbState
40
41         counter uint16
42         ctrLock sync.Mutex
43
44         tagRouter  map[string]chan APIReply
45         routerLock sync.RWMutex
46
47         sendCh chan packet
48
49         breakRecv chan bool
50         breakSend chan bool
51
52         // notifyState *notifyState
53         pingTimer *time.Timer
54 }
55
56 // Creates and initializes the AniDBUDP struct
57 func NewAniDBUDP() *AniDBUDP {
58         c := &AniDBUDP{
59                 KeepAliveInterval: 20 * time.Minute,
60                 Timeout:           45 * time.Second,
61                 Notifications:     make(chan APIReply, 5),
62                 tagRouter:         make(map[string]chan APIReply),
63         }
64         return c
65 }
66
67 // Key-value list of parameters.
68 type ParamMap map[string]interface{}
69
70 // Returns a query-like string representation of the ParamMap
71 func (m ParamMap) String() string {
72         keys := make([]string, 0, len(m))
73         for k, _ := range m {
74                 keys = append(keys, k)
75         }
76         sort.Strings(keys)
77
78         parts := make([]string, 0, len(m))
79         for _, k := range keys {
80                 parts = append(parts, strings.Join([]string{k, fmt.Sprint((m)[k])}, "="))
81         }
82         return strings.Join(parts, "&")
83 }
84
85 // Sends the requested query to the AniDB UDP API server.
86 //
87 // Returns a channel through which the eventual reply is sent.
88 //
89 // See http://wiki.anidb.net/w/UDP_API_Definition for the defined commands.
90 func (a *AniDBUDP) SendRecv(command string, args ParamMap) <-chan APIReply {
91         a.ctrLock.Lock()
92         tag := fmt.Sprintf("T%d", a.counter)
93         a.counter++
94         a.ctrLock.Unlock()
95
96         args["tag"] = tag
97         if a.session != "" {
98                 args["s"] = a.session
99         }
100         for k, v := range args {
101                 s := fmt.Sprint(v)
102                 s = strings.Replace(s, "\n", "<br/>", -1)
103                 args[k] = strings.Replace(s, "&", "&amp;", -1)
104         }
105
106         ch := make(chan APIReply, 1)
107
108         if err := a.dial(); err != nil {
109                 ch <- newErrorWrapper(err)
110                 close(ch)
111                 return ch
112         }
113
114         a.routerLock.Lock()
115         a.tagRouter[tag] = ch
116         a.routerLock.Unlock()
117
118         reply := make(chan APIReply, 1)
119         go func() {
120                 <-a.send(command, args)
121                 timeout := time.After(a.Timeout)
122
123                 select {
124                 case <-timeout:
125                         a.routerLock.Lock()
126                         delete(a.tagRouter, tag)
127                         a.routerLock.Unlock()
128                         close(ch)
129
130                         reply <- newErrorWrapper(TimeoutError)
131                         close(reply)
132                 case r := <-ch:
133                         a.routerLock.Lock()
134                         delete(a.tagRouter, tag)
135                         a.routerLock.Unlock()
136                         close(ch)
137
138                         reply <- r
139                         close(reply)
140                 }
141         }()
142         return reply
143 }
144
145 var laddr, _ = net.ResolveUDPAddr("udp4", "0.0.0.0:0")
146
147 func (a *AniDBUDP) dial() (err error) {
148         if a.conn != nil {
149                 return nil
150         }
151
152         srv := fmt.Sprintf("%s:%d", AniDBUDPServer, AniDBUDPPort)
153         if raddr, err := net.ResolveUDPAddr("udp4", srv); err != nil {
154                 return err
155         } else {
156                 a.conn, err = net.DialUDP("udp4", laddr, raddr)
157
158                 if a.breakSend != nil {
159                         a.breakSend <- true
160                         <-a.breakSend
161                 } else {
162                         a.breakSend = make(chan bool)
163                 }
164                 a.sendCh = make(chan packet, 10)
165                 go a.sendLoop()
166
167                 if a.breakRecv != nil {
168                         a.breakRecv <- true
169                         <-a.breakRecv
170                 } else {
171                         a.breakRecv = make(chan bool)
172                 }
173                 go a.recvLoop()
174         }
175         return err
176 }
177
178 func (a *AniDBUDP) send(command string, args ParamMap) chan bool {
179         str := command
180         arg := args.String()
181         if len(arg) > 0 {
182                 str = strings.Join([]string{command, arg}, " ")
183         }
184
185         p := makePacket([]byte(str), a.ecb)
186
187         return sendPacket(p, a.sendCh)
188 }
189
190 func (a *AniDBUDP) sendLoop() {
191         for {
192                 select {
193                 case <-a.breakSend:
194                         a.breakSend <- true
195                         return
196                 case pkt := <-a.sendCh:
197                         a.conn.Write(pkt.b)
198
199                         // send twice: once for confirming with the queue,
200                         // again for timeout calculations
201                         for i := 0; i < 2; i++ {
202                                 pkt.sent <- true
203                         }
204                 }
205         }
206 }
207
208 func (a *AniDBUDP) recvLoop() {
209         pkt := make(chan packet, 1)
210         brk := make(chan bool)
211         go func() {
212                 for {
213                         select {
214                         case <-brk:
215                                 brk <- true
216                                 return
217                         default:
218                                 b, err := a.getPacket()
219                                 pkt <- packet{b: b, err: err}
220                         }
221                 }
222         }()
223
224         var pingTimer <-chan time.Time
225
226         for {
227                 if a.pingTimer != nil {
228                         pingTimer = a.pingTimer.C
229                 }
230
231                 select {
232                 case <-a.breakRecv:
233                         brk <- true
234                         <-brk
235                         a.breakRecv <- true
236                         return
237                 case <-pingTimer:
238                         go func() {
239                                 if a.KeepAliveInterval >= 30*time.Minute {
240                                         if (<-a.Uptime()).Error() != nil {
241                                                 return
242                                         }
243                                 } else if (<-a.Ping()).Error() != nil {
244                                         return
245                                 }
246                                 a.pingTimer.Reset(a.KeepAliveInterval)
247                         }()
248                 case p := <-pkt:
249                         b, err := p.b, p.err
250
251                         if err != nil && err != io.EOF && err != zlib.ErrChecksum {
252                                 // can UDP recv even raise other errors?
253                                 panic("UDP recv: " + err.Error())
254                         }
255
256                         if r := newGenericReply(b); r != nil {
257                                 if a.pingTimer != nil {
258                                         a.pingTimer.Reset(a.KeepAliveInterval)
259                                 }
260
261                                 if err == zlib.ErrChecksum {
262                                         r.truncated = true
263                                 }
264
265                                 a.routerLock.RLock()
266                                 if ch, ok := a.tagRouter[r.Tag()]; ok {
267                                         ch <- r
268                                 } else {
269                                         c := r.Code()
270                                         if c >= 720 && c < 799 {
271                                                 // notices that need PUSHACK
272                                                 id := strings.Fields(r.Text())[0]
273                                                 a.send("PUSHACK", ParamMap{"nid": id})
274
275                                                 a.Notifications <- r
276                                         } else if c == 799 {
277                                                 // notice that doesn't need PUSHACK
278                                                 a.Notifications <- r
279                                         } else if c == 270 {
280                                                 // PUSH enabled
281                                                 if a.pingTimer == nil {
282                                                         a.pingTimer = time.NewTimer(a.KeepAliveInterval)
283                                                 }
284                                         } else if c == 370 {
285                                                 // PUSH disabled
286                                                 a.pingTimer = nil
287                                         } else if c == 701 || c == 702 {
288                                                 // PUSHACK ACK, no need to route
289                                         } else if c == 281 || c == 282 || c == 381 || c == 382 {
290                                                 // NOTIFYACK reply, ignore
291                                         } else {
292                                                 // untagged error, broadcast to all
293                                                 for _, ch := range a.tagRouter {
294                                                         ch <- r
295                                                 }
296                                         }
297                                 }
298                                 a.routerLock.RUnlock()
299                         }
300                 }
301         }
302 }