GO实现Redis:GO实现TCP服务器(1)

interface/tcp/Handler.go

type Handler interface {    Handle(ctx context.Context, conn net.Conn)    Close() error } 
  • Handler:业务逻辑的处理接口
    • Handle(ctx context.Context, conn net.Conn) 处理连接

tcp/server.go

type Config struct {     Address string }  func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {     closeChan := make(chan struct{})     listen, err := net.Listen("tcp", cfg.Address)     if err != nil {        return err    }     logger.Info("start listen")     ListenAndServe(listen, handler, closeChan)     return nil }  func ListenAndServe(listener net.Listener,                     handler tcp.Handler,                     closeChan <-chan struct{}) {     ctx := context.Background()     var waitDone sync.WaitGroup     for true {         conn, err := listener.Accept()         if err != nil {             break         }         logger.Info("accept link")         waitDone.Add(1)         go func() {             defer func() {                 waitDone.Done()             }()             handler.Handler(ctx, conn)         }()     }     waitDone.Wait() } 
  • Config:启动tcp服务器的配置
    • Address:监听地址
  • ListenAndServe:ctx是上下文,可以传递一些参数。死循环中接收到新连接时,让一个协程去处理连接
  • 如果listener.Accept()出错了就会break跳出来,这时候需要等待已经服务的客户端退出。使用WaitGroup等待客服端退出
func ListenAndServe(listener net.Listener,                     handler tcp.Handler,                     closeChan <-chan struct{}) {      go func() {        <-closeChan        logger.Info("shutting down...")        _ = listener.Close()        _ = handler.Close()    }()      defer func() {        _ = listener.Close()        _ = handler.Close()    }()      ...... } 

listener和handler在退出的时候需要关掉。如果用户直接kill掉了程序,我们也需要关掉listener和handler,这时候要使用closeChan,一旦接收到关闭信号,就执行关闭逻辑

func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {      closeChan := make(chan struct{})     sigCh := make(chan os.Signal)     signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)     go func() {        sig := <-sigCh        switch sig {           case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:           closeChan <- struct{}{}       }    }()     listen, err := net.Listen("tcp", cfg.Address)     if err != nil {        return err    }     logger.Info("start listen")     ListenAndServe(listen, handler, closeChan)     return nil } 

当系统对程序发送信号时,sigCh会接收到信号

tcp/echo.go

type EchoHandler struct {    activeConn sync.Map    closing    atomic.Boolean } 

EchoHandler:

  • activeConn:记录连接
  • closing:是否正在关闭,有并发竞争,使用atomic.Boolean
type EchoClient struct {    Conn    net.Conn    Waiting wait.Wait }  func (c *EchoClient) Close() error { 	c.Waiting.WaitWithTimeout(10 * time.Second) 	_ = c.Conn.Close() 	return nil } 

EchoClient:一个客户端就是一个连接。Close方法关闭客户端连接,超时时间设置为10s

func MakeHandler() *EchoHandler { 	return &EchoHandler{} }  func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {    // 连接正在关闭,不接收新连接    if h.closing.Get() {       _ = conn.Close()    }     client := &EchoClient{       Conn: conn,    }    h.activeConn.Store(client, struct{}{})     reader := bufio.NewReader(conn)    for {       msg, err := reader.ReadString('n')       if err != nil {          if err == io.EOF {             logger.Info("connection close")             h.activeConn.Delete(client)          } else {             logger.Warn(err)          }          return       }       // 正在处理业务,不要关掉       client.Waiting.Add(1)       // 将数据原封不动写回去,测试       b := []byte(msg)       _, _ = conn.Write(b)       client.Waiting.Done()    } }  func (h *EchoHandler) Close() error {    logger.Info("handler shutting down...")    h.closing.Set(true)    h.activeConn.Range(func(key interface{}, val interface{}) bool {       client := key.(*EchoClient)       _ = client.Close()       return true    })    return nil } 
  • MakeEchoHandler:创建EchoHandler
  • Handle:处理客户端的连接。
    • 1.连接正在关闭时,不接收新连接
    • 2.存储新连接,value用空结构体
    • 3.使用缓存区接收用户发来的数据,使用n作为结束的标志
  • Close:将所有客户端连接关掉

main.go

const configFile string = "redis.conf"  var defaultProperties = &config.ServerProperties{    Bind: "0.0.0.0",    Port: 6379, }  func fileExists(filename string) bool {    info, err := os.Stat(filename)    return err == nil && !info.IsDir() }  func main() {    logger.Setup(&logger.Settings{       Path:       "logs",       Name:       "godis",       Ext:        "log",       TimeFormat: "2022-02-02",    })     if fileExists(configFile) {       config.SetupConfig(configFile)    } else {       config.Properties = defaultProperties    }     err := tcp.ListenAndServeWithSignal(       &tcp.Config{          Address: fmt.Sprintf("%s:%d",             config.Properties.Bind,             config.Properties.Port),       },       EchoHandler.MakeHandler())    if err != nil {       logger.Error(err)    } } 

发表评论

相关文章