Compare commits

...

20 Commits
v1.1 ... main

Author SHA1 Message Date
Adam Štrauch c2ed658aaa
Multiple prometheus services support
continuous-integration/drone/push Build is passing Details
* Prometheus export test
* Version bump to 1.5
2022-02-21 10:20:06 +01:00
Adam Štrauch 038741b87e
Version bump to 1.4
continuous-integration/drone/push Build is passing Details
2021-12-31 09:08:22 +01:00
Adam Štrauch 157e795792
Invalid NATS connection fix when subscribing to a subject
continuous-integration/drone/push Build is passing Details
2021-12-31 08:09:44 +01:00
Adam Štrauch 55b3376d4c
Reconnect instead of exit when NATS is not available
continuous-integration/drone/push Build is passing Details
2021-12-25 10:38:13 +01:00
Adam Štrauch 0e836b68a8
NATS driver: try to reconnect if the connection is down
continuous-integration/drone/push Build is passing Details
continuous-integration/drone Build is passing Details
2021-09-27 00:10:16 +02:00
Adam Štrauch 322ef2a258
Version bump to 1.3 2021-09-20 16:41:14 +02:00
Adam Štrauch 6a1ecb80a1
Resolver
continuous-integration/drone/push Build is passing Details
Resolve API endpoint and cli command return list of hostnames
base on given label.
2021-09-20 16:10:52 +02:00
Adam Štrauch 3a7662ef0d
README update
continuous-integration/drone/push Build is passing Details
2021-09-18 19:43:13 +02:00
Adam Štrauch 4c0ca7ed6a
Callback script docs
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/tag Build is passing Details
2021-09-18 19:41:26 +02:00
Adam Štrauch a4a97ec49d
Version bump to 1.2
continuous-integration/drone/push Build is passing Details
2021-09-18 19:34:18 +02:00
Adam Štrauch 66683c9496
Update drone.yml file format
continuous-integration/drone/push Build is passing Details
2021-09-17 19:06:37 +02:00
Adam Štrauch 24923a305b
Renaming drone/woodpecker file for compatibility reasons
continuous-integration/drone/push Build is passing Details
2021-09-17 08:19:41 +02:00
Adam Štrauch ab8068e26d
Testing pipeline
continuous-integration/drone Build is passing Details
2021-09-17 01:39:48 +02:00
Adam Štrauch 202a60f091
Initial woodpeck pipeline 2021-09-17 01:36:43 +02:00
Adam Štrauch a9d7abbdd5
Don't run callback stuff when callback is not configured 2021-09-17 01:36:20 +02:00
Adam Štrauch 709c47af3e
Callback script
Callback is run when there some discovery packet changes.
It can be an update, adding a new one or deleting the old one.
This can be used to perform dynamic configuration of services that
don't support lobby's API.
2021-09-15 16:58:09 +02:00
Adam Štrauch e06a5cc94b
Implementation of change detection
If there is an update in discovery an function is triggered
that can pick it up.
2021-09-11 11:58:27 +02:00
Adam Štrauch 6769b903bc
Mini refactoring 2021-09-09 18:35:32 +02:00
Adam Štrauch 8493c7cb40
Send discovery packet every time a new label is added 2021-09-09 18:25:22 +02:00
Adam Štrauch a0abda5196
README update 2021-09-07 14:12:29 +02:00
21 changed files with 528 additions and 68 deletions

10
.drone.yml Normal file
View File

@ -0,0 +1,10 @@
kind: pipeline
type: docker
name: testing
steps:
- name: test
image: golang
commands:
- go mod tidy
- make test

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
.history/
# Files with secrets
*secret*
tmp/

View File

@ -1,4 +1,4 @@
VERSION=1.1
VERSION=1.5
.PHONY: all

View File

