我想做什么

chat和echo两个例子,说明了两个节点之间的通信需要两节点在同一个局域网内,或者有一个节点有公网。 如果我两个节点分别在两个NAT下面, 都在各自的局域网内,则需要有一个中继服务,这个服务做如下两件事:

  1. 交换两个节点的在交换机上的信息,帮助两个节点进行P2P打洞。(hole punching, 在 Go-libp2p四 | 穿透NAT 文章中可见)
  2. 在打洞不成功的情况下能提供中继服务,转发两节点发送的数据。

根据我的需求,我在go-libp2p中发现了relay例子。

go-libp2p的relay例子分析

这个例子是使用h2作为中继,使h1和h3通过h2进行信息交换, 并可以通过h2使用 h1可以和h3通信。

relay例子中,可以做到如果两个节点怎么通过中继服务知道对方的地址,然后进行通信。

  1. 创建节点: 创建h1 h1, err := libp2p.New(libp2p.EnableRelay()) 创建h3 h3, err := libp2p.New(libp2p.ListenAddrs(), libp2p.EnableRelay()) h3监听协议"/cats",h3.SetStreamHandler("/cats", func(s network.Stream) {})

  2. 创建中继: 创建h2中继 h2, err := libp2p.New(libp2p.DisableRelay()) 中继服务定义接收信息协议,其他节点可以通过协议与之通信_, err = relayv1.NewRelay(h2) 中继服务地址: h2info := peer.AddrInfo{ ID: h2.ID(), Addrs: h2.Addrs(), }

  3. 双方节点连上中继服务: h1连上h2, h1.Connect(context.Background(), h2info) h3连上h2, h3.Connect(context.Background(), h2info)

  4. 获取h3的地址 relayaddr, err := ma.NewMultiaddr("/p2p/" + h2.ID().Pretty() + "/p2p-circuit/ipfs/" + h3.ID().Pretty()) h3relayInfo := peer.AddrInfo{ ID: h3.ID(), Addrs: []ma.Multiaddr{relayaddr}, } h1清除掉h3的地址:h1.Network().(*swarm.Swarm).Backoff().Clear(h3.ID())

  5. h1连接h3 h1.Connect(context.Background(), h3relayInfo)

  6. h1向h3已经准备好连接 h1新建一个"/cats"的数据流,用于收发数据 h1.NewStream(context.Background(), h3.ID(), "/cats") 读取数据:s.Read(make([]byte, 1))

总结以上步骤来说: 第一步,双方节点先连接中继节点 第二步,其中一节点h1通过中继服务h2获取到另一方的节点h3地址(这个例子直接就能通过内存获取到对方的地址) 第三步,h1连接h3,并通过h3监听的协议"/cats"新建数据流,用于收发数据。

在各自的局域网内,通过一个中继服务,就能使节点双方进行p2p通信。此例子是在一个进程中实现的。我按照他的例子,把中继服务和节点分离开。

先实现中继服务

首先看中继服务需要什么:

  1. 监听一条协议RelayProtocolRequest,当有一个节点上来,需要记录,当有两个节点时,返回两个节点的信息给对方节点。
  2. 有一个公网IP
golang
//relay-server
package main

const RelayProtocolRequest = "/relay/relayreq/1.0.0"
const RelayProtocolResponse = "/relay/relayrsp/1.0.0"

type RelayProtocol struct {
   Server *RelayServer
}

func NewRelayProtocol(server *RelayServer)*RelayProtocol {
   protocol := &RelayProtocol{ Server: server}
   server.SetStreamHandler(RelayProtocolRequest, protocol.onReq)
   server.SetStreamHandler(RelayProtocolResponse, protocol.onRsp)
   return protocol
}

