Go-libp2p三 | relay例子
我想做什么
chat和echo两个例子,说明了两个节点之间的通信需要两节点在同一个局域网内,或者有一个节点有公网。
如果我两个节点分别在两个NAT下面, 都在各自的局域网内,则需要有一个中继服务
,这个服务做如下两件事:
- 交换两个节点的在交换机上的信息,帮助两个节点进行P2P打洞。(hole punching, 在 Go-libp2p四 | 穿透NAT 文章中可见)
- 在打洞不成功的情况下能提供中继服务,转发两节点发送的数据。
根据我的需求,我在go-libp2p中发现了relay例子。
go-libp2p的relay例子分析
这个例子是使用h2作为中继,使h1和h3通过h2进行信息交换, 并可以通过h2使用 h1可以和h3通信。
relay例子中,可以做到如果两个节点怎么通过中继服务知道对方的地址,然后进行通信。
创建节点: 创建h1
h1, err := libp2p.New(libp2p.EnableRelay())
创建h3h3, err := libp2p.New(libp2p.ListenAddrs(), libp2p.EnableRelay())
h3监听协议"/cats",h3.SetStreamHandler("/cats", func(s network.Stream) {})
创建中继: 创建h2中继
h2, err := libp2p.New(libp2p.DisableRelay())
中继服务定义接收信息协议,其他节点可以通过协议与之通信_, err = relayv1.NewRelay(h2)
中继服务地址:h2info := peer.AddrInfo{ ID: h2.ID(), Addrs: h2.Addrs(), }
双方节点连上中继服务: h1连上h2,
h1.Connect(context.Background(), h2info)
h3连上h2,h3.Connect(context.Background(), h2info)
获取h3的地址
relayaddr, err := ma.NewMultiaddr("/p2p/" + h2.ID().Pretty() + "/p2p-circuit/ipfs/" + h3.ID().Pretty())
h3relayInfo := peer.AddrInfo{ ID: h3.ID(), Addrs: []ma.Multiaddr{relayaddr}, }
h1清除掉h3的地址:h1.Network().(*swarm.Swarm).Backoff().Clear(h3.ID())
h1连接h3
h1.Connect(context.Background(), h3relayInfo)
h1向h3已经准备好连接 h1新建一个"/cats"的数据流,用于收发数据
h1.NewStream(context.Background(), h3.ID(), "/cats")
读取数据:s.Read(make([]byte, 1))
总结以上步骤来说: 第一步,双方节点先连接中继节点 第二步,其中一节点h1通过中继服务h2获取到另一方的节点h3地址(这个例子直接就能通过内存获取到对方的地址) 第三步,h1连接h3,并通过h3监听的协议"/cats"新建数据流,用于收发数据。
在各自的局域网内,通过一个中继服务,就能使节点双方进行p2p通信。此例子是在一个进程中实现的。我按照他的例子,把中继服务和节点分离开。
先实现中继服务
首先看中继服务需要什么:
- 监听一条协议
RelayProtocolRequest
,当有一个节点上来,需要记录,当有两个节点时,返回两个节点的信息给对方节点。 - 有一个公网IP
golang
//relay-server
package main
const RelayProtocolRequest = "/relay/relayreq/1.0.0"
const RelayProtocolResponse = "/relay/relayrsp/1.0.0"
type RelayProtocol struct {
Server *RelayServer
}
func NewRelayProtocol(server *RelayServer)*RelayProtocol {
protocol := &RelayProtocol{ Server: server}
server.SetStreamHandler(RelayProtocolRequest, protocol.onReq)
server.SetStreamHandler(RelayProtocolResponse, protocol.onRsp)
return protocol
}
//返回peers节点ID
func (p *RelayProtocol)onReq(s network.Stream) {
p.Server.Mutex.Lock()
defer p.Server.Mutex.Unlock()
remotePeerID := s.Conn().RemotePeer()
p.Server.Peers[remotePeerID]=s.Conn().RemoteMultiaddr()
localAddrs := s.Conn().LocalMultiaddr()
remoteAddrs := s.Conn().RemoteMultiaddr()
log.Printf("a new Stream relay req, remotePeerID: %s; \nremoteAddr: %s, localAddr: %s\n", remotePeerID.Pretty(), remoteAddrs.String(), localAddrs.String())
if len(p.Server.Peers) != 2 {
return
}
var rsp = ""
for k, v := range p.Server.Peers {
rsp = rsp + k.Pretty() + "=" + v.String() + "\n"
}
for k, _ := range p.Server.Peers {
log.Println("start send peer: " , s.Conn().RemotePeer().Pretty())
p.Server.sendMessage(k, RelayProtocolResponse, rsp)
delete(p.Server.Peers, k)
}
}
func (p *RelayProtocol)onRsp(s network.Stream) {
log.Println("a new Stream relay rsp")
}
type RelayServer struct {
host.Host
*RelayProtocol
Ctx context.Context
HoleServer *holepunch.Service
//记录peers
Mutex *sync.Mutex
Peers map[peer.ID]multiaddr.Multiaddr
}
func NewRelayServer(host host.Host) *RelayServer {
relayServer := &RelayServer{ Host:host, Ctx: context.Background(), Mutex: new(sync.Mutex), Peers: make(map[peer.ID]multiaddr.Multiaddr)}
relayServer.RelayProtocol = NewRelayProtocol(relayServer)
return relayServer
}
func (n *RelayServer) sendMessage(id peer.ID, p protocol.ID, data string) bool {
s, err := n.NewStream(n.Ctx, id, p)
if err != nil {
log.Println("NewStream err")
log.Println(err)
return false
}
defer s.Close()
s.Write([]byte(data))
return true
}
func AddHolePunchService( h host.Host) *holepunch.Service {
ids, err := identify.NewIDService(h)
if err != nil {
log.Println(err.Error())
return nil
}
hps, err := holepunch.NewService(h, ids)
if err != nil {
log.Println(err.Error())
return nil
}
return hps
}
func makeRelayHost(port int) (*RelayServer,error) {
sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port))
h, err:= libp2p.New(
libp2p.ListenAddrs(sourceMultiAddr),
libp2p.DisableRelay(),
libp2p.EnableHolePunching(),
)
_, err = relayv1.NewRelay(h)
if err != nil {
log.Printf("Failed to instantiate h2 relay: %v", err)
return nil, err
}
return NewRelayServer(h), err
}
func main() {
sourcePort := flag.Int("sp", 0, "Source port number")
flag.Parse()
if *sourcePort == 0 {
fmt.Println("Please Use -sp port")
return
}
flag.Parse()
serverHost, err := makeRelayHost(*sourcePort)
if err != nil {
log.Printf("Failed to create relay: %v", err)
return
}
hostAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serverHost.ID().Pretty()))
addr := serverHost.Addrs()[0]
fullAddr := addr.Encapsulate(hostAddr)
log.Printf("Run './chat -relay %s' on another console.\n", fullAddr)
log.Println("You can replace 192.168.0.100 with public IP as well.")
log.Println("Waiting for incoming connection")
log.Println()
<- serverHost.Ctx.Done()
}
节点代码
节点需要的组件:
- 创建host
- 监听
RelayMsg
协议 和RelayProtocolResponse
协议 - 连接中继服务 n.ConnectRelay()
- 节点发送
RelayProtocolRequest
协议给中继服务,服务会返回RelayProtocolResponse
协议给节点 其他节点的peerID - 通过中继服务连接其他节点
n.ConnectNode()
, - 并且往其他节点发送
RelayMsg
协议,n.CmdRelay
golang
//chat.go
package main
const RelayProtocolRequest = "/relay/relayreq/1.0.0"
const RelayProtocolResponse = "/relay/relayrsp/1.0.0"
const RelayMsg = "/relay/relaymsg/1.0.0"
type NodeProtocol struct {
node *Node
}
func NewNodeProtocol(server *Node)*NodeProtocol {
protocol := &NodeProtocol{ node: server}
server.SetStreamHandler(RelayProtocolResponse, protocol.onRsp)
server.SetStreamHandler(RelayMsg, protocol.onRelayMsg)
return protocol
}
//获得对方peerID
func (p *NodeProtocol)onRsp(s network.Stream) {
buf, err := ioutil.ReadAll(s)
if err != nil {
log.Println(err.Error())
return
}
peers := strings.Split(string(buf), "\n")
var addrs []string
for _,v := range peers {
tmp := strings.Split(string(v), "=")
if tmp[0] != p.node.ID().Pretty() {
addrs = tmp
break
}
}
log.Printf("a new Stream Relay rsp,addrs.len %d, peerInfo : %s\n", len(addrs) , buf)
if len(addrs) <= 1 { return}
p.node.ConnectNode(addrs[0], addrs[1])
}
func (p* NodeProtocol) onFirstP2PMsg(s network.Stream) {
buf, _ := ioutil.ReadAll(s)
log.Println("receive onFirstP2PMsg: ", string(buf))
time.Sleep(5*time.Second)
p.node.CmdP2P()
}
func (p* NodeProtocol) onP2PMsg(s network.Stream) {
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
go p.node.writeData(rw)
go p.node.readData(rw)
}
func (p* NodeProtocol) onRelayMsg(s network.Stream) {
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
go p.node.writeData(rw)
go p.node.readData(rw)
}
type Node struct {
host.Host
*NodeProtocol
chatP2PAddr *peer.AddrInfo
Ctx context.Context
//记录RelayServer
ServerAddrInfo *peer.AddrInfo
//记录peers
Mutex *sync.Mutex
Peers *list.List
}
func NewNode(host host.Host) *Node {
relayServer := &Node{ Host: host, chatP2PAddr: new(peer.AddrInfo), Ctx: context.Background(), Mutex: new(sync.Mutex), Peers: new(list.List)}
relayServer.NodeProtocol = NewNodeProtocol(relayServer)
return relayServer
}
func (n *Node)readData(rw *bufio.ReadWriter) {
for {
str, _ := rw.ReadString('\n')
if str == "" {
return
}
if str != "\n" {
// Green console colour: \x1b[32m
// Reset console colour: \x1b[0m
fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
}
}
}
func (n *Node)writeData(rw *bufio.ReadWriter) {
stdReader := bufio.NewReader(os.Stdin)
for {
fmt.Print("> ")
sendData, err := stdReader.ReadString('\n')
if err != nil {
log.Println(err)
return
}
rw.WriteString(fmt.Sprintf("%s\n", sendData))
rw.Flush()
}
}
func getPeerInfoByDest(relay string) (*peer.AddrInfo, error) {
relayAddr, err := multiaddr.NewMultiaddr(relay)
if err != nil {
return nil , err
}
pid, err := relayAddr.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return nil , err
}
relayPeerID, err := peer.Decode(pid)
if err != nil {
return nil , err
}
relayPeerAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", pid))
relayAddress := relayAddr.Decapsulate(relayPeerAddr)
peerInfo := &peer.AddrInfo{
ID: relayPeerID,
Addrs: []multiaddr.Multiaddr{relayAddress},
}
return peerInfo, err
}
func (n *Node)ConnectRelay(relay string) error {
peerInfo ,err := getPeerInfoByDest(relay)
if err != nil {
return err
}
n.ServerAddrInfo = peerInfo
return n.Connect(n.Ctx, *peerInfo)
}
func (n* Node)ConnectNode(nodePeerID string, nodeAddr string) {
log.Println("start ConnectNodeByRelay....", nodePeerID, nodeAddr)
peerID, err := peer.Decode(nodePeerID)
if err != nil {
log.Println(err.Error())
return
}
n.Network().(*swarm.Swarm).Backoff().Clear(peerID)
var address = nodeAddr +"/p2p/" + n.ServerAddrInfo.ID.Pretty() + "/p2p-circuit/p2p/" + peerID.Pretty()
//var address = nodeAddr +"/p2p/" + peerID.Pretty() + "/p2p-circuit"
//var address = nodeAddr +"/p2p/" + peerID.Pretty()
//var address = "/p2p/"+ peerID.Pretty()
chatTargetAddr, err := multiaddr.NewMultiaddr(address)
if err != nil {
log.Println("chatTargetAddr is err")
log.Println(err.Error())
return
}
chatTargetAddrInfo := peer.AddrInfo{
ID: peerID,
Addrs: []multiaddr.Multiaddr{chatTargetAddr},
}
if err := n.Connect(n.Ctx, chatTargetAddrInfo); err != nil {
log.Println("chatTargetAddr connect err")
log.Println(err.Error())
return
}
n.chatP2PAddr = &chatTargetAddrInfo
//n.sendMessage(chatTargetAddrInfo.ID, FirstP2PMsg, "Hello Im " + chatTargetAddrInfo.ID.Pretty())
n.CmdRelay()
}
func (n *Node)CmdRelay() {
log.Println("CmdRelay start....")
peerInfo := n.chatP2PAddr
s, err := n.NewStream(n.Ctx, peerInfo.ID, RelayMsg)
if err != nil {
log.Println(err.Error())
return
}
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
log.Println("opening chat stream")
log.Println("[INFO] chat connected!")
n.sendMessage(peerInfo.ID, RelayMsg, "Hello this is relay msg")
go n.writeData(rw)
go n.readData(rw)
}
func (n *Node) Login() {
if n.sendMessage(n.ServerAddrInfo.ID, RelayProtocolRequest, "login") {
log.Printf("%s login to relay\n", n.ID())
} else {
log.Printf("%s connect to relay error\n", n.ID())
}
}
func (n *Node) sendMessage(id peer.ID, p protocol.ID, data string) bool {
s, err := n.NewStream(n.Ctx, id, p)
if err != nil {
log.Println("NewStream err")
log.Println(err)
return false
}
defer s.Close()
s.Write([]byte(data))
return true
}
func AddHolePunchService( h host.Host) *holepunch.Service {
ids, err := identify.NewIDService(h)
if err != nil {
log.Println(err.Error())
return nil
}
hps, err := holepunch.NewService(h, ids)
if err != nil {
log.Println(err.Error())
return nil
}
return hps
}
func makeRandomNode(relay string) *Node {
peerInfo ,err := getPeerInfoByDest(relay)
if err != nil {
log.Println(err.Error())
return nil
}
//priv, _ := generateIdentity(0)
h, err := libp2p.New(
libp2p.ListenAddrs(multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/")),
libp2p.EnableRelay(),
)
if err != nil {
log.Println(err.Error())
return nil
}
return NewNode(h)
}
func main() {
relay := flag.String("relay", "", "relay addrs")
flag.Parse()
if *relay == "" {
fmt.Println("Please Use -relay ")
return
}
flag.Parse()
node := makeRandomNode(*relay)
if err := node.ConnectRelay(*relay); err != nil {
log.Println(err.Error())
return
}
log.Println("Connect relay success Next Login...")
node.Login()
log.Println("Im ", node.ID())
select {
case <-node.Ctx.Done():
log.Println(node.Ctx.Err().Error())
}
}
编译
$ go build -o relay-server relay-server.go
$ go build -o chat chat.go
使用,先启动中继服务
$ ./relay-server -sp 3001
启动A窗口, 连上relay服务
$ ./chat -relay
再启动B窗,连上relay服务
$ ./chat -relay
这样A和B两个节点就可以通过relay服务进行数据传输了。 下篇文章,改造relay服务,relay服务交换节点信息后,使节点间可以进行p2p传输,传输数据不通过relay服务。 详细代码
未完待续…
--完--
- 原文作者: 留白
- 原文链接: https://zfunnily.github.io/2021/10/gop2pthree/
- 更新时间:2024-04-16 01:01:05
- 本文声明:转载请标记原文作者及链接