单机缓存并发控制

通过互斥锁Mutex实现 LRU 缓存的并发控制

封装一个只读数据结构

  • 使用[]byte,支持各种数据类型。
  • 封装一个ByteView结构体,并且在返回时,返回一个拷贝,防止缓存值被外部程序修改。

byteview.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package Cache

/*
缓存值的抽象与封装
*/

// ByteView 只读数据结构
type ByteView struct {
b []byte // 存储真实缓存值
}

// Len 返回其所占的内存大小。
func (v ByteView) Len() int {
return len(v.b)
}

// ByteSlice 返回一个拷贝,防止缓存值被外部程序修改
func (v ByteView) ByteSlice() []byte {
return cloneBytes(v.b)
}

// String 以字符串形式返回数据
func (v ByteView) String() string {
return string(v.b)
}

// 拷贝
func cloneBytes(b []byte) []byte {
c := make([]byte, len(b))
copy(c, b)
return c
}

使用锁进行并发控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package Cache

import (
"Cache/Cache/lru"
"sync"
)

/*
主要是进行并发控制
*/

// 支持并发读写的lru缓存
type cache struct {
mu sync.Mutex
lru *lru.Cache
cacheBytes int64 // 缓存最大值
}

// 向缓存中添加数据
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()
// 使用延迟初始化,将该对象的创建延迟至第一次使该对象
// 主要用于提高性能,并且减少程序内存要求
if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil)
}
c.lru.Add(key, value)
}

// 从缓存中读取数据
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok // 转化成ByteView类型
}
return
}

主体结构 Group(最核心)

Group是最核心的数据结构,负责用户的交互,并且控制缓存值,存储和获取的流程。

Group流程

Group流程图

回调函数

如果缓存不存在,应从数据源获取数据并添加到缓存中。

当缓存不存在时,调用这个函数,得到源数据。

gocache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package Cache

import (
"fmt"
"log"
"sync"
)

/*
负责与外部交互,控制缓存存储和获取的主流程
设计了一个回调函数,在缓存不存在时,调用这个函数,得到源数据。
*/

// Getter 通过回调函数加载数据
type Getter interface {
Get(key string) ([]byte, error)
}

// 自定义函数类型
type GetterFunc func(key string) ([]byte, error)

// Get 实现 Getter 接口方法
func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}

Group定义

创建Group实例:

  1. 判断getter是否为空
  2. 新建Group对象
  3. 将group对象添加到group中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Group 是一个缓存的命名空间
type Group struct {
name string
getter Getter // 缓存未命中时获取源数据的回调(callback)
mainCache cache // 并发缓存
}

var (
mu sync.RWMutex // 读写锁
groups = make(map[string]*Group) // 存储所有Group信息的映射表
)

// NewGroup 创建Group实例
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
// 判断getter是否为空
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()

// 新建Group对象
g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
// 将group对象添加到group中
groups[name] = g
return g
}

从缓存获取key的value(Get最核心方法)

  1. 获取一个key值,首先查看本地缓存,如果命中直接返回。
  2. 如果没有命中缓存,调用Getter函数来拉去数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// GetGroup 返回之前用NewGroup创建的命名组,如果没有这样的组则为nil。
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}

// Get 从缓存中获取一个key的值
func (g *Group) Get(key string) (ByteView, error) {
// key为nil,返回
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}

// 命中会直接返回,从本地缓存中查找
if v, ok := g.mainCache.get(key); ok {
log.Println("[GoCache] hit")
return v, nil
}

// 未命中,查找失败调用Getter函数来拉去数据
return g.load(key)
}

func (g *Group) load(key string) (value ByteView, err error) {
// TODO:优先从远程节点获取
// 远程节点没有,调用Getter函数来获取
return g.getLocally(key)
}

// 获取本地数据源
func (g *Group) getLocally(key string) (ByteView, error) {

// 调用用户回调函数 g.getter.Get() 获取源数据
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}

// 将数据组成装成ByteView格式
value := ByteView{b: cloneBytes(bytes)}

// 将源数据添加到缓存 mainCache 中
g.populateCache(key, value)
return value, nil
}

// 将源数据添加到缓存 mainCache 中
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}

总结

  1. 了解了Group设计原理。
  2. 实现了用互斥锁实现缓存并发控制。