//返回peers节点ID
func (p *RelayProtocol)onReq(s network.Stream) {
   p.Server.Mutex.Lock()
   defer p.Server.Mutex.Unlock()
   remotePeerID := s.Conn().RemotePeer()
   p.Server.Peers[remotePeerID]=s.Conn().RemoteMultiaddr()

   localAddrs := s.Conn().LocalMultiaddr()
   remoteAddrs := s.Conn().RemoteMultiaddr()

   log.Printf("a new Stream relay req, remotePeerID: %s; \nremoteAddr: %s, localAddr: %s\n",  remotePeerID.Pretty(), remoteAddrs.String(), localAddrs.String())

   if len(p.Server.Peers) != 2 {
      return
   }

   var rsp = ""
   for k, v := range p.Server.Peers {
      rsp = rsp + k.Pretty() +  "=" + v.String() + "\n"
   }
   for k, _ := range p.Server.Peers {
      log.Println("start send peer: " , s.Conn().RemotePeer().Pretty())
      p.Server.sendMessage(k, RelayProtocolResponse, rsp)
      delete(p.Server.Peers, k)
   }
}

func (p *RelayProtocol)onRsp(s network.Stream) {
   log.Println("a new Stream relay rsp")
}

type RelayServer struct {
   host.Host
   *RelayProtocol
   Ctx context.Context
   HoleServer *holepunch.Service

   //记录peers
   Mutex *sync.Mutex
   Peers map[peer.ID]multiaddr.Multiaddr
}

func NewRelayServer(host host.Host) *RelayServer {
   relayServer := &RelayServer{ Host:host, Ctx: context.Background(), Mutex: new(sync.Mutex), Peers: make(map[peer.ID]multiaddr.Multiaddr)}
   relayServer.RelayProtocol = NewRelayProtocol(relayServer)
   return relayServer
}

func (n *RelayServer) sendMessage(id peer.ID, p protocol.ID, data string) bool {
   s, err := n.NewStream(n.Ctx, id, p)
   if err != nil {
      log.Println("NewStream err")
      log.Println(err)
      return false
   }
   defer s.Close()

   s.Write([]byte(data))
   return true
}

func AddHolePunchService( h host.Host) *holepunch.Service {
   ids, err := identify.NewIDService(h)
   if err != nil {
      log.Println(err.Error())
      return nil
   }
   hps, err := holepunch.NewService(h, ids)
   if err != nil {
      log.Println(err.Error())
      return nil
   }
   return hps
}
func makeRelayHost(port int) (*RelayServer,error) {
   sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port))
   h, err:= libp2p.New(
      libp2p.ListenAddrs(sourceMultiAddr),
      libp2p.DisableRelay(),
      libp2p.EnableHolePunching(),
   )
   _, err = relayv1.NewRelay(h)
   if err != nil {
      log.Printf("Failed to instantiate h2 relay: %v", err)
      return nil, err
   }
   return NewRelayServer(h), err
}

func main() {
   sourcePort := flag.Int("sp", 0, "Source port number")
   flag.Parse()

   if *sourcePort == 0 {
      fmt.Println("Please Use -sp port")
      return
   }
   flag.Parse()

   serverHost, err := makeRelayHost(*sourcePort)
   if err != nil {
      log.Printf("Failed to create relay: %v", err)
      return
   }

   hostAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serverHost.ID().Pretty()))
   addr := serverHost.Addrs()[0]
   fullAddr := addr.Encapsulate(hostAddr)
   log.Printf("Run './chat -relay %s' on another console.\n",  fullAddr)
   log.Println("You can replace 192.168.0.100 with public IP as well.")
   log.Println("Waiting for incoming connection")
   log.Println()
   <- serverHost.Ctx.Done()
}

节点代码

节点需要的组件:

  1. 创建host
  2. 监听RelayMsg协议 和RelayProtocolResponse协议
  3. 连接中继服务 n.ConnectRelay()
  4. 节点发送RelayProtocolRequest协议给中继服务,服务会返回RelayProtocolResponse协议给节点 其他节点的peerID
  5. 通过中继服务连接其他节点n.ConnectNode() ,
  6. 并且往其他节点发送RelayMsg协议, n.CmdRelay
golang
//chat.go
package main

const RelayProtocolRequest = "/relay/relayreq/1.0.0"
const RelayProtocolResponse = "/relay/relayrsp/1.0.0"
const RelayMsg = "/relay/relaymsg/1.0.0"

type NodeProtocol struct {
   node *Node
}

