Goodbye messages, proper exit handling

This commit is contained in:
Adam Štrauch 2021-09-02 02:13:12 +02:00
parent 53da96ec10
commit 68d4fc50a6
Signed by: cx
GPG Key ID: 018304FFA8988F8D
5 changed files with 102 additions and 12 deletions

View File

@ -8,9 +8,19 @@ import (
// Config keeps info about configuration of this daemon
type Config struct {
Token string `envconfig:"TOKEN" required:"false"` // not used yet
NATSURL string `envconfig:"NATS_URL" required:"true"`
Labels []string `envconfig:"LABELS" required:"false" default:""`
Token string `envconfig:"TOKEN" required:"false"` // Authentication token, if empty auth is disabled
Host string `envconfig:"HOST" required:"false" default:"127.0.0.1"` // IP address used for the REST server to listen
Port uint16 `envconfig:"PORT" required:"false" default:"1313"` // Port related to the address above
NATSURL string `envconfig:"NATS_URL" required:"true"` // NATS URL used to connect to the NATS server
NATSDiscoveryChannel string `envconfig:"NATS_DISCOVERY_CHANNEL" required:"true" default:"lobby.discovery"` // Channel where the kepp alive packets are sent
Labels []string `envconfig:"LABELS" required:"false" default:""` // List of labels
LabelsPath string `envconfig:"LABELS_PATH" required:"false" default:"/etc/lobby/labels"` // Path where filesystem based labels are located
// TemplatesPath string `envconfig:"TEMPLATES_PATH" required:"false" default:"/etc/lobby/templates"` // Path where templates are stored for custom output
HostName string `envconfig:"HOSTNAME" required:"false"` // Overrise local machine's hostname
CleanEvery uint `envconfig:"CLEAN_EVERY" required:"false" default:"15"` // How often to clean the list of servers to get rid of the not alive ones
KeepAlive uint `envconfig:"KEEP_ALIVE" required:"false" default:"5"` // how often to send the keepalive message with all availabel information [secs]
TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid
NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers
}
// GetConfig return configuration created based on environment variables

View File

@ -1,9 +1,13 @@
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/labstack/echo"
@ -16,6 +20,8 @@ var discoveryStorage server.Discoveries = server.Discoveries{}
var config Config
var shuttingDown bool
func init() {
discoveryStorage.LogChannel = make(chan string)
}
@ -29,15 +35,43 @@ func cleanDiscoveryPool() {
}
// sendGoodbyePacket is almost same as sendDiscoveryPacket but it's not running in loop
// and it adds goodbye message so other nodes know this node is gonna die.
func sendGoodbyePacket(nc *nats.Conn) {
discovery, err := getIdentification()
if err != nil {
log.Printf("sending discovery identification error: %v\n", err)
}
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "goodbye",
}
data, err := envelope.Bytes()
if err != nil {
log.Printf("sending discovery formating message error: %v\n", err)
}
err = nc.Publish(config.NATSDiscoveryChannel, data)
if err != nil {
log.Printf("sending discovery error: %v\n", err)
}
}
// sendDisoveryPacket sends discovery packet regularly so the network know we exist
func sendDisoveryPacket(nc *nats.Conn) {
func sendDiscoveryPacket(nc *nats.Conn) {
for {
discovery, err := getIdentification()
if err != nil {
log.Printf("sending discovery identification error: %v\n", err)
}
data, err := discovery.Bytes()
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "hi",
}
data, err := envelope.Bytes()
if err != nil {
log.Printf("sending discovery formating message error: %v\n", err)
}
@ -46,6 +80,10 @@ func sendDisoveryPacket(nc *nats.Conn) {
log.Printf("sending discovery error: %v\n", err)
}
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
if shuttingDown {
break
}
}
}
@ -89,7 +127,7 @@ func main() {
}
go cleanDiscoveryPool()
go sendDisoveryPacket(nc)
go sendDiscoveryPacket(nc)
// --------
// REST API
@ -141,6 +179,22 @@ func main() {
// return c.String(http.StatusOK, body.String())
// })
// ------------------------------
// Termination signals processing
// ------------------------------
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func(nc *nats.Conn, e *echo.Echo) {
sig := <-signals
shuttingDown = true
log.Printf("%s signal received, sending goodbye packet\n", sig.String())
sendGoodbyePacket(nc)
time.Sleep(5 * time.Second) // we wait for a few seconds to let background jobs to finish their job
e.Shutdown(context.TODO())
}(nc, e)
// Start server
e.Logger.Fatal(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
e.Logger.Error(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
}

View File

@ -6,22 +6,24 @@ import (
"log"
"github.com/nats-io/nats.go"
"github.com/rosti-cz/server_lobby/server"
)
// discoveryHandler accepts discovery message and
func discoveryHandler(m *nats.Msg) {
message := server.Discovery{}
message := discoveryEnvelope{}
err := json.Unmarshal(m.Data, &message)
if err != nil {
log.Println(fmt.Errorf("decoding message error: %v", err))
}
err = message.Validate()
err = message.Discovery.Validate()
if err != nil {
log.Println(fmt.Errorf("validation error: %v", err))
}
discoveryStorage.Add(message)
if message.Message == "hi" {
discoveryStorage.Add(message.Discovery)
} else if message.Message == "goodbye" {
discoveryStorage.Delete(message.Discovery.Hostname)
}
}

20
daemon/types.go Normal file
View File

@ -0,0 +1,20 @@
package main
import (
"encoding/json"
"github.com/rosti-cz/server_lobby/server"
)
// discoveryEnvelope adds a message to the standard discovery format. The message
// can be "hi" or "goodbye" where "hi" is used when the node is sending keep alive
// packets and "goodbye" means the node is leaving.
type discoveryEnvelope struct {
Discovery server.Discovery `json:"discovery"`
Message string `json:"message"` // can be hi or goodbye
}
func (e *discoveryEnvelope) Bytes() ([]byte, error) {
body, err := json.Marshal(e)
return body, err
}

View File

@ -105,6 +105,10 @@ func (d *Discoveries) Refresh(hostname string) {
// Delete removes server identified by hostname from the storage
func (d *Discoveries) Delete(hostname string) {
if !d.Exist(hostname) {
return
}
if d.LogChannel != nil {
d.LogChannel <- fmt.Sprintf("removing %s", hostname)
}