5-多节点间的通信(Go实现分布式缓存)
多节点间的通信
远程访问流程
- 当 客户端 发送一个查询请求达到某个缓存节点时, 该节点会先判断
key
是否在本地, 不在的话, 再通过发送网络请求去访问其他node
节点。 - 每个 node 既要处理来自客户端这样的外部请求, 也要处理来自其他远端节点的内部请求。
- 我们需要在 node 内部, 启动两个 http 服务, 一个处理客户端请求(APIServer), 一个处理节点的请求(CacheServer).
定义一个查询节点的方法
1
2
3
4
5type PeerPicker interface {
// PickPeer 于根据传入的 key 选择相应节点(选择相应的节点方法)
PickPeer(key string) (peer PeerGetter, ok bool)
}
通过网络请求帮我们拿到缓存结果
1
2
3
4// 这个接口为我们提供需要的能力.
type PeerGetter interface {
Get(group string, key string) ([]byte, error)
}
实现
PeerGetter#Get
方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// 首先需要定义一个 struct 实现 PeerGetter 接口
type httpGetter struct {
baseURL string
}
// Get 实现接口对应的方法, 这个接口能提供访问网络接口拿到缓存数据。
func (h *httpGetter) Get(group string, key string) ([]byte, error) {
// 访问 http 接口的逻辑
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(group),
url.QueryEscape(key),
)
res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned: %v", res.Status)
}
bytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}
return bytes, nil
}当我们 cache 服务启动时, 肯定要往 hash 环 添加节点(真实 + 虚拟)的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43const (
defaultBasePath = "/_gocache/"
defaultReplicas = 50
)
type HttpPool struct {
self string // 记录本地地址和端口
basePath string // 基础路径
mu sync.Mutex // 保护peer和httpGetters
peers *consistenthash.Map // 根据具体的 key 选择节点
httpGetters map[string]*httpGetter // 每一个远程节点对应一个 httpGetter
}
// Set 更新节点列表
func (p *HttpPool) Set(peers ...string) {
// 因为 hash 环的 map 不是线程安全的,所以这里要加锁.
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(defaultReplicas, nil)
// 调用上一章节的方法, 在 hash 环上添加真实节点和虚拟节点
p.peers.Add(peers...)
// 存储远端节点信息
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
}
}
// PickPeer 根据key选择一个节点
func (p *HttpPool) PickPeer(key string) (PeerGetter, bool) {
// 因为 hash 环的 map 不是线程安全的,所以这里要加锁.
p.mu.Lock()
defer p.mu.Unlock()
// p.peers 是个 哈希环, 通过调用它的 Get 方法拿到远端节点.
// 这里的 peer 是个地址.
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}
var _ PeerPicker = (*HttpPool)(nil)
总结
缓存节点启动的流程。
- 创建 Group 对象.(用于存储缓存数据)
- 启动缓存 http 服务.(创建 HttpPool,添加节点信息,注册到 group 中)
- 启动 API 服务.(用于与客户端进行交互)
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 贾小白博客!