目 录CONTENT

文章目录
Go

Go网络编程-TCP程序设计

Sakura
2023-08-05 / 0 评论 / 1 点赞 / 66 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于337天前,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

TCP程序设计

一: 建立连接

客户端和服务端在建立连接时候 :

  • 服务端: 监听 + 接收连接 , listen + Accept

  • 客户端: 主动建立连接 , Dial

Go 语言中使用net实现网络相关操作 , 包括 TCP操作

// 监听某一种网络的某一个地址
func Listen(network, address string) (Listener, error)
// 接受监听到的连接。
func (l *TCPListener) Accept() (Conn, error)

// 连接网络
func Dial(network, address string) (Conn, error)
// 带有超时的连接网络
func DialTimeout(network, address string, timeout time.Duration) (Conn, error)


func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error)
func (l *TCPListener) AcceptTCP() (*TCPConn, error)

1: 服务端程序

// TCP服务端
func Server() {
	// 1.基于某个地址监听
	// Listen中的network支持不同的网络类型
	listen, err := net.Listen("tcp", "127.0.0.1:9000")
	if err != nil {
		logger.Error("listen failed:", zap.Error(err))
	} else {
		fmt.Println("正在监听中,监听地址为:", listen.Addr())
	}
	// 2.接收连接请求,不止一个客户端所以要循环接收请求
	for {
		// 没有接收到请求,会阻塞到这里
		conn, err := listen.Accept()
		if err != nil {
			logger.Error("Accept failed: ", zap.Error(err))
		} else {
			fmt.Println("连接已建立,客户端地址为:", conn.RemoteAddr())
		}
		defer conn.Close()

		// 建立连接后,进行读写操作
		//....
	}
}

2: 客户端程序

// TCP 客户端
func Client() {
	// tcp服务器端地址
	address := "127.0.0.1:9000"
	// 模拟多客户端
	// 并发的客户端请求
	num := 10
	wg := sync.WaitGroup{}
	wg.Add(num)
	for i := 0; i < num; i++ {
		go func(wg *sync.WaitGroup) {
			defer wg.Done()
			//一.建立连接
			conn, err := net.Dial("tcp", address)
			if err != nil {
				log.Fatalln(err)
				return
			}
			// 保证关闭
			defer conn.Close()
			log.Printf("connection is establish ,client addr is %s\n", conn.LocalAddr())
		}(&wg)
	}
	wg.Wait()
}

注意:

  • conn.Close(),关闭连接 , 连接资源使用完毕要记得关闭

  • conn.LocalAddr() , 用于获得客户端本地地址,会与服务端的 RemoteAddr() , 服务端调用的是RemoteAddr(), 客户端调用的是LocalAddr()

3. TCP 网络支持

func Listen(network, address string) (Listener, error)
func Dial(network, address string) (Conn, error)
  • etwork 表示网络类型, 支持的TCP类型字符串:

    • tcp , 使用 IPv4 或 IPv6

    • tcp4 , 仅使用 IPv4

    • tcp6 , 仅使用 IPv6

    • 省略 IP 部分, 绑定可用的全部IP, 包括 IPv4 和 IPv6

客户端在建立连接时使用的网络类型,要与服务器监听的网络类型能够匹配

//服务端
address := ":5678"  //省略IP
address := "[::1]:5678"  // IPV6
address := "127.0.0.1:5678"  // IPV4
listener, err := net.Listen(tcp, address)

//客户端
connection, err := net.Dial(tcp, address)

二: 连接失败的常见情况

当客户端net.Dial()建立连接时, 还有可能会失败, 典型的失败原因:

  • 服务器端未启动, 或网络连接失败

  • 网络原因超时

  • 并发连接的客户端太多, 服务端处理不完

# 无连接目标可用
No connection could be made because the target machine actively refused it.

# 网络不可达
A socket operation was attempted to an unreachable network.

# 超时
dial tcp 127.0.0.1:56789: i/o timeout

1. DialTimeout

可以设置如果没有在指定时间连接到服务器 , 就返回一个 i/o 错误

// 带有超时时间的连接网络
connection, err := net.DialTimeout(tcp, address, 1*time.Millisecond)

2. 客户端未能及时 Accept

客户端发出的连接,若服务器端未能及时Accept, 会被缓存到队列中. 当队列存满时,就不会在接受客户端连接了

//服务端
func(conn net.Conn) {
	//处理请求
	fmt.Println("接收到了客户端连接,客户端地址为:", conn.RemoteAddr())
	//比如每个请求需要1秒的处理时间
	time.Sleep(1 * time.Second)
}(connection)


