]> git.lizzy.rs Git - go-anidb.git/commitdiff
anidb: New caching mechanism
authorDiogo Franco (Kovensky) <diogomfranco@gmail.com>
Sun, 14 Jul 2013 23:16:42 +0000 (20:16 -0300)
committerDiogo Franco (Kovensky) <diogomfranco@gmail.com>
Sun, 14 Jul 2013 23:16:42 +0000 (20:16 -0300)
Refactors the various parts of the code that interacted with the cache
to use the new cache. Moves the intent system to be decoupled from the
cache.

Caches requests made with invalid keys, avoids requerying the API for
InvalidKeyCacheDuration (1 hour).

Untested on !windows, since that uses a separate file locking mechanism.

15 files changed:
animecache.go
cache.go [new file with mode: 0644]
cache_test.go [new file with mode: 0644]
caches.go [deleted file]
cachetemplate.go [deleted file]
episodecache.go
filecache.go
flock.go [new file with mode: 0644]
flock_other.go [new file with mode: 0644]
flock_windows.go [new file with mode: 0644]
groupcache.go
intent.go [new file with mode: 0644]
misc.go
titlecache.go
udp.go

index e9910c553524548a09db8497a92903afe1cb03c9..948d257906c0c016dac550834a5b0a37f6f02638 100644 (file)
@@ -14,6 +14,7 @@ import (
 
 func init() {
        gob.RegisterName("*github.com/Kovensky/go-anidb.Anime", &Anime{})
+       gob.RegisterName("github.com/Kovensky/go-anidb.AID", AID(0))
 }
 
 func (a *Anime) Touch() {
@@ -33,10 +34,17 @@ func (a *Anime) IsStale() bool {
 // Unique Anime IDentifier.
 type AID int
 
+// make AID Cacheable
+func (e AID) Touch()        {}
+func (e AID) IsStale() bool { return false }
+
 // Returns a cached Anime. Returns nil if there is no cached Anime with this AID.
 func (aid AID) Anime() *Anime {
-       a, _ := caches.Get(animeCache).Get(int(aid)).(*Anime)
-       return a
+       var a Anime
+       if cache.Get(&a, "aid", aid) == nil {
+               return &a
+       }
+       return nil
 }
 
 type httpAnimeResponse struct {
@@ -50,19 +58,23 @@ type httpAnimeResponse struct {
 //
 // Note: This can take at least 4 seconds during heavy traffic.
 func (adb *AniDB) AnimeByID(aid AID) <-chan *Anime {
+       keys := []cacheKey{"aid", aid}
        ch := make(chan *Anime, 1)
 
-       anime := aid.Anime()
-       if !anime.IsStale() {
-               ch <- anime
-               close(ch)
+       ic := make(chan Cacheable, 1)
+       go func() { ch <- (<-ic).(*Anime); close(ch) }()
+       if intentMap.Intent(ic, keys...) {
                return ch
        }
 
-       ac := caches.Get(animeCache)
-       ic := make(chan Cacheable, 1)
-       go func() { ch <- (<-ic).(*Anime); close(ch) }()
-       if ac.Intent(int(aid), ic) {
+       if !cache.CheckValid(keys...) {
+               intentMap.Notify((*Anime)(nil), keys...)
+               return ch
+       }
+
+       anime := aid.Anime()
+       if !anime.IsStale() {
+               intentMap.Notify(anime, keys...)
                return ch
        }
 
@@ -98,6 +110,11 @@ func (adb *AniDB) AnimeByID(aid AID) <-chan *Anime {
                                        break Loop
                                }
                                if a := anime.populateFromHTTP(resp.anime); a == nil {
+                                       // HTTP ok but parsing not ok
+                                       if anime.PrimaryTitle == "" {
+                                               cache.MarkInvalid(keys...)
+                                       }
+
                                        ok = false
                                        break Loop
                                } else {
@@ -105,18 +122,21 @@ func (adb *AniDB) AnimeByID(aid AID) <-chan *Anime {
                                }
                                httpChan = nil
                        case reply := <-udpChan:
-                               anime.Incomplete = !anime.populateFromUDP(reply)
+                               if reply.Code() == 330 {
+                                       cache.MarkInvalid(keys...)
+                               } else {
+                                       anime.Incomplete = !anime.populateFromUDP(reply)
+                               }
                                udpChan = nil
                        }
                }
                if anime.PrimaryTitle != "" {
                        if ok {
-                               ac.Set(int(aid), anime)
-                       } else {
-                               ac.Flush(int(aid), anime)
+                               cache.Set(anime, keys...)
                        }
+                       intentMap.Notify(anime, keys...)
                } else {
-                       ac.Set(int(aid), (*Anime)(nil))
+                       intentMap.Notify((*Anime)(nil), keys...)
                }
        }()
        return ch
@@ -148,12 +168,6 @@ func (a *Anime) populateFromHTTP(reply httpapi.Anime) *Anime {
        for _, title := range reply.Titles {
                switch title.Type {
                case "main":
-                       if a.PrimaryTitle != "" {
-                               // We assume there's only ever one "main" title
-                               panic(
-                                       fmt.Sprintf("PrimaryTitle %q already set, new PrimaryTitle %q received!",
-                                               a.PrimaryTitle, title.Title))
-                       }
                        a.PrimaryTitle = title.Title
                case "official":
                        if a.OfficialTitles == nil {
diff --git a/cache.go b/cache.go
new file mode 100644 (file)
index 0000000..c22199f
--- /dev/null
+++ b/cache.go
@@ -0,0 +1,269 @@
+package anidb
+
+import (
+       "bytes"
+       "compress/gzip"
+       "encoding/gob"
+       "errors"
+       "fmt"
+       "io"
+       "log"
+       "os"
+       "path"
+       "reflect"
+       "regexp"
+       "sync"
+       "time"
+)
+
+var _ log.Logger
+
+type Cacheable interface {
+       // Updates the last modified time
+       Touch()
+       // Returns true if the Cacheable is nil, or if the last modified time is too old.
+       IsStale() bool
+}
+
+func init() {
+       gob.RegisterName("*github.com/Kovensky/go-anidb.invalidKeyCache", &invalidKeyCache{})
+}
+
+type invalidKeyCache struct{ time.Time }
+
+func (c *invalidKeyCache) Touch() {
+       c.Time = time.Now()
+}
+func (c *invalidKeyCache) IsStale() bool {
+       return time.Now().Sub(c.Time) > InvalidKeyCacheDuration
+}
+
+type cacheDir struct {
+       *sync.RWMutex
+
+       CacheDir string
+}
+
+func init() {
+       SetCacheDir(path.Join(os.TempDir(), "anidb", "cache"))
+}
+
+var cache cacheDir
+
+func SetCacheDir(path string) (err error) {
+       m := cache.RWMutex
+       if m == nil {
+               m = &sync.RWMutex{}
+               cache.RWMutex = m
+       }
+       cache.Lock()
+
+       if err = os.MkdirAll(path, 0755|os.ModeDir); err != nil {
+               cache.Unlock()
+               return err
+       }
+
+       cache = cacheDir{
+               RWMutex:  m,
+               CacheDir: path,
+       }
+
+       cache.Unlock()
+       RefreshTitles()
+       return nil
+}
+
+func GetCacheDir() (path string) {
+       cache.RLock()
+       defer cache.RUnlock()
+
+       return cache.CacheDir
+}
+
+type cacheKey interface{}
+
+// All "bad characters" that can't go in Windows paths.
+// It's a superset of the "bad characters" on other OSes, so this works.
+var badPath = regexp.MustCompile(`[\\/:\*\?\"<>\|]`)
+
+func stringify(stuff ...cacheKey) []string {
+       ret := make([]string, len(stuff))
+       for i := range stuff {
+               s := fmt.Sprint(stuff[i])
+               ret[i] = badPath.ReplaceAllLiteralString(s, "_")
+       }
+       return ret
+}
+
+// Each key but the last is treated as a directory.
+// The last key is treated as a regular file.
+//
+// This also means that cache keys that are file-backed
+// cannot have subkeys.
+func cachePath(keys ...cacheKey) string {
+       parts := append([]string{GetCacheDir()}, stringify(keys...)...)
+       p := path.Join(parts...)
+       return p
+}
+
+// Opens the file that backs the specified keys.
+func (c *cacheDir) Open(keys ...cacheKey) (fh *os.File, err error) {
+       subItem := cachePath(keys...)
+       return os.Open(subItem)
+}
+
+// Creates a new file to back the specified keys.
+func (c *cacheDir) Create(keys ...cacheKey) (fh *os.File, err error) {
+       subItem := cachePath(keys...)
+       subDir := path.Dir(subItem)
+
+       if err = os.MkdirAll(subDir, 0755|os.ModeDir); err != nil {
+               return nil, err
+       }
+       return os.Create(subItem)
+}
+
+// Deletes the file that backs the specified keys.
+func (c *cacheDir) Delete(keys ...cacheKey) (err error) {
+       return os.Remove(cachePath(keys...))
+}
+
+// Deletes the specified key and all subkeys.
+func (c *cacheDir) DeleteAll(keys ...cacheKey) (err error) {
+       return os.RemoveAll(cachePath(keys...))
+}
+
+func (c *cacheDir) Get(v Cacheable, keys ...cacheKey) (err error) {
+       defer func() {
+               log.Println("Got entry", keys, "(error", err, ")")
+       }()
+       flock := lockFile(cachePath(keys...))
+       flock.Lock()
+       defer flock.Unlock()
+
+       fh, err := c.Open(keys...)
+       if err != nil {
+               return err
+       }
+       defer func() {
+               if e := fh.Close(); err == nil {
+                       err = e
+               }
+       }()
+
+       val := reflect.ValueOf(v)
+       if k := val.Kind(); k == reflect.Ptr || k == reflect.Interface {
+               val = val.Elem()
+       }
+       if !val.CanSet() {
+               // panic because this is an internal coding mistake
+               panic("(*cacheDir).Get(): given Cacheable is not setable")
+       }
+       gz, err := gzip.NewReader(fh)
+       if err != nil {
+               return err
+       }
+       defer func() {
+               if e := gz.Close(); err == nil {
+                       err = e
+               }
+       }()
+
+       // defer func() {
+       //      if err == io.EOF {
+       //              err = nil
+       //      }
+       // }()
+
+       switch f := gz.Header.Comment; f {
+       case "encoding/gob":
+               dec := gob.NewDecoder(gz)
+               err = dec.Decode(v)
+       default:
+               return errors.New(fmt.Sprintf("Cached data (format %q) is not in a known format", f))
+       }
+
+       return
+}
+
+func (c *cacheDir) Set(v Cacheable, keys ...cacheKey) (n int64, err error) {
+       if v := reflect.ValueOf(v); !v.IsValid() {
+               panic("reflect.ValueOf() returned invaled value")
+       } else if k := v.Kind(); k == reflect.Ptr || k == reflect.Interface {
+               if v.IsNil() {
+                       return // no point in saving nil
+               }
+       }
+       defer func() {
+               log.Println("Set entry", keys, "(error", err, ")")
+       }()
+
+       // First we encode to memory -- we don't want to create/truncate a file and put bad data in it.
+       buf := bytes.Buffer{}
+       gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
+       if err != nil {
+               return 0, err
+       }
+       gz.Header.Comment = "encoding/gob"
+
+       // it doesn't matter if the caller doesn't see this,
+       // the important part is that the cache does.
+       v.Touch()
+
+       enc := gob.NewEncoder(gz)
+       err = enc.Encode(v)
+
+       if e := gz.Close(); err == nil {
+               err = e
+       }
+
+       if err != nil {
+               return 0, err
+       }
+
+       // We have good data, time to actually put it in the cache
+       flock := lockFile(cachePath(keys...))
+       flock.Lock()
+       defer flock.Unlock()
+
+       fh, err := c.Create(keys...)
+       if err != nil {
+               return 0, err
+       }
+       defer func() {
+               if e := fh.Close(); err == nil {
+                       err = e
+               }
+       }()
+       n, err = io.Copy(fh, &buf)
+       return
+}
+
+// Checks if the given keys are not marked as invalid.
+//
+// If the key was marked as invalid but is no longer considered
+// so, deletes the invalid marker.
+func (c *cacheDir) CheckValid(keys ...cacheKey) bool {
+       invKeys := append([]cacheKey{"invalid"}, keys...)
+       inv := invalidKeyCache{}
+
+       if cache.Get(&inv, invKeys...) == nil {
+               if inv.IsStale() {
+                       cache.Delete(invKeys...)
+               } else {
+                       return false
+               }
+       }
+       return true
+}
+
+// Deletes the given keys and marks them as invalid.
+//
+// They are considered invalid for InvalidKeyCacheDuration.
+func (c *cacheDir) MarkInvalid(keys ...cacheKey) error {
+       invKeys := append([]cacheKey{"invalid"}, keys...)
+
+       cache.Delete(keys...)
+       _, err := cache.Set(&invalidKeyCache{}, invKeys...)
+       return err
+}
diff --git a/cache_test.go b/cache_test.go
new file mode 100644 (file)
index 0000000..41c49b3
--- /dev/null
@@ -0,0 +1,82 @@
+package anidb
+
+import (
+       "encoding/gob"
+       "os"
+       "path"
+       "reflect"
+       "testing"
+)
+
+type stringifyVec struct {
+       result []string
+       data   []interface{}
+}
+
+func TestStringify(T *testing.T) {
+       T.Parallel()
+
+       vec := []stringifyVec{
+               stringifyVec{[]string{"a"}, []interface{}{"a"}},
+       }
+       for i, v := range vec {
+               str := stringify(v.data...)
+               if !reflect.DeepEqual(v.result, str) {
+                       T.Errorf("Vector #%d: Expected %v, got %v", i+1, v.result, str)
+               }
+       }
+}
+
+type cachePathVec struct {
+       path string
+       data []interface{}
+}
+
+var testDir = path.Join(os.TempDir(), "testing", "anidb")
+
+func init() { SetCacheDir(testDir) }
+
+func TestCachePath(T *testing.T) {
+       T.Parallel()
+
+       vec := []cachePathVec{
+               cachePathVec{path.Join(testDir, "a"), []interface{}{"a"}},
+               cachePathVec{path.Join(testDir, "b", "c", "d"), []interface{}{"b", "c", "d"}},
+       }
+       for i, v := range vec {
+               str := cachePath(v.data...)
+
+               if v.path != str {
+                       T.Errorf("Vector #%d: Expected %v, got %v", i+1, v.path, str)
+               }
+       }
+}
+
+type testString string
+
+func (_ testString) Touch()        {}
+func (_ testString) IsStale() bool { return false }
+
+func init() {
+       gob.Register(testString(""))
+}
+
+func TestCacheRoundtrip(T *testing.T) {
+       T.Parallel()
+
+       test := testString("some string")
+       _, err := cache.Set(test, "test", "string")
+       if err != nil {
+               T.Fatalf("Error storing: %v", err)
+       }
+
+       var t2 testString
+       err = cache.Get(&t2, "test", "string")
+       if err != nil {
+               T.Errorf("Error reading: %v", err)
+       }
+
+       if test != t2 {
+               T.Errorf("Expected %q, got %q", test, t2)
+       }
+}
diff --git a/caches.go b/caches.go
deleted file mode 100644 (file)
index ddb38c2..0000000
--- a/caches.go
+++ /dev/null
@@ -1,304 +0,0 @@
-package anidb
-
-import (
-       "bytes"
-       "compress/gzip"
-       "encoding/gob"
-       "errors"
-       "io"
-       "log"
-       "os"
-       "reflect"
-       "sync"
-)
-
-// Loads caches from the given path.
-func LoadCachesFromFile(f string) (err error) {
-       fh, err := os.Open(f)
-
-       if err != nil {
-               return err
-       }
-       defer fh.Close()
-       return LoadCaches(fh)
-}
-
-const cacheMajorVersion = 0
-const cacheMinorVersion = 0
-
-type cacheDataVer struct {
-       ver  int
-       data interface{}
-}
-
-type lockable interface {
-       Lock()
-       Unlock()
-}
-
-type rlockable interface {
-       lockable
-       RLock()
-       RUnlock()
-}
-
-func getLocks() []lockable {
-       return []lockable{
-               // caches is special-cased
-               &eidAidLock,
-               &ed2kFidLock,
-               &banTimeLock,
-               &titlesFileDataLock,
-       }
-}
-
-func getCacheData() []cacheDataVer {
-       return []cacheDataVer{
-               cacheDataVer{0, &titlesFileData},
-               cacheDataVer{0, &caches.Caches},
-               cacheDataVer{0, &eidAidMap},
-               cacheDataVer{0, &ed2kFidMap},
-               cacheDataVer{0, &banTime}}
-}
-
-// Loads caches from the given io.Reader.
-func LoadCaches(r io.Reader) (err error) {
-       defer func() { log.Println("Loaded with error", err) }()
-
-       caches.LockAll()        // no defer UnlockAll -- the mutexes get reset
-       defer caches.m.Unlock() // but we need to unlock this
-       for _, lock := range getLocks() {
-               lock.Lock()
-               defer lock.Unlock()
-       }
-
-       // make sure caches' mutexes are reset even on a decoding failure
-       defer func() {
-               for _, cache := range caches.Caches {
-                       cache.m = sync.RWMutex{}
-               }
-       }()
-
-       gz, err := gzip.NewReader(r)
-       if err != nil {
-               return err
-       }
-
-       dec := gob.NewDecoder(gz)
-       version := 0
-
-       if err = dec.Decode(&version); err != nil {
-               return err
-       }
-
-       if version != cacheMajorVersion {
-               return errors.New("Cache major version mismatch")
-       }
-
-       defer func() {
-               titlesDB.LoadDB(bytes.NewReader(titlesFileData))
-
-               for _, cache := range caches.Caches {
-                       cache.intent = make(map[int][]chan Cacheable)
-               }
-       }()
-
-       version = 0
-       for _, v := range append([]cacheDataVer{
-               cacheDataVer{0, &version}}, getCacheData()...) {
-               if v.ver > version {
-                       break
-               }
-               if err = dec.Decode(v.data); err != nil {
-                       return err
-               }
-       }
-
-       if version != cacheMinorVersion {
-               return errors.New("Cache minor version mismatch")
-       }
-       return nil
-}
-
-// Saves caches to the given path.
-func DumpCachesToFile(f string) (err error) {
-       fh, err := os.Create(f)
-       if err != nil {
-               return err
-       }
-       defer fh.Close()
-       return DumpCaches(fh)
-}
-
-// Saves caches to the given io.Writer.
-//
-// The cache is a gzipped, versioned gob of the various internal
-// caches.
-func DumpCaches(w io.Writer) (err error) {
-       defer func() { log.Println("Dumped with error", err) }()
-
-       caches.RLockAll()
-       defer caches.RUnlockAll()
-       for _, lock := range getLocks() {
-               if l, ok := lock.(rlockable); ok {
-                       l.RLock()
-                       defer l.RUnlock()
-               } else {
-                       lock.Lock()
-                       defer lock.Unlock()
-               }
-       }
-
-       gz, err := gzip.NewWriterLevel(w, gzip.BestCompression)
-       if err != nil {
-               return err
-       }
-       defer gz.Close()
-
-       enc := gob.NewEncoder(gz)
-
-       for _, v := range append([]cacheDataVer{
-               cacheDataVer{0, cacheMajorVersion},
-               cacheDataVer{0, cacheMinorVersion},
-       }, getCacheData()...) {
-               if err = enc.Encode(v.data); err != nil {
-                       return err
-               }
-       }
-
-       return nil
-}
-
-type Cacheable interface {
-       // Updates the last modified time
-       Touch()
-       // Returns true if the Cacheable is nil, or if the last modified time is too old.
-       IsStale() bool
-}
-
-var caches = initCacheMap()
-
-type cacheMap struct {
-       m      sync.RWMutex
-       Caches map[cacheType]*baseCache
-}
-
-type cacheType int
-
-const (
-       animeCache = cacheType(iota)
-       episodeCache
-       groupCache
-       fileCache
-)
-
-func initCacheMap() *cacheMap {
-       return &cacheMap{
-               Caches: map[cacheType]*baseCache{
-                       animeCache:   newBaseCache(),
-                       episodeCache: newBaseCache(),
-                       groupCache:   newBaseCache(),
-                       fileCache:    newBaseCache(),
-               },
-       }
-}
-
-func (c *cacheMap) Get(typ cacheType) *baseCache {
-       c.m.RLock()
-       defer c.m.RUnlock()
-
-       return c.Caches[typ]
-}
-
-func (c *cacheMap) LockAll() {
-       c.m.Lock()
-
-       for _, cache := range c.Caches {
-               cache.m.Lock()
-       }
-}
-func (c *cacheMap) UnlockAll() {
-       c.m.Unlock()
-
-       for _, cache := range c.Caches {
-               cache.m.Unlock()
-       }
-}
-
-func (c *cacheMap) RLockAll() {
-       c.m.RLock()
-
-       for _, cache := range c.Caches {
-               cache.m.RLock()
-       }
-}
-func (c *cacheMap) RUnlockAll() {
-       c.m.RUnlock()
-
-       for _, cache := range c.Caches {
-               cache.m.RUnlock()
-       }
-}
-
-type baseCache struct {
-       m      sync.RWMutex
-       Cache  map[int]Cacheable
-       intent map[int][]chan Cacheable
-}
-
-func newBaseCache() *baseCache {
-       return &baseCache{
-               Cache:  make(map[int]Cacheable),
-               intent: make(map[int][]chan Cacheable),
-       }
-}
-
-func (c *baseCache) Get(id int) Cacheable {
-       c.m.RLock()
-       defer c.m.RUnlock()
-
-       return c.Cache[id]
-}
-
-// Sends the Cacheable to all channels that registered
-// Intent and clears the Intent list.
-func (c *baseCache) Flush(id int, v Cacheable) {
-       c.m.Lock()
-       defer c.m.Unlock()
-
-       c._flush(id, v)
-}
-
-func (c *baseCache) _flush(id int, v Cacheable) {
-       for _, ch := range c.intent[id] {
-               ch <- v
-               close(ch)
-       }
-       delete(c.intent, id)
-}
-
-// Caches if v is not nil and then Flushes the Intents.
-func (c *baseCache) Set(id int, v Cacheable) {
-       c.m.Lock()
-       defer c.m.Unlock()
-
-       if !reflect.ValueOf(v).IsNil() {
-               v.Touch()
-               c.Cache[id] = v
-       }
-
-       c._flush(id, v)
-}
-
-// Register the Intent to get the cache data for this id when
-// it's available. Returns false if the caller was the first
-// to register it.
-func (c *baseCache) Intent(id int, ch chan Cacheable) (ok bool) {
-       c.m.Lock()
-       defer c.m.Unlock()
-
-       list, ok := c.intent[id]
-       c.intent[id] = append(list, ch)
-
-       return ok
-}
diff --git a/cachetemplate.go b/cachetemplate.go
deleted file mode 100644 (file)
index c4fd713..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-// +build never
-
-package anidb
-
-// Copy&paste this for new cache types
-// globally replace: Strut strut SID sid
-
-import (
-       "sync"
-       "time"
-)
-
-type Strut struct {
-       Cached time.Time
-}
-
-func (v *Strut) touch() {
-       v.Cached = time.Now()
-}
-func (v *Strut) isStale(d time.Duration) bool {
-       return time.Now().Sub(v.Cached) > d
-}
-
-type SID int
-
-func (sid SID) Strut() *Strut {
-       return strutCache.Get(sid)
-}
-
-var StrutCacheDuration = DefaultCacheDuration
-
-var strutCache = strutCacheStruct{baseCache: newBaseCache()}
-
-type strutCacheStruct struct{ baseCache }
-
-func (c *strutCacheStruct) Get(id SID) *Strut {
-       return c.baseCache.Get(int(id)).(*Strut)
-}
-
-func (c *strutCacheStruct) Set(id SID, v *Strut) {
-       c.baseCache.Set(int(id), v)
-}
-
-func (c *strutCacheStruct) Intent(id SID, ch chan *Strut) (ok bool) {
-       ch2 := make(chan cacheable, 1)
-       go func() { ch <- (<-ch2).(*Strut) }()
-       return c.baseCache.Intent(int(id), ch2)
-}
-
-func (adb *AniDB) StrutBySID(id SID) <-chan *Strut {
-       ch := make(chan *Strut, 1)
-       if v := id.Strut(); !v.isStale(StrutCacheDuration) {
-               ch <- v
-               close(ch)
-               return ch
-       }
-
-       if strutCache.Intent(id, ch) {
-               return ch
-       }
-
-       go func() {
-               var v *Strut
-               strutCache.Set(id, v)
-       }()
-       return ch
-}
index ff177770a12f50046996c4d7fb415927d080f00f..008dd98265c12353137b873156a72e25420aba4f 100644 (file)
@@ -4,7 +4,6 @@ import (
        "encoding/gob"
        "strconv"
        "strings"
-       "sync"
        "time"
 )
 
@@ -23,44 +22,45 @@ func (e *Episode) IsStale() bool {
        return time.Now().Sub(e.Cached) > EpisodeCacheDuration
 }
 
-var eidAidMap = map[EID]AID{}
-var eidAidLock = sync.RWMutex{}
-
 // Unique Episode IDentifier.
 type EID int
 
 // Retrieves the Episode corresponding to this EID from the cache.
 func (eid EID) Episode() *Episode {
-       e, _ := caches.Get(episodeCache).Get(int(eid)).(*Episode)
-       return e
+       var e Episode
+       if cache.Get(&e, "eid", eid) == nil {
+               return &e
+       }
+       return nil
 }
 
 func cacheEpisode(ep *Episode) {
-       eidAidLock.Lock()
-       defer eidAidLock.Unlock()
-
-       eidAidMap[ep.EID] = ep.AID
-       caches.Get(episodeCache).Set(int(ep.EID), ep)
+       cache.Set(ep.AID, "aid", "by-eid", ep.EID)
+       cache.Set(ep, "eid", ep.EID)
 }
 
 // Retrieves the Episode from the cache if possible.
 //
 // If the result is stale, then queries the UDP API to
-// know which AID owns this EID, then gets the episodes
+// know which AID owns this EID, then gets the episode
 // from the Anime.
 func (adb *AniDB) EpisodeByID(eid EID) <-chan *Episode {
+       keys := []cacheKey{"eid", eid}
        ch := make(chan *Episode, 1)
 
-       if e := eid.Episode(); e != nil && !e.IsStale() {
-               ch <- e
-               close(ch)
+       ic := make(chan Cacheable, 1)
+       go func() { ch <- (<-ic).(*Episode); close(ch) }()
+       if intentMap.Intent(ic, keys...) {
                return ch
        }
 
-       ec := caches.Get(episodeCache)
-       ic := make(chan Cacheable, 1)
-       go func() { ch <- (<-ic).(*Episode); close(ch) }()
-       if ec.Intent(int(eid), ic) {
+       if !cache.CheckValid(keys...) {
+               intentMap.Notify((*Episode)(nil), keys...)
+               return ch
+       }
+
+       if e := eid.Episode(); !e.IsStale() {
+               intentMap.Notify(e, keys...)
                return ch
        }
 
@@ -68,9 +68,8 @@ func (adb *AniDB) EpisodeByID(eid EID) <-chan *Episode {
                // The UDP API data is worse than the HTTP API anime data,
                // try and get from the corresponding Anime
 
-               eidAidLock.RLock()
-               aid, ok := eidAidMap[eid]
-               eidAidLock.RUnlock()
+               aid := AID(0)
+               ok := cache.Get(&aid, "aid", "by-eid", eid) == nil
 
                udpDone := false
 
@@ -92,6 +91,8 @@ func (adb *AniDB) EpisodeByID(eid EID) <-chan *Episode {
                                                ok = true
                                                aid = AID(id)
                                        }
+                               } else if reply.Code() == 340 {
+                                       cache.MarkInvalid(keys...)
                                } else {
                                        break
                                }
@@ -104,14 +105,11 @@ func (adb *AniDB) EpisodeByID(eid EID) <-chan *Episode {
                                break
                        } else {
                                // if this is somehow still a miss, then the EID<->AID map broke
-                               eidAidLock.Lock()
-                               delete(eidAidMap, eid)
-                               eidAidLock.Unlock()
-
+                               cache.Delete("aid", "by-eid", eid)
                                ok = false
                        }
                }
-               // Caching (and channel broadcasting) done by AnimeByID
+               intentMap.Notify(e, keys...)
        }()
        return ch
 }
index e3ae93c03911d5f4d84fdcf5373fc89e1a0c0476..e2b0672b21a5caf3403a93e3e88a5e9bb0c59b18 100644 (file)
@@ -2,19 +2,19 @@ package anidb
 
 import (
        "encoding/gob"
-       "fmt"
        "github.com/Kovensky/go-anidb/misc"
        "github.com/Kovensky/go-anidb/udp"
        "image"
        "log"
        "strconv"
        "strings"
-       "sync"
        "time"
 )
 
 func init() {
        gob.RegisterName("*github.com/Kovensky/go-anidb.File", &File{})
+       gob.RegisterName("*github.com/Kovensky/go-anidb.ed2kCache", &ed2kCache{})
+       gob.RegisterName("github.com/Kovensky/go-anidb.FID", FID(0))
 }
 
 func (f *File) Touch() {
@@ -33,21 +33,29 @@ func (f *File) IsStale() bool {
 
 type FID int
 
+// make FID Cacheable
+func (e FID) Touch()        {}
+func (e FID) IsStale() bool { return false }
+
 func (fid FID) File() *File {
-       f, _ := caches.Get(fileCache).Get(int(fid)).(*File)
-       return f
+       var f File
+       if cache.Get(&f, "fid", fid) == nil {
+               return &f
+       }
+       return nil
 }
 
-func ed2kKey(ed2k string, size int64) string {
-       return fmt.Sprintf("%s-%016x", ed2k, size)
+type ed2kCache struct {
+       FID
+       time.Time
 }
 
-func ed2kCache(f *File) {
-       if f != nil {
-               ed2kFidLock.Lock()
-               defer ed2kFidLock.Unlock()
-               ed2kFidMap[ed2kKey(f.Ed2kHash, f.Filesize)] = f.FID
-       }
+func (c *ed2kCache) Touch() {
+       c.Time = time.Now()
+}
+
+func (c *ed2kCache) IsStale() bool {
+       return time.Now().Sub(c.Time) > FileCacheDuration
 }
 
 // Prefetches the Anime, Episode and Group that this
@@ -71,22 +79,24 @@ func (f *File) Prefetch(adb *AniDB) <-chan *File {
        return ch
 }
 
-var ed2kFidMap = map[string]FID{}
-var ed2kIntent = map[string][]chan *File{}
-var ed2kFidLock = sync.RWMutex{}
-
 func (adb *AniDB) FileByID(fid FID) <-chan *File {
+       keys := []cacheKey{"fid", fid}
+
        ch := make(chan *File, 1)
-       if f := fid.File(); !f.IsStale() {
-               ch <- f
-               close(ch)
-               return ch
-       }
 
-       fc := caches.Get(fileCache)
        ic := make(chan Cacheable, 1)
        go func() { ch <- (<-ic).(*File); close(ch) }()
-       if fc.Intent(int(fid), ic) {
+       if intentMap.Intent(ic, keys...) {
+               return ch
+       }
+
+       if !cache.CheckValid(keys...) {
+               intentMap.Notify((*File)(nil), keys...)
+               return ch
+       }
+
+       if f := fid.File(); !f.IsStale() {
+               intentMap.Notify(f, keys...)
                return ch
        }
 
@@ -101,35 +111,44 @@ func (adb *AniDB) FileByID(fid FID) <-chan *File {
                var f *File
                if reply.Error() == nil {
                        f = parseFileResponse(reply)
+               } else if reply.Code() == 320 {
+                       cache.MarkInvalid(keys...)
+               }
+               if f != nil {
+                       cache.Set(&ed2kCache{FID: f.FID}, "fid", "by-ed2k", f.Ed2kHash, f.Filesize)
+                       cache.Set(f, keys...)
                }
-               ed2kCache(f)
-               fc.Set(int(fid), f)
+               intentMap.Notify(f, keys...)
        }()
        return ch
 }
 
 func (adb *AniDB) FileByEd2kSize(ed2k string, size int64) <-chan *File {
-       key := ed2kKey(ed2k, size)
+       keys := []cacheKey{"fid", "by-ed2k", ed2k, size}
+
        ch := make(chan *File, 1)
 
-       ed2kFidLock.RLock()
-       if fid, ok := ed2kFidMap[key]; ok {
-               ed2kFidLock.RUnlock()
-               if f := fid.File(); f != nil {
-                       ch <- f
-                       close(ch)
-                       return ch
+       ic := make(chan Cacheable, 1)
+       go func() {
+               fid := (<-ic).(FID)
+               if fid > 0 {
+                       ch <- <-adb.FileByID(fid)
                }
-               return adb.FileByID(fid)
+               close(ch)
+       }()
+       if intentMap.Intent(ic, keys...) {
+               return ch
+       }
+
+       if !cache.CheckValid(keys...) {
+               intentMap.Notify(FID(0), keys...)
+               return ch
        }
-       ed2kFidLock.RUnlock()
 
-       ed2kFidLock.Lock()
-       if list, ok := ed2kIntent[key]; ok {
-               ed2kIntent[key] = append(list, ch)
+       var ec ed2kCache
+       if cache.Get(&ec, keys...) == nil {
+               intentMap.Notify(ec.FID, keys...)
                return ch
-       } else {
-               ed2kIntent[key] = append(list, ch)
        }
 
        go func() {
@@ -141,28 +160,22 @@ func (adb *AniDB) FileByEd2kSize(ed2k string, size int64) <-chan *File {
                                "amask": fileAmask,
                        })
 
+               fid := FID(0)
                var f *File
                if reply.Error() == nil {
                        f = parseFileResponse(reply)
 
-                       ed2kCache(f)
-                       caches.Get(fileCache).Set(int(f.FID), f)
+                       fid = f.FID
+
+                       cache.Set(&ed2kCache{FID: fid}, keys...)
+                       cache.Set(f, "fid", fid)
                } else if reply.Code() == 320 { // file not found
-                       ed2kFidLock.Lock()
-                       delete(ed2kFidMap, key)
-                       ed2kFidLock.Unlock()
+                       cache.MarkInvalid(keys...)
                } else if reply.Code() == 322 { // multiple files found
                        panic("Don't know what to do with " + strings.Join(reply.Lines(), "\n"))
                }
 
-               ed2kFidLock.Lock()
-               defer ed2kFidLock.Unlock()
-
-               for _, ch := range ed2kIntent[key] {
-                       ch <- f
-                       close(ch)
-               }
-               delete(ed2kIntent, key)
+               intentMap.Notify(fid, keys...)
        }()
        return ch
 }
diff --git a/flock.go b/flock.go
new file mode 100644 (file)
index 0000000..6df0b2b
--- /dev/null
+++ b/flock.go
@@ -0,0 +1,8 @@
+package anidb
+
+type fileLock interface {
+       Lock() error
+       Unlock() error
+}
+
+// func lockFile(p path) fileLock
diff --git a/flock_other.go b/flock_other.go
new file mode 100644 (file)
index 0000000..2250a19
--- /dev/null
@@ -0,0 +1,31 @@
+// +build !windows
+
+package anidb
+
+import "github.com/tgulacsi/go-locking"
+
+type flockLock struct {
+       locking.FLock
+}
+
+func lockFile(p path) fileLock {
+       flock, err := locking.NewFLock(p)
+       if err != nil {
+               return &flockLock{FLock: flock}
+       }
+       return nil
+}
+
+func (fl *flockLock) Lock() error {
+       if fl != nil {
+               return fl.FLock.Lock()
+       }
+       return nil
+}
+
+func (fl *flockLock) Unlock() error {
+       if fl != nil {
+               return fl.FLock.Unlock()
+       }
+       return nil
+}
diff --git a/flock_windows.go b/flock_windows.go
new file mode 100644 (file)
index 0000000..cce4706
--- /dev/null
@@ -0,0 +1,12 @@
+package anidb
+
+type winFileLock struct{}
+
+func lockFile(p string) fileLock {
+       return &winFileLock{}
+}
+
+// empty implementations -- go-locking doesn't support windows
+// windows also does file locking on its own
+func (_ *winFileLock) Lock() error   { return nil }
+func (_ *winFileLock) Unlock() error { return nil }
index 4c0c1b19442c7e1dfea1f8736eaadcba291e0524..e67cd59943640e4e4ad098871bbc748fc59b03c6 100644 (file)
@@ -28,8 +28,11 @@ type GID int
 
 // Retrieves the Group from the cache.
 func (gid GID) Group() *Group {
-       g, _ := caches.Get(groupCache).Get(int(gid)).(*Group)
-       return g
+       var g Group
+       if cache.Get(&g, "gid", gid) == nil {
+               return &g
+       }
+       return nil
 }
 
 // Returns a Group from the cache if possible.
@@ -37,19 +40,17 @@ func (gid GID) Group() *Group {
 // If the Group is stale, then retrieves the Group
 // through the UDP API.
 func (adb *AniDB) GroupByID(gid GID) <-chan *Group {
+       keys := []cacheKey{"gid", gid}
        ch := make(chan *Group, 1)
-       if g := gid.Group(); !g.IsStale() {
-               ch <- g
-               close(ch)
-               return ch
-       }
-
-       gc := caches.Get(groupCache)
 
        ic := make(chan Cacheable, 1)
        go func() { ch <- (<-ic).(*Group); close(ch) }()
+       if intentMap.Intent(ic, keys...) {
+               return ch
+       }
 
-       if gc.Intent(int(gid), ic) {
+       if g := gid.Group(); !g.IsStale() {
+               intentMap.Notify(g, keys...)
                return ch
        }
 
@@ -130,7 +131,8 @@ func (adb *AniDB) GroupByID(gid GID) <-chan *Group {
                                Cached: time.Now(),
                        }
                }
-               gc.Set(int(gid), g)
+               cache.Set(g, keys...)
+               intentMap.Notify(g, keys...)
        }()
        return ch
 }
diff --git a/intent.go b/intent.go
new file mode 100644 (file)
index 0000000..853d827
--- /dev/null
+++ b/intent.go
@@ -0,0 +1,63 @@
+package anidb
+
+import "sync"
+
+type intentStruct struct {
+       sync.Mutex
+       chs []chan Cacheable
+}
+
+type intentMapStruct struct {
+       sync.Mutex
+       m map[string]*intentStruct
+}
+
+var intentMap = &intentMapStruct{
+       m: map[string]*intentStruct{},
+}
+
+// Register a channel to be notified when the specified keys are notified.
+//
+// Cache checks should be done after registering intent, since it's possible to
+// register Intent while a Notify is running, and the Notify is done after
+// setting the cache.
+func (m *intentMapStruct) Intent(ch chan Cacheable, keys ...cacheKey) bool {
+       key := cachePath(keys...)
+
+       m.Lock()
+       s, ok := m.m[key]
+       if !ok {
+               s = &intentStruct{}
+               m.m[key] = s
+       }
+       m.Unlock()
+
+       s.Lock()
+       s.chs = append(s.chs, ch)
+       s.Unlock()
+
+       return ok
+}
+
+// Notify all channels that are listening for the specified keys.
+//
+// Should be called after setting the cache.
+func (m *intentMapStruct) Notify(v Cacheable, keys ...cacheKey) {
+       key := cachePath(keys...)
+
+       m.Lock()
+       defer m.Unlock()
+       s, ok := m.m[key]
+       if !ok {
+               return
+       }
+
+       s.Lock()
+       defer s.Unlock()
+
+       for _, ch := range s.chs {
+               go func(c chan Cacheable) { c <- v }(ch)
+       }
+
+       delete(m.m, key)
+}
diff --git a/misc.go b/misc.go
index ad8c51740516d52359c3f5e949288528dd7bcdeb..f738d2883541b42221a10c45c06bef28e8cbd90f 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -15,6 +15,8 @@ var (
        GroupCacheDuration   = 4 * DefaultCacheDuration // They don't change that often.
        FileCacheDuration    = 8 * DefaultCacheDuration // These change even less often.
 
+       InvalidKeyCacheDuration = 1 * time.Hour
+
        // Used when the UDP API Anime query fails, but the HTTP API query succeeds.
        AnimeIncompleteCacheDuration = 24 * time.Hour
 
index 26a2fd48166467c7c2dcb053f3dfa398350c87c9..1100df085bc1eed4c4fa23863821e7529ce8dc7a 100644 (file)
@@ -1,44 +1,30 @@
 package anidb
 
 import (
-       "bytes"
        "github.com/Kovensky/go-anidb/titles"
        "io"
        "net/http"
-       "sync"
        "time"
 )
 
-var titlesFileData []byte
-var titlesFileDataLock sync.Mutex
 var titlesDB = &titles.TitlesDatabase{}
 
-// Loads the anime-titles database from the given io.Reader.
-//
-// Caches the io.Reader's contents on memory, which gets saved
-// by DumpCaches.
-func LoadTitles(src io.Reader) error {
-       buf := bytes.Buffer{}
-       _, err := io.Copy(&buf, src)
-       if err != nil && err != io.EOF {
+// Reloads titles from the cache.
+func RefreshTitles() error {
+       flock := lockFile(cachePath("anime-titles.dat.gz"))
+       flock.Lock()
+       defer flock.Unlock()
+
+       fh, err := cache.Open("anime-titles.dat.gz")
+       if err != nil {
                return err
        }
+       defer fh.Close()
 
-       titlesFileDataLock.Lock()
-       defer titlesFileDataLock.Unlock()
-
-       titlesFileData = buf.Bytes()
-
-       titlesDB.LoadDB(bytes.NewReader(titlesFileData))
-
+       titlesDB.LoadDB(fh)
        return nil
 }
 
-// Saves the currently cached anime-titles database to the given io.Writer.
-func DumpTitles(dst io.Writer) (int64, error) {
-       return io.Copy(dst, bytes.NewReader(titlesFileData))
-}
-
 // Returns true if the titles database is up-to-date (newer than 24 hours).
 func TitlesUpToDate() (ok bool) {
        return time.Now().Sub(titlesDB.UpdateTime) < 24*time.Hour
@@ -53,11 +39,27 @@ func UpdateTitles() error {
                return nil
        }
 
-       resp, err := http.Get(titles.DataDumpURL)
+       flock := lockFile(cachePath("anime-titles.dat.gz"))
+       flock.Lock()
+       defer flock.Unlock()
+
+       c := &http.Client{Transport: &http.Transport{DisableCompression: true}}
+
+       resp, err := c.Get(titles.DataDumpURL)
        if err != nil {
                return err
        }
        defer resp.Body.Close()
 
-       return LoadTitles(resp.Body)
+       fh, err := cache.Create("anime-titles.dat.gz")
+       if err != nil {
+               return err
+       }
+
+       _, err = io.Copy(fh, resp.Body)
+       if err != nil {
+               return err
+       }
+
+       return RefreshTitles()
 }
diff --git a/udp.go b/udp.go
index 433c9da36219e45f802c084f2a424ab30b9f9542..abd8a3aa1856e7c4168f3f88ec7f7df044555805 100644 (file)
--- a/udp.go
+++ b/udp.go
@@ -1,26 +1,40 @@
 package anidb
 
 import (
+       "encoding/gob"
        "github.com/Kovensky/go-anidb/udp"
-       "sync"
        "time"
 )
 
-var banTime time.Time
-var banTimeLock sync.Mutex
+func init() {
+       gob.RegisterName("*github.com/Kovensky/go-anidb.banCache", &banCache{})
+}
 
 const banDuration = 30*time.Minute + 1*time.Second
 
+type banCache struct{ time.Time }
+
+func (c *banCache) Touch() {
+       c.Time = time.Now()
+}
+func (c *banCache) IsStale() bool {
+       return time.Now().Sub(c.Time) > banDuration
+}
+
 // Returns whether the last UDP API access returned a 555 BANNED message.
 func Banned() bool {
-       banTimeLock.Lock()
-       banTimeLock.Unlock()
+       var banTime banCache
+       cache.Get(&banTime, "banned")
 
-       return _banned()
+       stale := banTime.IsStale()
+       if stale {
+               cache.Delete("banned")
+       }
+       return !stale
 }
 
-func _banned() bool {
-       return time.Now().Sub(banTime) > banDuration
+func setBanned() {
+       cache.Set(&banCache{}, "banned")
 }
 
 type paramSet struct {
@@ -85,11 +99,7 @@ func (udp *udpWrap) sendQueue() {
                case 503, 504: // client library rejected
                        panic(reply.Error())
                case 555: // IP (and user, possibly client) temporarily banned
-                       banTimeLock.Lock()
-
-                       banTime = time.Now()
-
-                       banTimeLock.Unlock()
+                       setBanned()
                }
                set.ch <- reply
                close(set.ch)
@@ -99,11 +109,7 @@ func (udp *udpWrap) sendQueue() {
 func (udp *udpWrap) SendRecv(cmd string, params paramMap) <-chan udpapi.APIReply {
        ch := make(chan udpapi.APIReply, 1)
 
-       banTimeLock.Lock()
-       defer banTimeLock.Unlock()
-       if _banned() {
-               banTime = time.Time{}
-       } else {
+       if Banned() {
                ch <- bannedReply
                close(ch)
                return ch