0%

bigCache

BigCache 是一个快速,支持并发访问,自淘汰的内存型缓存,可以在存储大量元素时依然保持高性能。BigCache将元素保存在堆上却避免了GC的开销

忽略gc的方法可参考 https://blog.allegro.tech/2016/03/writing-fast-cache-service-in-go.html#omitting-garbage-collector

如果map中key和value都没有指针的话,gc会忽略它
但在go中,仅有int、bool这类基础类型没有指针,考虑使用map[int]int
key:因为要做hash,所以可以比较容易的将key转成int
value: value就比较麻烦了,bigcache采用的是用一个大的字节数组,将value序列化成字节数组后,存入其中。然后将offset当成value

数据结构

BigCache

1
2
3
4
5
6
7
8
9
type BigCache struct {
shards []*cacheShard // 分片
lifeWindow uint64 //
clock clock // 时钟
hash Hasher // 哈希函数(可自定义,默认为fnv64a)
config Config // 配置
shardMask uint64 // 掩码,找对应分片使用
close chan struct{}
}

分片 cacheShard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type cacheShard struct {
hashmap map[uint64]uint32
entries queue.BytesQueue
lock sync.RWMutex
entryBuffer []byte
onRemove onRemoveCallback

isVerbose bool
statsEnabled bool
logger Logger
clock clock
lifeWindow uint64

hashmapStats map[uint64]uint32
stats Stats
}

队列 BytesQueue

1
2
3
4
5
6
7
8
9
10
11
12
type BytesQueue struct {
full bool // 是否已满
array []byte // 存数据的地方
capacity int // 容量
maxCapacity int // 最大容量
head int // 头部位置
tail int // 尾部位置
count int // 数量
rightMargin int // 右边界
headerBuffer []byte // buff头
verbose bool // 分配内存时是否打印存储信息
}

encoding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const (
timestampSizeInBytes = 8 // Number of bytes used for timestamp
hashSizeInBytes = 8 // Number of bytes used for hash
keySizeInBytes = 2 // Number of bytes used for size of entry key
headersSizeInBytes = timestampSizeInBytes + hashSizeInBytes + keySizeInBytes // Number of bytes used for all headers
)

func wrapEntry(timestamp uint64, hash uint64, key string, entry []byte, buffer *[]byte) []byte {
// key长度
keyLength := len(key)
// 总长度
blobLength := len(entry) + headersSizeInBytes + keyLength

if blobLength > len(*buffer) {
*buffer = make([]byte, blobLength)
}
blob := *buffer

// 依次放入timeStamp、hashKey、keyLength、key、value
binary.LittleEndian.PutUint64(blob, timestamp)
binary.LittleEndian.PutUint64(blob[timestampSizeInBytes:], hash)
binary.LittleEndian.PutUint16(blob[timestampSizeInBytes+hashSizeInBytes:], uint16(keyLength))
copy(blob[headersSizeInBytes:], key)
copy(blob[headersSizeInBytes+keyLength:], entry)

return blob[:blobLength]
}
1
<timeStamp><hashKey><keyLength><key><value>

操作

BigCache 先进行hash计算,找到对应分片

cacheShard再进行set操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
// 拿当前时间
currentTimestamp := uint64(s.clock.Epoch())

// 拿锁
s.lock.Lock()

// 是否有冲突
if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 {
// 冲突则取出并清0(软删除,清0的是hashkey)
if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil {
resetKeyFromEntry(previousEntry)
//remove hashkey
delete(s.hashmap, hashedKey)
}
}

// 拿出队列头的entry,判断是否需要过期删除
if oldestEntry, err := s.entries.Peek(); err == nil {
s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry)
}

// encoding
w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer)

for {
// push到队列
if index, err := s.entries.Push(w); err == nil {
s.hashmap[hashedKey] = uint32(index)
s.lock.Unlock()
return nil
}
// 出错尝试删除最老数据,再出错说明没数据可以删了
if s.removeOldestEntry(NoSpace) != nil {
s.lock.Unlock()
return fmt.Errorf("entry is bigger than max shard size")
}
}
}

BytesQueue 的push操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed.
// Returns index for pushed data or error if maximum size queue limit is reached.
func (q *BytesQueue) Push(data []byte) (int, error) {
// 获取需要的大小,主要是需要加上header(length用varint编码要的长度)
neededSize := getNeededSize(len(data))

// 是否能在尾部插入
if !q.canInsertAfterTail(neededSize) {
// 是否能在head前面插入
if q.canInsertBeforeHead(neededSize) {
q.tail = leftMarginIndex
} else if q.capacity+neededSize >= q.maxCapacity && q.maxCapacity > 0 { // 大小是否够
return -1, &queueError{"Full queue. Maximum size limit reached."}
} else {
q.allocateAdditionalMemory(neededSize) // 再分配制定大小
}
}

// 在q.tail处插入
index := q.tail

q.push(data, neededSize)

return index, nil
}

func (q *BytesQueue) push(data []byte, len int) {
// 长度进行varint编码
headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len))
q.copy(q.headerBuffer, headerEntrySize)

q.copy(data, len-headerEntrySize)

if q.tail > q.head {
q.rightMargin = q.tail
}
if q.tail == q.head {
q.full = true
}

q.count++
}

func (q *BytesQueue) copy(data []byte, len int) {
q.tail += copy(q.array[q.tail:], data[:len])
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) {
s.lock.RLock()
// 根据hashkey从队列里拿出encodeing后的entry
wrappedEntry, err := s.getWrappedEntry(hashedKey)
if err != nil {
s.lock.RUnlock()
return nil, err
}
// 先拿entry中的key,防止因为冲突命中
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
s.lock.RUnlock()
s.collision()
if s.isVerbose {
s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey)
}
return nil, ErrEntryNotFound
}
// 拿entry里的值
entry := readEntry(wrappedEntry)
s.lock.RUnlock()
s.hit(hashedKey)

return entry, nil
}

func (s *cacheShard) getWrappedEntry(hashedKey uint64) ([]byte, error) {
itemIndex := s.hashmap[hashedKey]

if itemIndex == 0 {
s.miss()
return nil, ErrEntryNotFound
}

wrappedEntry, err := s.entries.Get(int(itemIndex))
if err != nil {
s.miss()
return nil, err
}

return wrappedEntry, err
}

// peek returns the data from index and the number of bytes to encode the length of the data in uvarint format
func (q *BytesQueue) peek(index int) ([]byte, int, error) {
err := q.peekCheckErr(index)
if err != nil {
return nil, 0, err
}

blockSize, n := binary.Uvarint(q.array[index:])
return q.array[index+n : index+int(blockSize)], int(blockSize), nil
}

定时删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func newBigCache(config Config, clock clock) (*BigCache, error) {
...
if config.CleanWindow > 0 {
go func() {
ticker := time.NewTicker(config.CleanWindow)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
cache.cleanUp(uint64(t.Unix()))
case <-cache.close:
return
}
}
}()
}
...
}

func (s *cacheShard) cleanUp(currentTimestamp uint64) {
s.lock.Lock()
for {
// 查询head位置的entry,若该entry未过期则停止,过期则继续下一个entry
if oldestEntry, err := s.entries.Peek(); err != nil {
break
} else if evicted := s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry); !evicted {
break
}
}
s.lock.Unlock()
}

bigcache所有entry的过期时间是在初始化的时候就确定的,config里的lifeWindow,无法自定义每个entry的过期时间。

另外在每次set的时候会判断head是否需要过期删除