func NewNodeProtocol(server *Node)*NodeProtocol {
   protocol := &NodeProtocol{ node: server}
   server.SetStreamHandler(RelayProtocolResponse, protocol.onRsp)
   server.SetStreamHandler(RelayMsg, protocol.onRelayMsg)
   return protocol
}

//获得对方peerID
func (p *NodeProtocol)onRsp(s network.Stream) {
   buf, err := ioutil.ReadAll(s)
   if err != nil {
      log.Println(err.Error())
      return
   }
   peers := strings.Split(string(buf), "\n")
   var addrs []string
   for  _,v := range peers {
      tmp := strings.Split(string(v), "=")
      if tmp[0] != p.node.ID().Pretty() {
         addrs = tmp
         break
      }
   }
   log.Printf("a new Stream Relay rsp,addrs.len %d, peerInfo : %s\n", len(addrs) , buf)
   if len(addrs) <= 1 { return}
   p.node.ConnectNode(addrs[0], addrs[1])
}

func (p* NodeProtocol) onFirstP2PMsg(s network.Stream)  {
   buf, _ := ioutil.ReadAll(s)
   log.Println("receive onFirstP2PMsg: ", string(buf))
   time.Sleep(5*time.Second)
   p.node.CmdP2P()
}

func (p* NodeProtocol) onP2PMsg(s network.Stream)  {
   rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
   go p.node.writeData(rw)
   go p.node.readData(rw)
}

func (p* NodeProtocol) onRelayMsg(s network.Stream)  {
   rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
   go p.node.writeData(rw)
   go p.node.readData(rw)
}

type Node struct {
   host.Host
   *NodeProtocol

   chatP2PAddr *peer.AddrInfo
   Ctx context.Context
   //记录RelayServer
   ServerAddrInfo *peer.AddrInfo
   //记录peers
   Mutex *sync.Mutex
   Peers *list.List
}

func NewNode(host host.Host) *Node {
   relayServer := &Node{ Host: host, chatP2PAddr: new(peer.AddrInfo), Ctx: context.Background(), Mutex: new(sync.Mutex), Peers: new(list.List)}
   relayServer.NodeProtocol = NewNodeProtocol(relayServer)
   return relayServer
}

func (n *Node)readData(rw *bufio.ReadWriter) {
   for {
      str, _ := rw.ReadString('\n')

      if str == "" {
         return
      }
      if str != "\n" {
         // Green console colour: 	\x1b[32m
         // Reset console colour: 	\x1b[0m
         fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
      }
   }
}

func (n *Node)writeData(rw *bufio.ReadWriter) {
   stdReader := bufio.NewReader(os.Stdin)
   for {
      fmt.Print("> ")
      sendData, err := stdReader.ReadString('\n')
      if err != nil {
         log.Println(err)
         return
      }

      rw.WriteString(fmt.Sprintf("%s\n", sendData))
      rw.Flush()
   }
}

func getPeerInfoByDest(relay string) (*peer.AddrInfo, error) {
   relayAddr, err := multiaddr.NewMultiaddr(relay)
   if err != nil {
      return nil , err
   }
   pid, err := relayAddr.ValueForProtocol(multiaddr.P_P2P)
   if err != nil {
      return nil , err
   }
   relayPeerID, err := peer.Decode(pid)
   if err != nil {
      return nil , err
   }

   relayPeerAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", pid))
   relayAddress := relayAddr.Decapsulate(relayPeerAddr)
   peerInfo := &peer.AddrInfo{
      ID:    relayPeerID,
      Addrs: []multiaddr.Multiaddr{relayAddress},
   }
   return peerInfo, err
}

func (n *Node)ConnectRelay(relay string) error {
   peerInfo ,err := getPeerInfoByDest(relay)
   if err != nil {
      return err
   }
   n.ServerAddrInfo = peerInfo
   return n.Connect(n.Ctx, *peerInfo)
}

