Go实现一个并发聊天服务器

整体思路

Go并发聊天服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package main

import (
"fmt"
"io"
"net"
"strings"
"time"
)

// Client 连接的客户端
type Client struct {
// 用来存储要发送消息
C chan string
Name string
Addr string
}

// 用来存储在线的用户
var onlineMap map[string]*Client

var messages chan string

func init() {
// 初始化map
onlineMap = make(map[string]*Client)
// 初始化消息
messages = make(chan string, 128)
}

func makeMsg(cli *Client, msg string) string {
return fmt.Sprintf("用户[%s]:%s\n", cli.Name, msg)
}

// Manager 转发所有消息到用户
func Manager() {
for {

// 如果没有消息 这里会阻塞
msg := <-messages
fmt.Println(msg)
// 遍历在线用户 转发消息
for _, cli := range onlineMap {
cli.C <- msg
}
}
}

// 把消息发送给对应的用户
func writeMsgToClient(cli *Client, conn net.Conn) {
for {
msg := <-cli.C
_, err := conn.Write([]byte(msg))
if err != nil {
fmt.Println("[writeMsgToClient] err:", err)
}
}
}

func readClientMsg(cli *Client, conn net.Conn, isLogout, hasData chan bool) {
buf := make([]byte, 1024)
for {
n, err := conn.Read(buf)
if err != nil {
if err == io.EOF {
isLogout <- true
return
}
fmt.Println("server read client messages err:", err)
continue
}

// 如果读到数据长度为0 则用户退出
if n == 0 {
isLogout <- true
return
}

// 如果用户发送了消息 则hasData为true
hasData <- true

msg := string(buf[:n])
// 如果发送的是who 返回当前在线用户列表
if msg == "who" {
var onlinelist string
conn.Write([]byte("online user list:\n"))
for _, cli := range onlineMap {
onlinelist += fmt.Sprintf("name:%s\n", cli.Name)
}
conn.Write([]byte(onlinelist))
// 如果用户发送的消息是rename 则修改当前用户的名字
} else if len(msg) >= 6 && msg[:6] == "rename" {
cli.Name = strings.Split(msg, "|")[1]
conn.Write([]byte("rename success:\n"))
} else {
messages <- makeMsg(cli, string(buf[:n]))
}

}
}

func handleConn(conn net.Conn) {
defer conn.Close()
// 获取用户ip
cliAddr := conn.RemoteAddr().String()

// 把用户信息保存到结构体
client := Client{
C: make(chan string),
Name: cliAddr,
Addr: cliAddr,
}
// 把用户保存到map
onlineMap[cliAddr] = &client
// 把用户上线的消息放到全局chan
messages <- makeMsg(&client, "上线了")
// 用户是否退出了
isLogout := make(chan bool)
// 用户是否发消息了
hasData := make(chan bool)

// 每创建新增一个用户 就对应创建一个协程用来发送消息
go writeMsgToClient(&client, conn)
// 每创建新增一个用户 就对应创建一个协程用来接收用户发的消息
go readClientMsg(&client, conn, isLogout, hasData)

for {
select {
case <-isLogout:
// 如果isLogout中有值 把用户从onlineMap中删除 并广播用户下线
delete(onlineMap, client.Addr)
messages <- makeMsg(&client, "logout")
return
case <-hasData:
case <-time.After(time.Second * 15): // 15秒没有发送消息则超时
delete(onlineMap, client.Addr)
messages <- makeMsg(&client, "timeout")
return
}
}
}

func main() {
// 创建监听
listener, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("net.listen err", err)
return
}

// 专门负责转发消息 有消息时 遍历所有在线用户 把消息转发到所有用户
go Manager()

for {
// 阻塞等待用户连接
conn, err := listener.Accept()
if err != nil {
fmt.Println("listener.Accept() err", err)
// 连接出错则跳过这个连接
continue
}
go handleConn(conn)
}

}