NATS driver separated into its own package

* Client mode
This commit is contained in:
Adam Štrauch 2021-09-02 19:22:39 +02:00
parent 85ac19d7fe
commit c982038c53
Signed by: cx
GPG Key ID: 018304FFA8988F8D
8 changed files with 209 additions and 95 deletions

View File

@ -12,3 +12,4 @@
* [ ] Separate the NATS code so it can support multiple backend/drivers * [ ] Separate the NATS code so it can support multiple backend/drivers
* [ ] Documentation * [ ] Documentation
* [ ] Tests * [ ] Tests
* [ ] Command hooks - script or list of scripts that are triggered when discovery status has changed

16
common/types.go Normal file
View File

@ -0,0 +1,16 @@
package common
import "github.com/rosti-cz/server_lobby/server"
// Listener is a function that returns received discovery
type Listener func(server.Discovery)
// Driver interface describes exported methods that have to be implemented in each driver
type Driver interface {
Init() error
Close() error
RegisterSubscribeFunction(listener Listener)
RegisterUnsubscribeFunction(listener Listener)
SendDiscoveryPacket(discovery server.Discovery) error
SendGoodbyePacket(discovery server.Discovery) error
}

View File

@ -21,6 +21,7 @@ type Config struct {
KeepAlive uint `envconfig:"KEEP_ALIVE" required:"false" default:"5"` // how often to send the keepalive message with all availabel information [secs] KeepAlive uint `envconfig:"KEEP_ALIVE" required:"false" default:"5"` // how often to send the keepalive message with all availabel information [secs]
TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid
NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers
Register bool `envconfig:"REGISTER" required:"false" default:"true"` // If true (default) then local instance is registered with other instance (discovery packet is sent regularly)
} }
// GetConfig return configuration created based on environment variables // GetConfig return configuration created based on environment variables

28
daemon/handlers.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"net/http"
"github.com/labstack/echo"
"github.com/rosti-cz/server_lobby/server"
)
func listHandler(c echo.Context) error {
label := c.QueryParam("label")
var discoveries []server.Discovery
if len(label) > 0 {
discoveries = discoveryStorage.Filter(label)
} else {
discoveries = discoveryStorage.GetAll()
}
return c.JSONPretty(200, discoveries, " ")
}
func prometheusHandler(c echo.Context) error {
services := preparePrometheusOutput(discoveryStorage.GetAll())
return c.JSONPretty(http.StatusOK, services, " ")
}

View File

