您当前的位: 首页 > 业界动态 > > 内容页

GO实现Redis:GO实现TCP服务器(1)

来源:博客园 2023-03-23 19:48:26


(相关资料图)

本文实现一个Echo TCP Server

interface/tcp/Handler.go

type Handler interface {   Handle(ctx context.Context, conn net.Conn)   Close() error}
Handler:业务逻辑的处理接口Handle(ctx context.Context, conn net.Conn) 处理连接

tcp/server.go

type Config struct {    Address string}func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {    closeChan := make(chan struct{})    listen, err := net.Listen("tcp", cfg.Address)    if err != nil {       return err   }    logger.Info("start listen")    ListenAndServe(listen, handler, closeChan)    return nil}func ListenAndServe(listener net.Listener,                    handler tcp.Handler,                    closeChan <-chan struct{}) {    ctx := context.Background()    var waitDone sync.WaitGroup    for true {        conn, err := listener.Accept()        if err != nil {            break        }        logger.Info("accept link")        waitDone.Add(1)        go func() {            defer func() {                waitDone.Done()            }()            handler.Handler(ctx, conn)        }()    }    waitDone.Wait()}
Config:启动tcp服务器的配置Address:监听地址ListenAndServe:ctx是上下文,可以传递一些参数。死循环中接收到新连接时,让一个协程去处理连接如果listener.Accept()出错了就会break跳出来,这时候需要等待已经服务的客户端退出。使用WaitGroup等待客服端退出
func ListenAndServe(listener net.Listener,                    handler tcp.Handler,                    closeChan <-chan struct{}) {    go func() {       <-closeChan       logger.Info("shutting down...")       _ = listener.Close()       _ = handler.Close()   }()    defer func() {       _ = listener.Close()       _ = handler.Close()   }()    ......}

listener和handler在退出的时候需要关掉。如果用户直接kill掉了程序,我们也需要关掉listener和handler,这时候要使用closeChan,一旦接收到关闭信号,就执行关闭逻辑

func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {    closeChan := make(chan struct{})    sigCh := make(chan os.Signal)    signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)    go func() {       sig := <-sigCh       switch sig {          case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:          closeChan <- struct{}{}      }   }()    listen, err := net.Listen("tcp", cfg.Address)    if err != nil {       return err   }    logger.Info("start listen")    ListenAndServe(listen, handler, closeChan)    return nil}

当系统对程序发送信号时,sigCh会接收到信号

tcp/echo.go

type EchoHandler struct {   activeConn sync.Map   closing    atomic.Boolean}

EchoHandler:

activeConn:记录连接closing:是否正在关闭,有并发竞争,使用atomic.Boolean
type EchoClient struct {   Conn    net.Conn   Waiting wait.Wait}func (c *EchoClient) Close() error {c.Waiting.WaitWithTimeout(10 * time.Second)_ = c.Conn.Close()return nil}

EchoClient:一个客户端就是一个连接。Close方法关闭客户端连接,超时时间设置为10s

func MakeHandler() *EchoHandler {return &EchoHandler{}}func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {   // 连接正在关闭,不接收新连接   if h.closing.Get() {      _ = conn.Close()   }   client := &EchoClient{      Conn: conn,   }   h.activeConn.Store(client, struct{}{})   reader := bufio.NewReader(conn)   for {      msg, err := reader.ReadString("\n")      if err != nil {         if err == io.EOF {            logger.Info("connection close")            h.activeConn.Delete(client)         } else {            logger.Warn(err)         }         return      }      // 正在处理业务,不要关掉      client.Waiting.Add(1)      // 将数据原封不动写回去,测试      b := []byte(msg)      _, _ = conn.Write(b)      client.Waiting.Done()   }}func (h *EchoHandler) Close() error {   logger.Info("handler shutting down...")   h.closing.Set(true)   h.activeConn.Range(func(key interface{}, val interface{}) bool {      client := key.(*EchoClient)      _ = client.Close()      return true   })   return nil}
MakeEchoHandler:创建EchoHandlerHandle:处理客户端的连接。1.连接正在关闭时,不接收新连接2.存储新连接,value用空结构体3.使用缓存区接收用户发来的数据,使用\n作为结束的标志Close:将所有客户端连接关掉

main.go

const configFile string = "redis.conf"var defaultProperties = &config.ServerProperties{   Bind: "0.0.0.0",   Port: 6379,}func fileExists(filename string) bool {   info, err := os.Stat(filename)   return err == nil && !info.IsDir()}func main() {   logger.Setup(&logger.Settings{      Path:       "logs",      Name:       "godis",      Ext:        "log",      TimeFormat: "2022-02-02",   })   if fileExists(configFile) {      config.SetupConfig(configFile)   } else {      config.Properties = defaultProperties   }   err := tcp.ListenAndServeWithSignal(      &tcp.Config{         Address: fmt.Sprintf("%s:%d",            config.Properties.Bind,            config.Properties.Port),      },      EchoHandler.MakeHandler())   if err != nil {      logger.Error(err)   }}
测试
关键词:

Copyright ©  2015-2022 亚洲数据网版权所有  备案号:豫ICP备20022870号-9   联系邮箱:553 138 779@qq.com