Go-libp2p二 | echo例子
go-libp2p的echo例子分析
进入目录go-libp2p/examples/echo/
, 编译 go build
先看README.md
, 可以了解到使用echo的方式:
终端B
$ ./echo -l 3001
2021/10/28 11:25:33 I am /ip4/127.0.0.1/tcp/3001/p2p/QmTu8sxDi3fSjz9ZMvkAj3qA9YAoGskroEnyLz2ZvfTBhX
2021/10/28 11:25:33 listening for connections
2021/10/28 11:25:33 Now run "./echo -l 3002 -d /ip4/127.0.0.1/tcp/3001/p2p/QmTu8sxDi3fSjz9ZMvkAj3qA9YAoGskroEnyLz2ZvfTBhX" on a different terminal
2021/10/28 11:25:59 listener received new stream
2021/10/28 11:25:59 read: Hello, world!
在另外一个终端输A入:
$ ./echo -l 3002 -d /ip4/127.0.0.1/tcp/3001/p2p/QmTu8sxDi3fSjz9ZMvkAj3qA9YAoGskroEnyLz2ZvfTBhX
2021/10/28 11:25:59 I am /ip4/127.0.0.1/tcp/3002/p2p/QmPCxgAkhb9i6GeUr6QwTrfESShHqYQ4K9SHy6NWRQbagS
2021/10/28 11:25:59 sender opening stream
2021/10/28 11:25:59 sender saying hello
2021/10/28 11:25:59 read reply: "Hello, world!\n"
我改造了一下echo
我这么改造,是为了让echo例子更像chat例子,更容易做对比和理解libp2p这个库
改造了的函数
//echo.go
//监听方
func startListener(ctx context.Context, ha host.Host, listenPort int, insecure bool) {
// Set a stream handler on host A. /echo/1.0.0 is
// a user-defined protocol name.
ha.SetStreamHandler("/echo/1.0.0", func(s network.Stream) {
log.Println("listener received new stream")
if err := doEcho(s); err != nil {
log.Println(err)
s.Reset()
} else {
s.Close()
}
})
log.Println("listening for connections")
if insecure {
log.Printf("Now run \"./echo -l %d -d /ip4/127.0.0.1/tcp/%v/p2p/%s -insecure\" on a different terminal\n", listenPort+1, listenPort, ha.ID().Pretty())
} else {
log.Printf("Now run \"./echo -l %d -d /ip4/127.0.0.1/tcp/%v/p2p/%s on a different terminal\n", listenPort+1, listenPort, ha.ID().Pretty())
}
}
func runSender(ctx context.Context, ha host.Host, targetPeer string) {
ha.SetStreamHandler("/echo/1.0.0", func(s network.Stream) {
log.Println("sender received new stream")
if err := doEcho(s); err != nil {
log.Println(err)
s.Reset()
} else {
s.Close()
}
})
// The following code extracts target's the peer ID from the
// given multiaddress
ipfsaddr, err := ma.NewMultiaddr(targetPeer)
if err != nil {
log.Println(err)
return
}
info, err := peer.AddrInfoFromP2pAddr(ipfsaddr)
if err != nil {
log.Println(err)
return
}
ha.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
log.Println("sender opening stream")
//节点A 使用节点B的host.ID和B监听的协议"/echo/1.0.0"创建一个新的数据流
s, err := ha.NewStream(context.Background(), info.ID, "/echo/1.0.0")
if err != nil {
log.Println(err)
return
}
log.Println("sender saying hello")
_, err = s.Write([]byte("Hello, world!\n"))
if err != nil {
log.Println(err)
return
}
out, err := ioutil.ReadAll(s)
if err != nil {
log.Println(err)
return
}
log.Printf("read reply: %q\n", out)
}
改造的地方说明:
- 删除了函数
getHostAddress
的调用,他的作用就是不用手动拼接地址/ip4/127.0.0.1/tcp/3001/p2p/Qme97f81i4EFfUPstGmmumU1A5GJ5HMwtP5GXKDJGy9rSL
startListener
函数末尾,手动拼接出地址,并打印出来 方便另一个终端使用runSender
函数删除的比较多,改造成了类似chat
的例子的实现方式
改造后echo
流程:
节点B
- 创建host (监听端口)
- 设置协议ID="/echo/1.0.0",并设置数据流的回调handle, 这个回调handle中有读写协程,等待连接
- 节点B需要知道节点A的地址,并解析A的地址
- 把节点A的地址添加到节点B的host
- 节点B使用节点A的host.ID和节点A监听的协议"/echo/1.0.0"创建一个新的数据流
- 节点B可以通过这个数据流和A进行通信
节点A
- 创建host (监听端口)
- 设置协议ID="/echo/1.0.0",并设置数据流的回调handle, 这个回调handle中有读写协程,等待连接
- 节点A需要知道节点B的地址,并解析B的地址
- 把节点B的地址添加到节点A的host
- 节点A使用节点B的host.ID和节点B监听的协议"/echo/1.0.0"创建一个新的数据流
- 节点B可以通过这个数据流和A进行通信
循环echo
运行程序运行起来,其实可以看到,节点B作为发送方,只发送了一次消息就断开了。如果想把消息一直echo下去,则有两种思路
第一种 | 类似chat实例
在chat示例中拿来三个函数handleStream
readData
writeData
,然后把runSender
下面的这段代码替换成调用handleStream(s)
_, err = s.Write([]byte("Hello, world!\n"))
if err != nil {
log.Println(err)
return
}
out, err := ioutil.ReadAll(s)
if err != nil {
log.Println(err)
return
}
log.Printf("read reply: %q\n", out)
在调用runSender
函数下使用<-ctx.Done()
, 防止主协程退出
runSender(ctx, ha, *targetF)
<-ctx.Done()
编译运行代码,可以看到,在两个终端下,发送方的任何消息接收方都会一模一样的返回回来。
第二种 | 改造doEcho
调用doEcho
的地方也需要修改
// doEcho reads a line of data a stream and writes it back
func doEcho(h host.Host, s network.Stream) error {
buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
if err != nil {
return err
}
log.Printf("read: %s", str)
s1, err := h.NewStream(context.Background(), s.Conn().RemotePeer(), "/echo/1.0.0")
if err != nil {
return err
}
defer s1.Close()
s1.Write([]byte(str))
return err
}
在调用runSender
函数下使用<-ctx.Done()
, 防止主协程退出
runSender(ctx, ha, *targetF)
<-ctx.Done()
注释掉函数runSender
下的这段代码
//runSender
out, err := ioutil.ReadAll(s)
if err != nil {
log.Println(err)
return
}
log.Printf("read reply: %q\n", out)
编译运行代码,可以看到,在两个终端下,发送方的任何消息接收方都会一模一样的返回回来。
改造doEcho
函数,我是参考例子multipro
,
doEcho
数据回调handler,在函数doEcho
读取数据流里的数据,然后通过数据流里的RemotePeer
和协议ID新建一个数据流- 然后发送数据,这样会触发另一个节点(终端)的协议绑定回调
doEcho
- 这样就不会出现
echo
示例中的,发送端节点B绑定了协议回调,但是并没有触发这个协议回调。
总结
使用libp2p库让两个节点进行p2p通信一共分为如下几个步骤,以节点B为例子
- 创建host (监听端口)
- 设置协议ID="/echo/1.0.0",并设置数据流的回调handle, 这个回调handle中有读写协程,等待连接
- 节点B需要知道节点A的地址,并解析A的地址
- 把节点A的地址添加到节点B的host
- 节点B使用节点A的host.ID和节点A监听的协议"/echo/1.0.0"创建一个新的数据流 , 节点B可以通过这个数据流和A进行通信
作为发送方比接收方多了第4步, 把节点A的地址添加到节点B的host
--完--
- 原文作者: 留白
- 原文链接: https://zfunnily.github.io/2021/10/gop2ptwo/
- 更新时间:2024-04-16 01:01:05
- 本文声明:转载请标记原文作者及链接