//客户端
for i := 0; i < num; i++ {
	go func() {
		defer wg.Done()
		//一: 建立连接
		connection, err := net.DialTimeout(tcp, ServerAddress, 1*time.Millisecond)
		if err != nil {
			log.Fatalln(err)
		}
		//defer 关闭连接
		defer connection.Close()

		//处理连接
		fmt.Println("连接被接受,服务器地址为", connection.LocalAddr())
	}()

	//10毫秒一个请求
	time.Sleep(10 * time.Millisecond)
}

会出现服务器来不及处理请求的情况

解决方法 : 并发处理每个请求

// 处理连接,读写
go func(conn net.Conn) {
     // 日志连接的远程地址(client addr)
     log.Printf("accept from %s\n", conn.RemoteAddr())
     time.Sleep(time.Second)
 }(conn)

三: 读写操作

1. 基本读写操作

TCP 协议是全双工通信,就是连接两端允许同时进行双向数据传输 ( 读写 )

Go 程序设计时,服务端通常使用独立的 Goroutine 处理每个客户端的连接及使用该连接的读写操作

// 从conn读内容至b, 返回读取长度和错误
Read(b []byte) (n int, err error)
// 向conn写入数据b,返回写入长度和错误
Write(b []byte) (n int, err error)
  • 服务端代码

func ServerWrite() {
	// 要监听的地址和端口
	address := ":5678"

	//一:基于某个地址监听
	listener, err := net.Listen(tcp, address)
	if err != nil {
		log.Fatalln(err)
	}
	//defer 关闭监听
	defer listener.Close()
	fmt.Println("服务器正在监听:", listener.Addr())

	// 二:接收连接请求
	// 连接请求不是一个,所以要循环接收
	for true {
		//接受请求
		connection, err1 := listener.Accept()
		if err1 != nil {
			log.Fatalln(err1)
		}
		// 调用处理每个连接的函数
		HandleConn(connection)
	}
}

// 处理每个连接
func HandleConn(conn net.Conn) {
	// 打印客户端地址
	fmt.Println("接收到了客户端连接,客户端地址为:", conn.RemoteAddr())

	// 1.保证连接关闭
	defer conn.Close()

	// 2.向客户端发送数据
	write, err := conn.Write([]byte("Server data" + "\n"))
	if err != nil {
		log.Println(err)
	}
	fmt.Println("Server write len is :", write)

	// 3.从客户端接收数据
	buf := make([]byte, 1024) // 构建 buffer 缓冲
	n, err2 := conn.Read(buf)
	if err2 != nil {
		fmt.Println("读取失败")
	}
	fmt.Println("读取客户端的到内容为: ", string(buf[:n])) //截取前n个字节
}
  • 客户端代码

func ClientRead() {
	//服务器地址
	ServerAddress := "127.0.0.1:5678"

	// 一: 建立连接
	connection, err := net.DialTimeout(tcp, ServerAddress, 1*time.Millisecond)
	if err != nil {
		log.Fatalln("客户端连接失败")
	}

	// 二:保证关闭
	defer connection.Close()
	fmt.Println("连接完成,服务器地址为:", connection.LocalAddr())

	// 三:读取服务端发送的数据
	buf := make([]byte, 1024) // 构建 buffer 缓冲
	n, err2 := connection.Read(buf)
	if err2 != nil {
		fmt.Println("读取失败")
	}
	fmt.Println("读取到内容为: ", string(buf[:n])) //截取前n个字节

	// 四:向服务端发送数据
	write, err := connection.Write([]byte("Client date" + "\n"))
	if err != nil {
		log.Println(err)
	}
	fmt.Println("Client write len is :", write)
}

服务端和客户带端既可以发送数据 , 也可以接收数据

2. Write 和 Read 的注意事项

2.1 Write

  • 写成功 , err nil && wn len(data) 表示写入成功 ( 严格的写入成功判断 )

data := []byte("Client date" + "\n")
writeN, err1 := connection.Write(data)


// 严格的写入成功判断	
if err1 != nil && writeN == len(data){
	log.Println(err1)
}
fmt.Println("客户端发送数据长度 :", writeN)
  • 写阻塞,当无法继续写时,Write 会进入阻塞状态 , 无法继续写 , 通常意味着 TCP 的窗口已满

