GO网络编程-RPC
1. RPC
RPC , Remote Procedure Call,远程过程调用。与 HTTP 一致,也是应用层协议。该协议的目标是实现:调用远程过程(方法、函数)就如调用本地方法一致
ServiceA 需要调用 ServiceB 的 FuncOnB 函数,对于 ServiceA 来说 FuncOnB 就是远程过程
RPC 的目的是让 ServiceA 可以像调用 ServiceA 本地的函数一样调用远程函数 FuncOnB,也就是 ServieA 上代码层面使用:
serviceB.FuncOnB()
即可完成调用RPC 是 C/S 模式,调用方为 Client,远程方为 Server
RPC 把整体的调用过程,数据打包、网络请求等,封装完毕,在 C、S 两端的 Stub 中。Stub(代码存根)
调用流程:
ServiceA 将调回需求告知 Client Sub
Client Sub 将调用目标(Call ID)、参数数据(params)等调用信息进行打包(序列化),并将打包好的调用信息通过网络传输给 Server Sub
Server Sub 将根据调用信息,调用相应过程。期间涉及到数据的拆包(反序列化)等操作。
远程过程 FuncOnB 运行,并得到结果,将结果告知 Server Sub
Server Sub 将结果打包,并传输回给 Client Sub
Client Sub 将结果拆包,把最终函数调用的结果告知 ServiceA
2. 使用 go 原生的 rpc 实现 rpc server 和 client
2.1 rpc server
rpc server
type HelloWorld struct {
}
func (H *HelloWorld) HelloWorld(req string, resp *string) error {
*resp = req + "你好,Sakura"
return nil
}
func RpcServer() {
// 1. 创建服务
err := rpc.RegisterName("Hello", &HelloWorld{})
if err != nil {
log.Println("注册RPC服务失败:", err)
return
}
// 2.设置监听
listener, err := net.Listen("tcp", "127.0.0.1:8004")
if err != nil {
log.Println("监听失败:", err)
}
log.Println("监听端口 127.0.0.1:8004")
// 3.建立连接
conn, err := listener.Accept()
if err != nil {
log.Println("Accept err:", err)
}
fmt.Println("连接已建立")
// 4.绑定服务,将连接和服务进行绑定
rpc.ServeConn(conn)
}
rpc client
func RpcClient() {
// 1.用rpc链接服务器 --Dial(拨号)
conn, err := rpc.Dial("tcp", "127.0.0.1:8004")
if err != nil {
log.Println("Dial err:", err)
}
defer conn.Close()
// 2.调用远程函数
var response string // 接收返回值
// 指定调用的远程函数
err = conn.Call("Hello.HelloWorld", "Sakura", &response)
if err != nil {
log.Fatalln("Call err:", err)
}
fmt.Println(response)
}
2.2 对 rpc 进行封装
2.gRPC
gPRC 官网 https://grpc.io/ 上的 Slogan 是:A high performance, open source universal RPC framework。就是:一个高性能、开源的通用 RPC 框架。
支持多数主流语言:C#、C++、Dart、Go、Java、Kotlin、Node、Objective-C、PHP、Python、Ruby。其中 Go 支持 Windows, Linux, Mac 上的 Go 1.13+ 版本。
gRPC 是一个 Google 开源的高性能远程过程调用 ( RPC ) 框架,可以在任何环境中运行。它可以通过对负载平衡、跟踪、健康检查和身份验证的可插拔支持有效地连接数据中心内和跨数据中心的服务。它也适用于分布式计算的最后一步,将设备、移动应用程序和浏览器与后端服务接。
3. 准备 GRPC 环境
Protocol Buffer 编译器,
protoc
,推荐版本3
根据自己的系统选择合适的版本 , 然后解压并配置环境变量
https://github.com/protocolbuffers/protobuf/releases
# 解压到指定目录即可,要保证 protoc/bin 位于环境变量 path 中,可以随处调用
> protoc.exe --version
libprotoc 3.21.4
# 也可以使用go get安装
go get -u google.golang.org/protobuf
Go Plugin , 用于 Protocol Buffer 编译器ibprotoc 3.21.4
# go语言的protobuf 插件
> go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
# grpc 的 protobuf 插件
> go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 安装完毕后,要保证 $GOPATH/bin 位于环境变量 path 中
# 测试安装结果
> protoc-gen-go --version
protoc-gen-go.exe v1.32.0 (2023.12.27)
> protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0 (2023.12.27)
4. Protocl Buffer 基本使用
默认情况下,gRPC 使用 Protocol Buffers,这是 Google 用于序列化结构化数据的成熟开源机制(尽管它可以与 JSON 等其他数据格式一起使用)。
Protocol Buffers 的文档:https://developers.google.com/protocol-buffers/docs/overview
使用 protocol buffers 语法定义消息,消息是用于传递的数据
使用 protocol buffers 语法定义服务,服务是 RPC 方法的集合,来使用消息
syntax = "proto3";
// 为了生成go代码,需要增加 go_package
option go_package = "./proto-codes";
// 定义用于在服务器之间传递的消息
// 响应的产品信息结构
message ProductResponse{
int64 id = 1;
string name = 2;
bool is_safe = 3;
}
// 请求产品信息时的参数消息
message ProductRequest {
int64 id = 1;
}
// 定义服务,完成操作
service Product {
// 定义服务的操作
// 远程过程,服务器端的过程
rpc ProductInfo (ProductRequest) returns (ProductResponse);
// 其他的服务操作
}
用 Protocol Buffer 编译工具
protoc
来编译,生成对应语言的代码,例如 Go 的代码
# --go_out *.pb.go 目录
# --go-grpc_out *_grpc.pb.go 目录
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative *.proto
5. 基于 GRPC 的服务间通信案例
首先定义 Proto
syntax = "proto3";
// 定义生成的go代码所在包
option go_package = ".;pb";
// 定义消息体
message Person {
// 值为整数,原则上从1开始,也可以不这样。不能使用19000 - 19999
string name = 1;
int32 age = 2;a
}
// 定义一个gRPC服务
service StreamGreeter {
rpc Hello(Person) returns (Person);
}
// 生成go代码和grpc代码
// protoc --go_out=./Person --go-grpc_out=./Person Person.proto
Server
func GRPCSever() {
// 1.注册服务,启动监听
grpcserver := grpc.NewServer()
pb.RegisterHelloServer(grpcserver, &HelloService{})
// 2.监听
listen, err := net.Listen("tcp", "127.0.0.1:8005")
if err != nil {
log.Println("监听失败,", err)
}
defer listen.Close()
// 3.启动服务
grpcserver.Serve(listen)
}
// 定义结构实现具体的业务
// 需要嵌入这个结构体
type HelloService struct {
pb.UnimplementedHelloServer
}
// 重写未定义的方法
func (HelloService) Hello(context.Context, *pb.Person) (*pb.Person, error) {
person := &pb.Person{
Name: "Sakura",
Age: 22,
}
return person, nil
}
Client
func GRPCClient() {
// 1.连接gRPC服务
//grpcConn, err := grpc.Dial("127.0.0.1:8004")
// 抑制安全策略
grpcConn, err := grpc.Dial("127.0.0.1:8005", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Println("grpc Dial error:", err)
return
}
defer grpcConn.Close()
// 2.初始化客户端
grpcClient := pb.NewHelloClient(grpcConn)
// 3.调用远程服务(函数)
reply, err := grpcClient.Hello(context.Background(), &pb.Person{Name: "李四", Age: 18})
if err != nil {
fmt.Println("reply error:", err)
return
}
fmt.Println("reply:", reply)
}
6. 四种服务方法
// 一元RPC(Unary),客户端向服务器发送单个请求并返回单个响应,就像普通的函数调用一样
rpc SayHello(HelloRequest) returns (HelloResponse);
// 服务器流式处理(Serverstreaming)RPC,客户端向服务器发送请求并获取流以读回一系列消息
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
// 客户端流式处理(Clientstreaming)RPC,客户端写入一系列消息并将其发送到服务器,等待服务器读取并应答
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
// 双向流式处理(Bidirectionalstreaming)RPC,其中双方都使用读写流发送一系列消息。两个流独立运行
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
syntax = "proto3"; // protobuf协议版本
package echo; // 当前包
option go_package = ".;proto";// 编译后所在包
// EchoRequest is the request for echo.
message EchoRequest {
string message = 1;
}
// EchoResponse is the response for echo.
message EchoResponse {
string message = 1;
}
// Echo is the echo service.
service Echo {
// UnaryEcho is unary echo.
// 一元 RPC,客户端向服务器发送单个请求并返回单个响应,就像普通的函数调用一样
rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
// ServerStreamingEcho is server side streaming.
// 服务器流式处理 RPC,其中客户端向服务器发送请求并获取流以读回一系列消息。
// 客户端从返回的流中读取,一直到没有更多消息。gRPC 保证在单个 RPC 调用中对消息进行排序
rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
// ClientStreamingEcho is client side streaming.
// 客户端流式处理 RPC,其中客户端写入一系列消息并将其发送到服务器。
// 客户端完成消息写入后,将等待服务器读取它们并返回其响应。
// 同样,gRPC 保证在单个 RPC 调用中对消息进行排序。
rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
// BidirectionalStreamingEcho is bidirectional[ˌbaɪdəˈrekʃənl] streaming.
// 双向流式处理 RPC,其中双方都使用读写流发送一系列消息。
// 这两个流独立运行,因此客户端和服务器可以按照它们喜欢的任何顺序读取和写入:
// 例如,服务器可以在写入其响应之前等待接收所有客户端消息,或者它可以交替读取消息然后写入消息,
// 或者读取和写入的其他一些组合。将保留每个流中消息的顺序。
rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}
//protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative *.proto
6.1 一元 RPC
包含了对元数据的处理
Server
var port = flag.Int("port", 8006, "Grpc server port")
func GrpcServer() {
flag.Parse()
// 监听端口
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Println("监听失败,", err)
}
server := grpc.NewServer()
proto.RegisterEchoServer(server, &Servers{})
server.Serve(listen)
}
type Servers struct {
proto.UnimplementedEchoServer
}
// UnaryEcho 一元RPC服务的实现
// 元数据,map[string][string]
// token,timestamp,授权
func (s *Servers) UnaryEcho(ctx context.Context, req *proto.EchoRequest) (*proto.EchoResponse, error) {
fmt.Println("----------UnaryEcho-----------")
incomingContext, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Println("miss metadata from context")
}
fmt.Println("metadata:", incomingContext)
return &proto.EchoResponse{Message: req.Message}, nil
}
Client
var msg = "Client Data"
func GrpcClient() {
conn, err := grpc.Dial("127.0.0.1:8006", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Println(err)
}
defer conn.Close()
client := proto.NewEchoClient(conn)
// 1.一元RPC
unaryEchoWithMetadata(client, msg)
}
// 调用一元RPC方法
func unaryEchoWithMetadata(client proto.EchoClient, msg string) {
fmt.Println("---------UnaryEcho---------")
// Pairs封装一个metadata
md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
//md.Append("authorization", "token....")
ctx := metadata.NewOutgoingContext(context.Background(), md) // 即将输出的请求
resp, err := client.UnaryEcho(ctx, &proto.EchoRequest{Message: msg})
if err != nil {
fmt.Println(err)
} else {
fmt.Println("Grpc Server Response:", resp)
}
}
6.2 服务器流式处理
重点看 Server 和 Client 对于流式消息的处理,服务端向客户端发送流式消息,客户端接收消息
Server : 使用
Send
发送消息Client:使用
recv
接受消息
Server
var port = flag.Int("port", 8006, "Grpc server port")
func GrpcServer() {
flag.Parse()
// 监听端口
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Println("监听失败,", err)
}
server := grpc.NewServer()
proto.RegisterEchoServer(server, &Servers{})
server.Serve(listen)
}
type Servers struct {
proto.UnimplementedEchoServer
}
// 服务端流式处理RPC
func (s *Servers) ServerStreamingEcho(req *proto.EchoRequest, stream proto.Echo_ServerStreamingEchoServer) error {
fmt.Println("----------ServerStreamingEcho-----------")
// 服务端向客户单响应使用send函数
for i := 0; i < 5; i++ { // 以流的形式发送多次
err := stream.Send(&proto.EchoResponse{Message: req.Message})
if err != nil {
return err
}
}
return nil
}
Client
var msg = "Client Data"
func GrpcClient() {
conn, err := grpc.Dial("127.0.0.1:8006", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Println(err)
}
defer conn.Close()
client := proto.NewEchoClient(conn)
// 1.调用一元RPC方法
unaryEchoWithMetadata(client, msg)
// 2.调用服务端流式处理
serverStreaming(client, msg)
}
// 调用服务端流式处理
func serverStreaming(client proto.EchoClient, msg string) {
fmt.Println("---------serverStreaming Client---------")
// Pairs封装一个metadata
md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
//md.Append("authorization", "token....")
ctx := metadata.NewOutgoingContext(context.Background(), md) // 即将输出的请求
stream, err := client.ServerStreamingEcho(ctx, &proto.EchoRequest{Message: msg})
if err != nil {
fmt.Println(err)
}
// 从流中接收数据,每次读一个消息
for {
// err 读取到末尾会返回 EOF
recv, err := stream.Recv()
if err != nil {
log.Println(err)
break
} else if err == io.EOF {
fmt.Println("finish receive")
return
}
fmt.Println("response is :,", recv.Message)
}
}
6.3 客户端流式处理
客户端向服务端发送流式消息,服务端接收消息
Server
func GrpcServer() {
flag.Parse()
// 监听端口
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Println("监听失败,", err)
}
server := grpc.NewServer()
proto.RegisterEchoServer(server, &Servers{})
server.Serve(listen)
}
type Servers struct {
proto.UnimplementedEchoServer
}
// ClientStreamingEcho 客户端流式处理
func (s *Servers) ClientStreamingEcho(stream proto.Echo_ClientStreamingEchoServer) error {
fmt.Println("--------------ClientStreamingEcho------------------------")
// 接收客户端消息
for {
recv, err := stream.Recv()
if err == io.EOF {
fmt.Println("receive finish")
// 发送并关闭
return stream.SendAndClose(&proto.EchoResponse{Message: "Receive Finish"})
} else if err != nil {
log.Println(err)
return err
}
// 打印接收到的消息
fmt.Println("request reveived: ", recv.Message)
}
}
Client
func GrpcClient() {
conn, err := grpc.Dial("127.0.0.1:8006", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Println(err)
}
defer conn.Close()
client := proto.NewEchoClient(conn)
// 1.调用一元RPC方法
unaryEchoWithMetadata(client, msg)
// 2.调用服务端流式处理
serverStreaming(client, msg)
// 3.调用客户端流式处理
clientStreaming(client, msg)
}
// 客户端流式处理
func clientStreaming(client proto.EchoClient, msg string) {
fmt.Println("---------clientStreaming---------")
// Pairs封装一个metadata
md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
//md.Append("authorization", "token....")
ctx := metadata.NewOutgoingContext(context.Background(), md) // 即将输出的请求
stream, err := client.ClientStreamingEcho(ctx)
if err != nil {
fmt.Println(err)
}
// 向服务端循环发送消息
for i := 0; i < 5; i++ {
err = stream.Send(&proto.EchoRequest{Message: msg})
if err != nil {
log.Println("Fialed to send", err)
}
}
// 关闭并接受
recv, err := stream.CloseAndRecv()
if err != nil {
log.Println(err)
}
log.Println("Servere response:", recv)
}
6.4 双向流式处理
双向流式处理,客户端和服务端在接收的同时以发送
重点使用
sendclose()
这个方法
Server
func GrpcServer() {
flag.Parse()
// 监听端口
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Println("监听失败,", err)
}
server := grpc.NewServer()
proto.RegisterEchoServer(server, &Servers{})
server.Serve(listen)
}
type Servers struct {
proto.UnimplementedEchoServer
}
// 双向流式处理
func (s *Servers) BidirectionalStreamingEcho(stream proto.Echo_BidirectionalStreamingEchoServer) error {
fmt.Println("--------------BidirectionalStreamingEcho------------------------")
// 每接收到客户端的消息,就向客户端发送响应
for {
recv, err := stream.Recv()
if err == io.EOF {
fmt.Println("消息发送完毕")
stream.Send(&proto.EchoResponse{Message: "Receive over"})
return nil
} else if err != nil {
log.Println("Receive Failed", err)
return err
}
// 打印接收到消息
log.Println("Client Message:", recv)
// 回复响应
err = stream.Send(&proto.EchoResponse{Message: "Receive success!"})
if err != nil {
log.Println("Send Fail,", err)
}
}
}
Client
func GrpcClient() {
conn, err := grpc.Dial("127.0.0.1:8006", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Println(err)
}
defer conn.Close()
client := proto.NewEchoClient(conn)
// 4.双向流式处理
serverclientStreaming(client, msg)
}
// 双向流式处理
func serverclientStreaming(client proto.EchoClient, msg string) {
fmt.Println("---------serverclientStreaming---------")
// Pairs封装一个metadata
md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
//md.Append("authorization", "token....")
ctx := metadata.NewOutgoingContext(context.Background(), md) // 即将输出的请求
stream, err := client.BidirectionalStreamingEcho(ctx)
if err != nil {
fmt.Println(err)
}
// 开启一个协程向服务端发送消息
go func() {
for i := 0; i < 5; i++ {
err := stream.Send(&proto.EchoRequest{Message: msg})
if err != nil {
log.Println(err)
}
}
stream.CloseSend()
}()
//time.Sleep(time.Second * 10)
//循环接收服务器的消息
for {
recv, err1 := stream.Recv()
if err1 == io.EOF {
log.Println("----- Receive Finish ------ ")
break
} else if err != nil {
log.Println("Receive Fail,", err)
}
log.Println("Server Response:", recv)
}
}
评论区