package main import ( "context" "log" "os" "os/signal" "strconv" "syscall" "time" "github.com/by-cx/lobby/common" "github.com/by-cx/lobby/nats_driver" "github.com/by-cx/lobby/redis_driver" "github.com/by-cx/lobby/server" "github.com/labstack/echo" "github.com/labstack/echo/middleware" ) var discoveryStorage server.Discoveries = server.Discoveries{} var driver common.Driver var localHost server.LocalHost var lastLocalHostname string var config Config var shuttingDown bool var sendDiscoveryPacketTrigger chan bool = make(chan bool) func init() { // Load config from environment variables config = *GetConfig() // Setup discovery storage discoveryStorage.LogChannel = make(chan string) discoveryStorage.TTL = config.TTL // localhost initization localHost = server.LocalHost{ LabelsPath: config.LabelsPath, HostnameOverride: config.HostName, InitialLabels: config.Labels, RuntimeLabelsFilename: config.RuntimeLabelsFilename, } // Setup driver if config.Driver == "NATS" { driver = &nats_driver.Driver{ NATSUrl: config.NATSURL, NATSDiscoveryChannel: config.NATSDiscoveryChannel, LogChannel: discoveryStorage.LogChannel, } } else if config.Driver == "Redis" { driver = &redis_driver.Driver{ Host: config.RedisHost, Port: uint(config.RedisPort), Password: config.RedisPassword, Channel: config.RedisChannel, DB: uint(config.RedisDB), LogChannel: discoveryStorage.LogChannel, } } else { log.Fatalf("unsupported driver %s", config.Driver) } } // cleanDiscoveryPool clears the local server map and keeps only the alive servers func cleanDiscoveryPool() { for { discoveryStorage.Clean() time.Sleep(time.Duration(config.CleanEvery) * time.Second) } } // sendGoodbyePacket is almost same as sendDiscoveryPacket but it's not running in loop // and it adds goodbye message so other nodes know this node is gonna die. func sendGoodbyePacket() { discovery, err := localHost.GetIdentification() if err != nil { log.Printf("sending discovery identification error: %v\n", err) } err = driver.SendGoodbyePacket(discovery) if err != nil { log.Println(err) } } // sendDiscoveryPacket sends a single discovery packet out func sendDiscoveryPacket() { sendDiscoveryPacketTrigger <- true } // sendDisoveryPacket sends discovery packet to the driver which passes it to the // other nodes. By this it propagates any change that happens in the local discovery struct. // Every tune trigger is triggered it sends one message. func sendDiscoveryPacketTask(trigger chan bool) { for { // We are waiting for the trigger <-trigger if !shuttingDown { // Get info about local machine and send to the exchange point discovery, err := localHost.GetIdentification() if err != nil { log.Printf("sending discovery identification error: %v\n", err) } err = driver.SendDiscoveryPacket(discovery) if err != nil { log.Println(err.Error()) } // If local hostname changes, we will deregister it if discovery.Hostname != lastLocalHostname && lastLocalHostname != "" { log.Println("Hostname change detected, deregistering the old one") err = driver.SendGoodbyePacket(server.Discovery{ Hostname: lastLocalHostname, }) if err != nil { log.Println(err) } } lastLocalHostname = discovery.Hostname } else { break } } } // Print logs acquired from disovery storage func printDiscoveryLogs() { for { logMessage := <-discoveryStorage.LogChannel log.Println(logMessage) } } func main() { var err error // Closing the logging channel defer close(discoveryStorage.LogChannel) defer driver.Close() // ------------------------ // Server discovering stuff // ------------------------ // Setup callback function to register and unregister discovery packets from other servers driver.RegisterSubscribeFunction(func(d server.Discovery) { // Check if the local version and the new version are somehowe changed localVersion := discoveryStorage.Get(d.Hostname) exists := discoveryStorage.Exist(d.Hostname) changed := server.Compare(localVersion, d) discoveryStorage.Add(d) if changed { // Print this only if the server is already registered if exists { log.Printf("%s has been updated", d.Hostname) } go func() { err = discoveryChange(d) if err != nil { log.Printf("discovery changed error: %v", err) } }() } }) driver.RegisterUnsubscribeFunction(func(d server.Discovery) { discoveryStorage.Delete(d.Hostname) go func() { err = discoveryChange(d) if err != nil { log.Printf("discovery changed error: %v", err) } }() }) err = driver.Init() if err != nil { log.Fatalln(err) } go printDiscoveryLogs() go cleanDiscoveryPool() go discoveryChangeLoop() go changeCatcherLoop() // When the daemon boots up we trigger discovery change event so the config can be setup via callback script if there is any err = discoveryChange(server.Discovery{}) if err != nil { log.Printf("discovery changed error: %v", err) } // If config.Register is false this instance won't be registered with other nodes if config.Register { // This is background process that sends the message go sendDiscoveryPacketTask(sendDiscoveryPacketTrigger) // This triggers the process go func() { for { sendDiscoveryPacket() time.Sleep(time.Duration(config.KeepAlive) * time.Second) if shuttingDown { break } } }() } else { log.Println("standalone mode, I won't register myself") } // -------- // REST API // -------- e := echo.New() // Middleware if len(config.Token) > 0 { e.Use(TokenMiddleware) } e.Use(middleware.Logger()) e.Use(middleware.Recover()) // Routes if !config.DisableAPI { e.GET("/", listHandler) e.GET("/v1/discovery", getIdentificationHandler) e.GET("/v1/discoveries", listHandler) e.POST("/v1/labels", addLabelsHandler) e.DELETE("/v1/labels", deleteLabelsHandler) e.GET("/v1/prometheus/:name", prometheusHandler) } // ------------------------------ // Termination signals processing // ------------------------------ signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) go func(e *echo.Echo, config Config) { sig := <-signals shuttingDown = true if config.Register { log.Printf("%s signal received, sending goodbye packet\n", sig.String()) sendGoodbyePacket() time.Sleep(1 * time.Second) // we wait for a few seconds to let background jobs to finish their job } else { log.Printf("%s signal received", sig.String()) } e.Shutdown(context.TODO()) }(e, config) // Start server // In most cases this will end expectedly so it doesn't make sense to use the echo's approach to treat this message as an error. e.Logger.Info(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port)))) }