go-libp2p的echo例子分析

进入目录go-libp2p/examples/echo/, 编译 go build

先看README.md, 可以了解到使用echo的方式: 终端B

$ ./echo -l 3001
2021/10/28 11:25:33 I am /ip4/127.0.0.1/tcp/3001/p2p/QmTu8sxDi3fSjz9ZMvkAj3qA9YAoGskroEnyLz2ZvfTBhX
2021/10/28 11:25:33 listening for connections
2021/10/28 11:25:33 Now run "./echo -l 3002 -d /ip4/127.0.0.1/tcp/3001/p2p/QmTu8sxDi3fSjz9ZMvkAj3qA9YAoGskroEnyLz2ZvfTBhX" on a different terminal

2021/10/28 11:25:59 listener received new stream
2021/10/28 11:25:59 read: Hello, world!

在另外一个终端输A入:

$ ./echo -l 3002 -d /ip4/127.0.0.1/tcp/3001/p2p/QmTu8sxDi3fSjz9ZMvkAj3qA9YAoGskroEnyLz2ZvfTBhX
2021/10/28 11:25:59 I am /ip4/127.0.0.1/tcp/3002/p2p/QmPCxgAkhb9i6GeUr6QwTrfESShHqYQ4K9SHy6NWRQbagS
2021/10/28 11:25:59 sender opening stream
2021/10/28 11:25:59 sender saying hello
2021/10/28 11:25:59 read reply: "Hello, world!\n"

我改造了一下echo

我这么改造,是为了让echo例子更像chat例子,更容易做对比和理解libp2p这个库

改造了的函数

//echo.go
//监听方 
func startListener(ctx context.Context, ha host.Host, listenPort int, insecure bool) {
	// Set a stream handler on host A. /echo/1.0.0 is
	// a user-defined protocol name.
	ha.SetStreamHandler("/echo/1.0.0", func(s network.Stream) {
		log.Println("listener received new stream")
		if err := doEcho(s); err != nil {
			log.Println(err)
			s.Reset()
		} else {
			s.Close()
		}
	})

	log.Println("listening for connections")

	if insecure {
		log.Printf("Now run \"./echo -l %d -d /ip4/127.0.0.1/tcp/%v/p2p/%s -insecure\" on a different terminal\n", listenPort+1, listenPort, ha.ID().Pretty())
	} else {
		log.Printf("Now run \"./echo -l %d -d /ip4/127.0.0.1/tcp/%v/p2p/%s on a different terminal\n", listenPort+1, listenPort, ha.ID().Pretty())
	}
}

func runSender(ctx context.Context, ha host.Host, targetPeer string) {
	ha.SetStreamHandler("/echo/1.0.0", func(s network.Stream) {
		log.Println("sender received new stream")
		if err := doEcho(s); err != nil {
			log.Println(err)
			s.Reset()
		} else {
			s.Close()
		}
	})

	// The following code extracts target's the peer ID from the
	// given multiaddress
	ipfsaddr, err := ma.NewMultiaddr(targetPeer)
	if err != nil {
		log.Println(err)
		return
	}
	
	info, err := peer.AddrInfoFromP2pAddr(ipfsaddr)
	if err != nil {
		log.Println(err)
		return
	}

	ha.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)

	log.Println("sender opening stream")
	
	//节点A 使用节点B的host.ID和B监听的协议"/echo/1.0.0"创建一个新的数据流
	s, err := ha.NewStream(context.Background(), info.ID, "/echo/1.0.0")
	if err != nil {
		log.Println(err)
		return
	}

	log.Println("sender saying hello")
	_, err = s.Write([]byte("Hello, world!\n"))
	if err != nil {
		log.Println(err)
		return
	}

	out, err := ioutil.ReadAll(s)
	if err != nil {
		log.Println(err)
		return
	}

	log.Printf("read reply: %q\n", out)
}

改造的地方说明:

  1. 删除了函数getHostAddress的调用,他的作用就是不用手动拼接地址/ip4/127.0.0.1/tcp/3001/p2p/Qme97f81i4EFfUPstGmmumU1A5GJ5HMwtP5GXKDJGy9rSL
  2. startListener函数末尾,手动拼接出地址,并打印出来 方便另一个终端使用
  3. runSender函数删除的比较多,改造成了类似chat的例子的实现方式

