Compare commits
32 Commits
Author | SHA1 | Date |
---|---|---|
Adam Štrauch | c2ed658aaa | |
Adam Štrauch | 038741b87e | |
Adam Štrauch | 157e795792 | |
Adam Štrauch | 55b3376d4c | |
Adam Štrauch | 0e836b68a8 | |
Adam Štrauch | 322ef2a258 | |
Adam Štrauch | 6a1ecb80a1 | |
Adam Štrauch | 3a7662ef0d | |
Adam Štrauch | 4c0ca7ed6a | |
Adam Štrauch | a4a97ec49d | |
Adam Štrauch | 66683c9496 | |
Adam Štrauch | 24923a305b | |
Adam Štrauch | ab8068e26d | |
Adam Štrauch | 202a60f091 | |
Adam Štrauch | a9d7abbdd5 | |
Adam Štrauch | 709c47af3e | |
Adam Štrauch | e06a5cc94b | |
Adam Štrauch | 6769b903bc | |
Adam Štrauch | 8493c7cb40 | |
Adam Štrauch | a0abda5196 | |
Adam Štrauch | 58495d59c1 | |
Adam Štrauch | 2f4203e7c3 | |
Adam Štrauch | 7010316f8f | |
Adam Štrauch | f7340728be | |
Adam Štrauch | f2c3ef49e9 | |
Adam Štrauch | 128ed765e7 | |
Adam Štrauch | f60f92d667 | |
Adam Štrauch | 65c6aef5dc | |
Adam Štrauch | 753aaf8377 | |
Adam Štrauch | c6cb674053 | |
Adam Štrauch | b120970054 | |
Adam Štrauch | b19002ec84 |
|
@ -0,0 +1,10 @@
|
|||
kind: pipeline
|
||||
type: docker
|
||||
name: testing
|
||||
|
||||
steps:
|
||||
- name: test
|
||||
image: golang
|
||||
commands:
|
||||
- go mod tidy
|
||||
- make test
|
|
@ -1,3 +1,5 @@
|
|||
.history/
|
||||
|
||||
# Files with secrets
|
||||
*secret*
|
||||
tmp/
|
||||
|
|
41
Makefile
41
Makefile
|
@ -1,3 +1,6 @@
|
|||
VERSION=1.5
|
||||
|
||||
|
||||
.PHONY: all
|
||||
all: build
|
||||
|
||||
|
@ -9,9 +12,39 @@ clean:
|
|||
test:
|
||||
go test -v server/*.go
|
||||
|
||||
.PHONY: build
|
||||
build: test
|
||||
init:
|
||||
mkdir -p ./bin
|
||||
export CGO_ENABLED=0 && go build -o ./bin/lobbyd daemon/*.go
|
||||
export CGO_ENABLED=0 && go build -o ./bin/lobbyctl ctl/*.go
|
||||
|
||||
.PHONY: build
|
||||
build: test clean init linux-amd64 linux-arm linux-arm64 darwin-amd64 darwin-arm64
|
||||
|
||||
|
||||
.PHONY: linux-amd64
|
||||
linux-amd64: clean init
|
||||
# linux amd64
|
||||
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/lobbyd-${VERSION}-linux-amd64 daemon/*.go
|
||||
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/lobbyctl-${VERSION}-linux-amd64 ctl/*.go
|
||||
|
||||
.PHONY: linux-arm
|
||||
linux-arm: clean init
|
||||
# linux arm
|
||||
env CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -o ./bin/lobbyd-${VERSION}-linux-arm daemon/*.go
|
||||
env CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -o ./bin/lobbyctl-${VERSION}-linux-arm ctl/*.go
|
||||
|
||||
.PHONY: linux-arm64
|
||||
linux-arm64: clean init
|
||||
# linux arm64
|
||||
env CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./bin/lobbyd-${VERSION}-linux-arm64 daemon/*.go
|
||||
env CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./bin/lobbyctl-${VERSION}-linux-arm64 ctl/*.go
|
||||
|
||||
.PHONY: darwin-amd64
|
||||
darwin-amd64: clean init
|
||||
# darwin amd64
|
||||
env CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./bin/lobbyd-${VERSION}-darwin-amd64 daemon/*.go
|
||||
env CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./bin/lobbyctl-${VERSION}-darwin-amd64 ctl/*.go
|
||||
|
||||
.PHONY: darwin-arm64
|
||||
darwin-arm64: clean init
|
||||
# darwin arm64
|
||||
env CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o ./bin/lobbyd-${VERSION}-darwin-arm64 daemon/*.go
|
||||
env CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o ./bin/lobbyctl-${VERSION}-darwin-arm64 ctl/*.go
|
||||
|
|
129
README.md
129
README.md
|
@ -1,11 +1,15 @@
|
|||
# Lobby - simple server/service discovery service
|
||||
|
||||
TLDR: This a labeling tool for your servers. Like AWS resource tags but available everywhere.
|
||||
|
||||
In one of ours projects we needed service discovery that doesn't need complicated setup just to share
|
||||
a simple information about running services and checking if they are still alive. So we came up with
|
||||
this small service we call Lobby. It's like a lobby in games but in this case there are servers. Each
|
||||
server runs one or more instances of lobby daemon and it regularly sends how it's configured.
|
||||
this small service we called Lobby. It's like a lobby for users in games but in this case there are
|
||||
servers instead. Each server runs one or more instances of lobby daemon and it regularly sends info
|
||||
about its hostname and configured labels.
|
||||
|
||||
We call the information about the server and services running on it *labels*. Every server shares
|
||||
Labels are similar what you could know from AWS. It's basically alternative to resources' tags feature
|
||||
with the only different that you can use this anywhere including AWS. Every server sends something called
|
||||
"discovery packet" which is basically a json that looks like this:
|
||||
|
||||
```json
|
||||
|
@ -15,39 +19,42 @@ We call the information about the server and services running on it *labels*. Ev
|
|||
"service:smtp",
|
||||
"public_ip4:1.2.3.4",
|
||||
"public_ip6:2a03::1"
|
||||
],
|
||||
"last_check": 1630612478
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The packet contains information what's the server hostname and then list of labels describing
|
||||
what's running on it and what are the IP addresses. What's in the labels is completely up to you
|
||||
but in some use-cases (Node Exporter API endpoint) it expects "NAME:VALUE" format.
|
||||
The packet contains information what's the server's hostname and then list of labels describing
|
||||
what's running on it, what are the IP addresses, services, directories to backup or server's location for example.
|
||||
What's in the labels is completely up to you but in some use-cases (Node Exporter API endpoint) it
|
||||
expects "NAME:VALUE" format.
|
||||
|
||||
The labels can be configured via environment variables but also as files located in
|
||||
*/etc/lobby/labels* (configurable path) so it can dynamically change.
|
||||
*/etc/lobby/labels* (configurable path) so it can dynamically change. Another way is to use
|
||||
*lobbyctl* which can add new labels at runtime.
|
||||
|
||||
When everything is running just call your favorite http client against "http://localhost:1313/"
|
||||
on any of the running instances and lobby returns you list of all available servers and
|
||||
on any of the running instances and lobby returns a list of all available servers and
|
||||
their labels. You can hook it to Prometheus, deployment scripts, CI/CD automations or
|
||||
your internal system that sends emails and it needs to know where is the SMTP server for
|
||||
example.
|
||||
|
||||
Lobby doesn't care if you have a one or thousand instances of it running. Each instance
|
||||
is connected to a common point which is a [NATS server](https://nats.io/) in this case. NATS is super fast and reliable
|
||||
is connected to a common point which is a [NATS server](https://nats.io/) or Redis. NATS is super fast and reliable
|
||||
messaging system which handles the communication part but also the high availability part.
|
||||
NATS is easy to run and it offloads a huge part of the problem from lobby itself.
|
||||
NATS is easy to run and it offloads a huge part of the problem from lobby itself. But Redis
|
||||
is not a bad choice either in some cases.
|
||||
|
||||
The code is open to support multiple backends and it's not that hard to add a new one.
|
||||
Support for NATS is only less than 150 lines.
|
||||
|
||||
## Quickstart guide
|
||||
|
||||
The quickest way how to run lobbyd on your server is this:
|
||||
The quickest way how to run lobby on your server is this:
|
||||
|
||||
```shell
|
||||
wget -O /usr/local/bin/lobbyd https://....
|
||||
wget -O /usr/local/bin/lobbyd https://github.com/by-cx/lobby/releases/download/v1.2/lobbyd-1.2-linux-amd64
|
||||
chmod +x /usr/local/bin/lobbyd
|
||||
wget -O /usr/local/bin/lobbyctl https://....
|
||||
wget -O /usr/local/bin/lobbyctl https://github.com/by-cx/lobby/releases/download/v1.2/lobbyctl-1.2-linux-amd64
|
||||
chmod +x /usr/local/bin/lobbyctl
|
||||
|
||||
# Update NATS_URL and LABELS here
|
||||
|
@ -72,7 +79,7 @@ systemctl enable lobbyd
|
|||
```
|
||||
|
||||
If you run lobbyd in production, consider to create its own system user and group and add both into this
|
||||
service file. It doesn't need to access almost anything in your system.
|
||||
service file. It doesn't need to access everything in your system.
|
||||
|
||||
To test if local instance is running call this:
|
||||
|
||||
|
@ -82,24 +89,42 @@ To test if local instance is running call this:
|
|||
|
||||
There are other config directives you can use to fine-tune lobbyd to exactly what you need.
|
||||
|
||||
| Environment variable | Type | Default | Required | Note |
|
||||
| ----------------------- | ------ | ----------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| TOKEN | string | | no | Authentication token for API, if empty auth is disabled |
|
||||
| HOST | string | 127.0.0.1 | no | IP address used for the REST server to listen |
|
||||
| PORT | int | 1313 | no | Port related to the address above |
|
||||
| NATS_URL | string | | yes | NATS URL used to connect to the NATS server |
|
||||
| NATS_DISCOVERY_CHANNEL | string | lobby.discovery | no | Channel where the keep-alive packets are sent |
|
||||
| LABELS | string | | no | List of labels, labels should be separated by comma |
|
||||
| LABELS_PATH | string | /etc/lobby/labels | no | Path where filesystem based labels are located, one label per line, filename is not important for lobby |
|
||||
| RUNTIME_LABELS_FILENAME | string | _runtime | no | Filename for file created in LabelsPath where runtime labels will be added |
|
||||
| HOSTNAME | string | | no | Override local machine's hostname |
|
||||
| CLEAN_EVERY | int | 15 | no | How often to clean the list of discovered servers to get rid of the not alive ones [secs] |
|
||||
| KEEP_ALIVE | int | 5 | no | how often to send the keep-alive discovery message with all available information [secs] |
|
||||
| TTL | int | 30 | no | After how many secs is discovery record considered as invalid |
|
||||
| NODE_EXPORTER_PORT | int | 9100 | no | Default port where node_exporter listens on all registered servers, this is used when the special prometheus labels doesn't contain port |
|
||||
| REGISTER | bool | true | no | If true (default) then local instance is registered with other instance (discovery packet is sent regularly), if false the daemon runs only as a client |
|
||||
| Environment variable | Type | Default | Required | Note |
|
||||
| ------------------------ | ------ | ----------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| TOKEN | string | | no | Authentication token for API, if empty auth is disabled |
|
||||
| HOST | string | 127.0.0.1 | no | IP address used for the REST server to listen |
|
||||
| PORT | int | 1313 | no | Port related to the address above |
|
||||
| DISABLE_API | bool | false | no | If true API interface won't start |
|
||||
| DRIVER | string | NATS | yes | Selects which driver is used to exchange the discovery packets. |
|
||||
| NATS_URL | string | | yes (NATS driver) | NATS URL used to connect to the NATS server |
|
||||
| NATS_DISCOVERY_CHANNEL | string | lobby.discovery | no | Channel where the keep-alive packets are sent |
|
||||
| REDIS_HOST | string | 127.0.0.1" | no | Redis host |
|
||||
| REDIS_PORT | uint16 | 6379 | no | Redis port |
|
||||
| REDIS_DB | string | 0 | no | Redis DB |
|
||||
| REDIS_CHANNEL | string | lobby:discovery | no | Redis channel |
|
||||
| REDIS_PASSWORD | string | | no | Redis password |
|
||||
| LABELS | string | | no | List of labels, labels should be separated by comma |
|
||||
| LABELS_PATH | string | /etc/lobby/labels | no | Path where filesystem based labels are located, one label per line, filename is not important for lobby |
|
||||
| RUNTIME_LABELS_FILENAME | string | _runtime | no | Filename for file created in LabelsPath where runtime labels will be added |
|
||||
| HOSTNAME | string | | no | Override local machine's hostname |
|
||||
| CLEAN_EVERY | int | 15 | no | How often to clean the list of discovered servers to get rid of the not alive ones [secs] |
|
||||
| KEEP_ALIVE | int | 5 | no | how often to send the keep-alive discovery message with all available information [secs] |
|
||||
| TTL | int | 30 | no | After how many secs is discovery record considered as invalid |
|
||||
| NODE_EXPORTER_PORT | int | 9100 | no | Default port where node_exporter listens on all registered servers, this is used when the special prometheus labels doesn't contain port |
|
||||
| REGISTER | bool | true | no | If true (default) then local instance is registered with other instance (discovery packet is sent regularly), if false the daemon runs only as a client |
|
||||
| CALLBACK | string | | no | Path to a script that runs when the the discovery packet records are changed. Not running for first |
|
||||
| CALLBACK_COOLDOWN | int | 15 | no | Cooldown prevents the call back script to run sooner than configured amount of seconds after last run is finished. |
|
||||
| CALLBACK_FIRST_RUN_DELAY | int | 30 | no | Wait for this amount of seconds before callback is run for first time after fresh start of the daemon |
|
||||
|
||||
|
||||
### Callback script
|
||||
|
||||
When your application cannot support Lobbyd's API it can be configured via callback script that runs everytime something has changed in the network. Callback script is run every 15 seconds (configured by CALLBACK_COOLDOWN) but only when something has changed.
|
||||
|
||||
The script runs under the same user as lobbyd. When lobbyd starts first thirty seconds (CALLBACK_FIRST_RUN_DELAY) is ignored and then the script is run for first time. After these thirty seconds everything runs in loop based on the changes in the network.
|
||||
|
||||
All current discovery packets are passed to the callback script via standard input. It's basically the same input you get if you run `lobbyctl discoveries`.
|
||||
|
||||
### Service discovery for Prometheus
|
||||
|
||||
Lobbyd has an API endpoint that returns list of targets for [Prometheus's HTTP SD config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config). That
|
||||
|
@ -112,6 +137,8 @@ Let's check this:
|
|||
|
||||
If you set port to *-* lobby daemon omits port entirely from the output.
|
||||
|
||||
There can be multiple `host` labels but only one `port` label and all prometheus labels (last line) will be common for all hosts labels. If port is omitted then default 9100 is used or port can also be part of the host label.
|
||||
|
||||
When you open URL http://localhost:1313/v1/prometheus/nodeexporter it returns this:
|
||||
|
||||
```json
|
||||
|
@ -175,24 +202,42 @@ It uses Go client library also located in this repository.
|
|||
So far the REST API is super simple and it has only two endpoints:
|
||||
|
||||
```
|
||||
GET / # Same as /v1/discoveries
|
||||
GET /v1/discovery # Returns current local discovery packet
|
||||
GET /v1/discoveries # Returns list of all discovered servers and their labels.
|
||||
GET /v1/discoveries?labels=LABELS # output will be filtered based on one or multiple labels separated by comma
|
||||
GET /v1/prometheus/:name # Generates output for Prometheus's SD config, name is group of the monitoring services described above.
|
||||
POST /v1/labels # Add runtime labels that will persist over daemon restarts. Labels should be in the body of the request, one line per one label.
|
||||
DELETE /v1/labels # Delete runtime labels. One label per line. Can't affect the labels from environment variables or labels added from the LabelPath.
|
||||
GET / # Same as /v1/discoveries
|
||||
GET /v1/discovery # Returns current local discovery packet
|
||||
GET /v1/discoveries # Returns list of all discovered servers and their labels.
|
||||
GET /v1/discoveries?labels=LABELS&prefixes=PREFIXES # output will be filtered based on one or multiple labels separated by comma or it can search for given prefixes, only one of those will be used
|
||||
GET /v1/prometheus/:name # Generates output for Prometheus's SD config, name is group of the monitoring services described above.
|
||||
POST /v1/labels # Add runtime labels that will persist over daemon restarts. Labels should be in the body of the request, one line per one label.
|
||||
DELETE /v1/labels # Delete runtime labels. One label per line. Can't affect the labels from environment variables or labels added from the LabelPath.
|
||||
```
|
||||
|
||||
If there is an error the error message is returned as plain text.
|
||||
|
||||
## API clients
|
||||
|
||||
* Golang client is part of this repository.
|
||||
* There is also [Python client](https://github.com/by-cx/lobby-python) available.
|
||||
|
||||
## Notes
|
||||
|
||||
I wanted to use SQS or SNS as backend but when I checked the services I found out
|
||||
it wouldn't work. SNS would require open HTTP server whicn is hard to do in our
|
||||
infrastructure and I couldn't find a way how SQS could deliver every message
|
||||
to all instances of lobbyd.
|
||||
|
||||
Instead I decided to implement Redis because it's much easier to use for
|
||||
development and testing. But there is no reason why it couldn't work in production
|
||||
too.
|
||||
|
||||
|
||||
## TODO
|
||||
|
||||
* [X] Tests
|
||||
* [ ] Command hooks - script or list of scripts that are triggered when discovery status has changed
|
||||
* [X] Command hooks - script or list of scripts that are triggered when discovery status has changed
|
||||
* [ ] Support for multiple active backend drivers
|
||||
* [ ] SNS driver
|
||||
* [X] Redis driver
|
||||
* [X] Remove the 5 secs waiting when daemon is stopped
|
||||
* [X] API to allow add labels at runtime
|
||||
|
||||
* [ ] Check what happens when driver is disconnected
|
||||
|
||||
|
||||
|
|
|
@ -6,8 +6,8 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/by-cx/lobby/server"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/rosti-cz/server_lobby/server"
|
||||
)
|
||||
|
||||
// Encapsulation of Lobby's client code
|
||||
|
@ -114,14 +114,64 @@ func (l *LobbyClient) GetDiscoveries() ([]server.Discovery, error) {
|
|||
return discoveries, nil
|
||||
}
|
||||
|
||||
// Resolve returns list of hostnames that have given label
|
||||
func (l *LobbyClient) Resolve(label server.Label) ([]string, error) {
|
||||
l.init()
|
||||
|
||||
path := fmt.Sprintf("/v1/resolve?label=%s", label.String())
|
||||
method := "GET"
|
||||
|
||||
var hostnames []string
|
||||
|
||||
status, body, err := l.call(method, path, "")
|
||||
if err != nil {
|
||||
return hostnames, err
|
||||
}
|
||||
if status != 200 {
|
||||
return hostnames, fmt.Errorf("non-200 response: %s", body)
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(body), &hostnames)
|
||||
if err != nil {
|
||||
return hostnames, fmt.Errorf("response parsing error: %v", err)
|
||||
}
|
||||
|
||||
return hostnames, nil
|
||||
}
|
||||
|
||||
// Find discoveries by their labels
|
||||
func (l *LobbyClient) FindByLabels(labels server.Labels) (server.Discoveries, error) {
|
||||
func (l *LobbyClient) FindByLabels(labels server.Labels) ([]server.Discovery, error) {
|
||||
l.init()
|
||||
|
||||
path := fmt.Sprintf("/v1/discoveries?labels=%s", strings.Join(labels.StringSlice(), ","))
|
||||
method := "GET"
|
||||
|
||||
var discoveries server.Discoveries
|
||||
var discoveries []server.Discovery
|
||||
|
||||
status, body, err := l.call(method, path, "")
|
||||
if err != nil {
|
||||
return discoveries, err
|
||||
}
|
||||
if status != 200 {
|
||||
return discoveries, fmt.Errorf("non-200 response: %s", body)
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(body), &discoveries)
|
||||
if err != nil {
|
||||
return discoveries, fmt.Errorf("response parsing error: %v", err)
|
||||
}
|
||||
|
||||
return discoveries, nil
|
||||
}
|
||||
|
||||
// Find discoveries by label prefixes
|
||||
func (l *LobbyClient) FindByPrefixes(prefixes []string) ([]server.Discovery, error) {
|
||||
l.init()
|
||||
|
||||
path := fmt.Sprintf("/v1/discoveries?prefixes=%s", strings.Join(prefixes, ","))
|
||||
method := "GET"
|
||||
|
||||
var discoveries []server.Discovery
|
||||
|
||||
status, body, err := l.call(method, path, "")
|
||||
if err != nil {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package common
|
||||
|
||||
import "github.com/rosti-cz/server_lobby/server"
|
||||
import "github.com/by-cx/lobby/server"
|
||||
|
||||
// Listener is a function that returns received discovery
|
||||
type Listener func(server.Discovery)
|
||||
|
|
|
@ -1,14 +1,18 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/rosti-cz/server_lobby/server"
|
||||
"github.com/by-cx/lobby/server"
|
||||
"github.com/fatih/color"
|
||||
)
|
||||
|
||||
func printDiscovery(discovery server.Discovery) {
|
||||
fmt.Printf("Hostname:\n %s\n", discovery.Hostname)
|
||||
color.Yellow("Hostname:\n %s\n", discovery.Hostname)
|
||||
|
||||
if len(discovery.Labels) > 0 {
|
||||
fmt.Printf("Labels:\n")
|
||||
|
@ -18,6 +22,15 @@ func printDiscovery(discovery server.Discovery) {
|
|||
}
|
||||
}
|
||||
|
||||
func colorLabel(label server.Label) string {
|
||||
parts := strings.Split(label.String(), ":")
|
||||
if len(parts) == 1 {
|
||||
return color.GreenString(parts[0])
|
||||
}
|
||||
|
||||
return color.GreenString(parts[0]) + ":" + color.MagentaString((strings.Join(parts[1:], ":")))
|
||||
}
|
||||
|
||||
func printDiscoveries(discoveries []server.Discovery) {
|
||||
maxHostnameWidth := 0
|
||||
for _, discovery := range discoveries {
|
||||
|
@ -28,16 +41,29 @@ func printDiscoveries(discoveries []server.Discovery) {
|
|||
|
||||
for _, discovery := range discoveries {
|
||||
if len(discovery.Labels) == 0 {
|
||||
fmt.Println(discovery.Hostname)
|
||||
// fmt.Println(discovery.Hostname)
|
||||
color.Yellow(discovery.Hostname)
|
||||
} else {
|
||||
hostname := fmt.Sprintf("%"+strconv.Itoa(maxHostnameWidth)+"s", discovery.Hostname)
|
||||
fmt.Printf("%s %s\n", hostname, discovery.Labels[0].String())
|
||||
|
||||
fmt.Printf("%s %s\n", color.YellowString(hostname), colorLabel(discovery.Labels[0]))
|
||||
|
||||
if len(discovery.Labels) > 1 {
|
||||
for _, label := range discovery.Labels[1:] {
|
||||
fmt.Printf("%"+strconv.Itoa(maxHostnameWidth+4)+"s%s\n", " ", label)
|
||||
fmt.Printf("%"+strconv.Itoa(maxHostnameWidth+4)+"s%s\n", " ", colorLabel(label))
|
||||
}
|
||||
}
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
}
|
||||
|
||||
func printJSON(data interface{}) {
|
||||
body, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
fmt.Println("error occurred while formating the output into JSON:", err.Error())
|
||||
os.Exit(3)
|
||||
}
|
||||
|
||||
fmt.Println(string(body))
|
||||
}
|
||||
|
|
87
ctl/main.go
87
ctl/main.go
|
@ -6,18 +6,21 @@ import (
|
|||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/rosti-cz/server_lobby/client"
|
||||
"github.com/rosti-cz/server_lobby/server"
|
||||
"github.com/by-cx/lobby/client"
|
||||
"github.com/by-cx/lobby/server"
|
||||
)
|
||||
|
||||
func Usage() {
|
||||
flag.Usage()
|
||||
fmt.Println("")
|
||||
fmt.Println("Commands:")
|
||||
fmt.Println(" discovery returns discovery packet of the server where the client is connected to")
|
||||
fmt.Println(" discoveries returns list of all registered discovery packets")
|
||||
fmt.Println(" labels add LABEL [LABEL] ... adds new runtime labels")
|
||||
fmt.Println(" labels del LABEL [LABEL] ... deletes runtime labels")
|
||||
fmt.Println(" resolve label returns list of hostnames with given label")
|
||||
fmt.Println(" discovery returns discovery packet of the server where the client is connected to")
|
||||
fmt.Println(" discoveries returns list of all registered discovery packets")
|
||||
fmt.Println(" discoveries labels [LABEL] ... returns list of all registered discovery packets with given labels (OR)")
|
||||
fmt.Println(" discoveries search [LABEL] ... returns list of all registered discovery packets with given label prefixes (OR)")
|
||||
fmt.Println(" labels add LABEL [LABEL] ... adds new runtime labels")
|
||||
fmt.Println(" labels del LABEL [LABEL] ... deletes runtime labels")
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -28,6 +31,7 @@ func main() {
|
|||
host := flag.String("host", "", "Hostname or IP address of lobby daemon")
|
||||
port := flag.Uint("port", 0, "Port of lobby daemon")
|
||||
token := flag.String("token", "", "Token needed to communicate lobby daemon, if empty auth is disabled")
|
||||
jsonOutput := flag.Bool("json", false, "set output to JSON, error will be still in plain text")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
|
@ -65,18 +69,77 @@ func main() {
|
|||
}
|
||||
|
||||
switch flag.Args()[0] {
|
||||
case "discoveries":
|
||||
discoveries, err := client.GetDiscoveries()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
case "resolve":
|
||||
if len(flag.Args()) == 2 {
|
||||
hostnames, err := client.Resolve(server.Label(flag.Arg(1)))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if *jsonOutput {
|
||||
printJSON(hostnames)
|
||||
} else {
|
||||
for _, hostname := range hostnames {
|
||||
fmt.Println(hostname)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Usage()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
case "discoveries":
|
||||
var discoveries []server.Discovery
|
||||
var err error
|
||||
|
||||
if len(flag.Args()) > 2 {
|
||||
if flag.Arg(1) == "labels" {
|
||||
labels := []server.Label{}
|
||||
for _, label := range flag.Args()[2:] {
|
||||
labels = append(labels, server.Label(label))
|
||||
}
|
||||
|
||||
discoveries, err = client.FindByLabels(labels)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
} else if flag.Arg(1) == "search" {
|
||||
discoveries, err = client.FindByPrefixes(flag.Args()[2:])
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
} else {
|
||||
fmt.Println("ERROR: unknown usage of discoveries arguments")
|
||||
fmt.Println("")
|
||||
Usage()
|
||||
os.Exit(0)
|
||||
}
|
||||
} else {
|
||||
discoveries, err = client.GetDiscoveries()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
if *jsonOutput {
|
||||
printJSON(discoveries)
|
||||
} else {
|
||||
printDiscoveries(discoveries)
|
||||
}
|
||||
printDiscoveries(discoveries)
|
||||
case "discovery":
|
||||
discovery, err := client.GetDiscovery()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
printDiscovery(discovery)
|
||||
|
||||
if *jsonOutput {
|
||||
printJSON(discovery)
|
||||
} else {
|
||||
printDiscovery(discovery)
|
||||
}
|
||||
case "labels":
|
||||
if len(flag.Args()) < 3 {
|
||||
fmt.Println("ERROR: not enough arguments for labels command")
|
||||
|
|
|
@ -3,8 +3,8 @@ package main
|
|||
import (
|
||||
"log"
|
||||
|
||||
"github.com/by-cx/lobby/server"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
"github.com/rosti-cz/server_lobby/server"
|
||||
)
|
||||
|
||||
// Config keeps info about configuration of this daemon
|
||||
|
@ -12,8 +12,15 @@ type Config struct {
|
|||
Token string `envconfig:"TOKEN" required:"false"` // Authentication token, if empty auth is disabled
|
||||
Host string `envconfig:"HOST" required:"false" default:"127.0.0.1"` // IP address used for the REST server to listen
|
||||
Port uint16 `envconfig:"PORT" required:"false" default:"1313"` // Port related to the address above
|
||||
NATSURL string `envconfig:"NATS_URL" required:"true"` // NATS URL used to connect to the NATS server
|
||||
DisableAPI bool `envconfig:"DISABLE_API" required:"false" default:"false"` // If true API interface won't start
|
||||
Driver string `envconfig:"DRIVER" required:"false" default:"NATS"` // Select driver to use to communicate with the group of nodes. The possible values are NATS and Redis
|
||||
NATSURL string `envconfig:"NATS_URL" required:"false"` // NATS URL used to connect to the NATS server
|
||||
NATSDiscoveryChannel string `envconfig:"NATS_DISCOVERY_CHANNEL" required:"false" default:"lobby.discovery"` // Channel where the kepp alive packets are sent
|
||||
RedisHost string `envconfig:"REDIS_HOST" required:"false" default:"127.0.0.1"` // Redis host
|
||||
RedisPort uint16 `envconfig:"REDIS_PORT" required:"false" default:"6379"` // Redis port
|
||||
RedisDB uint `envconfig:"REDIS_DB" required:"false" default:"0"` // Redis DB
|
||||
RedisChannel string `envconfig:"REDIS_CHANNEL" required:"false" default:"lobby:discovery"` // Redis channel
|
||||
RedisPassword string `envconfig:"REDIS_PASSWORD" required:"false" default:""` // Redis password
|
||||
Labels server.Labels `envconfig:"LABELS" required:"false" default:""` // List of labels
|
||||
LabelsPath string `envconfig:"LABELS_PATH" required:"false" default:"/etc/lobby/labels"` // Path where filesystem based labels are located
|
||||
RuntimeLabelsFilename string `envconfig:"RUNTIME_LABELS_FILENAME" required:"false" default:"_runtime"` // Filename for file created in LabelsPath where runtime labels will be added
|
||||
|
@ -23,6 +30,9 @@ type Config struct {
|
|||
TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid
|
||||
NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers
|
||||
Register bool `envconfig:"REGISTER" required:"false" default:"true"` // If true (default) then local instance is registered with other instance (discovery packet is sent regularly)
|
||||
Callback string `envconfig:"CALLBACK" required:"false" default:""` // path to a script that runs when the is a change in the labels database
|
||||
CallbackCooldown uint `envconfig:"CALLBACK_COOLDOWN" required:"false" default:"15"` // cooldown that prevents to run the config change script too many times in row
|
||||
CallbackFirstRunDelay uint `envconfig:"CALLBACK_FIRST_RUN_DELAY" required:"false" default:"30"` // Wait for this amount of seconds before callback is run for first time after fresh start of the daemon
|
||||
}
|
||||
|
||||
// GetConfig return configuration created based on environment variables
|
||||
|
@ -34,5 +44,13 @@ func GetConfig() *Config {
|
|||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
if config.Driver != "Redis" && config.Driver != "NATS" {
|
||||
log.Fatal("ERROR: the only supported drivers are Redis and NATS (default)")
|
||||
}
|
||||
|
||||
if config.Driver == "NATS" && len(config.NATSURL) == 0 {
|
||||
log.Fatal("ERROR: NATS_URL cannot be empty when driver is set to NATS")
|
||||
}
|
||||
|
||||
return &config
|
||||
}
|
||||
|
|
|
@ -6,18 +6,21 @@ import (
|
|||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/by-cx/lobby/server"
|
||||
"github.com/labstack/echo"
|
||||
"github.com/rosti-cz/server_lobby/server"
|
||||
)
|
||||
|
||||
func listHandler(c echo.Context) error {
|
||||
labels := c.QueryParam("labels")
|
||||
prefixes := c.QueryParam("prefixes")
|
||||
|
||||
var discoveries []server.Discovery
|
||||
|
||||
if len(labels) > 0 {
|
||||
labelsFilterSlice := strings.Split(labels, ",")
|
||||
discoveries = discoveryStorage.Filter(labelsFilterSlice)
|
||||
} else if len(prefixes) > 0 {
|
||||
discoveries = discoveryStorage.FilterPrefix(strings.Split(prefixes, ","))
|
||||
} else {
|
||||
discoveries = discoveryStorage.GetAll()
|
||||
}
|
||||
|
@ -25,6 +28,20 @@ func listHandler(c echo.Context) error {
|
|||
return c.JSONPretty(200, discoveries, " ")
|
||||
}
|
||||
|
||||
// resolveHandler returns hostname(s) based on another label
|
||||
func resolveHandler(c echo.Context) error {
|
||||
label := c.QueryParam("label") // This is label we will use to filter discovery packets
|
||||
|
||||
output := []string{}
|
||||
|
||||
discoveries := discoveryStorage.Filter([]string{label})
|
||||
for _, discovery := range discoveries {
|
||||
output = append(output, discovery.Hostname)
|
||||
}
|
||||
|
||||
return c.JSONPretty(http.StatusOK, output, " ")
|
||||
}
|
||||
|
||||
func prometheusHandler(c echo.Context) error {
|
||||
name := c.Param("name")
|
||||
|
||||
|
@ -60,6 +77,9 @@ func addLabelsHandler(c echo.Context) error {
|
|||
return c.String(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
|
||||
// Update the other nodes with this new change
|
||||
sendDiscoveryPacket()
|
||||
|
||||
return c.String(http.StatusOK, "OK")
|
||||
}
|
||||
|
||||
|
@ -81,5 +101,8 @@ func deleteLabelsHandler(c echo.Context) error {
|
|||
return c.String(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
|
||||
// Update the other nodes with this new change
|
||||
sendDiscoveryPacket()
|
||||
|
||||
return c.String(http.StatusOK, "OK")
|
||||
}
|
||||
|
|
157
daemon/main.go
157
daemon/main.go
|
@ -9,20 +9,23 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/by-cx/lobby/common"
|
||||
"github.com/by-cx/lobby/nats_driver"
|
||||
"github.com/by-cx/lobby/redis_driver"
|
||||
"github.com/by-cx/lobby/server"
|
||||
"github.com/labstack/echo"
|
||||
"github.com/labstack/echo/middleware"
|
||||
"github.com/rosti-cz/server_lobby/common"
|
||||
"github.com/rosti-cz/server_lobby/nats_driver"
|
||||
"github.com/rosti-cz/server_lobby/server"
|
||||
)
|
||||
|
||||
var discoveryStorage server.Discoveries = server.Discoveries{}
|
||||
var driver common.Driver
|
||||
var localHost server.LocalHost
|
||||
var lastLocalHostname string
|
||||
|
||||
var config Config
|
||||
|
||||
var shuttingDown bool
|
||||
var sendDiscoveryPacketTrigger chan bool = make(chan bool)
|
||||
|
||||
func init() {
|
||||
// Load config from environment variables
|
||||
|
@ -32,7 +35,7 @@ func init() {
|
|||
discoveryStorage.LogChannel = make(chan string)
|
||||
discoveryStorage.TTL = config.TTL
|
||||
|
||||
// localhost initization
|
||||
// localhost initiation
|
||||
localHost = server.LocalHost{
|
||||
LabelsPath: config.LabelsPath,
|
||||
HostnameOverride: config.HostName,
|
||||
|
@ -41,12 +44,27 @@ func init() {
|
|||
}
|
||||
|
||||
// Setup driver
|
||||
driver = &nats_driver.Driver{
|
||||
NATSUrl: config.NATSURL,
|
||||
NATSDiscoveryChannel: config.NATSDiscoveryChannel,
|
||||
if config.Driver == "NATS" {
|
||||
driver = &nats_driver.Driver{
|
||||
NATSUrl: config.NATSURL,
|
||||
NATSDiscoveryChannel: config.NATSDiscoveryChannel,
|
||||
|
||||
LogChannel: discoveryStorage.LogChannel,
|
||||
LogChannel: discoveryStorage.LogChannel,
|
||||
}
|
||||
} else if config.Driver == "Redis" {
|
||||
driver = &redis_driver.Driver{
|
||||
Host: config.RedisHost,
|
||||
Port: uint(config.RedisPort),
|
||||
Password: config.RedisPassword,
|
||||
Channel: config.RedisChannel,
|
||||
DB: uint(config.RedisDB),
|
||||
|
||||
LogChannel: discoveryStorage.LogChannel,
|
||||
}
|
||||
} else {
|
||||
log.Fatalf("unsupported driver %s", config.Driver)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// cleanDiscoveryPool clears the local server map and keeps only the alive servers
|
||||
|
@ -72,21 +90,44 @@ func sendGoodbyePacket() {
|
|||
}
|
||||
}
|
||||
|
||||
// sendDisoveryPacket sends discovery packet regularly so the network know we exist
|
||||
// sendDiscoveryPacket sends a single discovery packet out
|
||||
func sendDiscoveryPacket() {
|
||||
sendDiscoveryPacketTrigger <- true
|
||||
}
|
||||
|
||||
// sendDisoveryPacket sends discovery packet to the driver which passes it to the
|
||||
// other nodes. By this it propagates any change that happens in the local discovery struct.
|
||||
// Every tune trigger is triggered it sends one message.
|
||||
func sendDiscoveryPacketTask(trigger chan bool) {
|
||||
for {
|
||||
discovery, err := localHost.GetIdentification()
|
||||
if err != nil {
|
||||
log.Printf("sending discovery identification error: %v\n", err)
|
||||
}
|
||||
// We are waiting for the trigger
|
||||
<-trigger
|
||||
|
||||
err = driver.SendDiscoveryPacket(discovery)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
|
||||
if !shuttingDown {
|
||||
// Get info about local machine and send to the exchange point
|
||||
discovery, err := localHost.GetIdentification()
|
||||
if err != nil {
|
||||
log.Printf("sending discovery identification error: %v\n", err)
|
||||
}
|
||||
|
||||
if shuttingDown {
|
||||
err = driver.SendDiscoveryPacket(discovery)
|
||||
if err != nil {
|
||||
log.Println(err.Error())
|
||||
}
|
||||
|
||||
// If local hostname changes, we will deregister it
|
||||
if discovery.Hostname != lastLocalHostname && lastLocalHostname != "" {
|
||||
log.Println("Hostname change detected, deregistering the old one")
|
||||
err = driver.SendGoodbyePacket(server.Discovery{
|
||||
Hostname: lastLocalHostname,
|
||||
})
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
|
||||
lastLocalHostname = discovery.Hostname
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -111,12 +152,38 @@ func main() {
|
|||
// Server discovering stuff
|
||||
// ------------------------
|
||||
|
||||
// Connect to the NATS service
|
||||
// Setup callback function to register and unregister discovery packets from other servers
|
||||
driver.RegisterSubscribeFunction(func(d server.Discovery) {
|
||||
// Check if the local version and the new version are somehowe changed
|
||||
localVersion := discoveryStorage.Get(d.Hostname)
|
||||
exists := discoveryStorage.Exist(d.Hostname)
|
||||
changed := server.Compare(localVersion, d)
|
||||
|
||||
discoveryStorage.Add(d)
|
||||
|
||||
if changed {
|
||||
// Print this only if the server is already registered
|
||||
if exists {
|
||||
log.Printf("%s has been updated", d.Hostname)
|
||||
}
|
||||
|
||||
go func() {
|
||||
err = discoveryChange(d)
|
||||
if err != nil {
|
||||
log.Printf("discovery changed error: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
})
|
||||
driver.RegisterUnsubscribeFunction(func(d server.Discovery) {
|
||||
discoveryStorage.Delete(d.Hostname)
|
||||
go func() {
|
||||
err = discoveryChange(d)
|
||||
if err != nil {
|
||||
log.Printf("discovery changed error: %v", err)
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
err = driver.Init()
|
||||
|
@ -127,13 +194,36 @@ func main() {
|
|||
go printDiscoveryLogs()
|
||||
go cleanDiscoveryPool()
|
||||
|
||||
if len(config.Callback) > 0 {
|
||||
go discoveryChangeLoop()
|
||||
go changeCatcherLoop()
|
||||
}
|
||||
|
||||
// When the daemon boots up we trigger discovery change event so the config can be setup via callback script if there is any
|
||||
err = discoveryChange(server.Discovery{})
|
||||
if err != nil {
|
||||
log.Printf("discovery changed error: %v", err)
|
||||
}
|
||||
// If config.Register is false this instance won't be registered with other nodes
|
||||
if config.Register {
|
||||
go sendDiscoveryPacket()
|
||||
// This is background process that sends the message
|
||||
go sendDiscoveryPacketTask(sendDiscoveryPacketTrigger)
|
||||
|
||||
// This triggers the process
|
||||
go func() {
|
||||
for {
|
||||
sendDiscoveryPacket()
|
||||
|
||||
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
|
||||
|
||||
if shuttingDown {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
log.Println("standalone mode, I won't register myself")
|
||||
}
|
||||
|
||||
// --------
|
||||
// REST API
|
||||
// --------
|
||||
|
@ -145,19 +235,20 @@ func main() {
|
|||
}
|
||||
e.Use(middleware.Logger())
|
||||
e.Use(middleware.Recover())
|
||||
|
||||
// Routes
|
||||
e.GET("/", listHandler)
|
||||
e.GET("/v1/discovery", getIdentificationHandler)
|
||||
e.GET("/v1/discoveries", listHandler)
|
||||
e.POST("/v1/labels", addLabelsHandler)
|
||||
e.DELETE("/v1/labels", deleteLabelsHandler)
|
||||
e.GET("/v1/prometheus/:name", prometheusHandler)
|
||||
if !config.DisableAPI {
|
||||
e.GET("/", listHandler)
|
||||
e.GET("/v1/resolve", resolveHandler)
|
||||
e.GET("/v1/discovery", getIdentificationHandler)
|
||||
e.GET("/v1/discoveries", listHandler)
|
||||
e.POST("/v1/labels", addLabelsHandler)
|
||||
e.DELETE("/v1/labels", deleteLabelsHandler)
|
||||
e.GET("/v1/prometheus/:name", prometheusHandler)
|
||||
}
|
||||
|
||||
// ------------------------------
|
||||
// Termination signals processing
|
||||
// ------------------------------
|
||||
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
|
@ -167,13 +258,13 @@ func main() {
|
|||
if config.Register {
|
||||
log.Printf("%s signal received, sending goodbye packet\n", sig.String())
|
||||
sendGoodbyePacket()
|
||||
time.Sleep(5 * time.Second) // we wait for a few seconds to let background jobs to finish their job
|
||||
time.Sleep(1 * time.Second) // we wait for a few seconds to let background jobs to finish their job
|
||||
} else {
|
||||
log.Printf("%s signal received", sig.String())
|
||||
}
|
||||
e.Shutdown(context.TODO())
|
||||
}(e, config)
|
||||
|
||||
// Start server
|
||||
e.Logger.Error(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
|
||||
// In most cases this will end expectedly so it doesn't make sense to use the echo's approach to treat this message as an error.
|
||||
e.Logger.Info(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/rosti-cz/server_lobby/server"
|
||||
"github.com/by-cx/lobby/server"
|
||||
)
|
||||
|
||||
// [
|
||||
|
@ -34,19 +34,19 @@ func preparePrometheusOutput(name string, discoveries []server.Discovery) Promet
|
|||
|
||||
for _, discovery := range discoveries {
|
||||
port := strconv.Itoa(int(config.NodeExporterPort))
|
||||
host := discovery.Hostname
|
||||
hosts := []string{}
|
||||
var add bool // add to the prometheus output when there is at least one prometheus related label
|
||||
|
||||
labels := map[string]string{}
|
||||
labels := map[string]string{} // These are prometheus labels, not Lobby's labels
|
||||
|
||||
for _, label := range discovery.FindLabels("prometheus:" + name + ":") {
|
||||
for _, label := range discovery.FindLabelsByPrefix("prometheus:" + name + ":") {
|
||||
trimmed := strings.TrimPrefix(label.String(), "prometheus:"+name+":")
|
||||
parts := strings.SplitN(trimmed, ":", 2)
|
||||
if len(parts) == 2 {
|
||||
if parts[0] == "port" {
|
||||
port = parts[1]
|
||||
} else if parts[0] == "host" {
|
||||
host = parts[1]
|
||||
hosts = append(hosts, parts[1])
|
||||
} else {
|
||||
labels[parts[0]] = parts[1]
|
||||
}
|
||||
|
@ -65,20 +65,23 @@ func preparePrometheusOutput(name string, discoveries []server.Discovery) Promet
|
|||
}
|
||||
|
||||
if add {
|
||||
// Omit port part if "-" is set
|
||||
target := host + ":" + port
|
||||
if port == "-" {
|
||||
target = host
|
||||
}
|
||||
targets := []string{}
|
||||
for _, host := range hosts {
|
||||
// Omit port part if "-" is set or port is part of the host
|
||||
target := host + ":" + port
|
||||
if strings.Contains(host, ":") || port == "-" {
|
||||
target = host
|
||||
}
|
||||
targets = append(targets, target)
|
||||
|
||||
}
|
||||
service := PrometheusService{
|
||||
Targets: []string{target},
|
||||
Targets: targets,
|
||||
Labels: labels,
|
||||
}
|
||||
|
||||
services = append(services, service)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return services
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/by-cx/lobby/server"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestPreparePrometheusOutput(t *testing.T) {
|
||||
discoveries := []server.Discovery{
|
||||
{
|
||||
Hostname: "test.server",
|
||||
Labels: server.Labels{
|
||||
// test1
|
||||
"prometheus:test1:label1:l1",
|
||||
"prometheus:test1:host:srv1:1234",
|
||||
"prometheus:test1:host:srv1:1235",
|
||||
"prometheus:test1:host:srv1",
|
||||
"prometheus:test1:label2:l2",
|
||||
// test2
|
||||
"prometheus:test2:host:srv2",
|
||||
"prometheus:test2:host:srv2:1235",
|
||||
"prometheus:test2:host:srv2:1236",
|
||||
"prometheus:test2:host:srv2:1237",
|
||||
"prometheus:test2:label1:l3",
|
||||
},
|
||||
},
|
||||
}
|
||||
services := preparePrometheusOutput("test1", discoveries)
|
||||
assert.Equal(t, 1, len(services))
|
||||
assert.Equal(t, 3, len(services[0].Targets))
|
||||
assert.Contains(t, services[0].Targets, "srv1:9100")
|
||||
assert.Contains(t, services[0].Targets, "srv1:1234")
|
||||
assert.Contains(t, services[0].Targets, "srv1:1235")
|
||||
assert.Contains(t, services[0].Labels["label1"], "l1")
|
||||
assert.Contains(t, services[0].Labels["label2"], "l2")
|
||||
|
||||
services = preparePrometheusOutput("test2", discoveries)
|
||||
assert.Equal(t, 1, len(services))
|
||||
assert.Equal(t, 4, len(services[0].Targets))
|
||||
assert.Contains(t, services[0].Targets, "srv2:9100")
|
||||
assert.Contains(t, services[0].Targets, "srv2:1235")
|
||||
assert.Contains(t, services[0].Targets, "srv2:1236")
|
||||
assert.Contains(t, services[0].Targets, "srv2:1237")
|
||||
assert.Contains(t, services[0].Labels["label1"], "l3")
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/by-cx/lobby/server"
|
||||
)
|
||||
|
||||
// These functions are called when something has changed in the storage
|
||||
|
||||
var changeDetectedChannel chan bool = make(chan bool)
|
||||
var changeDetected bool
|
||||
|
||||
// changeCatcherLoop waits for a change signal and switches variable that says if the callback should run or not
|
||||
func changeCatcherLoop() {
|
||||
for {
|
||||
<-changeDetectedChannel
|
||||
changeDetected = true
|
||||
}
|
||||
}
|
||||
|
||||
// discoveryChangeLoop is used to process dynamic configuration changes.
|
||||
// When there is a change detected a shell script or given process is triggered
|
||||
// which does some operations with the new data. Usually it generates the
|
||||
// configuration.
|
||||
// This function has internal loop and won't allow to run the command more
|
||||
// often than it's the configured amount of time. That prevents
|
||||
func discoveryChangeLoop() {
|
||||
// This other loop tics in strict intervals and prevents the callback script to run more often than it's configured
|
||||
var cmd *exec.Cmd
|
||||
|
||||
// Delay first run of the callback script a little so everything can set up
|
||||
log.Printf("Delaying start of discovery change loop (%d seconds)\n", config.CallbackFirstRunDelay)
|
||||
time.Sleep(time.Duration(config.CallbackFirstRunDelay) * time.Second)
|
||||
log.Println("Starting discovery change loop")
|
||||
|
||||
for {
|
||||
if changeDetected {
|
||||
// We switch this at the beginning so we can detect new changes while the callback script is running
|
||||
changeDetected = false
|
||||
|
||||
log.Println("Running callback function")
|
||||
|
||||
// TODO: this is not the best way
|
||||
callbackCommandSlice := strings.Split(config.Callback, " ")
|
||||
|
||||
if len(callbackCommandSlice) == 1 {
|
||||
cmd = exec.Command(callbackCommandSlice[0])
|
||||
} else if len(callbackCommandSlice) > 1 {
|
||||
cmd = exec.Command(callbackCommandSlice[0], callbackCommandSlice[1:]...)
|
||||
} else {
|
||||
log.Println("wrong number of parts of the callback command")
|
||||
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
log.Println("stdin writing error: ", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
discoveriesJSON, err := json.Marshal(discoveryStorage.GetAll())
|
||||
if err != nil {
|
||||
log.Println("stdin writing error: ", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = stdin.Write([]byte(discoveriesJSON))
|
||||
if err != nil {
|
||||
log.Println("stdin writing error: ", err.Error())
|
||||
continue
|
||||
}
|
||||
err = stdin.Close()
|
||||
if err != nil {
|
||||
log.Println("stdin writing error: ", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
stdout, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
log.Println("Callback error: ", err.Error())
|
||||
}
|
||||
log.Println("Callback output: ", string(stdout))
|
||||
}
|
||||
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// discoveryChange is called when daemon detects that a newly arrived discovery
|
||||
// packet is somehow different than the localone. This can be used to trigger
|
||||
// some action in the local machine.
|
||||
func discoveryChange(discovery server.Discovery) error {
|
||||
if len(config.Callback) > 0 {
|
||||
changeDetectedChannel <- true
|
||||
}
|
||||
return nil
|
||||
}
|
8
go.mod
8
go.mod
|
@ -1,18 +1,22 @@
|
|||
module github.com/rosti-cz/server_lobby
|
||||
module github.com/by-cx/lobby
|
||||
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
|
||||
github.com/fatih/color v1.12.0
|
||||
github.com/go-redis/redis v6.15.9+incompatible
|
||||
github.com/go-resty/resty/v2 v2.6.0
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/google/go-cmp v0.5.5
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/labstack/echo v3.3.10+incompatible
|
||||
github.com/labstack/gommon v0.3.0 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.4.0 // indirect
|
||||
github.com/nats-io/nats.go v1.12.0
|
||||
github.com/onsi/gomega v1.16.0 // indirect
|
||||
github.com/shirou/gopsutil/v3 v3.21.7
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/valyala/fasttemplate v1.2.1 // indirect
|
||||
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
)
|
||||
|
|
73
go.sum
73
go.sum
|
@ -1,13 +1,23 @@
|
|||
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
|
||||
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
|
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
|
||||
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
|
||||
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
|
||||
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
|
||||
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
|
||||
github.com/go-resty/resty/v2 v2.6.0 h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKYS4=
|
||||
github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q=
|
||||
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
|
@ -21,7 +31,9 @@ github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
|
|||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
|
||||
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
|
||||
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
|
||||
|
@ -30,11 +42,13 @@ github.com/labstack/echo v3.3.10+incompatible h1:pGRcYk231ExFAyoAjAfD85kQzRJCRI8
|
|||
github.com/labstack/echo v3.3.10+incompatible/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s=
|
||||
github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0=
|
||||
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
|
||||
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
|
||||
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
|
||||
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
|
||||
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
|
||||
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||
github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg=
|
||||
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
|
||||
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
|
||||
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
|
||||
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
|
||||
|
@ -50,12 +64,24 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
|||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
|
||||
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
|
||||
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
|
||||
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
|
||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
|
||||
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
|
||||
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/shirou/gopsutil/v3 v3.21.7 h1:PnTqQamUjwEDSgn+nBGu0qSDV/CfvyiR/gwTH3i7HTU=
|
||||
github.com/shirou/gopsutil/v3 v3.21.7/go.mod h1:RGl11Y7XMTQPmHh8F0ayC6haKNBgH4PXMJuTAcMOlz4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/tklauser/go-sysconf v0.3.7 h1:HT7h4+536gjqeq1ZIJPgOl1rg1XFatQGVZWp7Py53eg=
|
||||
|
@ -67,34 +93,64 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
|
|||
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
|
||||
github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4=
|
||||
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
|
||||
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
|
||||
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
|
@ -107,6 +163,13 @@ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+Rur
|
|||
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
|
@ -3,12 +3,16 @@ package nats_driver
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/by-cx/lobby/common"
|
||||
"github.com/by-cx/lobby/server"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/rosti-cz/server_lobby/common"
|
||||
"github.com/rosti-cz/server_lobby/server"
|
||||
)
|
||||
|
||||
// NATS drivers is used to send discovery packet to other nodes into the group via NATS messenging protocol.
|
||||
type Driver struct {
|
||||
NATSUrl string
|
||||
NATSDiscoveryChannel string
|
||||
|
@ -51,32 +55,41 @@ func (d *Driver) Init() error {
|
|||
return fmt.Errorf("please initiate LogChannel variable")
|
||||
}
|
||||
|
||||
nc, err := nats.Connect(d.NATSUrl)
|
||||
if err != nil {
|
||||
return err
|
||||
for {
|
||||
nc, err := nats.Connect(d.NATSUrl)
|
||||
if err != nil {
|
||||
log.Printf("Can't connect to the NATS server, waiting for 5 seconds before I try it again. (%v)\n", err)
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
}
|
||||
d.nc = nc
|
||||
break
|
||||
}
|
||||
d.nc = nc
|
||||
|
||||
_, err = nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
|
||||
_, err := d.nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("subscribe error: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close is called when all is done.
|
||||
func (d *Driver) Close() error {
|
||||
return d.nc.Drain()
|
||||
}
|
||||
|
||||
// RegisterSubscribeFunction sets the function that will process the incoming messages
|
||||
func (d *Driver) RegisterSubscribeFunction(listener common.Listener) {
|
||||
d.subscribeListener = listener
|
||||
}
|
||||
|
||||
// RegisterUnsubscribeFunction sets the function that will process the goodbye incoming messages
|
||||
func (d *Driver) RegisterUnsubscribeFunction(listener common.Listener) {
|
||||
d.unsubscribeListener = listener
|
||||
}
|
||||
|
||||
// SendDiscoveryPacket send discovery packet to the group.
|
||||
func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
|
||||
envelope := discoveryEnvelope{
|
||||
Discovery: discovery,
|
||||
|
@ -88,12 +101,20 @@ func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
|
|||
return fmt.Errorf("sending discovery formating message error: %v", err)
|
||||
}
|
||||
err = d.nc.Publish(d.NATSDiscoveryChannel, data)
|
||||
if err != nil {
|
||||
// In case the connection is down we will try to reconnect
|
||||
if err != nil && strings.Contains(err.Error(), "connection closed") {
|
||||
d.nc.Close()
|
||||
err = d.Init()
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending discovery reconnect error: %v", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("sending discovery error: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendGoodbyePacket deregister node from the group. It tells everybody that it's going to die.
|
||||
func (d *Driver) SendGoodbyePacket(discovery server.Discovery) error {
|
||||
envelope := discoveryEnvelope{
|
||||
Discovery: discovery,
|
||||
|
|
|
@ -3,7 +3,7 @@ package nats_driver
|
|||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/rosti-cz/server_lobby/server"
|
||||
"github.com/by-cx/lobby/server"
|
||||
)
|
||||
|
||||
// discoveryEnvelope adds a message to the standard discovery format. The message
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
package redis_driver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/by-cx/lobby/common"
|
||||
"github.com/by-cx/lobby/server"
|
||||
"github.com/go-redis/redis"
|
||||
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Redis drivers is used to send discovery packet to other nodes into the group via Redis's pubsub protocol.
|
||||
type Driver struct {
|
||||
Host string
|
||||
Port uint
|
||||
Password string
|
||||
Channel string
|
||||
DB uint
|
||||
|
||||
LogChannel chan string
|
||||
|
||||
subscribeListener common.Listener
|
||||
unsubscribeListener common.Listener
|
||||
|
||||
redis *redis.Client
|
||||
}
|
||||
|
||||
// handler is called asynchronously so and because it cannot log directly to stderr there
|
||||
// is channel called LogChannel that can be used to log error messages from this
|
||||
func (d *Driver) handler(payload string) {
|
||||
message := discoveryEnvelope{}
|
||||
err := json.Unmarshal([]byte(payload), &message)
|
||||
if err != nil && d.LogChannel != nil {
|
||||
d.LogChannel <- fmt.Errorf("decoding message error: %v", err).Error()
|
||||
}
|
||||
|
||||
err = message.Discovery.Validate()
|
||||
if err != nil && d.LogChannel != nil {
|
||||
d.LogChannel <- fmt.Errorf("validation error: %v", err).Error()
|
||||
}
|
||||
|
||||
if message.Message == "hi" {
|
||||
d.subscribeListener(message.Discovery)
|
||||
} else if message.Message == "goodbye" {
|
||||
d.unsubscribeListener(message.Discovery)
|
||||
} else {
|
||||
if d.LogChannel != nil {
|
||||
d.LogChannel <- "incompatible message"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Init connect to Redis, subscribes to the channel and waits for the messages.
|
||||
// It runs goroutine in background that listens for new messages.
|
||||
func (d *Driver) Init() error {
|
||||
if d.LogChannel == nil {
|
||||
return fmt.Errorf("please initiate LogChannel variable")
|
||||
}
|
||||
|
||||
if len(d.Host) == 0 {
|
||||
return fmt.Errorf("parameter Host cannot be empty")
|
||||
}
|
||||
|
||||
if len(d.Channel) == 0 {
|
||||
return fmt.Errorf("pattern cannot be empty")
|
||||
}
|
||||
|
||||
if d.Port <= 0 || d.Port > 65536 {
|
||||
return fmt.Errorf("port has to be in range of 0-65536")
|
||||
}
|
||||
|
||||
d.redis = redis.NewClient(&redis.Options{
|
||||
Addr: fmt.Sprintf("%s:%d", d.Host, d.Port),
|
||||
Password: d.Password,
|
||||
DB: int(d.DB),
|
||||
})
|
||||
|
||||
pubsub := d.redis.Subscribe(d.Channel)
|
||||
|
||||
go func(pubsub *redis.PubSub, d *Driver) {
|
||||
channel := pubsub.Channel()
|
||||
|
||||
for message := range channel {
|
||||
d.handler(message.Payload)
|
||||
}
|
||||
}(pubsub, d)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close is called when all is done.
|
||||
func (d *Driver) Close() error {
|
||||
return d.redis.Close()
|
||||
}
|
||||
|
||||
// RegisterSubscribeFunction sets the function that will process the incoming messages
|
||||
func (d *Driver) RegisterSubscribeFunction(listener common.Listener) {
|
||||
d.subscribeListener = listener
|
||||
}
|
||||
|
||||
// RegisterUnsubscribeFunction sets the function that will process the goodbye incoming messages
|
||||
func (d *Driver) RegisterUnsubscribeFunction(listener common.Listener) {
|
||||
d.unsubscribeListener = listener
|
||||
}
|
||||
|
||||
// SendDiscoveryPacket send discovery packet to the group.
|
||||
func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
|
||||
envelope := discoveryEnvelope{
|
||||
Discovery: discovery,
|
||||
Message: "hi",
|
||||
}
|
||||
|
||||
data, err := envelope.Bytes()
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending discovery formating message error: %v", err)
|
||||
}
|
||||
|
||||
err = d.redis.Publish(d.Channel, data).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending discovery error: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendGoodbyePacket deregister node from the group. It tells everybody that it's going to die.
|
||||
func (d *Driver) SendGoodbyePacket(discovery server.Discovery) error {
|
||||
envelope := discoveryEnvelope{
|
||||
Discovery: discovery,
|
||||
Message: "goodbye",
|
||||
}
|
||||
|
||||
data, err := envelope.Bytes()
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending discovery formating message error: %v", err)
|
||||
}
|
||||
err = d.redis.Publish(d.Channel, data).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending discovery error: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package redis_driver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/by-cx/lobby/server"
|
||||
)
|
||||
|
||||
// discoveryEnvelope adds a message to the standard discovery format. The message
|
||||
// can be "hi" or "goodbye" where "hi" is used when the node is sending keep alive
|
||||
// packets and "goodbye" means the node is leaving.
|
||||
type discoveryEnvelope struct {
|
||||
Discovery server.Discovery `json:"discovery"`
|
||||
Message string `json:"message"` // can be hi or goodbye
|
||||
}
|
||||
|
||||
func (e *discoveryEnvelope) Bytes() ([]byte, error) {
|
||||
body, err := json.Marshal(e)
|
||||
return body, err
|
||||
}
|
|
@ -3,6 +3,7 @@ package server
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
@ -44,9 +45,9 @@ func (d *Discovery) Bytes() ([]byte, error) {
|
|||
return data, err
|
||||
}
|
||||
|
||||
// FindLabels returns list of labels with given prefix. For example "service:ns" has prefix "service" or "service:".
|
||||
// FindLabelsByPrefix returns list of labels with given prefix. For example "service:ns" has prefix "service" or "service:".
|
||||
// It doesn't have to be prefix, but for example "service:test" will match "service:test" and also "service:test2".
|
||||
func (d *Discovery) FindLabels(prefix string) Labels {
|
||||
func (d *Discovery) FindLabelsByPrefix(prefix string) Labels {
|
||||
labels := Labels{}
|
||||
for _, label := range d.Labels {
|
||||
if strings.HasPrefix(label.String(), prefix) {
|
||||
|
@ -56,6 +57,18 @@ func (d *Discovery) FindLabels(prefix string) Labels {
|
|||
return labels
|
||||
}
|
||||
|
||||
func (d *Discovery) SortLabels() {
|
||||
labelStrings := d.Labels.StringSlice()
|
||||
sort.Strings(labelStrings)
|
||||
|
||||
labels := Labels{}
|
||||
for _, label := range labelStrings {
|
||||
labels = append(labels, Label(label))
|
||||
}
|
||||
|
||||
d.Labels = labels
|
||||
}
|
||||
|
||||
// -----------------
|
||||
// Discovery storage
|
||||
// -----------------
|
||||
|
@ -148,25 +161,67 @@ func (d *Discoveries) GetAll() []Discovery {
|
|||
return d.activeServers
|
||||
}
|
||||
|
||||
// Filter returns list of discoveries based on given labels
|
||||
func (d *Discoveries) Filter(labelsFilter []string) []Discovery {
|
||||
newSet := []Discovery{}
|
||||
|
||||
var found bool
|
||||
if len(labelsFilter) > 0 {
|
||||
for _, discovery := range d.activeServers {
|
||||
newDiscovery := discovery
|
||||
newDiscovery.Labels = Labels{}
|
||||
|
||||
found = false
|
||||
for _, label := range discovery.Labels {
|
||||
for _, labelFilter := range labelsFilter {
|
||||
if label.String() == labelFilter {
|
||||
newSet = append(newSet, discovery)
|
||||
found = true
|
||||
newDiscovery.Labels = append(newDiscovery.Labels, label)
|
||||
break
|
||||
}
|
||||
}
|
||||
if found {
|
||||
break
|
||||
}
|
||||
|
||||
if found {
|
||||
newSet = append(newSet, newDiscovery)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return newSet
|
||||
}
|
||||
|
||||
// Filter returns list of discoveries based on given label prefixes.
|
||||
func (d *Discoveries) FilterPrefix(prefixes []string) []Discovery {
|
||||
newSet := []Discovery{}
|
||||
|
||||
var found bool
|
||||
if len(prefixes) > 0 {
|
||||
for _, discovery := range d.activeServers {
|
||||
newDiscovery := discovery
|
||||
newDiscovery.Labels = Labels{}
|
||||
|
||||
found = false
|
||||
|
||||
if found {
|
||||
newSet = append(newSet, newDiscovery)
|
||||
}
|
||||
|
||||
for _, label := range discovery.Labels {
|
||||
for _, prefix := range prefixes {
|
||||
if strings.HasPrefix(label.String(), prefix) {
|
||||
found = true
|
||||
newDiscovery.Labels = append(newDiscovery.Labels, label)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
newSet = append(newSet, newDiscovery)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestDiscovery(t *testing.T) {
|
|||
assert.False(t, discovery.IsAlive(), "discovery not suppose to be alive")
|
||||
discovery.LastCheck = now
|
||||
|
||||
assert.Equal(t, Labels{Label("service:test")}, discovery.FindLabels("service"))
|
||||
assert.Equal(t, Labels{Label("service:test")}, discovery.FindLabelsByPrefix("service"))
|
||||
assert.Equal(t, nil, discovery.Validate()) // TODO: This needs more love
|
||||
|
||||
content, err := json.Marshal(&discovery)
|
||||
|
|
|
@ -138,6 +138,7 @@ func (l *LocalHost) GetIdentification() (Discovery, error) {
|
|||
}
|
||||
|
||||
discovery.Labels = append(l.InitialLabels, localLabels...)
|
||||
discovery.SortLabels()
|
||||
|
||||
return discovery, nil
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ func TestGetIdentification(t *testing.T) {
|
|||
discovery, err = localHost.GetIdentification()
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Equal(t, Label("public_ip:1.2.3.4"), discovery.Labels[3])
|
||||
assert.Equal(t, Label("test:1"), discovery.Labels[3])
|
||||
|
||||
os.RemoveAll(tmpPath)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package server
|
||||
|
||||
import "github.com/google/go-cmp/cmp"
|
||||
|
||||
// Compare compares discovery A and B and returns true if those two are different.
|
||||
func Compare(discoveryA, discoveryB Discovery) bool {
|
||||
discoveryA.LastCheck = 0
|
||||
discoveryB.LastCheck = 0
|
||||
discoveryA.TTL = 0
|
||||
discoveryB.TTL = 0
|
||||
return !cmp.Equal(discoveryA, discoveryB)
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCompare(t *testing.T) {
|
||||
discoveryA := Discovery{
|
||||
Hostname: "abcd.com",
|
||||
Labels: Labels{
|
||||
Label("label1"),
|
||||
},
|
||||
LastCheck: 52,
|
||||
}
|
||||
discoveryAB := Discovery{
|
||||
Hostname: "abcd.com",
|
||||
Labels: Labels{
|
||||
Label("label1"),
|
||||
},
|
||||
LastCheck: 56,
|
||||
}
|
||||
discoveryB := Discovery{
|
||||
Hostname: "efgh.com",
|
||||
Labels: Labels{
|
||||
Label("label2"),
|
||||
},
|
||||
LastCheck: 56,
|
||||
}
|
||||
discoveryC := Discovery{
|
||||
Hostname: "abcd.com",
|
||||
Labels: Labels{
|
||||
Label("label2"),
|
||||
},
|
||||
LastCheck: 60,
|
||||
}
|
||||
|
||||
assert.True(t, Compare(discoveryA, discoveryB))
|
||||
assert.True(t, Compare(discoveryB, discoveryC))
|
||||
assert.True(t, Compare(discoveryA, discoveryC)) // Test different labels and same hostname
|
||||
assert.False(t, Compare(discoveryA, discoveryA))
|
||||
assert.False(t, Compare(discoveryB, discoveryB))
|
||||
assert.False(t, Compare(discoveryA, discoveryAB)) // Test that last check is zeroed
|
||||
}
|
|
@ -1,5 +1,7 @@
|
|||
package server
|
||||
|
||||
import "strings"
|
||||
|
||||
// Label keeps one piece of information about a single server
|
||||
type Label string
|
||||
|
||||
|
@ -7,6 +9,34 @@ func (l Label) String() string {
|
|||
return string(l)
|
||||
}
|
||||
|
||||
// GetPart returns specific part of the label, if part index is higher than last available index it returns empty string.
|
||||
func (l Label) GetPart(idx int) string {
|
||||
parts := strings.Split(l.String(), ":")
|
||||
|
||||
if idx < 0 {
|
||||
return ""
|
||||
}
|
||||
if len(parts) >= idx {
|
||||
return ""
|
||||
}
|
||||
|
||||
return parts[idx]
|
||||
}
|
||||
|
||||
// GetPart1 is exactly same as GetPart but it splits the label only once, this is good for IPv6 addresses
|
||||
func (l Label) GetPart1(idx int) string {
|
||||
parts := strings.SplitN(l.String(), ":", 2)
|
||||
|
||||
if idx < 0 {
|
||||
return ""
|
||||
}
|
||||
if len(parts) >= idx {
|
||||
return ""
|
||||
}
|
||||
|
||||
return parts[idx]
|
||||
}
|
||||
|
||||
// Labels stores multiple Label records
|
||||
type Labels []Label
|
||||
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Templater is used to generate config files from predefined
|
||||
// templates based on content of gathered discovery packets.
|
||||
// It can for example configure Nginx's backend or database
|
||||
// replication.
|
||||
//
|
||||
// It reads templates from /var/lib/lobby/templates (default)
|
||||
// which are YAML files cotaining the template itself and command(s)
|
||||
// that needs to be run when the template changes.
|
||||
|
||||
const defaultTemplatesPath = "/var/lib/lobby/templates"
|
||||
|
||||
var templatesPath *string
|
||||
|
||||
func init() {
|
||||
templatesPath = flag.String("templates-path", defaultTemplatesPath, "path of where templates are stored")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
err := os.MkdirAll(*templatesPath, 0750)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
flag.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func main() {
|
||||
fmt.Println(*templatesPath)
|
||||
}
|
Loading…
Reference in New Issue