代码下载

git clone https://github.com/libp2p/go-libp2p.git

chat实例分析

进入目录go-libp2p/examples/chat/, 编译 go build 先看README.md, 可以了解到使用chat的方式

  1. 在节点 B
$ ./chat -sp 3001
2021/10/28 11:01:25 Run './chat -d /ip4/127.0.0.1/tcp/3001/p2p/QmXgEXt4CiqmVmDhPPQnyVkoj3Bsx3Nggq57PPsk5tLNTs' on another console.
2021/10/28 11:01:25 You can replace 127.0.0.1 with public IP as well.
2021/10/28 11:01:25 Waiting for incoming connection
2021/10/28 11:01:25 
2021/10/28 11:03:03 Got a new stream!
> hello
  1. 在节点 A, 如果B有公网IP,则把127.0.0.1替换成B的公网IP
$ ./chat -d /ip4/127.0.0.1/tcp/3001/p2p/QmXgEXt4CiqmVmDhPPQnyVkoj3Bsx3Nggq57PPsk5tLNTs
2021/10/28 11:03:03 This node's multiaddresses:
2021/10/28 11:03:03  - /ip4/172.16.3.205/tcp/53084
2021/10/28 11:03:03  - /ip4/127.0.0.1/tcp/53084
2021/10/28 11:03:03 
2021/10/28 11:03:03 Established connection to destination
> hello

作为被连接方B

先看代码:

golang
//创建host
h, err := makeHost(*sourcePort, r)
...
//监听 并设置回调Handler streamHandler
func startPeer(ctx context.Context, h host.Host, streamHandler network.StreamHandler) {
	// Set a function as stream handler.
	// This function is called when a peer connects, and starts a stream with this protocol.
	// Only applies on the receiving side.
	//给host 设置协议'/chat/1.0.0', 并设置回调~~~~
	h.SetStreamHandler("/chat/1.0.0", streamHandler)

	// Let's get the actual TCP port from our listen multiaddr, in case we're using 0 (default; random available port).
	var port string
	for _, la := range h.Network().ListenAddresses() {
		if p, err := la.ValueForProtocol(multiaddr.P_TCP); err == nil {
			port = p
			break
		}
	}

	if port == "" {
		log.Println("was not able to find actual local port")
		return
	}

	log.Printf("Run './chat -d /ip4/127.0.0.1/tcp/%v/p2p/%s' on another console.\n", port, h.ID().Pretty())
	log.Println("You can replace 127.0.0.1 with public IP as well.")
	log.Println("Waiting for incoming connection")
	log.Println()
}
//回调的handler
func handleStream(s network.Stream) {
	log.Println("Got a new stream!")

	// Create a buffer stream for non blocking read and write.
	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

	go readData(rw)
	go writeData(rw)

	// stream 's' will stay open until you close it (or the other side closes it).
}

从代码上看,节点B所做的事情:

  1. 创建host
  2. 设置协议id,并设置回调, 有种写接口接口的即视感。
  3. 等待回调

作为连接方A

代码如下:

golang
//创建host
h, err := makeHost(*sourcePort, r)

//建立对等连接发送消息
func startPeerAndConnect(ctx context.Context, h host.Host, destination string) (*bufio.ReadWriter, error) {
	log.Println("This node's multiaddresses:")
	for _, la := range h.Addrs() {
		log.Printf(" - %v\n", la)
	}
	log.Println()

	//通过被连接方B的地址destination创建 maddr
	maddr, err := multiaddr.NewMultiaddr(destination)
	if err != nil {
		log.Println(err)
		return nil, err
	}

	// Extract the peer ID from the multiaddr.
	info, err := peer.AddrInfoFromP2pAddr(maddr)
	if err != nil {
		log.Println(err)
		return nil, err
	}

	//把被连接方B的地址加入到连接方A的host中
	h.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)

	//用被连接方B的ID新建一个数据流, 并与被连接方A监听的协议设置成一致的 "/chat/1.0.0"
	s, err := h.NewStream(context.Background(), info.ID, "/chat/1.0.0")
	if err != nil {
		log.Println(err)
		return nil, err
	}
	log.Println("Established connection to destination")

	// Create a buffered stream so that read and writes are non blocking.
	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

	return rw, nil
}

// 创建读写协程 
go writeData(rw)
go readData(rw)

被连接方别名为B, 连接方别名为A 从代码上看,节点A所做的事情:

  1. A创建host
  2. 解析B的地址,获得B的host.ID
  3. 把B的地址添加到A的host中
  4. 用A的host创建一个新的数据流,其中参数为B的host.ID,和B监听的协议 “/chat/1.0.0”, 根据生成的数据流,创建读写协程, 可以在协程里进行逻辑处理

总结

作为节点A和节点B, 使用libp2p库进行p2p通讯的步骤如下:

  1. 双方需要创建各自的host
  2. 被连接方的节点B需要设置协议ID,并设置数据流的回调handle, 这个回调handle中有读写协程
  3. 连接方的节点A需要知道B的地址,并解析B的地址
  4. 把节点B的地址添加到节点A的host
  5. 节点A使用节点B的host.ID和B监听的协议"/chat/1.0.0"创建一个新的数据流, 创建读写协程
  6. 这样节点A和B就能通讯了

作为发送方比接收方多了第4步, 把节点B的地址添加到节点A的host

--完--