TCP程序设计
一: 建立连接
客户端和服务端在建立连接时候 :
服务端: 监听 + 接收连接 , listen + Accept
客户端: 主动建立连接 , Dial
Go 语言中使用net
实现网络相关操作 , 包括 TCP操作
// 监听某一种网络的某一个地址
func Listen(network, address string) (Listener, error)
// 接受监听到的连接。
func (l *TCPListener) Accept() (Conn, error)
// 连接网络
func Dial(network, address string) (Conn, error)
// 带有超时的连接网络
func DialTimeout(network, address string, timeout time.Duration) (Conn, error)
func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error)
func (l *TCPListener) AcceptTCP() (*TCPConn, error)
1: 服务端程序
// TCP服务端
func Server() {
// 1.基于某个地址监听
// Listen中的network支持不同的网络类型
listen, err := net.Listen("tcp", "127.0.0.1:9000")
if err != nil {
logger.Error("listen failed:", zap.Error(err))
} else {
fmt.Println("正在监听中,监听地址为:", listen.Addr())
}
// 2.接收连接请求,不止一个客户端所以要循环接收请求
for {
// 没有接收到请求,会阻塞到这里
conn, err := listen.Accept()
if err != nil {
logger.Error("Accept failed: ", zap.Error(err))
} else {
fmt.Println("连接已建立,客户端地址为:", conn.RemoteAddr())
}
defer conn.Close()
// 建立连接后,进行读写操作
//....
}
}
2: 客户端程序
// TCP 客户端
func Client() {
// tcp服务器端地址
address := "127.0.0.1:9000"
// 模拟多客户端
// 并发的客户端请求
num := 10
wg := sync.WaitGroup{}
wg.Add(num)
for i := 0; i < num; i++ {
go func(wg *sync.WaitGroup) {
defer wg.Done()
//一.建立连接
conn, err := net.Dial("tcp", address)
if err != nil {
log.Fatalln(err)
return
}
// 保证关闭
defer conn.Close()
log.Printf("connection is establish ,client addr is %s\n", conn.LocalAddr())
}(&wg)
}
wg.Wait()
}
注意:
conn.Close()
,关闭连接 , 连接资源使用完毕要记得关闭
conn.LocalAddr()
, 用于获得客户端本地地址,会与服务端的RemoteAddr()
, 服务端调用的是RemoteAddr()
, 客户端调用的是LocalAddr()
3. TCP 网络支持
func Listen(network, address string) (Listener, error)
func Dial(network, address string) (Conn, error)
etwork 表示网络类型, 支持的
TCP
类型字符串:tcp , 使用 IPv4 或 IPv6
tcp4 , 仅使用 IPv4
tcp6 , 仅使用 IPv6
省略 IP 部分, 绑定可用的全部
IP
, 包括 IPv4 和 IPv6
客户端在建立连接时使用的网络类型,要与服务器监听的网络类型能够匹配
//服务端
address := ":5678" //省略IP
address := "[::1]:5678" // IPV6
address := "127.0.0.1:5678" // IPV4
listener, err := net.Listen(tcp, address)
//客户端
connection, err := net.Dial(tcp, address)
二: 连接失败的常见情况
当客户端net.Dial()
建立连接时, 还有可能会失败, 典型的失败原因:
服务器端未启动, 或网络连接失败
网络原因超时
并发连接的客户端太多, 服务端处理不完
# 无连接目标可用
No connection could be made because the target machine actively refused it.
# 网络不可达
A socket operation was attempted to an unreachable network.
# 超时
dial tcp 127.0.0.1:56789: i/o timeout
1. DialTimeout
可以设置如果没有在指定时间连接到服务器 , 就返回一个 i/o 错误
// 带有超时时间的连接网络
connection, err := net.DialTimeout(tcp, address, 1*time.Millisecond)
2. 客户端未能及时 Accept
客户端发出的连接,若服务器端未能及时Accept, 会被缓存到队列中. 当队列存满时,就不会在接受客户端连接了
//服务端
func(conn net.Conn) {
//处理请求
fmt.Println("接收到了客户端连接,客户端地址为:", conn.RemoteAddr())
//比如每个请求需要1秒的处理时间
time.Sleep(1 * time.Second)
}(connection)
//客户端
for i := 0; i < num; i++ {
go func() {
defer wg.Done()
//一: 建立连接
connection, err := net.DialTimeout(tcp, ServerAddress, 1*time.Millisecond)
if err != nil {
log.Fatalln(err)
}
//defer 关闭连接
defer connection.Close()
//处理连接
fmt.Println("连接被接受,服务器地址为", connection.LocalAddr())
}()
//10毫秒一个请求
time.Sleep(10 * time.Millisecond)
}
会出现服务器来不及处理请求的情况
解决方法 : 并发处理每个请求
// 处理连接,读写
go func(conn net.Conn) {
// 日志连接的远程地址(client addr)
log.Printf("accept from %s\n", conn.RemoteAddr())
time.Sleep(time.Second)
}(conn)
三: 读写操作
1. 基本读写操作
TCP 协议是全双工通信,就是连接两端允许同时进行双向数据传输 ( 读写 )
Go 程序设计时,服务端通常使用独立的 Goroutine 处理每个客户端的连接及使用该连接的读写操作
// 从conn读内容至b, 返回读取长度和错误
Read(b []byte) (n int, err error)
// 向conn写入数据b,返回写入长度和错误
Write(b []byte) (n int, err error)
服务端代码
func ServerWrite() {
// 要监听的地址和端口
address := ":5678"
//一:基于某个地址监听
listener, err := net.Listen(tcp, address)
if err != nil {
log.Fatalln(err)
}
//defer 关闭监听
defer listener.Close()
fmt.Println("服务器正在监听:", listener.Addr())
// 二:接收连接请求
// 连接请求不是一个,所以要循环接收
for true {
//接受请求
connection, err1 := listener.Accept()
if err1 != nil {
log.Fatalln(err1)
}
// 调用处理每个连接的函数
HandleConn(connection)
}
}
// 处理每个连接
func HandleConn(conn net.Conn) {
// 打印客户端地址
fmt.Println("接收到了客户端连接,客户端地址为:", conn.RemoteAddr())
// 1.保证连接关闭
defer conn.Close()
// 2.向客户端发送数据
write, err := conn.Write([]byte("Server data" + "\n"))
if err != nil {
log.Println(err)
}
fmt.Println("Server write len is :", write)
// 3.从客户端接收数据
buf := make([]byte, 1024) // 构建 buffer 缓冲
n, err2 := conn.Read(buf)
if err2 != nil {
fmt.Println("读取失败")
}
fmt.Println("读取客户端的到内容为: ", string(buf[:n])) //截取前n个字节
}
客户端代码
func ClientRead() {
//服务器地址
ServerAddress := "127.0.0.1:5678"
// 一: 建立连接
connection, err := net.DialTimeout(tcp, ServerAddress, 1*time.Millisecond)
if err != nil {
log.Fatalln("客户端连接失败")
}
// 二:保证关闭
defer connection.Close()
fmt.Println("连接完成,服务器地址为:", connection.LocalAddr())
// 三:读取服务端发送的数据
buf := make([]byte, 1024) // 构建 buffer 缓冲
n, err2 := connection.Read(buf)
if err2 != nil {
fmt.Println("读取失败")
}
fmt.Println("读取到内容为: ", string(buf[:n])) //截取前n个字节
// 四:向服务端发送数据
write, err := connection.Write([]byte("Client date" + "\n"))
if err != nil {
log.Println(err)
}
fmt.Println("Client write len is :", write)
}
服务端和客户带端既可以发送数据 , 也可以接收数据
2. Write 和 Read 的注意事项
2.1 Write
写成功 ,
err nil && wn len(data)
表示写入成功 ( 严格的写入成功判断 )
data := []byte("Client date" + "\n")
writeN, err1 := connection.Write(data)
// 严格的写入成功判断
if err1 != nil && writeN == len(data){
log.Println(err1)
}
fmt.Println("客户端发送数据长度 :", writeN)
写阻塞,当无法继续写时,Write 会进入阻塞状态 , 无法继续写 , 通常意味着 TCP 的窗口已满
// 服务端
// 向客户端发送数据
for i := 0; i < 3000000; i++ {
data := []byte("send some data from server" + "\n")
wn, err := conn.Write(data)
if err != nil {
log.Fatalln(err)
}
log.Printf("%d, server write len is %d\n", i, wn)
}
//客户端 连接成功后不做接收,用select阻塞即可
select{}
最多会开启的 TCP 窗口
已关闭的连接不能继续写入
在上面的代码中如果没有
select{}
阻塞 , 那么连接成功后就会关闭连接
// 向已关闭的连接中写入会报错
An existing connection was for cibly closed by the remote host
可以使用如下方法控制Write的超时时长
SetDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
2.2 Read
当 conn 中无数据时,Read 处于阻塞状态
当 conn 中有足够数据时,Read 读满 buf,并返回读取长度,需要循环读取,才可以读取全部内容
for {
buf := make([]byte, 10)
rn, err := conn.Read(buf)
if err != nil {
// 因为读完后再次读会返回err,所以要break
log.Println(err)
break
}
log.Println("received from server data is:", string(buf[:rn]))
}
当 conn 中有部分数据时,Read 读部分数据,并返回读取长度
当 conn 已经关闭时,通常会返回 EOF error
可以使用如下方法控制Read的超时时长
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
四: 并发读写和并发安全
1. Server 和 Client 并发读写
服务端
func Server() {
// 要监听的地址和端口
address := ":5678"
//一:基于某个地址监听
listener, err := net.Listen(tcp, address)
if err != nil {
log.Fatalln(err)
}
//defer 关闭监听
defer listener.Close()
fmt.Println("服务器正在监听:", listener.Addr())
// 二:接收连接请求
// 连接请求不是一个,所以要循环接收
for true {
//接受请求
connection, err1 := listener.Accept()
if err1 != nil {
log.Fatalln(err1)
}
// 调用处理每个连接的函数
ServerGoroutine(connection)
}
}
// 并发读写
func ServerGoroutine(conn net.Conn) {
// 打印客户端地址
fmt.Println("接收到了客户端连接,客户端地址为:", conn.RemoteAddr())
// 保证连接关闭
defer conn.Close()
wg := sync.WaitGroup{}
// 并发的写
wg.Add(1)
go ServerWrite(conn, &wg)
// 并发的读
wg.Add(1)
go ServerRead(conn, &wg)
wg.Wait()
}
func ServerWrite(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
data := []byte("来自服务器的数据" + "\n")
writeN, err := conn.Write(data)
if err != nil {
log.Fatalln(err)
}
fmt.Println("服务器发送数据的长度是 ", writeN)
time.Sleep(time.Second)
}
}
func ServerRead(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
buf := make([]byte, 1024) // 构建 buffer 缓冲
n, err2 := conn.Read(buf)
if err2 != nil {
fmt.Println("读取失败")
}
fmt.Println("读取客户端发送的内容为: ", string(buf[:n])) //截取前n个字节
}
}
客户端
func Client() {
//服务器地址
ServerAddress := "127.0.0.1:5678"
// 一: 建立连接
connection, err := net.DialTimeout(tcp, ServerAddress, 1*time.Millisecond)
if err != nil {
log.Fatalln("客户端连接失败")
return
}
//保证关闭
defer connection.Close()
fmt.Println("连接被接受,服务器地址为", connection.LocalAddr())
wg := sync.WaitGroup{}
// 并发写入
wg.Add(1)
go ClientWrite(connection, &wg)
// 并发读取
wg.Add(1)
go ClientRead(connection, &wg)
wg.Wait()
}
func ClientWrite(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
data := []byte("来自客户端的数据" + "\n")
writeN, err := conn.Write(data)
if err != nil {
log.Fatalln(err)
}
fmt.Println("客户端发送数据的长度是 ", writeN)
time.Sleep(500 * time.Millisecond)
}
}
func ClientRead(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
buf := make([]byte, 1024) // 构建 buffer 缓冲
n, err2 := conn.Read(buf)
if err2 != nil {
fmt.Println("读取失败")
}
fmt.Println("读取服务器端发送的内容为: ", string(buf[:n])) //截取前n个字节
}
}
2. 多个 Goroutine 同时写
同一个连接的并发读或写操作是 Goroutine 并发安全的。指的是同时存在多个 Goroutine 并发的读写,之间是不会相互影响的,这个在实操中,主要针对 Write 操作。
conn.Write()
是通过锁来实现的。
// 处理每个连接
func HandleConnConcurrency(conn net.Conn) {
// 日志连接的远程地址(client addr)
log.Printf("accept from %s\n", conn.RemoteAddr())
// A.保证连接关闭
defer conn.Close()
wg := sync.WaitGroup{}
// 并发的写
wg.Add(1)
go SerWrite(conn, &wg, "1234")
wg.Add(1)
go SerWrite(conn, &wg, "5678")
wg.Add(1)
go SerWrite(conn, &wg, "910")
// 并发的读
wg.Add(1)
go SerRead(conn, &wg)
wg.Wait()
}
write
在底层是通过锁来实现并发安全的
// Write implements io.Writer.
func (fd *FD) Write(buf []byte) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
if fd.isFile {
fd.l.Lock()
defer fd.l.Unlock()
}
五: 格式化消息
在发送或接收消息时,需要对消息进行格式化处理,才能在应用程序中保证消息具有逻辑含义。之前采用的是字符串传递消息,也是一种格式,但能够包含的数据字段有限。
典型编程时,会将两端处理好的数据,使用特定格式进行发送。典型的有两类:
文本编码,例如 JSON , YAML , CSV 等
二进制编码,例如 GOB ( Go Binary ) , Protocol Buffer 等
服务端
GOB 和 的 JSON 的区别就是将解/编码器包名从 JSON 改为 GOB
func WriteMessage(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
// 向客户端发送数据
// 数据编码后发送
// 自定义的消息类型
type Message struct {
ID uint `json:"id,omitempty"`
Code string `json:"code,omitempty"`
Content string `json:"content,omitempty"`
}
// 要发送的消息
message := Message{
ID: 10,
Code: "Contend-code",
Content: "Golang网络编程",
}
// 1.JSON,文本编码
// 创建编码器,func NewEncoder(w io.Writer) *Encode
// io.Writer任何支持写入的东西都可以,切片,文件....
// net.conn 里的write实现io.writer
//encoder := json.NewEncoder(conn)
//// 利用编码器进行编码
//// 编码成功后会写入到conn , 已经完成了conn.Write()
//if err := encoder.Encode(message); err != nil {
// log.Println(err)
// continue
//}
//fmt.Println("消息已发送")
// 2.GOB
encoder := gob.NewEncoder(conn)
// 利用编码器进行编码
// 编码成功后会写入到conn , 已经完成了conn.Write()
if err := encoder.Encode(message); err != nil {
log.Println(err)
continue
}
fmt.Println("消息已发送")
time.Sleep(1 * time.Second)
}
}
客户端
func ClientReadMessage(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
// 从客户端接收数据
// 接受到数据后解码
// 自定义的消息类型
type Message struct {
ID uint `json:"id,omitempty"`
Code string `json:"code,omitempty"`
Content string `json:"content,omitempty"`
}
// 解码后的消息
message := Message{}
// 1.JSON解码
// 创建解码器
//decoder := json.NewDecoder(conn)
//// 利用解码器进行解码
//// 解码操作,从conn中读取内容,成功会将解码后的结果,赋值到message变量
//if err := decoder.Decode(&message); err != nil {
// log.Println(err)
// continue
//}
//fmt.Println(message)
// 2.GOB解码
decoder := gob.NewDecoder(conn)
// 利用解码器进行解码
// 解码操作,从conn中读取内容,成功会将解码后的结果,赋值到message变量
if err := decoder.Decode(&message); err != nil {
log.Println(err)
continue
}
fmt.Println(message)
time.Sleep(1 * time.Second)
}
}
评论区