// 服务端
// 向客户端发送数据
for i := 0; i < 3000000; i++ {
	data := []byte("send some data from server" + "\n")
	wn, err := conn.Write(data)
	if err != nil {
		log.Fatalln(err)
	}
	log.Printf("%d, server write len is %d\n", i, wn)
}

//客户端 连接成功后不做接收,用select阻塞即可
select{}

最多会开启的 TCP 窗口

  • 已关闭的连接不能继续写入

在上面的代码中如果没有select{} 阻塞 , 那么连接成功后就会关闭连接

// 向已关闭的连接中写入会报错 
An existing connection was for cibly closed by the remote host
  • 可以使用如下方法控制Write的超时时长

    • SetDeadline(t time.Time) error

    • SetWriteDeadline(t time.Time) error

2.2 Read

  • 当 conn 中无数据时,Read 处于阻塞状态

  • 当 conn 中有足够数据时,Read 读满 buf,并返回读取长度,需要循环读取,才可以读取全部内容

for {
    buf := make([]byte, 10)
    rn, err := conn.Read(buf)
    if err != nil {
		// 因为读完后再次读会返回err,所以要break
        log.Println(err)
        break
    }
    log.Println("received from server data is:", string(buf[:rn]))
}
  • 当 conn 中有部分数据时,Read 读部分数据,并返回读取长度

  • 当 conn 已经关闭时,通常会返回 EOF error

  • 可以使用如下方法控制Read的超时时长

    • SetDeadline(t time.Time) error

    • SetReadDeadline(t time.Time) error

四: 并发读写和并发安全

1. Server 和 Client 并发读写

  • 服务端

func Server() {
	// 要监听的地址和端口
	address := ":5678"

	//一:基于某个地址监听
	listener, err := net.Listen(tcp, address)
	if err != nil {
		log.Fatalln(err)
	}
	//defer 关闭监听
	defer listener.Close()
	fmt.Println("服务器正在监听:", listener.Addr())

	// 二:接收连接请求
	// 连接请求不是一个,所以要循环接收
	for true {
		//接受请求
		connection, err1 := listener.Accept()
		if err1 != nil {
			log.Fatalln(err1)
		}
		// 调用处理每个连接的函数
		ServerGoroutine(connection)
	}
}

// 并发读写
func ServerGoroutine(conn net.Conn) {
	// 打印客户端地址
	fmt.Println("接收到了客户端连接,客户端地址为:", conn.RemoteAddr())

	// 保证连接关闭
	defer conn.Close()
	wg := sync.WaitGroup{}

	// 并发的写
	wg.Add(1)
	go ServerWrite(conn, &wg)

	// 并发的读
	wg.Add(1)
	go ServerRead(conn, &wg)

	wg.Wait()
}

func ServerWrite(conn net.Conn, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		data := []byte("来自服务器的数据" + "\n")
		writeN, err := conn.Write(data)
		if err != nil {
			log.Fatalln(err)
		}
		fmt.Println("服务器发送数据的长度是 ", writeN)
		time.Sleep(time.Second)
	}
}

func ServerRead(conn net.Conn, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		buf := make([]byte, 1024) // 构建 buffer 缓冲
		n, err2 := conn.Read(buf)
		if err2 != nil {
			fmt.Println("读取失败")
		}
		fmt.Println("读取客户端发送的内容为: ", string(buf[:n])) //截取前n个字节
	}
}
  • 客户端

func Client() {
	//服务器地址
	ServerAddress := "127.0.0.1:5678"

	// 一: 建立连接
	connection, err := net.DialTimeout(tcp, ServerAddress, 1*time.Millisecond)
	if err != nil {
		log.Fatalln("客户端连接失败")
		return
	}
	//保证关闭
	defer connection.Close()
	fmt.Println("连接被接受,服务器地址为", connection.LocalAddr())
	wg := sync.WaitGroup{}

	// 并发写入
	wg.Add(1)
	go ClientWrite(connection, &wg)

	// 并发读取
	wg.Add(1)
	go ClientRead(connection, &wg)

	wg.Wait()
}

func ClientWrite(conn net.Conn, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		data := []byte("来自客户端的数据" + "\n")
		writeN, err := conn.Write(data)
		if err != nil {
			log.Fatalln(err)
		}
		fmt.Println("客户端发送数据的长度是 ", writeN)
		time.Sleep(500 * time.Millisecond)
	}
}

