From e54415863fb1aaa9a12d4a351557667f2d668c30 Mon Sep 17 00:00:00 2001 From: "Diogo Franco (Kovensky)" Date: Sun, 14 Jul 2013 20:16:42 -0300 Subject: [PATCH] anidb: New caching mechanism Refactors the various parts of the code that interacted with the cache to use the new cache. Moves the intent system to be decoupled from the cache. Caches requests made with invalid keys, avoids requerying the API for InvalidKeyCacheDuration (1 hour). Untested on !windows, since that uses a separate file locking mechanism. --- animecache.go | 56 +++++---- cache.go | 269 +++++++++++++++++++++++++++++++++++++++++ cache_test.go | 82 +++++++++++++ caches.go | 304 ----------------------------------------------- cachetemplate.go | 67 ----------- episodecache.go | 52 ++++---- filecache.go | 119 ++++++++++--------- flock.go | 8 ++ flock_other.go | 31 +++++ flock_windows.go | 12 ++ groupcache.go | 24 ++-- intent.go | 63 ++++++++++ misc.go | 2 + titlecache.go | 54 +++++---- udp.go | 42 ++++--- 15 files changed, 658 insertions(+), 527 deletions(-) create mode 100644 cache.go create mode 100644 cache_test.go delete mode 100644 caches.go delete mode 100644 cachetemplate.go create mode 100644 flock.go create mode 100644 flock_other.go create mode 100644 flock_windows.go create mode 100644 intent.go diff --git a/animecache.go b/animecache.go index e9910c5..948d257 100644 --- a/animecache.go +++ b/animecache.go @@ -14,6 +14,7 @@ import ( func init() { gob.RegisterName("*github.com/Kovensky/go-anidb.Anime", &Anime{}) + gob.RegisterName("github.com/Kovensky/go-anidb.AID", AID(0)) } func (a *Anime) Touch() { @@ -33,10 +34,17 @@ func (a *Anime) IsStale() bool { // Unique Anime IDentifier. type AID int +// make AID Cacheable +func (e AID) Touch() {} +func (e AID) IsStale() bool { return false } + // Returns a cached Anime. Returns nil if there is no cached Anime with this AID. func (aid AID) Anime() *Anime { - a, _ := caches.Get(animeCache).Get(int(aid)).(*Anime) - return a + var a Anime + if cache.Get(&a, "aid", aid) == nil { + return &a + } + return nil } type httpAnimeResponse struct { @@ -50,19 +58,23 @@ type httpAnimeResponse struct { // // Note: This can take at least 4 seconds during heavy traffic. func (adb *AniDB) AnimeByID(aid AID) <-chan *Anime { + keys := []cacheKey{"aid", aid} ch := make(chan *Anime, 1) - anime := aid.Anime() - if !anime.IsStale() { - ch <- anime - close(ch) + ic := make(chan Cacheable, 1) + go func() { ch <- (<-ic).(*Anime); close(ch) }() + if intentMap.Intent(ic, keys...) { return ch } - ac := caches.Get(animeCache) - ic := make(chan Cacheable, 1) - go func() { ch <- (<-ic).(*Anime); close(ch) }() - if ac.Intent(int(aid), ic) { + if !cache.CheckValid(keys...) { + intentMap.Notify((*Anime)(nil), keys...) + return ch + } + + anime := aid.Anime() + if !anime.IsStale() { + intentMap.Notify(anime, keys...) return ch } @@ -98,6 +110,11 @@ func (adb *AniDB) AnimeByID(aid AID) <-chan *Anime { break Loop } if a := anime.populateFromHTTP(resp.anime); a == nil { + // HTTP ok but parsing not ok + if anime.PrimaryTitle == "" { + cache.MarkInvalid(keys...) + } + ok = false break Loop } else { @@ -105,18 +122,21 @@ func (adb *AniDB) AnimeByID(aid AID) <-chan *Anime { } httpChan = nil case reply := <-udpChan: - anime.Incomplete = !anime.populateFromUDP(reply) + if reply.Code() == 330 { + cache.MarkInvalid(keys...) + } else { + anime.Incomplete = !anime.populateFromUDP(reply) + } udpChan = nil } } if anime.PrimaryTitle != "" { if ok { - ac.Set(int(aid), anime) - } else { - ac.Flush(int(aid), anime) + cache.Set(anime, keys...) } + intentMap.Notify(anime, keys...) } else { - ac.Set(int(aid), (*Anime)(nil)) + intentMap.Notify((*Anime)(nil), keys...) } }() return ch @@ -148,12 +168,6 @@ func (a *Anime) populateFromHTTP(reply httpapi.Anime) *Anime { for _, title := range reply.Titles { switch title.Type { case "main": - if a.PrimaryTitle != "" { - // We assume there's only ever one "main" title - panic( - fmt.Sprintf("PrimaryTitle %q already set, new PrimaryTitle %q received!", - a.PrimaryTitle, title.Title)) - } a.PrimaryTitle = title.Title case "official": if a.OfficialTitles == nil { diff --git a/cache.go b/cache.go new file mode 100644 index 0000000..c22199f --- /dev/null +++ b/cache.go @@ -0,0 +1,269 @@ +package anidb + +import ( + "bytes" + "compress/gzip" + "encoding/gob" + "errors" + "fmt" + "io" + "log" + "os" + "path" + "reflect" + "regexp" + "sync" + "time" +) + +var _ log.Logger + +type Cacheable interface { + // Updates the last modified time + Touch() + // Returns true if the Cacheable is nil, or if the last modified time is too old. + IsStale() bool +} + +func init() { + gob.RegisterName("*github.com/Kovensky/go-anidb.invalidKeyCache", &invalidKeyCache{}) +} + +type invalidKeyCache struct{ time.Time } + +func (c *invalidKeyCache) Touch() { + c.Time = time.Now() +} +func (c *invalidKeyCache) IsStale() bool { + return time.Now().Sub(c.Time) > InvalidKeyCacheDuration +} + +type cacheDir struct { + *sync.RWMutex + + CacheDir string +} + +func init() { + SetCacheDir(path.Join(os.TempDir(), "anidb", "cache")) +} + +var cache cacheDir + +func SetCacheDir(path string) (err error) { + m := cache.RWMutex + if m == nil { + m = &sync.RWMutex{} + cache.RWMutex = m + } + cache.Lock() + + if err = os.MkdirAll(path, 0755|os.ModeDir); err != nil { + cache.Unlock() + return err + } + + cache = cacheDir{ + RWMutex: m, + CacheDir: path, + } + + cache.Unlock() + RefreshTitles() + return nil +} + +func GetCacheDir() (path string) { + cache.RLock() + defer cache.RUnlock() + + return cache.CacheDir +} + +type cacheKey interface{} + +// All "bad characters" that can't go in Windows paths. +// It's a superset of the "bad characters" on other OSes, so this works. +var badPath = regexp.MustCompile(`[\\/:\*\?\"<>\|]`) + +func stringify(stuff ...cacheKey) []string { + ret := make([]string, len(stuff)) + for i := range stuff { + s := fmt.Sprint(stuff[i]) + ret[i] = badPath.ReplaceAllLiteralString(s, "_") + } + return ret +} + +// Each key but the last is treated as a directory. +// The last key is treated as a regular file. +// +// This also means that cache keys that are file-backed +// cannot have subkeys. +func cachePath(keys ...cacheKey) string { + parts := append([]string{GetCacheDir()}, stringify(keys...)...) + p := path.Join(parts...) + return p +} + +// Opens the file that backs the specified keys. +func (c *cacheDir) Open(keys ...cacheKey) (fh *os.File, err error) { + subItem := cachePath(keys...) + return os.Open(subItem) +} + +// Creates a new file to back the specified keys. +func (c *cacheDir) Create(keys ...cacheKey) (fh *os.File, err error) { + subItem := cachePath(keys...) + subDir := path.Dir(subItem) + + if err = os.MkdirAll(subDir, 0755|os.ModeDir); err != nil { + return nil, err + } + return os.Create(subItem) +} + +// Deletes the file that backs the specified keys. +func (c *cacheDir) Delete(keys ...cacheKey) (err error) { + return os.Remove(cachePath(keys...)) +} + +// Deletes the specified key and all subkeys. +func (c *cacheDir) DeleteAll(keys ...cacheKey) (err error) { + return os.RemoveAll(cachePath(keys...)) +} + +func (c *cacheDir) Get(v Cacheable, keys ...cacheKey) (err error) { + defer func() { + log.Println("Got entry", keys, "(error", err, ")") + }() + flock := lockFile(cachePath(keys...)) + flock.Lock() + defer flock.Unlock() + + fh, err := c.Open(keys...) + if err != nil { + return err + } + defer func() { + if e := fh.Close(); err == nil { + err = e + } + }() + + val := reflect.ValueOf(v) + if k := val.Kind(); k == reflect.Ptr || k == reflect.Interface { + val = val.Elem() + } + if !val.CanSet() { + // panic because this is an internal coding mistake + panic("(*cacheDir).Get(): given Cacheable is not setable") + } + gz, err := gzip.NewReader(fh) + if err != nil { + return err + } + defer func() { + if e := gz.Close(); err == nil { + err = e + } + }() + + // defer func() { + // if err == io.EOF { + // err = nil + // } + // }() + + switch f := gz.Header.Comment; f { + case "encoding/gob": + dec := gob.NewDecoder(gz) + err = dec.Decode(v) + default: + return errors.New(fmt.Sprintf("Cached data (format %q) is not in a known format", f)) + } + + return +} + +func (c *cacheDir) Set(v Cacheable, keys ...cacheKey) (n int64, err error) { + if v := reflect.ValueOf(v); !v.IsValid() { + panic("reflect.ValueOf() returned invaled value") + } else if k := v.Kind(); k == reflect.Ptr || k == reflect.Interface { + if v.IsNil() { + return // no point in saving nil + } + } + defer func() { + log.Println("Set entry", keys, "(error", err, ")") + }() + + // First we encode to memory -- we don't want to create/truncate a file and put bad data in it. + buf := bytes.Buffer{} + gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression) + if err != nil { + return 0, err + } + gz.Header.Comment = "encoding/gob" + + // it doesn't matter if the caller doesn't see this, + // the important part is that the cache does. + v.Touch() + + enc := gob.NewEncoder(gz) + err = enc.Encode(v) + + if e := gz.Close(); err == nil { + err = e + } + + if err != nil { + return 0, err + } + + // We have good data, time to actually put it in the cache + flock := lockFile(cachePath(keys...)) + flock.Lock() + defer flock.Unlock() + + fh, err := c.Create(keys...) + if err != nil { + return 0, err + } + defer func() { + if e := fh.Close(); err == nil { + err = e + } + }() + n, err = io.Copy(fh, &buf) + return +} + +// Checks if the given keys are not marked as invalid. +// +// If the key was marked as invalid but is no longer considered +// so, deletes the invalid marker. +func (c *cacheDir) CheckValid(keys ...cacheKey) bool { + invKeys := append([]cacheKey{"invalid"}, keys...) + inv := invalidKeyCache{} + + if cache.Get(&inv, invKeys...) == nil { + if inv.IsStale() { + cache.Delete(invKeys...) + } else { + return false + } + } + return true +} + +// Deletes the given keys and marks them as invalid. +// +// They are considered invalid for InvalidKeyCacheDuration. +func (c *cacheDir) MarkInvalid(keys ...cacheKey) error { + invKeys := append([]cacheKey{"invalid"}, keys...) + + cache.Delete(keys...) + _, err := cache.Set(&invalidKeyCache{}, invKeys...) + return err +} diff --git a/cache_test.go b/cache_test.go new file mode 100644 index 0000000..41c49b3 --- /dev/null +++ b/cache_test.go @@ -0,0 +1,82 @@ +package anidb + +import ( + "encoding/gob" + "os" + "path" + "reflect" + "testing" +) + +type stringifyVec struct { + result []string + data []interface{} +} + +func TestStringify(T *testing.T) { + T.Parallel() + + vec := []stringifyVec{ + stringifyVec{[]string{"a"}, []interface{}{"a"}}, + } + for i, v := range vec { + str := stringify(v.data...) + if !reflect.DeepEqual(v.result, str) { + T.Errorf("Vector #%d: Expected %v, got %v", i+1, v.result, str) + } + } +} + +type cachePathVec struct { + path string + data []interface{} +} + +var testDir = path.Join(os.TempDir(), "testing", "anidb") + +func init() { SetCacheDir(testDir) } + +func TestCachePath(T *testing.T) { + T.Parallel() + + vec := []cachePathVec{ + cachePathVec{path.Join(testDir, "a"), []interface{}{"a"}}, + cachePathVec{path.Join(testDir, "b", "c", "d"), []interface{}{"b", "c", "d"}}, + } + for i, v := range vec { + str := cachePath(v.data...) + + if v.path != str { + T.Errorf("Vector #%d: Expected %v, got %v", i+1, v.path, str) + } + } +} + +type testString string + +func (_ testString) Touch() {} +func (_ testString) IsStale() bool { return false } + +func init() { + gob.Register(testString("")) +} + +func TestCacheRoundtrip(T *testing.T) { + T.Parallel() + + test := testString("some string") + _, err := cache.Set(test, "test", "string") + if err != nil { + T.Fatalf("Error storing: %v", err) + } + + var t2 testString + err = cache.Get(&t2, "test", "string") + if err != nil { + T.Errorf("Error reading: %v", err) + } + + if test != t2 { + T.Errorf("Expected %q, got %q", test, t2) + } +} diff --git a/caches.go b/caches.go deleted file mode 100644 index ddb38c2..0000000 --- a/caches.go +++ /dev/null @@ -1,304 +0,0 @@ -package anidb - -import ( - "bytes" - "compress/gzip" - "encoding/gob" - "errors" - "io" - "log" - "os" - "reflect" - "sync" -) - -// Loads caches from the given path. -func LoadCachesFromFile(f string) (err error) { - fh, err := os.Open(f) - - if err != nil { - return err - } - defer fh.Close() - return LoadCaches(fh) -} - -const cacheMajorVersion = 0 -const cacheMinorVersion = 0 - -type cacheDataVer struct { - ver int - data interface{} -} - -type lockable interface { - Lock() - Unlock() -} - -type rlockable interface { - lockable - RLock() - RUnlock() -} - -func getLocks() []lockable { - return []lockable{ - // caches is special-cased - &eidAidLock, - &ed2kFidLock, - &banTimeLock, - &titlesFileDataLock, - } -} - -func getCacheData() []cacheDataVer { - return []cacheDataVer{ - cacheDataVer{0, &titlesFileData}, - cacheDataVer{0, &caches.Caches}, - cacheDataVer{0, &eidAidMap}, - cacheDataVer{0, &ed2kFidMap}, - cacheDataVer{0, &banTime}} -} - -// Loads caches from the given io.Reader. -func LoadCaches(r io.Reader) (err error) { - defer func() { log.Println("Loaded with error", err) }() - - caches.LockAll() // no defer UnlockAll -- the mutexes get reset - defer caches.m.Unlock() // but we need to unlock this - for _, lock := range getLocks() { - lock.Lock() - defer lock.Unlock() - } - - // make sure caches' mutexes are reset even on a decoding failure - defer func() { - for _, cache := range caches.Caches { - cache.m = sync.RWMutex{} - } - }() - - gz, err := gzip.NewReader(r) - if err != nil { - return err - } - - dec := gob.NewDecoder(gz) - version := 0 - - if err = dec.Decode(&version); err != nil { - return err - } - - if version != cacheMajorVersion { - return errors.New("Cache major version mismatch") - } - - defer func() { - titlesDB.LoadDB(bytes.NewReader(titlesFileData)) - - for _, cache := range caches.Caches { - cache.intent = make(map[int][]chan Cacheable) - } - }() - - version = 0 - for _, v := range append([]cacheDataVer{ - cacheDataVer{0, &version}}, getCacheData()...) { - if v.ver > version { - break - } - if err = dec.Decode(v.data); err != nil { - return err - } - } - - if version != cacheMinorVersion { - return errors.New("Cache minor version mismatch") - } - return nil -} - -// Saves caches to the given path. -func DumpCachesToFile(f string) (err error) { - fh, err := os.Create(f) - if err != nil { - return err - } - defer fh.Close() - return DumpCaches(fh) -} - -// Saves caches to the given io.Writer. -// -// The cache is a gzipped, versioned gob of the various internal -// caches. -func DumpCaches(w io.Writer) (err error) { - defer func() { log.Println("Dumped with error", err) }() - - caches.RLockAll() - defer caches.RUnlockAll() - for _, lock := range getLocks() { - if l, ok := lock.(rlockable); ok { - l.RLock() - defer l.RUnlock() - } else { - lock.Lock() - defer lock.Unlock() - } - } - - gz, err := gzip.NewWriterLevel(w, gzip.BestCompression) - if err != nil { - return err - } - defer gz.Close() - - enc := gob.NewEncoder(gz) - - for _, v := range append([]cacheDataVer{ - cacheDataVer{0, cacheMajorVersion}, - cacheDataVer{0, cacheMinorVersion}, - }, getCacheData()...) { - if err = enc.Encode(v.data); err != nil { - return err - } - } - - return nil -} - -type Cacheable interface { - // Updates the last modified time - Touch() - // Returns true if the Cacheable is nil, or if the last modified time is too old. - IsStale() bool -} - -var caches = initCacheMap() - -type cacheMap struct { - m sync.RWMutex - Caches map[cacheType]*baseCache -} - -type cacheType int - -const ( - animeCache = cacheType(iota) - episodeCache - groupCache - fileCache -) - -func initCacheMap() *cacheMap { - return &cacheMap{ - Caches: map[cacheType]*baseCache{ - animeCache: newBaseCache(), - episodeCache: newBaseCache(), - groupCache: newBaseCache(), - fileCache: newBaseCache(), - }, - } -} - -func (c *cacheMap) Get(typ cacheType) *baseCache { - c.m.RLock() - defer c.m.RUnlock() - - return c.Caches[typ] -} - -func (c *cacheMap) LockAll() { - c.m.Lock() - - for _, cache := range c.Caches { - cache.m.Lock() - } -} -func (c *cacheMap) UnlockAll() { - c.m.Unlock() - - for _, cache := range c.Caches { - cache.m.Unlock() - } -} - -func (c *cacheMap) RLockAll() { - c.m.RLock() - - for _, cache := range c.Caches { - cache.m.RLock() - } -} -func (c *cacheMap) RUnlockAll() { - c.m.RUnlock() - - for _, cache := range c.Caches { - cache.m.RUnlock() - } -} - -type baseCache struct { - m sync.RWMutex - Cache map[int]Cacheable - intent map[int][]chan Cacheable -} - -func newBaseCache() *baseCache { - return &baseCache{ - Cache: make(map[int]Cacheable), - intent: make(map[int][]chan Cacheable), - } -} - -func (c *baseCache) Get(id int) Cacheable { - c.m.RLock() - defer c.m.RUnlock() - - return c.Cache[id] -} - -// Sends the Cacheable to all channels that registered -// Intent and clears the Intent list. -func (c *baseCache) Flush(id int, v Cacheable) { - c.m.Lock() - defer c.m.Unlock() - - c._flush(id, v) -} - -func (c *baseCache) _flush(id int, v Cacheable) { - for _, ch := range c.intent[id] { - ch <- v - close(ch) - } - delete(c.intent, id) -} - -// Caches if v is not nil and then Flushes the Intents. -func (c *baseCache) Set(id int, v Cacheable) { - c.m.Lock() - defer c.m.Unlock() - - if !reflect.ValueOf(v).IsNil() { - v.Touch() - c.Cache[id] = v - } - - c._flush(id, v) -} - -// Register the Intent to get the cache data for this id when -// it's available. Returns false if the caller was the first -// to register it. -func (c *baseCache) Intent(id int, ch chan Cacheable) (ok bool) { - c.m.Lock() - defer c.m.Unlock() - - list, ok := c.intent[id] - c.intent[id] = append(list, ch) - - return ok -} diff --git a/cachetemplate.go b/cachetemplate.go deleted file mode 100644 index c4fd713..0000000 --- a/cachetemplate.go +++ /dev/null @@ -1,67 +0,0 @@ -// +build never - -package anidb - -// Copy&paste this for new cache types -// globally replace: Strut strut SID sid - -import ( - "sync" - "time" -) - -type Strut struct { - Cached time.Time -} - -func (v *Strut) touch() { - v.Cached = time.Now() -} -func (v *Strut) isStale(d time.Duration) bool { - return time.Now().Sub(v.Cached) > d -} - -type SID int - -func (sid SID) Strut() *Strut { - return strutCache.Get(sid) -} - -var StrutCacheDuration = DefaultCacheDuration - -var strutCache = strutCacheStruct{baseCache: newBaseCache()} - -type strutCacheStruct struct{ baseCache } - -func (c *strutCacheStruct) Get(id SID) *Strut { - return c.baseCache.Get(int(id)).(*Strut) -} - -func (c *strutCacheStruct) Set(id SID, v *Strut) { - c.baseCache.Set(int(id), v) -} - -func (c *strutCacheStruct) Intent(id SID, ch chan *Strut) (ok bool) { - ch2 := make(chan cacheable, 1) - go func() { ch <- (<-ch2).(*Strut) }() - return c.baseCache.Intent(int(id), ch2) -} - -func (adb *AniDB) StrutBySID(id SID) <-chan *Strut { - ch := make(chan *Strut, 1) - if v := id.Strut(); !v.isStale(StrutCacheDuration) { - ch <- v - close(ch) - return ch - } - - if strutCache.Intent(id, ch) { - return ch - } - - go func() { - var v *Strut - strutCache.Set(id, v) - }() - return ch -} diff --git a/episodecache.go b/episodecache.go index ff17777..008dd98 100644 --- a/episodecache.go +++ b/episodecache.go @@ -4,7 +4,6 @@ import ( "encoding/gob" "strconv" "strings" - "sync" "time" ) @@ -23,44 +22,45 @@ func (e *Episode) IsStale() bool { return time.Now().Sub(e.Cached) > EpisodeCacheDuration } -var eidAidMap = map[EID]AID{} -var eidAidLock = sync.RWMutex{} - // Unique Episode IDentifier. type EID int // Retrieves the Episode corresponding to this EID from the cache. func (eid EID) Episode() *Episode { - e, _ := caches.Get(episodeCache).Get(int(eid)).(*Episode) - return e + var e Episode + if cache.Get(&e, "eid", eid) == nil { + return &e + } + return nil } func cacheEpisode(ep *Episode) { - eidAidLock.Lock() - defer eidAidLock.Unlock() - - eidAidMap[ep.EID] = ep.AID - caches.Get(episodeCache).Set(int(ep.EID), ep) + cache.Set(ep.AID, "aid", "by-eid", ep.EID) + cache.Set(ep, "eid", ep.EID) } // Retrieves the Episode from the cache if possible. // // If the result is stale, then queries the UDP API to -// know which AID owns this EID, then gets the episodes +// know which AID owns this EID, then gets the episode // from the Anime. func (adb *AniDB) EpisodeByID(eid EID) <-chan *Episode { + keys := []cacheKey{"eid", eid} ch := make(chan *Episode, 1) - if e := eid.Episode(); e != nil && !e.IsStale() { - ch <- e - close(ch) + ic := make(chan Cacheable, 1) + go func() { ch <- (<-ic).(*Episode); close(ch) }() + if intentMap.Intent(ic, keys...) { return ch } - ec := caches.Get(episodeCache) - ic := make(chan Cacheable, 1) - go func() { ch <- (<-ic).(*Episode); close(ch) }() - if ec.Intent(int(eid), ic) { + if !cache.CheckValid(keys...) { + intentMap.Notify((*Episode)(nil), keys...) + return ch + } + + if e := eid.Episode(); !e.IsStale() { + intentMap.Notify(e, keys...) return ch } @@ -68,9 +68,8 @@ func (adb *AniDB) EpisodeByID(eid EID) <-chan *Episode { // The UDP API data is worse than the HTTP API anime data, // try and get from the corresponding Anime - eidAidLock.RLock() - aid, ok := eidAidMap[eid] - eidAidLock.RUnlock() + aid := AID(0) + ok := cache.Get(&aid, "aid", "by-eid", eid) == nil udpDone := false @@ -92,6 +91,8 @@ func (adb *AniDB) EpisodeByID(eid EID) <-chan *Episode { ok = true aid = AID(id) } + } else if reply.Code() == 340 { + cache.MarkInvalid(keys...) } else { break } @@ -104,14 +105,11 @@ func (adb *AniDB) EpisodeByID(eid EID) <-chan *Episode { break } else { // if this is somehow still a miss, then the EID<->AID map broke - eidAidLock.Lock() - delete(eidAidMap, eid) - eidAidLock.Unlock() - + cache.Delete("aid", "by-eid", eid) ok = false } } - // Caching (and channel broadcasting) done by AnimeByID + intentMap.Notify(e, keys...) }() return ch } diff --git a/filecache.go b/filecache.go index e3ae93c..e2b0672 100644 --- a/filecache.go +++ b/filecache.go @@ -2,19 +2,19 @@ package anidb import ( "encoding/gob" - "fmt" "github.com/Kovensky/go-anidb/misc" "github.com/Kovensky/go-anidb/udp" "image" "log" "strconv" "strings" - "sync" "time" ) func init() { gob.RegisterName("*github.com/Kovensky/go-anidb.File", &File{}) + gob.RegisterName("*github.com/Kovensky/go-anidb.ed2kCache", &ed2kCache{}) + gob.RegisterName("github.com/Kovensky/go-anidb.FID", FID(0)) } func (f *File) Touch() { @@ -33,21 +33,29 @@ func (f *File) IsStale() bool { type FID int +// make FID Cacheable +func (e FID) Touch() {} +func (e FID) IsStale() bool { return false } + func (fid FID) File() *File { - f, _ := caches.Get(fileCache).Get(int(fid)).(*File) - return f + var f File + if cache.Get(&f, "fid", fid) == nil { + return &f + } + return nil } -func ed2kKey(ed2k string, size int64) string { - return fmt.Sprintf("%s-%016x", ed2k, size) +type ed2kCache struct { + FID + time.Time } -func ed2kCache(f *File) { - if f != nil { - ed2kFidLock.Lock() - defer ed2kFidLock.Unlock() - ed2kFidMap[ed2kKey(f.Ed2kHash, f.Filesize)] = f.FID - } +func (c *ed2kCache) Touch() { + c.Time = time.Now() +} + +func (c *ed2kCache) IsStale() bool { + return time.Now().Sub(c.Time) > FileCacheDuration } // Prefetches the Anime, Episode and Group that this @@ -71,22 +79,24 @@ func (f *File) Prefetch(adb *AniDB) <-chan *File { return ch } -var ed2kFidMap = map[string]FID{} -var ed2kIntent = map[string][]chan *File{} -var ed2kFidLock = sync.RWMutex{} - func (adb *AniDB) FileByID(fid FID) <-chan *File { + keys := []cacheKey{"fid", fid} + ch := make(chan *File, 1) - if f := fid.File(); !f.IsStale() { - ch <- f - close(ch) - return ch - } - fc := caches.Get(fileCache) ic := make(chan Cacheable, 1) go func() { ch <- (<-ic).(*File); close(ch) }() - if fc.Intent(int(fid), ic) { + if intentMap.Intent(ic, keys...) { + return ch + } + + if !cache.CheckValid(keys...) { + intentMap.Notify((*File)(nil), keys...) + return ch + } + + if f := fid.File(); !f.IsStale() { + intentMap.Notify(f, keys...) return ch } @@ -101,35 +111,44 @@ func (adb *AniDB) FileByID(fid FID) <-chan *File { var f *File if reply.Error() == nil { f = parseFileResponse(reply) + } else if reply.Code() == 320 { + cache.MarkInvalid(keys...) + } + if f != nil { + cache.Set(&ed2kCache{FID: f.FID}, "fid", "by-ed2k", f.Ed2kHash, f.Filesize) + cache.Set(f, keys...) } - ed2kCache(f) - fc.Set(int(fid), f) + intentMap.Notify(f, keys...) }() return ch } func (adb *AniDB) FileByEd2kSize(ed2k string, size int64) <-chan *File { - key := ed2kKey(ed2k, size) + keys := []cacheKey{"fid", "by-ed2k", ed2k, size} + ch := make(chan *File, 1) - ed2kFidLock.RLock() - if fid, ok := ed2kFidMap[key]; ok { - ed2kFidLock.RUnlock() - if f := fid.File(); f != nil { - ch <- f - close(ch) - return ch + ic := make(chan Cacheable, 1) + go func() { + fid := (<-ic).(FID) + if fid > 0 { + ch <- <-adb.FileByID(fid) } - return adb.FileByID(fid) + close(ch) + }() + if intentMap.Intent(ic, keys...) { + return ch + } + + if !cache.CheckValid(keys...) { + intentMap.Notify(FID(0), keys...) + return ch } - ed2kFidLock.RUnlock() - ed2kFidLock.Lock() - if list, ok := ed2kIntent[key]; ok { - ed2kIntent[key] = append(list, ch) + var ec ed2kCache + if cache.Get(&ec, keys...) == nil { + intentMap.Notify(ec.FID, keys...) return ch - } else { - ed2kIntent[key] = append(list, ch) } go func() { @@ -141,28 +160,22 @@ func (adb *AniDB) FileByEd2kSize(ed2k string, size int64) <-chan *File { "amask": fileAmask, }) + fid := FID(0) var f *File if reply.Error() == nil { f = parseFileResponse(reply) - ed2kCache(f) - caches.Get(fileCache).Set(int(f.FID), f) + fid = f.FID + + cache.Set(&ed2kCache{FID: fid}, keys...) + cache.Set(f, "fid", fid) } else if reply.Code() == 320 { // file not found - ed2kFidLock.Lock() - delete(ed2kFidMap, key) - ed2kFidLock.Unlock() + cache.MarkInvalid(keys...) } else if reply.Code() == 322 { // multiple files found panic("Don't know what to do with " + strings.Join(reply.Lines(), "\n")) } - ed2kFidLock.Lock() - defer ed2kFidLock.Unlock() - - for _, ch := range ed2kIntent[key] { - ch <- f - close(ch) - } - delete(ed2kIntent, key) + intentMap.Notify(fid, keys...) }() return ch } diff --git a/flock.go b/flock.go new file mode 100644 index 0000000..6df0b2b --- /dev/null +++ b/flock.go @@ -0,0 +1,8 @@ +package anidb + +type fileLock interface { + Lock() error + Unlock() error +} + +// func lockFile(p path) fileLock diff --git a/flock_other.go b/flock_other.go new file mode 100644 index 0000000..2250a19 --- /dev/null +++ b/flock_other.go @@ -0,0 +1,31 @@ +// +build !windows + +package anidb + +import "github.com/tgulacsi/go-locking" + +type flockLock struct { + locking.FLock +} + +func lockFile(p path) fileLock { + flock, err := locking.NewFLock(p) + if err != nil { + return &flockLock{FLock: flock} + } + return nil +} + +func (fl *flockLock) Lock() error { + if fl != nil { + return fl.FLock.Lock() + } + return nil +} + +func (fl *flockLock) Unlock() error { + if fl != nil { + return fl.FLock.Unlock() + } + return nil +} diff --git a/flock_windows.go b/flock_windows.go new file mode 100644 index 0000000..cce4706 --- /dev/null +++ b/flock_windows.go @@ -0,0 +1,12 @@ +package anidb + +type winFileLock struct{} + +func lockFile(p string) fileLock { + return &winFileLock{} +} + +// empty implementations -- go-locking doesn't support windows +// windows also does file locking on its own +func (_ *winFileLock) Lock() error { return nil } +func (_ *winFileLock) Unlock() error { return nil } diff --git a/groupcache.go b/groupcache.go index 4c0c1b1..e67cd59 100644 --- a/groupcache.go +++ b/groupcache.go @@ -28,8 +28,11 @@ type GID int // Retrieves the Group from the cache. func (gid GID) Group() *Group { - g, _ := caches.Get(groupCache).Get(int(gid)).(*Group) - return g + var g Group + if cache.Get(&g, "gid", gid) == nil { + return &g + } + return nil } // Returns a Group from the cache if possible. @@ -37,19 +40,17 @@ func (gid GID) Group() *Group { // If the Group is stale, then retrieves the Group // through the UDP API. func (adb *AniDB) GroupByID(gid GID) <-chan *Group { + keys := []cacheKey{"gid", gid} ch := make(chan *Group, 1) - if g := gid.Group(); !g.IsStale() { - ch <- g - close(ch) - return ch - } - - gc := caches.Get(groupCache) ic := make(chan Cacheable, 1) go func() { ch <- (<-ic).(*Group); close(ch) }() + if intentMap.Intent(ic, keys...) { + return ch + } - if gc.Intent(int(gid), ic) { + if g := gid.Group(); !g.IsStale() { + intentMap.Notify(g, keys...) return ch } @@ -130,7 +131,8 @@ func (adb *AniDB) GroupByID(gid GID) <-chan *Group { Cached: time.Now(), } } - gc.Set(int(gid), g) + cache.Set(g, keys...) + intentMap.Notify(g, keys...) }() return ch } diff --git a/intent.go b/intent.go new file mode 100644 index 0000000..853d827 --- /dev/null +++ b/intent.go @@ -0,0 +1,63 @@ +package anidb + +import "sync" + +type intentStruct struct { + sync.Mutex + chs []chan Cacheable +} + +type intentMapStruct struct { + sync.Mutex + m map[string]*intentStruct +} + +var intentMap = &intentMapStruct{ + m: map[string]*intentStruct{}, +} + +// Register a channel to be notified when the specified keys are notified. +// +// Cache checks should be done after registering intent, since it's possible to +// register Intent while a Notify is running, and the Notify is done after +// setting the cache. +func (m *intentMapStruct) Intent(ch chan Cacheable, keys ...cacheKey) bool { + key := cachePath(keys...) + + m.Lock() + s, ok := m.m[key] + if !ok { + s = &intentStruct{} + m.m[key] = s + } + m.Unlock() + + s.Lock() + s.chs = append(s.chs, ch) + s.Unlock() + + return ok +} + +// Notify all channels that are listening for the specified keys. +// +// Should be called after setting the cache. +func (m *intentMapStruct) Notify(v Cacheable, keys ...cacheKey) { + key := cachePath(keys...) + + m.Lock() + defer m.Unlock() + s, ok := m.m[key] + if !ok { + return + } + + s.Lock() + defer s.Unlock() + + for _, ch := range s.chs { + go func(c chan Cacheable) { c <- v }(ch) + } + + delete(m.m, key) +} diff --git a/misc.go b/misc.go index ad8c517..f738d28 100644 --- a/misc.go +++ b/misc.go @@ -15,6 +15,8 @@ var ( GroupCacheDuration = 4 * DefaultCacheDuration // They don't change that often. FileCacheDuration = 8 * DefaultCacheDuration // These change even less often. + InvalidKeyCacheDuration = 1 * time.Hour + // Used when the UDP API Anime query fails, but the HTTP API query succeeds. AnimeIncompleteCacheDuration = 24 * time.Hour diff --git a/titlecache.go b/titlecache.go index 26a2fd4..1100df0 100644 --- a/titlecache.go +++ b/titlecache.go @@ -1,44 +1,30 @@ package anidb import ( - "bytes" "github.com/Kovensky/go-anidb/titles" "io" "net/http" - "sync" "time" ) -var titlesFileData []byte -var titlesFileDataLock sync.Mutex var titlesDB = &titles.TitlesDatabase{} -// Loads the anime-titles database from the given io.Reader. -// -// Caches the io.Reader's contents on memory, which gets saved -// by DumpCaches. -func LoadTitles(src io.Reader) error { - buf := bytes.Buffer{} - _, err := io.Copy(&buf, src) - if err != nil && err != io.EOF { +// Reloads titles from the cache. +func RefreshTitles() error { + flock := lockFile(cachePath("anime-titles.dat.gz")) + flock.Lock() + defer flock.Unlock() + + fh, err := cache.Open("anime-titles.dat.gz") + if err != nil { return err } + defer fh.Close() - titlesFileDataLock.Lock() - defer titlesFileDataLock.Unlock() - - titlesFileData = buf.Bytes() - - titlesDB.LoadDB(bytes.NewReader(titlesFileData)) - + titlesDB.LoadDB(fh) return nil } -// Saves the currently cached anime-titles database to the given io.Writer. -func DumpTitles(dst io.Writer) (int64, error) { - return io.Copy(dst, bytes.NewReader(titlesFileData)) -} - // Returns true if the titles database is up-to-date (newer than 24 hours). func TitlesUpToDate() (ok bool) { return time.Now().Sub(titlesDB.UpdateTime) < 24*time.Hour @@ -53,11 +39,27 @@ func UpdateTitles() error { return nil } - resp, err := http.Get(titles.DataDumpURL) + flock := lockFile(cachePath("anime-titles.dat.gz")) + flock.Lock() + defer flock.Unlock() + + c := &http.Client{Transport: &http.Transport{DisableCompression: true}} + + resp, err := c.Get(titles.DataDumpURL) if err != nil { return err } defer resp.Body.Close() - return LoadTitles(resp.Body) + fh, err := cache.Create("anime-titles.dat.gz") + if err != nil { + return err + } + + _, err = io.Copy(fh, resp.Body) + if err != nil { + return err + } + + return RefreshTitles() } diff --git a/udp.go b/udp.go index 433c9da..abd8a3a 100644 --- a/udp.go +++ b/udp.go @@ -1,26 +1,40 @@ package anidb import ( + "encoding/gob" "github.com/Kovensky/go-anidb/udp" - "sync" "time" ) -var banTime time.Time -var banTimeLock sync.Mutex +func init() { + gob.RegisterName("*github.com/Kovensky/go-anidb.banCache", &banCache{}) +} const banDuration = 30*time.Minute + 1*time.Second +type banCache struct{ time.Time } + +func (c *banCache) Touch() { + c.Time = time.Now() +} +func (c *banCache) IsStale() bool { + return time.Now().Sub(c.Time) > banDuration +} + // Returns whether the last UDP API access returned a 555 BANNED message. func Banned() bool { - banTimeLock.Lock() - banTimeLock.Unlock() + var banTime banCache + cache.Get(&banTime, "banned") - return _banned() + stale := banTime.IsStale() + if stale { + cache.Delete("banned") + } + return !stale } -func _banned() bool { - return time.Now().Sub(banTime) > banDuration +func setBanned() { + cache.Set(&banCache{}, "banned") } type paramSet struct { @@ -85,11 +99,7 @@ func (udp *udpWrap) sendQueue() { case 503, 504: // client library rejected panic(reply.Error()) case 555: // IP (and user, possibly client) temporarily banned - banTimeLock.Lock() - - banTime = time.Now() - - banTimeLock.Unlock() + setBanned() } set.ch <- reply close(set.ch) @@ -99,11 +109,7 @@ func (udp *udpWrap) sendQueue() { func (udp *udpWrap) SendRecv(cmd string, params paramMap) <-chan udpapi.APIReply { ch := make(chan udpapi.APIReply, 1) - banTimeLock.Lock() - defer banTimeLock.Unlock() - if _banned() { - banTime = time.Time{} - } else { + if Banned() { ch <- bannedReply close(ch) return ch -- 2.44.0