@ -52,9 +52,9 @@ Support for NATS is only less than 150 lines.
The quickest way how to run lobby on your server is this:
```shell
wget -O /usr/local/bin/lobbyd https://github.com/by-cx/lobby/releases/download/v1.0/lobbyd-1.0-linux-amd64
wget -O /usr/local/bin/lobbyd https://github.com/by-cx/lobby/releases/download/v1.2/lobbyd-1.2-linux-amd64
chmod +x /usr/local/bin/lobbyd
wget -O /usr/local/bin/lobbyctl https://github.com/by-cx/lobby/releases/download/v1.0/lobbyctl-1.0-linux-amd64
wget -O /usr/local/bin/lobbyctl https://github.com/by-cx/lobby/releases/download/v1.2/lobbyctl-1.2-linux-amd64
chmod +x /usr/local/bin/lobbyctl
# Update NATS_URL and LABELS here
@ -89,31 +89,42 @@ To test if local instance is running call this:
There are other config directives you can use to fine-tune lobbyd to exactly what you need.
| Environment variable | Type | Default | Required | Note |
| ----------------------- | ------ | ----------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TOKEN | string | | no | Authentication token for API, if empty auth is disabled |
| HOST | string | 127.0.0.1 | no | IP address used for the REST server to listen |
| PORT | int | 1313 | no | Port related to the address above |
| DISABLE_API | bool | false | no | If true API interface won't start |
| DRIVER | string | NATS | yes | Selects which driver is used to exchange the discovery packets. |
| NATS_URL | string | | yes (NATS driver) | NATS URL used to connect to the NATS server |
| NATS_DISCOVERY_CHANNEL | string | lobby.discovery | no | Channel where the keep-alive packets are sent |
| REDIS_HOST | string | 127.0.0.1" | no | Redis host |
| REDIS_PORT | uint16 | 6379 | no | Redis port |
| REDIS_DB | string | 0 | no | Redis DB |
| REDIS_CHANNEL | string | lobby:discovery | no | Redis channel |
| REDIS_PASSWORD | string | | no | Redis password |
| LABELS | string | | no | List of labels, labels should be separated by comma |
| LABELS_PATH | string | /etc/lobby/labels | no | Path where filesystem based labels are located, one label per line, filename is not important for lobby |
| RUNTIME_LABELS_FILENAME | string | _runtime | no | Filename for file created in LabelsPath where runtime labels will be added |
| HOSTNAME | string | | no | Override local machine's hostname |
| CLEAN_EVERY | int | 15 | no | How often to clean the list of discovered servers to get rid of the not alive ones [secs] |
| KEEP_ALIVE | int | 5 | no | how often to send the keep-alive discovery message with all available information [secs] |
| TTL | int | 30 | no | After how many secs is discovery record considered as invalid |
| NODE_EXPORTER_PORT | int | 9100 | no | Default port where node_exporter listens on all registered servers, this is used when the special prometheus labels doesn't contain port |
| REGISTER | bool | true | no | If true (default) then local instance is registered with other instance (discovery packet is sent regularly), if false the daemon runs only as a client |
| Environment variable | Type | Default | Required | Note |
| ------------------------ | ------ | ----------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TOKEN | string | | no | Authentication token for API, if empty auth is disabled |
| HOST | string | 127.0.0.1 | no | IP address used for the REST server to listen |
| PORT | int | 1313 | no | Port related to the address above |
| DISABLE_API | bool | false | no | If true API interface won't start |
| DRIVER | string | NATS | yes | Selects which driver is used to exchange the discovery packets. |
| NATS_URL | string | | yes (NATS driver) | NATS URL used to connect to the NATS server |
| NATS_DISCOVERY_CHANNEL | string | lobby.discovery | no | Channel where the keep-alive packets are sent |
| REDIS_HOST | string | 127.0.0.1" | no | Redis host |
| REDIS_PORT | uint16 | 6379 | no | Redis port |
| REDIS_DB | string | 0 | no | Redis DB |
| REDIS_CHANNEL | string | lobby:discovery | no | Redis channel |
| REDIS_PASSWORD | string | | no | Redis password |
| LABELS | string | | no | List of labels, labels should be separated by comma |
| LABELS_PATH | string | /etc/lobby/labels | no | Path where filesystem based labels are located, one label per line, filename is not important for lobby |
| RUNTIME_LABELS_FILENAME | string | _runtime | no | Filename for file created in LabelsPath where runtime labels will be added |
| HOSTNAME | string | | no | Override local machine's hostname |
| CLEAN_EVERY | int | 15 | no | How often to clean the list of discovered servers to get rid of the not alive ones [secs] |
| KEEP_ALIVE | int | 5 | no | how often to send the keep-alive discovery message with all available information [secs] |
| TTL | int | 30 | no | After how many secs is discovery record considered as invalid |
| NODE_EXPORTER_PORT | int | 9100 | no | Default port where node_exporter listens on all registered servers, this is used when the special prometheus labels doesn't contain port |
| REGISTER | bool | true | no | If true (default) then local instance is registered with other instance (discovery packet is sent regularly), if false the daemon runs only as a client |
| CALLBACK | string | | no | Path to a script that runs when the the discovery packet records are changed. Not running for first |
| CALLBACK_COOLDOWN | int | 15 | no | Cooldown prevents the call back script to run sooner than configured amount of seconds after last run is finished. |
| CALLBACK_FIRST_RUN_DELAY | int | 30 | no | Wait for this amount of seconds before callback is run for first time after fresh start of the daemon |
### Callback script
When your application cannot support Lobbyd's API it can be configured via callback script that runs everytime something has changed in the network. Callback script is run every 15 seconds (configured by CALLBACK_COOLDOWN) but only when something has changed.
The script runs under the same user as lobbyd. When lobbyd starts first thirty seconds (CALLBACK_FIRST_RUN_DELAY) is ignored and then the script is run for first time. After these thirty seconds everything runs in loop based on the changes in the network.
All current discovery packets are passed to the callback script via standard input. It's basically the same input you get if you run `lobbyctl discoveries`.
### Service discovery for Prometheus
Lobbyd has an API endpoint that returns list of targets for [Prometheus's HTTP SD config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config). That
@ -126,6 +137,8 @@ Let's check this:
If you set port to *-* lobby daemon omits port entirely from the output.
There can be multiple `host` labels but only one `port` label and all prometheus labels (last line) will be common for all hosts labels. If port is omitted then default 9100 is used or port can also be part of the host label.
When you open URL http://localhost:1313/v1/prometheus/nodeexporter it returns this:
```json
@ -220,11 +233,11 @@ too.
## TODO
* [X] Tests
* [ ] Command hooks - script or list of scripts that are triggered when discovery status has changed
* [X] Command hooks - script or list of scripts that are triggered when discovery status has changed
* [ ] Support for multiple active backend drivers
* [ ] Redis driver
* [X] Redis driver
* [X] Remove the 5 secs waiting when daemon is stopped
* [X] API to allow add labels at runtime
* [ ] Check what happens when driver is disconnected