@ -3,7 +3,6 @@ package main
import ( import (
"context" "context"
"log" "log"
"net/http"
"os" "os"
"os/signal" "os/signal"
"strconv" "strconv"
@ -12,18 +11,33 @@ import (
"github.com/labstack/echo" "github.com/labstack/echo"
"github.com/labstack/echo/middleware" "github.com/labstack/echo/middleware"
"github.com/nats-io/nats.go" "github.com/rosti-cz/server_lobby/common"
"github.com/rosti-cz/server_lobby/nats_driver"
"github.com/rosti-cz/server_lobby/server" "github.com/rosti-cz/server_lobby/server"
) )
var discoveryStorage server.Discoveries = server.Discoveries{} var discoveryStorage server.Discoveries = server.Discoveries{}
var driver common.Driver
var config Config var config Config
var shuttingDown bool var shuttingDown bool
func init() { func init() {
// Load config from environment variables
config = *GetConfig()
// Setup discovery storage
discoveryStorage.LogChannel = make(chan string) discoveryStorage.LogChannel = make(chan string)
discoveryStorage.TTL = config.TTL
// Setup driver
driver = &nats_driver.Driver{
NATSUrl: config.NATSURL,
NATSDiscoveryChannel: config.NATSDiscoveryChannel,
LogChannel: discoveryStorage.LogChannel,
}
} }
// cleanDiscoveryPool clears the local server map and keeps only the alive servers // cleanDiscoveryPool clears the local server map and keeps only the alive servers
@ -37,47 +51,29 @@ func cleanDiscoveryPool() {
// sendGoodbyePacket is almost same as sendDiscoveryPacket but it's not running in loop // sendGoodbyePacket is almost same as sendDiscoveryPacket but it's not running in loop
// and it adds goodbye message so other nodes know this node is gonna die. // and it adds goodbye message so other nodes know this node is gonna die.
func sendGoodbyePacket(nc *nats.Conn) { func sendGoodbyePacket() {
discovery, err := getIdentification() discovery, err := getIdentification()
if err != nil { if err != nil {
log.Printf("sending discovery identification error: %v\n", err) log.Printf("sending discovery identification error: %v\n", err)
} }
envelope := discoveryEnvelope{ err = driver.SendGoodbyePacket(discovery)
Discovery: discovery,
Message: "goodbye",
}
data, err := envelope.Bytes()
if err != nil { if err != nil {
log.Printf("sending discovery formating message error: %v\n", err) log.Println(err)
}
err = nc.Publish(config.NATSDiscoveryChannel, data)
if err != nil {
log.Printf("sending discovery error: %v\n", err)
} }
} }
// sendDisoveryPacket sends discovery packet regularly so the network know we exist // sendDisoveryPacket sends discovery packet regularly so the network know we exist
func sendDiscoveryPacket(nc *nats.Conn) { func sendDiscoveryPacket() {
for { for {
discovery, err := getIdentification() discovery, err := getIdentification()
if err != nil { if err != nil {
log.Printf("sending discovery identification error: %v\n", err) log.Printf("sending discovery identification error: %v\n", err)
} }
envelope := discoveryEnvelope{ err = driver.SendDiscoveryPacket(discovery)
Discovery: discovery,
Message: "hi",
}
data, err := envelope.Bytes()
if err != nil { if err != nil {
log.Printf("sending discovery formating message error: %v\n", err) log.Println(err)
}
err = nc.Publish(config.NATSDiscoveryChannel, data)
if err != nil {
log.Printf("sending discovery error: %v\n", err)
} }
time.Sleep(time.Duration(config.KeepAlive) * time.Second) time.Sleep(time.Duration(config.KeepAlive) * time.Second)
@ -100,34 +96,35 @@ func main() {
// Closing the logging channel // Closing the logging channel
defer close(discoveryStorage.LogChannel) defer close(discoveryStorage.LogChannel)
defer driver.Close()
discoveryStorage.TTL = config.TTL
// Load config from environment variables
config = *GetConfig()
// ------------------------ // ------------------------
// Server discovering stuff // Server discovering stuff
// ------------------------ // ------------------------
// Connect to the NATS service // Connect to the NATS service
nc, err := nats.Connect(config.NATSURL) driver.RegisterSubscribeFunction(func(d server.Discovery) {
discoveryStorage.Add(d)
})
driver.RegisterUnsubscribeFunction(func(d server.Discovery) {
discoveryStorage.Delete(d.Hostname)
})
err = driver.Init()
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
defer nc.Drain()
go printDiscoveryLogs() go printDiscoveryLogs()
// Subscribe
log.Println("> discovery channel")
_, err = nc.Subscribe(config.NATSDiscoveryChannel, discoveryHandler)
if err != nil {
log.Fatalln(err)
}
go cleanDiscoveryPool() go cleanDiscoveryPool()
go sendDiscoveryPacket(nc)
// If config.Register is false this instance won't be registered with other nodes
if config.Register {
go sendDiscoveryPacket()
} else {
log.Println("standalone mode, I won't register myself")
}
// -------- // --------
// REST API // REST API
@ -142,25 +139,9 @@ func main() {
e.Use(middleware.Recover()) e.Use(middleware.Recover())
// Routes // Routes
e.GET("/", func(c echo.Context) error { e.GET("/", listHandler)
label := c.QueryParam("label") e.GET("/v1/", listHandler)
e.GET("/v1/prometheus", prometheusHandler)
var discoveries []server.Discovery
if len(label) > 0 {
discoveries = discoveryStorage.Filter(label)
} else {
discoveries = discoveryStorage.GetAll()
}
return c.JSONPretty(200, discoveries, " ")
})
e.GET("/prometheus", func(c echo.Context) error {
services := preparePrometheusOutput(discoveryStorage.GetAll())
return c.JSONPretty(http.StatusOK, services, " ")
})
// e.GET("/template/:template", func(c echo.Context) error { // e.GET("/template/:template", func(c echo.Context) error {
// templateName := c.Param("template") // templateName := c.Param("template")
@ -186,14 +167,18 @@ func main() {
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func(nc *nats.Conn, e *echo.Echo) { go func(e *echo.Echo, config Config) {
sig := <-signals sig := <-signals
shuttingDown = true shuttingDown = true
log.Printf("%s signal received, sending goodbye packet\n", sig.String()) if config.Register {
sendGoodbyePacket(nc) log.Printf("%s signal received, sending goodbye packet\n", sig.String())
time.Sleep(5 * time.Second) // we wait for a few seconds to let background jobs to finish their job sendGoodbyePacket()
time.Sleep(5 * time.Second) // we wait for a few seconds to let background jobs to finish their job
} else {
log.Printf("%s signal received", sig.String())
}
e.Shutdown(context.TODO()) e.Shutdown(context.TODO())
}(nc, e) }(e, config)
// Start server // Start server
e.Logger.Error(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port)))) e.Logger.Error(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))

View File

@ -1,29 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"log"
"github.com/nats-io/nats.go"
)
// discoveryHandler accepts discovery message and
func discoveryHandler(m *nats.Msg) {
message := discoveryEnvelope{}
err := json.Unmarshal(m.Data, &message)
if err != nil {
log.Println(fmt.Errorf("decoding message error: %v", err))
}
err = message.Discovery.Validate()
if err != nil {
log.Println(fmt.Errorf("validation error: %v", err))
}
if message.Message == "hi" {
discoveryStorage.Add(message.Discovery)
} else if message.Message == "goodbye" {
discoveryStorage.Delete(message.Discovery.Hostname)
}
}

112
nats_driver/main.go Normal file
View File

@ -0,0 +1,112 @@
package nats_driver
import (
"encoding/json"
"fmt"
"github.com/nats-io/nats.go"
"github.com/rosti-cz/server_lobby/common"
"github.com/rosti-cz/server_lobby/server"
)
type Driver struct {
NATSUrl string
NATSDiscoveryChannel string
LogChannel chan string
nc *nats.Conn
subscribeListener common.Listener
unsubscribeListener common.Listener
}
// handler is called asynchronously so and because it cannot log directly to stderr there
// is channel called LogChannel that can be used to log error messages from this
func (d *Driver) handler(m *nats.Msg) {
message := discoveryEnvelope{}
err := json.Unmarshal(m.Data, &message)
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("decoding message error: %v", err).Error()
}
err = message.Discovery.Validate()
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("validation error: %v", err).Error()
}
if message.Message == "hi" {
d.subscribeListener(message.Discovery)
} else if message.Message == "goodbye" {
d.unsubscribeListener(message.Discovery)
} else {
if d.LogChannel != nil {
d.LogChannel <- "incompatible message"
}
}
}
func (d *Driver) Init() error {
if d.LogChannel == nil {
return fmt.Errorf("please initiate LogChannel variable")
}
nc, err := nats.Connect(d.NATSUrl)
if err != nil {
return err
}
d.nc = nc
_, err = nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
if err != nil {
return err
}
return nil
}
func (d *Driver) Close() error {
return d.nc.Drain()
}
func (d *Driver) RegisterSubscribeFunction(listener common.Listener) {
d.subscribeListener = listener
}
func (d *Driver) RegisterUnsubscribeFunction(listener common.Listener) {
d.unsubscribeListener = listener
}
func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "hi",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.nc.Publish(d.NATSDiscoveryChannel, data)
if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}
func (d *Driver) SendGoodbyePacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "goodbye",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.nc.Publish(d.NATSDiscoveryChannel, data)
if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}

View File

@ -1,4 +1,4 @@
package main package nats_driver
import ( import (
"encoding/json" "encoding/json"