diff --git a/README.md b/README.md index e4ecb29..d053392 100644 --- a/README.md +++ b/README.md @@ -12,3 +12,4 @@ * [ ] Separate the NATS code so it can support multiple backend/drivers * [ ] Documentation * [ ] Tests +* [ ] Command hooks - script or list of scripts that are triggered when discovery status has changed diff --git a/common/types.go b/common/types.go new file mode 100644 index 0000000..58a90d7 --- /dev/null +++ b/common/types.go @@ -0,0 +1,16 @@ +package common + +import "github.com/rosti-cz/server_lobby/server" + +// Listener is a function that returns received discovery +type Listener func(server.Discovery) + +// Driver interface describes exported methods that have to be implemented in each driver +type Driver interface { + Init() error + Close() error + RegisterSubscribeFunction(listener Listener) + RegisterUnsubscribeFunction(listener Listener) + SendDiscoveryPacket(discovery server.Discovery) error + SendGoodbyePacket(discovery server.Discovery) error +} diff --git a/daemon/config.go b/daemon/config.go index ff28b11..3fea3ee 100644 --- a/daemon/config.go +++ b/daemon/config.go @@ -21,6 +21,7 @@ type Config struct { KeepAlive uint `envconfig:"KEEP_ALIVE" required:"false" default:"5"` // how often to send the keepalive message with all availabel information [secs] TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers + Register bool `envconfig:"REGISTER" required:"false" default:"true"` // If true (default) then local instance is registered with other instance (discovery packet is sent regularly) } // GetConfig return configuration created based on environment variables diff --git a/daemon/handlers.go b/daemon/handlers.go new file mode 100644 index 0000000..72b172b --- /dev/null +++ b/daemon/handlers.go @@ -0,0 +1,28 @@ +package main + +import ( + "net/http" + + "github.com/labstack/echo" + "github.com/rosti-cz/server_lobby/server" +) + +func listHandler(c echo.Context) error { + label := c.QueryParam("label") + + var discoveries []server.Discovery + + if len(label) > 0 { + discoveries = discoveryStorage.Filter(label) + } else { + discoveries = discoveryStorage.GetAll() + } + + return c.JSONPretty(200, discoveries, " ") +} + +func prometheusHandler(c echo.Context) error { + services := preparePrometheusOutput(discoveryStorage.GetAll()) + + return c.JSONPretty(http.StatusOK, services, " ") +} diff --git a/daemon/main.go b/daemon/main.go index 8cba7c5..2d24aa4 100644 --- a/daemon/main.go +++ b/daemon/main.go @@ -3,7 +3,6 @@ package main import ( "context" "log" - "net/http" "os" "os/signal" "strconv" @@ -12,18 +11,33 @@ import ( "github.com/labstack/echo" "github.com/labstack/echo/middleware" - "github.com/nats-io/nats.go" + "github.com/rosti-cz/server_lobby/common" + "github.com/rosti-cz/server_lobby/nats_driver" "github.com/rosti-cz/server_lobby/server" ) var discoveryStorage server.Discoveries = server.Discoveries{} +var driver common.Driver var config Config var shuttingDown bool func init() { + // Load config from environment variables + config = *GetConfig() + + // Setup discovery storage discoveryStorage.LogChannel = make(chan string) + discoveryStorage.TTL = config.TTL + + // Setup driver + driver = &nats_driver.Driver{ + NATSUrl: config.NATSURL, + NATSDiscoveryChannel: config.NATSDiscoveryChannel, + + LogChannel: discoveryStorage.LogChannel, + } } // cleanDiscoveryPool clears the local server map and keeps only the alive servers @@ -37,47 +51,29 @@ func cleanDiscoveryPool() { // sendGoodbyePacket is almost same as sendDiscoveryPacket but it's not running in loop // and it adds goodbye message so other nodes know this node is gonna die. -func sendGoodbyePacket(nc *nats.Conn) { +func sendGoodbyePacket() { discovery, err := getIdentification() if err != nil { log.Printf("sending discovery identification error: %v\n", err) } - envelope := discoveryEnvelope{ - Discovery: discovery, - Message: "goodbye", - } - - data, err := envelope.Bytes() + err = driver.SendGoodbyePacket(discovery) if err != nil { - log.Printf("sending discovery formating message error: %v\n", err) - } - err = nc.Publish(config.NATSDiscoveryChannel, data) - if err != nil { - log.Printf("sending discovery error: %v\n", err) + log.Println(err) } } // sendDisoveryPacket sends discovery packet regularly so the network know we exist -func sendDiscoveryPacket(nc *nats.Conn) { +func sendDiscoveryPacket() { for { discovery, err := getIdentification() if err != nil { log.Printf("sending discovery identification error: %v\n", err) } - envelope := discoveryEnvelope{ - Discovery: discovery, - Message: "hi", - } - - data, err := envelope.Bytes() + err = driver.SendDiscoveryPacket(discovery) if err != nil { - log.Printf("sending discovery formating message error: %v\n", err) - } - err = nc.Publish(config.NATSDiscoveryChannel, data) - if err != nil { - log.Printf("sending discovery error: %v\n", err) + log.Println(err) } time.Sleep(time.Duration(config.KeepAlive) * time.Second) @@ -100,34 +96,35 @@ func main() { // Closing the logging channel defer close(discoveryStorage.LogChannel) - - discoveryStorage.TTL = config.TTL - - // Load config from environment variables - config = *GetConfig() + defer driver.Close() // ------------------------ // Server discovering stuff // ------------------------ // Connect to the NATS service - nc, err := nats.Connect(config.NATSURL) + driver.RegisterSubscribeFunction(func(d server.Discovery) { + discoveryStorage.Add(d) + }) + driver.RegisterUnsubscribeFunction(func(d server.Discovery) { + discoveryStorage.Delete(d.Hostname) + }) + + err = driver.Init() if err != nil { log.Fatalln(err) } - defer nc.Drain() go printDiscoveryLogs() - // Subscribe - log.Println("> discovery channel") - _, err = nc.Subscribe(config.NATSDiscoveryChannel, discoveryHandler) - if err != nil { - log.Fatalln(err) - } - go cleanDiscoveryPool() - go sendDiscoveryPacket(nc) + + // If config.Register is false this instance won't be registered with other nodes + if config.Register { + go sendDiscoveryPacket() + } else { + log.Println("standalone mode, I won't register myself") + } // -------- // REST API @@ -142,25 +139,9 @@ func main() { e.Use(middleware.Recover()) // Routes - e.GET("/", func(c echo.Context) error { - label := c.QueryParam("label") - - var discoveries []server.Discovery - - if len(label) > 0 { - discoveries = discoveryStorage.Filter(label) - } else { - discoveries = discoveryStorage.GetAll() - } - - return c.JSONPretty(200, discoveries, " ") - }) - - e.GET("/prometheus", func(c echo.Context) error { - services := preparePrometheusOutput(discoveryStorage.GetAll()) - - return c.JSONPretty(http.StatusOK, services, " ") - }) + e.GET("/", listHandler) + e.GET("/v1/", listHandler) + e.GET("/v1/prometheus", prometheusHandler) // e.GET("/template/:template", func(c echo.Context) error { // templateName := c.Param("template") @@ -186,14 +167,18 @@ func main() { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - go func(nc *nats.Conn, e *echo.Echo) { + go func(e *echo.Echo, config Config) { sig := <-signals shuttingDown = true - log.Printf("%s signal received, sending goodbye packet\n", sig.String()) - sendGoodbyePacket(nc) - time.Sleep(5 * time.Second) // we wait for a few seconds to let background jobs to finish their job + if config.Register { + log.Printf("%s signal received, sending goodbye packet\n", sig.String()) + sendGoodbyePacket() + time.Sleep(5 * time.Second) // we wait for a few seconds to let background jobs to finish their job + } else { + log.Printf("%s signal received", sig.String()) + } e.Shutdown(context.TODO()) - }(nc, e) + }(e, config) // Start server e.Logger.Error(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port)))) diff --git a/daemon/message_handlers.go b/daemon/message_handlers.go deleted file mode 100644 index 797bee3..0000000 --- a/daemon/message_handlers.go +++ /dev/null @@ -1,29 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log" - - "github.com/nats-io/nats.go" -) - -// discoveryHandler accepts discovery message and -func discoveryHandler(m *nats.Msg) { - message := discoveryEnvelope{} - err := json.Unmarshal(m.Data, &message) - if err != nil { - log.Println(fmt.Errorf("decoding message error: %v", err)) - } - - err = message.Discovery.Validate() - if err != nil { - log.Println(fmt.Errorf("validation error: %v", err)) - } - - if message.Message == "hi" { - discoveryStorage.Add(message.Discovery) - } else if message.Message == "goodbye" { - discoveryStorage.Delete(message.Discovery.Hostname) - } -} diff --git a/nats_driver/main.go b/nats_driver/main.go new file mode 100644 index 0000000..cd78853 --- /dev/null +++ b/nats_driver/main.go @@ -0,0 +1,112 @@ +package nats_driver + +import ( + "encoding/json" + "fmt" + + "github.com/nats-io/nats.go" + "github.com/rosti-cz/server_lobby/common" + "github.com/rosti-cz/server_lobby/server" +) + +type Driver struct { + NATSUrl string + NATSDiscoveryChannel string + + LogChannel chan string + + nc *nats.Conn + subscribeListener common.Listener + unsubscribeListener common.Listener +} + +// handler is called asynchronously so and because it cannot log directly to stderr there +// is channel called LogChannel that can be used to log error messages from this +func (d *Driver) handler(m *nats.Msg) { + message := discoveryEnvelope{} + err := json.Unmarshal(m.Data, &message) + if err != nil && d.LogChannel != nil { + d.LogChannel <- fmt.Errorf("decoding message error: %v", err).Error() + } + + err = message.Discovery.Validate() + if err != nil && d.LogChannel != nil { + d.LogChannel <- fmt.Errorf("validation error: %v", err).Error() + } + + if message.Message == "hi" { + d.subscribeListener(message.Discovery) + } else if message.Message == "goodbye" { + d.unsubscribeListener(message.Discovery) + } else { + if d.LogChannel != nil { + d.LogChannel <- "incompatible message" + } + } + +} + +func (d *Driver) Init() error { + if d.LogChannel == nil { + return fmt.Errorf("please initiate LogChannel variable") + } + + nc, err := nats.Connect(d.NATSUrl) + if err != nil { + return err + } + d.nc = nc + + _, err = nc.Subscribe(d.NATSDiscoveryChannel, d.handler) + if err != nil { + return err + } + + return nil +} + +func (d *Driver) Close() error { + return d.nc.Drain() +} + +func (d *Driver) RegisterSubscribeFunction(listener common.Listener) { + d.subscribeListener = listener +} + +func (d *Driver) RegisterUnsubscribeFunction(listener common.Listener) { + d.unsubscribeListener = listener +} + +func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error { + envelope := discoveryEnvelope{ + Discovery: discovery, + Message: "hi", + } + + data, err := envelope.Bytes() + if err != nil { + return fmt.Errorf("sending discovery formating message error: %v", err) + } + err = d.nc.Publish(d.NATSDiscoveryChannel, data) + if err != nil { + return fmt.Errorf("sending discovery error: %v", err) + } + return nil +} + +func (d *Driver) SendGoodbyePacket(discovery server.Discovery) error { + envelope := discoveryEnvelope{ + Discovery: discovery, + Message: "goodbye", + } + + data, err := envelope.Bytes() + if err != nil { + return fmt.Errorf("sending discovery formating message error: %v", err) + } + err = d.nc.Publish(d.NATSDiscoveryChannel, data) + if err != nil { + return fmt.Errorf("sending discovery error: %v", err) + } + return nil +} diff --git a/daemon/types.go b/nats_driver/types.go similarity index 96% rename from daemon/types.go rename to nats_driver/types.go index 6e69f84..5d1e14c 100644 --- a/daemon/types.go +++ b/nats_driver/types.go @@ -1,4 +1,4 @@ -package main +package nats_driver import ( "encoding/json"