From 68d4fc50a67c3ae85ca6f31a2fb9f85668cb00b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20=C5=A0trauch?= Date: Thu, 2 Sep 2021 02:13:12 +0200 Subject: [PATCH] Goodbye messages, proper exit handling --- daemon/config.go | 16 ++++++++-- daemon/main.go | 62 +++++++++++++++++++++++++++++++++++--- daemon/message_handlers.go | 12 +++++--- daemon/types.go | 20 ++++++++++++ server/main.go | 4 +++ 5 files changed, 102 insertions(+), 12 deletions(-) create mode 100644 daemon/types.go diff --git a/daemon/config.go b/daemon/config.go index bc05c57..ff28b11 100644 --- a/daemon/config.go +++ b/daemon/config.go @@ -8,9 +8,19 @@ import ( // Config keeps info about configuration of this daemon type Config struct { - Token string `envconfig:"TOKEN" required:"false"` // not used yet - NATSURL string `envconfig:"NATS_URL" required:"true"` - Labels []string `envconfig:"LABELS" required:"false" default:""` + Token string `envconfig:"TOKEN" required:"false"` // Authentication token, if empty auth is disabled + Host string `envconfig:"HOST" required:"false" default:"127.0.0.1"` // IP address used for the REST server to listen + Port uint16 `envconfig:"PORT" required:"false" default:"1313"` // Port related to the address above + NATSURL string `envconfig:"NATS_URL" required:"true"` // NATS URL used to connect to the NATS server + NATSDiscoveryChannel string `envconfig:"NATS_DISCOVERY_CHANNEL" required:"true" default:"lobby.discovery"` // Channel where the kepp alive packets are sent + Labels []string `envconfig:"LABELS" required:"false" default:""` // List of labels + LabelsPath string `envconfig:"LABELS_PATH" required:"false" default:"/etc/lobby/labels"` // Path where filesystem based labels are located + // TemplatesPath string `envconfig:"TEMPLATES_PATH" required:"false" default:"/etc/lobby/templates"` // Path where templates are stored for custom output + HostName string `envconfig:"HOSTNAME" required:"false"` // Overrise local machine's hostname + CleanEvery uint `envconfig:"CLEAN_EVERY" required:"false" default:"15"` // How often to clean the list of servers to get rid of the not alive ones + KeepAlive uint `envconfig:"KEEP_ALIVE" required:"false" default:"5"` // how often to send the keepalive message with all availabel information [secs] + TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid + NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers } // GetConfig return configuration created based on environment variables diff --git a/daemon/main.go b/daemon/main.go index 8248665..8cba7c5 100644 --- a/daemon/main.go +++ b/daemon/main.go @@ -1,9 +1,13 @@ package main import ( + "context" "log" "net/http" + "os" + "os/signal" "strconv" + "syscall" "time" "github.com/labstack/echo" @@ -16,6 +20,8 @@ var discoveryStorage server.Discoveries = server.Discoveries{} var config Config +var shuttingDown bool + func init() { discoveryStorage.LogChannel = make(chan string) } @@ -29,15 +35,43 @@ func cleanDiscoveryPool() { } +// sendGoodbyePacket is almost same as sendDiscoveryPacket but it's not running in loop +// and it adds goodbye message so other nodes know this node is gonna die. +func sendGoodbyePacket(nc *nats.Conn) { + discovery, err := getIdentification() + if err != nil { + log.Printf("sending discovery identification error: %v\n", err) + } + + envelope := discoveryEnvelope{ + Discovery: discovery, + Message: "goodbye", + } + + data, err := envelope.Bytes() + if err != nil { + log.Printf("sending discovery formating message error: %v\n", err) + } + err = nc.Publish(config.NATSDiscoveryChannel, data) + if err != nil { + log.Printf("sending discovery error: %v\n", err) + } +} + // sendDisoveryPacket sends discovery packet regularly so the network know we exist -func sendDisoveryPacket(nc *nats.Conn) { +func sendDiscoveryPacket(nc *nats.Conn) { for { discovery, err := getIdentification() if err != nil { log.Printf("sending discovery identification error: %v\n", err) } - data, err := discovery.Bytes() + envelope := discoveryEnvelope{ + Discovery: discovery, + Message: "hi", + } + + data, err := envelope.Bytes() if err != nil { log.Printf("sending discovery formating message error: %v\n", err) } @@ -46,6 +80,10 @@ func sendDisoveryPacket(nc *nats.Conn) { log.Printf("sending discovery error: %v\n", err) } time.Sleep(time.Duration(config.KeepAlive) * time.Second) + + if shuttingDown { + break + } } } @@ -89,7 +127,7 @@ func main() { } go cleanDiscoveryPool() - go sendDisoveryPacket(nc) + go sendDiscoveryPacket(nc) // -------- // REST API @@ -141,6 +179,22 @@ func main() { // return c.String(http.StatusOK, body.String()) // }) + // ------------------------------ + // Termination signals processing + // ------------------------------ + + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + + go func(nc *nats.Conn, e *echo.Echo) { + sig := <-signals + shuttingDown = true + log.Printf("%s signal received, sending goodbye packet\n", sig.String()) + sendGoodbyePacket(nc) + time.Sleep(5 * time.Second) // we wait for a few seconds to let background jobs to finish their job + e.Shutdown(context.TODO()) + }(nc, e) + // Start server - e.Logger.Fatal(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port)))) + e.Logger.Error(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port)))) } diff --git a/daemon/message_handlers.go b/daemon/message_handlers.go index 776da04..797bee3 100644 --- a/daemon/message_handlers.go +++ b/daemon/message_handlers.go @@ -6,22 +6,24 @@ import ( "log" "github.com/nats-io/nats.go" - "github.com/rosti-cz/server_lobby/server" ) // discoveryHandler accepts discovery message and func discoveryHandler(m *nats.Msg) { - message := server.Discovery{} + message := discoveryEnvelope{} err := json.Unmarshal(m.Data, &message) if err != nil { log.Println(fmt.Errorf("decoding message error: %v", err)) } - err = message.Validate() + err = message.Discovery.Validate() if err != nil { log.Println(fmt.Errorf("validation error: %v", err)) } - discoveryStorage.Add(message) - + if message.Message == "hi" { + discoveryStorage.Add(message.Discovery) + } else if message.Message == "goodbye" { + discoveryStorage.Delete(message.Discovery.Hostname) + } } diff --git a/daemon/types.go b/daemon/types.go new file mode 100644 index 0000000..6e69f84 --- /dev/null +++ b/daemon/types.go @@ -0,0 +1,20 @@ +package main + +import ( + "encoding/json" + + "github.com/rosti-cz/server_lobby/server" +) + +// discoveryEnvelope adds a message to the standard discovery format. The message +// can be "hi" or "goodbye" where "hi" is used when the node is sending keep alive +// packets and "goodbye" means the node is leaving. +type discoveryEnvelope struct { + Discovery server.Discovery `json:"discovery"` + Message string `json:"message"` // can be hi or goodbye +} + +func (e *discoveryEnvelope) Bytes() ([]byte, error) { + body, err := json.Marshal(e) + return body, err +} diff --git a/server/main.go b/server/main.go index 1b82ed4..cf5f639 100644 --- a/server/main.go +++ b/server/main.go @@ -105,6 +105,10 @@ func (d *Discoveries) Refresh(hostname string) { // Delete removes server identified by hostname from the storage func (d *Discoveries) Delete(hostname string) { + if !d.Exist(hostname) { + return + } + if d.LogChannel != nil { d.LogChannel <- fmt.Sprintf("removing %s", hostname) }