本文目的

上文Go-libp2p三 | relay例子已经分析过relay例子,并解释了两个节点如果通过中继服务进行通信。本文改造relay例子 并让这个服务做如下两件事:

  1. 交换两个节点的在交换机上的信息,帮助两个节点进行P2P打洞。
  2. 在打洞不成功的情况下能提供中继服务,转发两节点发送的数据。

我是看到源码中有hole punching协议,我是看了测试用例后才明白的。 利用libp2p的hole punching协议,NAT穿透打洞。打洞成功后进行p2p传输

relay例子代码

relay-server 代码基本不用改变

节点代码

节点需要的组件:

  1. 创建host, 绑定holepunching服务, hps := AddHolePunchService(h)
  2. 监听RelayProtocolResponse,P2PMsgFirstP2PMsg协议
  3. 连接中继服务 n.ConnectRelay()
  4. 节点发送RelayProtocolRequest协议给中继服务,服务会返回RelayProtocolResponse协议给节点 其他节点的peerID
  5. 通过中继服务连接其他节点n.ConnectNode() , 发送FirstP2PMsg协议(很关键),方便后续打洞成功
  6. 收到其他节点发来的FirstP2PMsg协议,5s钟后(已经开始打洞,防止在打洞过程中进行直连操作)开始直连p.node.CmdP2P(), DirectConnect
  7. 直连成功,往其他节点发送P2PMsg协议
golang
//chat.go
package main

const RelayProtocolRequest = "/relay/relayreq/1.0.0"
const RelayProtocolResponse = "/relay/relayrsp/1.0.0"
const FirstP2PMsg = "/First/P2PMsg/1.0.0"
const P2PMsg = "/Start/P2PMsg/1.0.0"
const RelayMsg = "/relay/relaymsg/1.0.0"


type NodeProtocol struct {
	node *Node
}

func NewNodeProtocol(server *Node)*NodeProtocol {
	protocol := &NodeProtocol{ node: server}
	server.SetStreamHandler(RelayProtocolResponse, protocol.onRsp)
	server.SetStreamHandler(FirstP2PMsg, protocol.onFirstP2PMsg)
	server.SetStreamHandler(P2PMsg, protocol.onP2PMsg)
	return protocol
}

func (p* NodeProtocol) onFirstP2PMsg(s network.Stream)  {
	buf, _ := ioutil.ReadAll(s)
	log.Println("receive onFirstP2PMsg: ", string(buf))
	time.Sleep(5*time.Second)
	p.node.CmdP2P()
}

func (p* NodeProtocol) onP2PMsg(s network.Stream)  {
	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
	go p.node.writeData(rw)
	go p.node.readData(rw)
}

type Node struct {
	host.Host
	HoleService *holepunch.Service
	*NodeProtocol

	chatP2PAddr *peer.AddrInfo
	Ctx context.Context
	//记录RelayServer
	ServerAddrInfo *peer.AddrInfo
	//记录peers
	Mutex *sync.Mutex
	Peers *list.List
}

func NewNode(host host.Host, hps *holepunch.Service) *Node {
	relayServer := &Node{ Host: host, HoleService: hps, chatP2PAddr: new(peer.AddrInfo), Ctx: context.Background(), Mutex: new(sync.Mutex), Peers: new(list.List)}
	relayServer.NodeProtocol = NewNodeProtocol(relayServer)
	return relayServer
}

func (n* Node)ConnectNode(nodePeerID string, nodeAddr string) {
	log.Println("start ConnectNodeByRelay....", nodePeerID, nodeAddr)
	peerID, err := peer.Decode(nodePeerID)
	if err != nil {
		log.Println(err.Error())
		return
	}
	n.Network().(*swarm.Swarm).Backoff().Clear(peerID)

	var address = nodeAddr +"/p2p/" + n.ServerAddrInfo.ID.Pretty()  + "/p2p-circuit/p2p/" + peerID.Pretty()
	//var address = nodeAddr +"/p2p/" + peerID.Pretty() + "/p2p-circuit"
	//var address = nodeAddr +"/p2p/" + peerID.Pretty()
	//var address = "/p2p/"+ peerID.Pretty()
	chatTargetAddr, err := multiaddr.NewMultiaddr(address)
	if err != nil {
		log.Println("chatTargetAddr is err")
		log.Println(err.Error())
		return
	}

	chatTargetAddrInfo := peer.AddrInfo{
		ID:    peerID,
		Addrs: []multiaddr.Multiaddr{chatTargetAddr},
	}

	if err := n.Connect(n.Ctx, chatTargetAddrInfo); err != nil {
		log.Println("chatTargetAddr connect err")
		log.Println(err.Error())
		return
	}

	n.chatP2PAddr = &chatTargetAddrInfo
	n.sendMessage(chatTargetAddrInfo.ID, FirstP2PMsg, "Hello Im " + chatTargetAddrInfo.ID.Pretty())
	//n.CmdRelay()
}

