lobby/daemon/main.go

271 lines
6.8 KiB
Go
Raw Normal View History

2021-08-31 14:26:09 +00:00
package main
import (
2021-09-02 00:13:12 +00:00
"context"
2021-08-31 14:26:09 +00:00
"log"
2021-09-02 00:13:12 +00:00
"os"
"os/signal"
"strconv"
2021-09-02 00:13:12 +00:00
"syscall"
2021-08-31 14:26:09 +00:00
"time"
"github.com/by-cx/lobby/common"
"github.com/by-cx/lobby/nats_driver"
"github.com/by-cx/lobby/redis_driver"
"github.com/by-cx/lobby/server"
2021-08-31 14:26:09 +00:00
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
)
var discoveryStorage server.Discoveries = server.Discoveries{}
var driver common.Driver
var localHost server.LocalHost
var lastLocalHostname string
2021-08-31 14:26:09 +00:00
var config Config
2021-09-02 00:13:12 +00:00
var shuttingDown bool
var sendDiscoveryPacketTrigger chan bool = make(chan bool)
2021-09-02 00:13:12 +00:00
2021-08-31 14:26:09 +00:00
func init() {
// Load config from environment variables
config = *GetConfig()
// Setup discovery storage
2021-08-31 14:26:09 +00:00
discoveryStorage.LogChannel = make(chan string)
discoveryStorage.TTL = config.TTL
// localhost initization
localHost = server.LocalHost{
LabelsPath: config.LabelsPath,
HostnameOverride: config.HostName,
InitialLabels: config.Labels,
RuntimeLabelsFilename: config.RuntimeLabelsFilename,
}
// Setup driver
if config.Driver == "NATS" {
driver = &nats_driver.Driver{
NATSUrl: config.NATSURL,
NATSDiscoveryChannel: config.NATSDiscoveryChannel,
LogChannel: discoveryStorage.LogChannel,
}
} else if config.Driver == "Redis" {
driver = &redis_driver.Driver{
Host: config.RedisHost,
Port: uint(config.RedisPort),
Password: config.RedisPassword,
Channel: config.RedisChannel,
DB: uint(config.RedisDB),
LogChannel: discoveryStorage.LogChannel,
}
} else {
log.Fatalf("unsupported driver %s", config.Driver)
}
2021-08-31 14:26:09 +00:00
}
// cleanDiscoveryPool clears the local server map and keeps only the alive servers
func cleanDiscoveryPool() {
for {
discoveryStorage.Clean()
time.Sleep(time.Duration(config.CleanEvery) * time.Second)
2021-08-31 14:26:09 +00:00
}
}
2021-09-02 00:13:12 +00:00
// sendGoodbyePacket is almost same as sendDiscoveryPacket but it's not running in loop
// and it adds goodbye message so other nodes know this node is gonna die.
func sendGoodbyePacket() {
discovery, err := localHost.GetIdentification()
2021-09-02 00:13:12 +00:00
if err != nil {
log.Printf("sending discovery identification error: %v\n", err)
}
err = driver.SendGoodbyePacket(discovery)
2021-09-02 00:13:12 +00:00
if err != nil {
log.Println(err)
2021-09-02 00:13:12 +00:00
}
}
// sendDiscoveryPacket sends a single discovery packet out
func sendDiscoveryPacket() {
sendDiscoveryPacketTrigger <- true
}
2021-08-31 14:26:09 +00:00
// sendDisoveryPacket sends discovery packet to the driver which passes it to the
// other nodes. By this it propagates any change that happens in the local discovery struct.
// Every tune trigger is triggered it sends one message.
func sendDiscoveryPacketTask(trigger chan bool) {
for {
// We are waiting for the trigger
<-trigger
if !shuttingDown {
// Get info about local machine and send to the exchange point
discovery, err := localHost.GetIdentification()
if err != nil {
2021-09-09 16:35:32 +00:00
log.Printf("sending discovery identification error: %v\n", err)
}
err = driver.SendDiscoveryPacket(discovery)
if err != nil {
log.Println(err.Error())
}
// If local hostname changes, we will deregister it
if discovery.Hostname != lastLocalHostname && lastLocalHostname != "" {
log.Println("Hostname change detected, deregistering the old one")
err = driver.SendGoodbyePacket(server.Discovery{
Hostname: lastLocalHostname,
})
if err != nil {
log.Println(err)
}
}
lastLocalHostname = discovery.Hostname
2021-09-09 16:35:32 +00:00
} else {
2021-09-02 00:13:12 +00:00
break
}
2021-08-31 14:26:09 +00:00
}
}
// Print logs acquired from disovery storage
func printDiscoveryLogs() {
for {
logMessage := <-discoveryStorage.LogChannel
log.Println(logMessage)
}
}
func main() {
var err error
// Closing the logging channel
defer close(discoveryStorage.LogChannel)
defer driver.Close()
2021-08-31 14:26:09 +00:00
// ------------------------
// Server discovering stuff
// ------------------------
// Setup callback function to register and unregister discovery packets from other servers
driver.RegisterSubscribeFunction(func(d server.Discovery) {
// Check if the local version and the new version are somehowe changed
localVersion := discoveryStorage.Get(d.Hostname)
exists := discoveryStorage.Exist(d.Hostname)
changed := server.Compare(localVersion, d)
discoveryStorage.Add(d)
if changed {
// Print this only if the server is already registered
if exists {
log.Printf("%s has been updated", d.Hostname)
}
go func() {
err = discoveryChange(d)
if err != nil {
log.Printf("discovery changed error: %v", err)
}
}()
}
})
driver.RegisterUnsubscribeFunction(func(d server.Discovery) {
discoveryStorage.Delete(d.Hostname)
go func() {
err = discoveryChange(d)
if err != nil {
log.Printf("discovery changed error: %v", err)
}
}()
})
err = driver.Init()
2021-08-31 14:26:09 +00:00
if err != nil {
log.Fatalln(err)
}
go printDiscoveryLogs()
go cleanDiscoveryPool()
if len(config.Callback) > 0 {
go discoveryChangeLoop()
go changeCatcherLoop()
}
// When the daemon boots up we trigger discovery change event so the config can be setup via callback script if there is any
err = discoveryChange(server.Discovery{})
if err != nil {
log.Printf("discovery changed error: %v", err)
}
// If config.Register is false this instance won't be registered with other nodes
if config.Register {
// This is background process that sends the message
go sendDiscoveryPacketTask(sendDiscoveryPacketTrigger)
// This triggers the process
go func() {
for {
sendDiscoveryPacket()
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
if shuttingDown {
break
}
}
}()
} else {
log.Println("standalone mode, I won't register myself")
}
2021-08-31 14:26:09 +00:00
// --------
// REST API
// --------
e := echo.New()
// Middleware
if len(config.Token) > 0 {
e.Use(TokenMiddleware)
}
2021-08-31 14:26:09 +00:00
e.Use(middleware.Logger())
e.Use(middleware.Recover())
// Routes
if !config.DisableAPI {
e.GET("/", listHandler)
e.GET("/v1/resolve", resolveHandler)
e.GET("/v1/discovery", getIdentificationHandler)
e.GET("/v1/discoveries", listHandler)
e.POST("/v1/labels", addLabelsHandler)
e.DELETE("/v1/labels", deleteLabelsHandler)
e.GET("/v1/prometheus/:name", prometheusHandler)
}
2021-09-02 00:13:12 +00:00
// ------------------------------
// Termination signals processing
// ------------------------------
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func(e *echo.Echo, config Config) {
2021-09-02 00:13:12 +00:00
sig := <-signals
shuttingDown = true
if config.Register {
log.Printf("%s signal received, sending goodbye packet\n", sig.String())
sendGoodbyePacket()
time.Sleep(1 * time.Second) // we wait for a few seconds to let background jobs to finish their job
} else {
log.Printf("%s signal received", sig.String())
}
2021-09-02 00:13:12 +00:00
e.Shutdown(context.TODO())
}(e, config)
2021-08-31 14:26:09 +00:00
// Start server
// In most cases this will end expectedly so it doesn't make sense to use the echo's approach to treat this message as an error.
e.Logger.Info(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
2021-08-31 14:26:09 +00:00
}