BigCache 是一个快速,支持并发访问,自淘汰的内存型缓存,可以在存储大量元素时依然保持高性能。BigCache将元素保存在堆上却避免了GC的开销
忽略gc的方法可参考 https://blog.allegro.tech/2016/03/writing-fast-cache-service-in-go.html#omitting-garbage-collector
如果map中key和value都没有指针的话,gc会忽略它 但在go中,仅有int、bool这类基础类型没有指针,考虑使用map[int]int key:因为要做hash,所以可以比较容易的将key转成int value: value就比较麻烦了,bigcache采用的是用一个大的字节数组,将value序列化成字节数组后,存入其中。然后将offset当成value
数据结构 BigCache 1 2 3 4 5 6 7 8 9 type BigCache struct { shards []*cacheShard lifeWindow uint64 clock clock hash Hasher config Config shardMask uint64 close chan struct {} }
分片 cacheShard 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 type cacheShard struct { hashmap map [uint64 ]uint32 entries queue.BytesQueue lock sync.RWMutex entryBuffer []byte onRemove onRemoveCallback isVerbose bool statsEnabled bool logger Logger clock clock lifeWindow uint64 hashmapStats map [uint64 ]uint32 stats Stats }
队列 BytesQueue 1 2 3 4 5 6 7 8 9 10 11 12 type BytesQueue struct { full bool array []byte capacity int maxCapacity int head int tail int count int rightMargin int headerBuffer []byte verbose bool }
encoding 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 const ( timestampSizeInBytes = 8 hashSizeInBytes = 8 keySizeInBytes = 2 headersSizeInBytes = timestampSizeInBytes + hashSizeInBytes + keySizeInBytes ) func wrapEntry (timestamp uint64 , hash uint64 , key string , entry []byte , buffer *[]byte ) []byte { keyLength := len (key) blobLength := len (entry) + headersSizeInBytes + keyLength if blobLength > len (*buffer) { *buffer = make ([]byte , blobLength) } blob := *buffer binary.LittleEndian.PutUint64(blob, timestamp) binary.LittleEndian.PutUint64(blob[timestampSizeInBytes:], hash) binary.LittleEndian.PutUint16(blob[timestampSizeInBytes+hashSizeInBytes:], uint16 (keyLength)) copy (blob[headersSizeInBytes:], key) copy (blob[headersSizeInBytes+keyLength:], entry) return blob[:blobLength] }
1 <timeStamp><hashKey><keyLength><key><value>
操作 增 BigCache 先进行hash计算,找到对应分片
cacheShard再进行set操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func (s *cacheShard) set (key string , hashedKey uint64 , entry []byte ) error { currentTimestamp := uint64 (s.clock.Epoch()) s.lock.Lock() if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 { if previousEntry, err := s.entries.Get(int (previousIndex)); err == nil { resetKeyFromEntry(previousEntry) delete (s.hashmap, hashedKey) } } if oldestEntry, err := s.entries.Peek(); err == nil { s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry) } w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer) for { if index, err := s.entries.Push(w); err == nil { s.hashmap[hashedKey] = uint32 (index) s.lock.Unlock() return nil } if s.removeOldestEntry(NoSpace) != nil { s.lock.Unlock() return fmt.Errorf("entry is bigger than max shard size" ) } } }
BytesQueue 的push操作如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func (q *BytesQueue) Push (data []byte ) (int , error) { neededSize := getNeededSize(len (data)) if !q.canInsertAfterTail(neededSize) { if q.canInsertBeforeHead(neededSize) { q.tail = leftMarginIndex } else if q.capacity+neededSize >= q.maxCapacity && q.maxCapacity > 0 { return -1 , &queueError{"Full queue. Maximum size limit reached." } } else { q.allocateAdditionalMemory(neededSize) } } index := q.tail q.push(data, neededSize) return index, nil } func (q *BytesQueue) push (data []byte , len int ) { headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64 (len )) q.copy (q.headerBuffer, headerEntrySize) q.copy (data, len -headerEntrySize) if q.tail > q.head { q.rightMargin = q.tail } if q.tail == q.head { q.full = true } q.count++ } func (q *BytesQueue) copy (data []byte , len int ) { q.tail += copy (q.array[q.tail:], data[:len ]) }
查 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 func (s *cacheShard) get (key string , hashedKey uint64 ) ([]byte , error) { s.lock.RLock() wrappedEntry, err := s.getWrappedEntry(hashedKey) if err != nil { s.lock.RUnlock() return nil , err } if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey { s.lock.RUnlock() s.collision() if s.isVerbose { s.logger.Printf("Collision detected. Both %q and %q have the same hash %x" , key, entryKey, hashedKey) } return nil , ErrEntryNotFound } entry := readEntry(wrappedEntry) s.lock.RUnlock() s.hit(hashedKey) return entry, nil } func (s *cacheShard) getWrappedEntry (hashedKey uint64 ) ([]byte , error) { itemIndex := s.hashmap[hashedKey] if itemIndex == 0 { s.miss() return nil , ErrEntryNotFound } wrappedEntry, err := s.entries.Get(int (itemIndex)) if err != nil { s.miss() return nil , err } return wrappedEntry, err } func (q *BytesQueue) peek (index int ) ([]byte , int , error) { err := q.peekCheckErr(index) if err != nil { return nil , 0 , err } blockSize, n := binary.Uvarint(q.array[index:]) return q.array[index+n : index+int (blockSize)], int (blockSize), nil }
定时删除 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func newBigCache (config Config, clock clock) (*BigCache, error) { ... if config.CleanWindow > 0 { go func () { ticker := time.NewTicker(config.CleanWindow) defer ticker.Stop() for { select { case t := <-ticker.C: cache.cleanUp(uint64 (t.Unix())) case <-cache.close : return } } }() } ... } func (s *cacheShard) cleanUp (currentTimestamp uint64 ) { s.lock.Lock() for { if oldestEntry, err := s.entries.Peek(); err != nil { break } else if evicted := s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry); !evicted { break } } s.lock.Unlock() }
bigcache所有entry的过期时间是在初始化的时候就确定的,config里的lifeWindow
,无法自定义每个entry的过期时间。
另外在每次set的时候会判断head是否需要过期删除