改造后echo流程:

节点B

  1. 创建host (监听端口)
  2. 设置协议ID="/echo/1.0.0",并设置数据流的回调handle, 这个回调handle中有读写协程,等待连接
  3. 节点B需要知道节点A的地址,并解析A的地址
  4. 把节点A的地址添加到节点B的host
  5. 节点B使用节点A的host.ID和节点A监听的协议"/echo/1.0.0"创建一个新的数据流
  6. 节点B可以通过这个数据流和A进行通信

节点A

  1. 创建host (监听端口)
  2. 设置协议ID="/echo/1.0.0",并设置数据流的回调handle, 这个回调handle中有读写协程,等待连接
  3. 节点A需要知道节点B的地址,并解析B的地址
  4. 把节点B的地址添加到节点A的host
  5. 节点A使用节点B的host.ID和节点B监听的协议"/echo/1.0.0"创建一个新的数据流
  6. 节点B可以通过这个数据流和A进行通信

循环echo

运行程序运行起来,其实可以看到,节点B作为发送方,只发送了一次消息就断开了。如果想把消息一直echo下去,则有两种思路

第一种 | 类似chat实例

在chat示例中拿来三个函数handleStream readData writeData,然后把runSender下面的这段代码替换成调用handleStream(s)

_, err = s.Write([]byte("Hello, world!\n"))
	if err != nil {
		log.Println(err)
		return
	}

	out, err := ioutil.ReadAll(s)
	if err != nil {
		log.Println(err)
		return
	}

	log.Printf("read reply: %q\n", out)

在调用runSender函数下使用<-ctx.Done(), 防止主协程退出

runSender(ctx, ha, *targetF)
<-ctx.Done()

编译运行代码,可以看到,在两个终端下,发送方的任何消息接收方都会一模一样的返回回来。

第二种 | 改造doEcho

调用doEcho的地方也需要修改

// doEcho reads a line of data a stream and writes it back
func doEcho(h host.Host, s network.Stream) error {
	buf := bufio.NewReader(s)
	str, err := buf.ReadString('\n')
	if err != nil {
		return err
	}

	log.Printf("read: %s", str)
	s1, err := h.NewStream(context.Background(), s.Conn().RemotePeer(), "/echo/1.0.0")
	if err != nil {
		return err
	}
	defer s1.Close()

	s1.Write([]byte(str))
	return err
}

在调用runSender函数下使用<-ctx.Done(), 防止主协程退出

runSender(ctx, ha, *targetF)
<-ctx.Done()

注释掉函数runSender下的这段代码

//runSender
out, err := ioutil.ReadAll(s)
	if err != nil {
		log.Println(err)
		return
	}

log.Printf("read reply: %q\n", out)

编译运行代码,可以看到,在两个终端下,发送方的任何消息接收方都会一模一样的返回回来。

改造doEcho函数,我是参考例子multipro,

  1. doEcho数据回调handler,在函数doEcho读取数据流里的数据,然后通过数据流里的RemotePeer和协议ID新建一个数据流
  2. 然后发送数据,这样会触发另一个节点(终端)的协议绑定回调doEcho
  3. 这样就不会出现echo示例中的,发送端节点B绑定了协议回调,但是并没有触发这个协议回调。

总结

使用libp2p库让两个节点进行p2p通信一共分为如下几个步骤,以节点B为例子

  1. 创建host (监听端口)
  2. 设置协议ID="/echo/1.0.0",并设置数据流的回调handle, 这个回调handle中有读写协程,等待连接
  3. 节点B需要知道节点A的地址,并解析A的地址
  4. 把节点A的地址添加到节点B的host
  5. 节点B使用节点A的host.ID和节点A监听的协议"/echo/1.0.0"创建一个新的数据流 , 节点B可以通过这个数据流和A进行通信

作为发送方比接收方多了第4步, 把节点A的地址添加到节点B的host

--完--