func (n* Node)ConnectNode(nodePeerID string, nodeAddr string) {
   log.Println("start ConnectNodeByRelay....", nodePeerID, nodeAddr)
   peerID, err := peer.Decode(nodePeerID)
   if err != nil {
      log.Println(err.Error())
      return
   }
   n.Network().(*swarm.Swarm).Backoff().Clear(peerID)

   var address = nodeAddr +"/p2p/" + n.ServerAddrInfo.ID.Pretty()  + "/p2p-circuit/p2p/" + peerID.Pretty()
   //var address = nodeAddr +"/p2p/" + peerID.Pretty() + "/p2p-circuit"
   //var address = nodeAddr +"/p2p/" + peerID.Pretty()
   //var address = "/p2p/"+ peerID.Pretty()
   chatTargetAddr, err := multiaddr.NewMultiaddr(address)
   if err != nil {
      log.Println("chatTargetAddr is err")
      log.Println(err.Error())
      return
   }

   chatTargetAddrInfo := peer.AddrInfo{
      ID:    peerID,
      Addrs: []multiaddr.Multiaddr{chatTargetAddr},
   }

   if err := n.Connect(n.Ctx, chatTargetAddrInfo); err != nil {
      log.Println("chatTargetAddr connect err")
      log.Println(err.Error())
      return
   }

   n.chatP2PAddr = &chatTargetAddrInfo
   //n.sendMessage(chatTargetAddrInfo.ID, FirstP2PMsg, "Hello Im " + chatTargetAddrInfo.ID.Pretty())
   n.CmdRelay()
}

func (n *Node)CmdRelay()  {
   log.Println("CmdRelay start....")
   peerInfo := n.chatP2PAddr
   s, err := n.NewStream(n.Ctx, peerInfo.ID, RelayMsg)
   if err != nil {
      log.Println(err.Error())
      return
   }

   rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
   log.Println("opening chat stream")
   log.Println("[INFO] chat connected!")
   n.sendMessage(peerInfo.ID, RelayMsg, "Hello this is relay msg")

   go n.writeData(rw)
   go n.readData(rw)
}

func (n *Node) Login()  {
   if n.sendMessage(n.ServerAddrInfo.ID, RelayProtocolRequest, "login") {
      log.Printf("%s login to relay\n", n.ID())
   } else {
      log.Printf("%s connect to relay error\n", n.ID())
   }
}

func (n *Node) sendMessage(id peer.ID, p protocol.ID, data string) bool {
   s, err := n.NewStream(n.Ctx, id, p)
   if err != nil {
      log.Println("NewStream err")
      log.Println(err)
      return false
   }
   defer s.Close()

   s.Write([]byte(data))
   return true
}

func AddHolePunchService( h host.Host) *holepunch.Service {
   ids, err := identify.NewIDService(h)
   if err != nil {
      log.Println(err.Error())
      return nil
   }
   hps, err := holepunch.NewService(h, ids)
   if err != nil {
      log.Println(err.Error())
      return nil
   }
   return hps
}

func makeRandomNode(relay string) *Node {
   peerInfo ,err := getPeerInfoByDest(relay)
   if err != nil {
      log.Println(err.Error())
      return nil
   }

   //priv, _ := generateIdentity(0)
   h, err := libp2p.New(
      libp2p.ListenAddrs(multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/")),
      libp2p.EnableRelay(),
   )

   if err != nil {
      log.Println(err.Error())
      return nil
   }
   return NewNode(h)
}

func main() {
   relay := flag.String("relay", "", "relay addrs")
   flag.Parse()

   if *relay == "" {
      fmt.Println("Please Use -relay ")
      return
   }
   flag.Parse()

   node := makeRandomNode(*relay)
   if err := node.ConnectRelay(*relay); err != nil {
      log.Println(err.Error())
      return
   }
   log.Println("Connect relay success Next Login...")
   node.Login()

   log.Println("Im ", node.ID())
   select {
   case <-node.Ctx.Done():
      log.Println(node.Ctx.Err().Error())
   }
}

编译

$ go build -o relay-server relay-server.go
$ go build -o chat chat.go

使用,先启动中继服务

$ ./relay-server -sp 3001

启动A窗口, 连上relay服务

$ ./chat -relay 

再启动B窗,连上relay服务

$ ./chat -relay

这样A和B两个节点就可以通过relay服务进行数据传输了。 下篇文章,改造relay服务,relay服务交换节点信息后,使节点间可以进行p2p传输,传输数据不通过relay服务。 详细代码

未完待续…

--完--