多节点间的通信

远程访问流程

远程访问流程

  • 客户端 发送一个查询请求达到某个缓存节点时, 该节点会先判断 key 是否在本地, 不在的话, 再通过发送网络请求去访问其他 node 节点。
  • 每个 node 既要处理来自客户端这样的外部请求, 也要处理来自其他远端节点的内部请求。
  • 我们需要在 node 内部, 启动两个 http 服务, 一个处理客户端请求(APIServer), 一个处理节点的请求(CacheServer).

请求处理

  1. 定义一个查询节点的方法

    1
    2
    3
    4
    5
    type PeerPicker interface {
    // PickPeer 于根据传入的 key 选择相应节点(选择相应的节点方法)
    PickPeer(key string) (peer PeerGetter, ok bool)
    }

  1. 通过网络请求帮我们拿到缓存结果

    1
    2
    3
    4
    // 这个接口为我们提供需要的能力.
    type PeerGetter interface {
    Get(group string, key string) ([]byte, error)
    }
  1. 实现 PeerGetter#Get 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // 首先需要定义一个 struct 实现 PeerGetter 接口
    type httpGetter struct {
    baseURL string
    }

    // Get 实现接口对应的方法, 这个接口能提供访问网络接口拿到缓存数据。
    func (h *httpGetter) Get(group string, key string) ([]byte, error) {
    // 访问 http 接口的逻辑
    u := fmt.Sprintf(
    "%v%v/%v",
    h.baseURL,
    url.QueryEscape(group),
    url.QueryEscape(key),
    )
    res, err := http.Get(u)
    if err != nil {
    return nil, err
    }
    defer res.Body.Close()

    if res.StatusCode != http.StatusOK {
    return nil, fmt.Errorf("server returned: %v", res.Status)
    }

    bytes, err := ioutil.ReadAll(res.Body)
    if err != nil {
    return nil, fmt.Errorf("reading response body: %v", err)
    }

    return bytes, nil
    }

  2. 当我们 cache 服务启动时, 肯定要往 hash 环 添加节点(真实 + 虚拟)的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    const (
    defaultBasePath = "/_gocache/"
    defaultReplicas = 50
    )

    type HttpPool struct {
    self string // 记录本地地址和端口
    basePath string // 基础路径
    mu sync.Mutex // 保护peer和httpGetters
    peers *consistenthash.Map // 根据具体的 key 选择节点
    httpGetters map[string]*httpGetter // 每一个远程节点对应一个 httpGetter
    }
    // Set 更新节点列表
    func (p *HttpPool) Set(peers ...string) {
    // 因为 hash 环的 map 不是线程安全的,所以这里要加锁.
    p.mu.Lock()
    defer p.mu.Unlock()
    p.peers = consistenthash.New(defaultReplicas, nil)
    // 调用上一章节的方法, 在 hash 环上添加真实节点和虚拟节点
    p.peers.Add(peers...)
    // 存储远端节点信息
    p.httpGetters = make(map[string]*httpGetter, len(peers))
    for _, peer := range peers {
    p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
    }
    }

    // PickPeer 根据key选择一个节点
    func (p *HttpPool) PickPeer(key string) (PeerGetter, bool) {
    // 因为 hash 环的 map 不是线程安全的,所以这里要加锁.
    p.mu.Lock()
    defer p.mu.Unlock()
    // p.peers 是个 哈希环, 通过调用它的 Get 方法拿到远端节点.
    // 这里的 peer 是个地址.
    if peer := p.peers.Get(key); peer != "" && peer != p.self {
    p.Log("Pick peer %s", peer)
    return p.httpGetters[peer], true
    }
    return nil, false
    }

    var _ PeerPicker = (*HttpPool)(nil)

总结

缓存节点启动的流程。

  1. 创建 Group 对象.(用于存储缓存数据)
  2. 启动缓存 http 服务.(创建 HttpPool,添加节点信息,注册到 group 中)
  3. 启动 API 服务.(用于与客户端进行交互)