View File

@ -114,6 +114,31 @@ func (l *LobbyClient) GetDiscoveries() ([]server.Discovery, error) {
return discoveries, nil
}
// Resolve returns list of hostnames that have given label
func (l *LobbyClient) Resolve(label server.Label) ([]string, error) {
l.init()
path := fmt.Sprintf("/v1/resolve?label=%s", label.String())
method := "GET"
var hostnames []string
status, body, err := l.call(method, path, "")
if err != nil {
return hostnames, err
}
if status != 200 {
return hostnames, fmt.Errorf("non-200 response: %s", body)
}
err = json.Unmarshal([]byte(body), &hostnames)
if err != nil {
return hostnames, fmt.Errorf("response parsing error: %v", err)
}
return hostnames, nil
}
// Find discoveries by their labels
func (l *LobbyClient) FindByLabels(labels server.Labels) ([]server.Discovery, error) {
l.init()

View File

@ -14,6 +14,7 @@ func Usage() {
flag.Usage()
fmt.Println("")
fmt.Println("Commands:")
fmt.Println(" resolve label returns list of hostnames with given label")
fmt.Println(" discovery returns discovery packet of the server where the client is connected to")
fmt.Println(" discoveries returns list of all registered discovery packets")
fmt.Println(" discoveries labels [LABEL] ... returns list of all registered discovery packets with given labels (OR)")
@ -68,6 +69,25 @@ func main() {
}
switch flag.Args()[0] {
case "resolve":
if len(flag.Args()) == 2 {
hostnames, err := client.Resolve(server.Label(flag.Arg(1)))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if *jsonOutput {
printJSON(hostnames)
} else {
for _, hostname := range hostnames {
fmt.Println(hostname)
}
}
} else {
Usage()
os.Exit(0)
}
case "discoveries":
var discoveries []server.Discovery
var err error

View File

@ -30,6 +30,9 @@ type Config struct {
TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid
NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers
Register bool `envconfig:"REGISTER" required:"false" default:"true"` // If true (default) then local instance is registered with other instance (discovery packet is sent regularly)
Callback string `envconfig:"CALLBACK" required:"false" default:""` // path to a script that runs when the is a change in the labels database
CallbackCooldown uint `envconfig:"CALLBACK_COOLDOWN" required:"false" default:"15"` // cooldown that prevents to run the config change script too many times in row
CallbackFirstRunDelay uint `envconfig:"CALLBACK_FIRST_RUN_DELAY" required:"false" default:"30"` // Wait for this amount of seconds before callback is run for first time after fresh start of the daemon
}
// GetConfig return configuration created based on environment variables

View File

@ -28,6 +28,20 @@ func listHandler(c echo.Context) error {
return c.JSONPretty(200, discoveries, " ")
}
// resolveHandler returns hostname(s) based on another label
func resolveHandler(c echo.Context) error {
label := c.QueryParam("label") // This is label we will use to filter discovery packets
output := []string{}
discoveries := discoveryStorage.Filter([]string{label})
for _, discovery := range discoveries {
output = append(output, discovery.Hostname)
}
return c.JSONPretty(http.StatusOK, output, " ")
}
func prometheusHandler(c echo.Context) error {
name := c.Param("name")
@ -63,6 +77,9 @@ func addLabelsHandler(c echo.Context) error {
return c.String(http.StatusBadRequest, err.Error())
}
// Update the other nodes with this new change
sendDiscoveryPacket()
return c.String(http.StatusOK, "OK")
}
@ -84,5 +101,8 @@ func deleteLabelsHandler(c echo.Context) error {
return c.String(http.StatusBadRequest, err.Error())
}
// Update the other nodes with this new change
sendDiscoveryPacket()
return c.String(http.StatusOK, "OK")
}

View File

@ -20,10 +20,12 @@ import (
var discoveryStorage server.Discoveries = server.Discoveries{}
var driver common.Driver
var localHost server.LocalHost
var lastLocalHostname string
var config Config
var shuttingDown bool
var sendDiscoveryPacketTrigger chan bool = make(chan bool)
func init() {
// Load config from environment variables
@ -33,7 +35,7 @@ func init() {
discoveryStorage.LogChannel = make(chan string)
discoveryStorage.TTL = config.TTL
// localhost initization
// localhost initiation
localHost = server.LocalHost{
LabelsPath: config.LabelsPath,
HostnameOverride: config.HostName,
@ -88,21 +90,44 @@ func sendGoodbyePacket() {
}
}
// sendDisoveryPacket sends discovery packet regularly so the network know we exist
// sendDiscoveryPacket sends a single discovery packet out
func sendDiscoveryPacket() {
sendDiscoveryPacketTrigger <- true
}
// sendDisoveryPacket sends discovery packet to the driver which passes it to the
// other nodes. By this it propagates any change that happens in the local discovery struct.
// Every tune trigger is triggered it sends one message.
func sendDiscoveryPacketTask(trigger chan bool) {
for {
discovery, err := localHost.GetIdentification()
if err != nil {
log.Printf("sending discovery identification error: %v\n", err)
}
// We are waiting for the trigger
<-trigger
err = driver.SendDiscoveryPacket(discovery)
if err != nil {
log.Println(err)
}
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
if !shuttingDown {
// Get info about local machine and send to the exchange point
discovery, err := localHost.GetIdentification()
if err != nil {
log.Printf("sending discovery identification error: %v\n", err)
}
if shuttingDown {
err = driver.SendDiscoveryPacket(discovery)
if err != nil {
log.Println(err.Error())
}
// If local hostname changes, we will deregister it
if discovery.Hostname != lastLocalHostname && lastLocalHostname != "" {
log.Println("Hostname change detected, deregistering the old one")
err = driver.SendGoodbyePacket(server.Discovery{
Hostname: lastLocalHostname,
})
if err != nil {
log.Println(err)
}
}
lastLocalHostname = discovery.Hostname
} else {
break
}
}
@ -127,12 +152,38 @@ func main() {
// Server discovering stuff
// ------------------------
// Connect to the NATS service
// Setup callback function to register and unregister discovery packets from other servers
driver.RegisterSubscribeFunction(func(d server.Discovery) {
// Check if the local version and the new version are somehowe changed
localVersion := discoveryStorage.Get(d.Hostname)
exists := discoveryStorage.Exist(d.Hostname)
changed := server.Compare(localVersion, d)
discoveryStorage.Add(d)
if changed {
// Print this only if the server is already registered
if exists {
log.Printf("%s has been updated", d.Hostname)
}
go func() {
err = discoveryChange(d)
if err != nil {
log.Printf("discovery changed error: %v", err)
}
}()
}
})
driver.RegisterUnsubscribeFunction(func(d server.Discovery) {
discoveryStorage.Delete(d.Hostname)
go func() {
err = discoveryChange(d)
if err != nil {
log.Printf("discovery changed error: %v", err)
}
}()
})
err = driver.Init()
@ -143,13 +194,36 @@ func main() {
go printDiscoveryLogs()
go cleanDiscoveryPool()
if len(config.Callback) > 0 {
go discoveryChangeLoop()
go changeCatcherLoop()
}
// When the daemon boots up we trigger discovery change event so the config can be setup via callback script if there is any
err = discoveryChange(server.Discovery{})
if err != nil {
log.Printf("discovery changed error: %v", err)
}
// If config.Register is false this instance won't be registered with other nodes
if config.Register {
go sendDiscoveryPacket()
// This is background process that sends the message
go sendDiscoveryPacketTask(sendDiscoveryPacketTrigger)
// This triggers the process
go func() {
for {
sendDiscoveryPacket()
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
if shuttingDown {
break
}
}
}()
} else {
log.Println("standalone mode, I won't register myself")
}
// --------
// REST API
// --------
@ -161,10 +235,10 @@ func main() {
}
e.Use(middleware.Logger())
e.Use(middleware.Recover())
// Routes
if !config.DisableAPI {
e.GET("/", listHandler)
e.GET("/v1/resolve", resolveHandler)
e.GET("/v1/discovery", getIdentificationHandler)
e.GET("/v1/discoveries", listHandler)
e.POST("/v1/labels", addLabelsHandler)
@ -175,7 +249,6 @@ func main() {
// ------------------------------
// Termination signals processing
// ------------------------------
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
@ -191,7 +264,6 @@ func main() {
}
e.Shutdown(context.TODO())
}(e, config)
// Start server
// In most cases this will end expectedly so it doesn't make sense to use the echo's approach to treat this message as an error.
e.Logger.Info(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))

View File

@ -34,19 +34,19 @@ func preparePrometheusOutput(name string, discoveries []server.Discovery) Promet
for _, discovery := range discoveries {
port := strconv.Itoa(int(config.NodeExporterPort))
host := discovery.Hostname
hosts := []string{}
var add bool // add to the prometheus output when there is at least one prometheus related label
labels := map[string]string{}
labels := map[string]string{} // These are prometheus labels, not Lobby's labels
for _, label := range discovery.FindLabels("prometheus:" + name + ":") {
for _, label := range discovery.FindLabelsByPrefix("prometheus:" + name + ":") {
trimmed := strings.TrimPrefix(label.String(), "prometheus:"+name+":")
parts := strings.SplitN(trimmed, ":", 2)
if len(parts) == 2 {
if parts[0] == "port" {
port = parts[1]
} else if parts[0] == "host" {
host = parts[1]
hosts = append(hosts, parts[1])
} else {
labels[parts[0]] = parts[1]
}
@ -65,20 +65,23 @@ func preparePrometheusOutput(name string, discoveries []server.Discovery) Promet
}
if add {
// Omit port part if "-" is set
target := host + ":" + port
if port == "-" {
target = host
}
targets := []string{}
for _, host := range hosts {
// Omit port part if "-" is set or port is part of the host
target := host + ":" + port
if strings.Contains(host, ":") || port == "-" {
target = host
}
targets = append(targets, target)
}
service := PrometheusService{
Targets: []string{target},
Targets: targets,
Labels: labels,
}
services = append(services, service)
}
}
return services

47
daemon/prometheus_test.go Normal file
View File

@ -0,0 +1,47 @@
package main
import (
"testing"
"github.com/by-cx/lobby/server"
"github.com/stretchr/testify/assert"
)
func TestPreparePrometheusOutput(t *testing.T) {
discoveries := []server.Discovery{
{
Hostname: "test.server",
Labels: server.Labels{
// test1
"prometheus:test1:label1:l1",
"prometheus:test1:host:srv1:1234",
"prometheus:test1:host:srv1:1235",
"prometheus:test1:host:srv1",
"prometheus:test1:label2:l2",
// test2
"prometheus:test2:host:srv2",
"prometheus:test2:host:srv2:1235",
"prometheus:test2:host:srv2:1236",
"prometheus:test2:host:srv2:1237",
"prometheus:test2:label1:l3",
},
},
}
services := preparePrometheusOutput("test1", discoveries)
assert.Equal(t, 1, len(services))
assert.Equal(t, 3, len(services[0].Targets))
assert.Contains(t, services[0].Targets, "srv1:9100")
assert.Contains(t, services[0].Targets, "srv1:1234")
assert.Contains(t, services[0].Targets, "srv1:1235")
assert.Contains(t, services[0].Labels["label1"], "l1")
assert.Contains(t, services[0].Labels["label2"], "l2")
services = preparePrometheusOutput("test2", discoveries)
assert.Equal(t, 1, len(services))
assert.Equal(t, 4, len(services[0].Targets))
assert.Contains(t, services[0].Targets, "srv2:9100")
assert.Contains(t, services[0].Targets, "srv2:1235")
assert.Contains(t, services[0].Targets, "srv2:1236")
assert.Contains(t, services[0].Targets, "srv2:1237")
assert.Contains(t, services[0].Labels["label1"], "l3")
}

102
daemon/update.go Normal file
View File

@ -0,0 +1,102 @@
package main
import (
"encoding/json"
"log"
"os/exec"
"strings"
"time"
"github.com/by-cx/lobby/server"
)
// These functions are called when something has changed in the storage
var changeDetectedChannel chan bool = make(chan bool)
var changeDetected bool
// changeCatcherLoop waits for a change signal and switches variable that says if the callback should run or not
func changeCatcherLoop() {
for {
<-changeDetectedChannel
changeDetected = true
}
}
// discoveryChangeLoop is used to process dynamic configuration changes.
// When there is a change detected a shell script or given process is triggered
// which does some operations with the new data. Usually it generates the
// configuration.
// This function has internal loop and won't allow to run the command more
// often than it's the configured amount of time. That prevents
func discoveryChangeLoop() {
// This other loop tics in strict intervals and prevents the callback script to run more often than it's configured
var cmd *exec.Cmd
// Delay first run of the callback script a little so everything can set up
log.Printf("Delaying start of discovery change loop (%d seconds)\n", config.CallbackFirstRunDelay)
time.Sleep(time.Duration(config.CallbackFirstRunDelay) * time.Second)
log.Println("Starting discovery change loop")
for {
if changeDetected {
// We switch this at the beginning so we can detect new changes while the callback script is running
changeDetected = false
log.Println("Running callback function")
// TODO: this is not the best way
callbackCommandSlice := strings.Split(config.Callback, " ")
if len(callbackCommandSlice) == 1 {
cmd = exec.Command(callbackCommandSlice[0])
} else if len(callbackCommandSlice) > 1 {
cmd = exec.Command(callbackCommandSlice[0], callbackCommandSlice[1:]...)
} else {
log.Println("wrong number of parts of the callback command")
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
continue
}
stdin, err := cmd.StdinPipe()
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
discoveriesJSON, err := json.Marshal(discoveryStorage.GetAll())
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
_, err = stdin.Write([]byte(discoveriesJSON))
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
err = stdin.Close()
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
stdout, err := cmd.CombinedOutput()
if err != nil {
log.Println("Callback error: ", err.Error())
}
log.Println("Callback output: ", string(stdout))
}
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
}
}
// discoveryChange is called when daemon detects that a newly arrived discovery
// packet is somehow different than the localone. This can be used to trigger
// some action in the local machine.
func discoveryChange(discovery server.Discovery) error {
if len(config.Callback) > 0 {
changeDetectedChannel <- true
}
return nil
}

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/fatih/color v1.12.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-resty/resty/v2 v2.6.0
github.com/google/go-cmp v0.5.5
github.com/kelseyhightower/envconfig v1.4.0
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0 // indirect

2
go.sum
View File

@ -31,6 +31,7 @@ github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
@ -148,6 +149,7 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@ -3,6 +3,9 @@ package nats_driver
import (
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/by-cx/lobby/common"
"github.com/by-cx/lobby/server"
@ -52,15 +55,20 @@ func (d *Driver) Init() error {
return fmt.Errorf("please initiate LogChannel variable")
}
nc, err := nats.Connect(d.NATSUrl)
if err != nil {
return err
for {
nc, err := nats.Connect(d.NATSUrl)
if err != nil {
log.Printf("Can't connect to the NATS server, waiting for 5 seconds before I try it again. (%v)\n", err)
time.Sleep(time.Second * 5)
continue
}
d.nc = nc
break
}
d.nc = nc
_, err = nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
_, err := d.nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
if err != nil {
return err
return fmt.Errorf("subscribe error: %v", err)
}
return nil
@ -93,7 +101,14 @@ func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.nc.Publish(d.NATSDiscoveryChannel, data)
if err != nil {
// In case the connection is down we will try to reconnect
if err != nil && strings.Contains(err.Error(), "connection closed") {
d.nc.Close()
err = d.Init()
if err != nil {
return fmt.Errorf("sending discovery reconnect error: %v", err)
}
} else if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil

View File

@ -45,9 +45,9 @@ func (d *Discovery) Bytes() ([]byte, error) {
return data, err
}
// FindLabels returns list of labels with given prefix. For example "service:ns" has prefix "service" or "service:".
// FindLabelsByPrefix returns list of labels with given prefix. For example "service:ns" has prefix "service" or "service:".
// It doesn't have to be prefix, but for example "service:test" will match "service:test" and also "service:test2".
func (d *Discovery) FindLabels(prefix string) Labels {
func (d *Discovery) FindLabelsByPrefix(prefix string) Labels {
labels := Labels{}
for _, label := range d.Labels {
if strings.HasPrefix(label.String(), prefix) {

View File

@ -27,7 +27,7 @@ func TestDiscovery(t *testing.T) {
assert.False(t, discovery.IsAlive(), "discovery not suppose to be alive")
discovery.LastCheck = now
assert.Equal(t, Labels{Label("service:test")}, discovery.FindLabels("service"))
assert.Equal(t, Labels{Label("service:test")}, discovery.FindLabelsByPrefix("service"))
assert.Equal(t, nil, discovery.Validate()) // TODO: This needs more love
content, err := json.Marshal(&discovery)

12
server/tools.go Normal file
View File

@ -0,0 +1,12 @@
package server
import "github.com/google/go-cmp/cmp"
// Compare compares discovery A and B and returns true if those two are different.
func Compare(discoveryA, discoveryB Discovery) bool {
discoveryA.LastCheck = 0
discoveryB.LastCheck = 0
discoveryA.TTL = 0
discoveryB.TTL = 0
return !cmp.Equal(discoveryA, discoveryB)
}

45
server/tools_test.go Normal file
View File

@ -0,0 +1,45 @@
package server
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCompare(t *testing.T) {
discoveryA := Discovery{
Hostname: "abcd.com",
Labels: Labels{
Label("label1"),
},
LastCheck: 52,
}
discoveryAB := Discovery{
Hostname: "abcd.com",
Labels: Labels{
Label("label1"),
},
LastCheck: 56,
}
discoveryB := Discovery{
Hostname: "efgh.com",
Labels: Labels{
Label("label2"),
},
LastCheck: 56,
}
discoveryC := Discovery{
Hostname: "abcd.com",
Labels: Labels{
Label("label2"),
},
LastCheck: 60,
}
assert.True(t, Compare(discoveryA, discoveryB))
assert.True(t, Compare(discoveryB, discoveryC))
assert.True(t, Compare(discoveryA, discoveryC)) // Test different labels and same hostname
assert.False(t, Compare(discoveryA, discoveryA))
assert.False(t, Compare(discoveryB, discoveryB))
assert.False(t, Compare(discoveryA, discoveryAB)) // Test that last check is zeroed
}

View File

@ -1,5 +1,7 @@
package server
import "strings"
// Label keeps one piece of information about a single server
type Label string
@ -7,6 +9,34 @@ func (l Label) String() string {
return string(l)
}
// GetPart returns specific part of the label, if part index is higher than last available index it returns empty string.
func (l Label) GetPart(idx int) string {
parts := strings.Split(l.String(), ":")
if idx < 0 {
return ""
}
if len(parts) >= idx {
return ""
}
return parts[idx]
}
// GetPart1 is exactly same as GetPart but it splits the label only once, this is good for IPv6 addresses
func (l Label) GetPart1(idx int) string {
parts := strings.SplitN(l.String(), ":", 2)
if idx < 0 {
return ""
}
if len(parts) >= idx {
return ""
}
return parts[idx]
}
// Labels stores multiple Label records
type Labels []Label

38
templater/main.go Normal file
View File

@ -0,0 +1,38 @@
package main
import (
"flag"
"fmt"
"os"
)
// Templater is used to generate config files from predefined
// templates based on content of gathered discovery packets.
// It can for example configure Nginx's backend or database
// replication.
//
// It reads templates from /var/lib/lobby/templates (default)
// which are YAML files cotaining the template itself and command(s)
// that needs to be run when the template changes.
const defaultTemplatesPath = "/var/lib/lobby/templates"
var templatesPath *string
func init() {
templatesPath = flag.String("templates-path", defaultTemplatesPath, "path of where templates are stored")
flag.Parse()
err := os.MkdirAll(*templatesPath, 0750)
if err != nil {
fmt.Println(err)
flag.Usage()
os.Exit(1)
}
}
func main() {
fmt.Println(*templatesPath)
}