func ClientRead(conn net.Conn, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		buf := make([]byte, 1024) // 构建 buffer 缓冲
		n, err2 := conn.Read(buf)
		if err2 != nil {
			fmt.Println("读取失败")
		}
		fmt.Println("读取服务器端发送的内容为: ", string(buf[:n])) //截取前n个字节
	}
}

2. 多个 Goroutine 同时写

同一个连接的并发读或写操作是 Goroutine 并发安全的。指的是同时存在多个 Goroutine 并发的读写,之间是不会相互影响的,这个在实操中,主要针对 Write 操作。conn.Write()是通过锁来实现的。

// 处理每个连接
func HandleConnConcurrency(conn net.Conn) {
    // 日志连接的远程地址(client addr)
    log.Printf("accept from %s\n", conn.RemoteAddr())
    // A.保证连接关闭
    defer conn.Close()

    wg := sync.WaitGroup{}
    // 并发的写
    wg.Add(1)
    go SerWrite(conn, &wg, "1234")
    wg.Add(1)
    go SerWrite(conn, &wg, "5678")
    wg.Add(1)
    go SerWrite(conn, &wg, "910")

    // 并发的读
    wg.Add(1)
    go SerRead(conn, &wg)

    wg.Wait()
}

write 在底层是通过锁来实现并发安全的

// Write implements io.Writer.
func (fd *FD) Write(buf []byte) (int, error) {
    if err := fd.writeLock(); err != nil {
        return 0, err
    }
    defer fd.writeUnlock()
    if fd.isFile {
        fd.l.Lock()
        defer fd.l.Unlock()
    }

五: 格式化消息

在发送或接收消息时,需要对消息进行格式化处理,才能在应用程序中保证消息具有逻辑含义。之前采用的是字符串传递消息,也是一种格式,但能够包含的数据字段有限。

典型编程时,会将两端处理好的数据,使用特定格式进行发送。典型的有两类:

  • 文本编码,例如 JSON , YAML , CSV 等

  • 二进制编码,例如 GOB ( Go Binary ) , Protocol Buffer 等

  1. 服务端

GOB 和 的 JSON 的区别就是将解/编码器包名从 JSON 改为 GOB

func WriteMessage(conn net.Conn, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		// 向客户端发送数据
		// 数据编码后发送

		// 自定义的消息类型
		type Message struct {
			ID      uint   `json:"id,omitempty"`
			Code    string `json:"code,omitempty"`
			Content string `json:"content,omitempty"`
		}
		// 要发送的消息
		message := Message{
			ID:      10,
			Code:    "Contend-code",
			Content: "Golang网络编程",
		}

		// 1.JSON,文本编码
		// 创建编码器,func NewEncoder(w io.Writer) *Encode
		// io.Writer任何支持写入的东西都可以,切片,文件....
		// net.conn 里的write实现io.writer
		//encoder := json.NewEncoder(conn)
		//// 利用编码器进行编码
		//// 编码成功后会写入到conn , 已经完成了conn.Write()
		//if err := encoder.Encode(message); err != nil {
		//	log.Println(err)
		//	continue
		//}
		//fmt.Println("消息已发送")

		// 2.GOB
		encoder := gob.NewEncoder(conn)
		// 利用编码器进行编码
		// 编码成功后会写入到conn , 已经完成了conn.Write()
		if err := encoder.Encode(message); err != nil {
			log.Println(err)
			continue
		}
		fmt.Println("消息已发送")

		time.Sleep(1 * time.Second)
	}
}
  1. 客户端

func ClientReadMessage(conn net.Conn, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		// 从客户端接收数据
		// 接受到数据后解码

		// 自定义的消息类型
		type Message struct {
			ID      uint   `json:"id,omitempty"`
			Code    string `json:"code,omitempty"`
			Content string `json:"content,omitempty"`
		}
		// 解码后的消息
		message := Message{}

		// 1.JSON解码
		// 创建解码器
		//decoder := json.NewDecoder(conn)
		//// 利用解码器进行解码
		//// 解码操作,从conn中读取内容,成功会将解码后的结果,赋值到message变量
		//if err := decoder.Decode(&message); err != nil {
		//	log.Println(err)
		//	continue
		//}
		//fmt.Println(message)

		// 2.GOB解码
		decoder := gob.NewDecoder(conn)
		// 利用解码器进行解码
		// 解码操作,从conn中读取内容,成功会将解码后的结果,赋值到message变量
		if err := decoder.Decode(&message); err != nil {
			log.Println(err)
			continue
		}
		fmt.Println(message)

		time.Sleep(1 * time.Second)
	}
}

1

评论区