func (n *Node)CmdP2P()  {
	log.Println("Cmdp2p start ....")
	peerInfo := n.chatP2PAddr

	n.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.ConnectedAddrTTL)
	err := n.HoleService.DirectConnect(peerInfo.ID)
	if err != nil {
		log.Println("DirectConnect error: ")
		log.Println(err.Error())
		return
	}

	s, err := n.NewStream(n.Ctx, peerInfo.ID, P2PMsg)
	if err != nil {
		log.Println("p2p NewStream err")
		log.Println(err)
		return
	}

	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
	log.Println("opening p2p chat stream")
	log.Println("[INFO] p2p chat connected!")

	rw.WriteString("Hello this is  msg")
	rw.Flush()
	go n.writeData(rw)
	go n.readData(rw)
}

func makeRandomNode(relay string) *Node {
	peerInfo ,err := getPeerInfoByDest(relay)
	if err != nil {
		log.Println(err.Error())
		return nil
	}

	h, err := libp2p.New(
		libp2p.ListenAddrs(multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/")),
		libp2p.EnableRelay(),
		libp2p.EnableAutoRelay(),
		libp2p.ForceReachabilityPrivate(),
		libp2p.StaticRelays([]peer.AddrInfo{*peerInfo}),
		libp2p.EnableHolePunching(),
	)

	if err != nil {
		log.Println(err.Error())
		return nil
	}
	hps := AddHolePunchService(h)
	if hps == nil { return nil}
	return NewNode(h, hps)
}

func main() {
	relay := flag.String("relay", "", "relay addrs")
	flag.Parse()

	if *relay == "" {
		fmt.Println("Please Use -relay ")
		return
	}
	flag.Parse()

	node := makeRandomNode(*relay)
	if err := node.ConnectRelay(*relay); err != nil {
		log.Println(err.Error())
		return
	}
	log.Println("Connect relay success Next Login...")
	node.Login()

	log.Println("Im ", node.ID())
	select {
	case <-node.Ctx.Done():
		log.Println(node.Ctx.Err().Error())
	}
}

编译

$ go build chat.go
$ go build relay-server.go

启动中继服务

$ ./relay -sp 3001
2021/11/08 15:10:01 Run './chat -relay /ip4/172.16.3.205/tcp/3001/p2p/QmVJoYJC447ZVqQxWfWUecFAXPFd6QbT9mQvHuew3jaCBd' on another console.
2021/11/08 15:10:01 You can replace 192.168.0.100 with public IP as well.
2021/11/08 15:10:01 Waiting for incoming connection

在窗口A启动chat 连上中继

$ ./chat -relay /ip4/172.16.3.205/tcp/3001/p2p/QmVJoYJC447ZVqQxWfWUecFAXPFd6QbT9mQvHuew3jaCBd
2021/11/08 15:10:09 start ConnectNodeByRelay.... QmWtaMSNakQ8x5LUf4TpMb9JQy3PPSTPS4yNjRFyxAmaom /ip4/172.16.3.205/tcp/56309
2021/11/08 15:10:09 receive onFirstP2PMsg:  Hello Im QmT2qBsSWkTHPHkgfd6aUZQiF7j2ArmWF9NwQ8R62Wixrw

在窗口B启动chat 连上中继

$ ./chat -relay /ip4/172.16.3.205/tcp/3001/p2p/QmVJoYJC447ZVqQxWfWUecFAXPFd6QbT9mQvHuew3jaCBd
2021/11/08 15:10:09 start ConnectNodeByRelay.... QmT2qBsSWkTHPHkgfd6aUZQiF7j2ArmWF9NwQ8R62Wixrw /ip4/172.16.3.205/tcp/56306
2021/11/08 15:10:09 receive onFirstP2PMsg:  Hello Im QmWtaMSNakQ8x5LUf4TpMb9JQy3PPSTPS4yNjRFyxAmaom
2021/11/08 15:10:14 Cmdp2p start ....
2021/11/08 15:10:14 opening p2p chat stream
2021/11/08 15:10:14 [INFO] p2p chat connected!
  • A和B连上了中继后,中继服务会返回对方的地址 s.Conn.RemoteMultiaddr()
  • A和B接收到了地址后,发送了第一条消息FirstP2PMsg后,则过了5秒后直连对方节点n.HoleService.DirectConnect(peerInfo.ID),代码可见,(自动p2p打洞).
  • A和B可以进行p2p沟通。可以试着把relay-server关闭,然后继续发送消息。可见消息能发送成功。

这里p2p的打洞流程参考 coordination.go:

func (hs *Service) handleNewStream(s network.Stream) {
// Check directionality of the underlying connection.
// Peer A receives an inbound connection from peer B.
// Peer A opens a new hole punch stream to peer B.
// Peer B receives this stream, calling this function.
// Peer B sees the underlying connection as an outbound connection.
}

用go-libp2p进行p2p传输的流程就理清楚了。我只贴了改动和新增的代码, 详细代码

--完--