Compare commits

...

44 Commits

Author SHA1 Message Date
c2ed658aaa
Multiple prometheus services support
All checks were successful
continuous-integration/drone/push Build is passing
* Prometheus export test
* Version bump to 1.5
2022-02-21 10:20:06 +01:00
038741b87e
Version bump to 1.4
All checks were successful
continuous-integration/drone/push Build is passing
2021-12-31 09:08:22 +01:00
157e795792
Invalid NATS connection fix when subscribing to a subject
All checks were successful
continuous-integration/drone/push Build is passing
2021-12-31 08:09:44 +01:00
55b3376d4c
Reconnect instead of exit when NATS is not available
All checks were successful
continuous-integration/drone/push Build is passing
2021-12-25 10:38:13 +01:00
0e836b68a8
NATS driver: try to reconnect if the connection is down
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone Build is passing
2021-09-27 00:10:16 +02:00
322ef2a258
Version bump to 1.3 2021-09-20 16:41:14 +02:00
6a1ecb80a1
Resolver
All checks were successful
continuous-integration/drone/push Build is passing
Resolve API endpoint and cli command return list of hostnames
base on given label.
2021-09-20 16:10:52 +02:00
3a7662ef0d
README update
All checks were successful
continuous-integration/drone/push Build is passing
2021-09-18 19:43:13 +02:00
4c0ca7ed6a
Callback script docs
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2021-09-18 19:41:26 +02:00
a4a97ec49d
Version bump to 1.2
All checks were successful
continuous-integration/drone/push Build is passing
2021-09-18 19:34:18 +02:00
66683c9496
Update drone.yml file format
All checks were successful
continuous-integration/drone/push Build is passing
2021-09-17 19:06:37 +02:00
24923a305b
Renaming drone/woodpecker file for compatibility reasons
All checks were successful
continuous-integration/drone/push Build is passing
2021-09-17 08:19:41 +02:00
ab8068e26d
Testing pipeline
All checks were successful
continuous-integration/drone Build is passing
2021-09-17 01:39:48 +02:00
202a60f091
Initial woodpeck pipeline 2021-09-17 01:36:43 +02:00
a9d7abbdd5
Don't run callback stuff when callback is not configured 2021-09-17 01:36:20 +02:00
709c47af3e
Callback script
Callback is run when there some discovery packet changes.
It can be an update, adding a new one or deleting the old one.
This can be used to perform dynamic configuration of services that
don't support lobby's API.
2021-09-15 16:58:09 +02:00
e06a5cc94b
Implementation of change detection
If there is an update in discovery an function is triggered
that can pick it up.
2021-09-11 11:58:27 +02:00
6769b903bc
Mini refactoring 2021-09-09 18:35:32 +02:00
8493c7cb40
Send discovery packet every time a new label is added 2021-09-09 18:25:22 +02:00
a0abda5196
README update 2021-09-07 14:12:29 +02:00
58495d59c1
Labels sorting by default and enhancements
Faster building,
disable API fix.
2021-09-07 01:18:17 +02:00
2f4203e7c3
README update 2021-09-07 00:50:13 +02:00
7010316f8f
Redis driver, changing name of the package 2021-09-07 00:46:48 +02:00
f7340728be
Dropping 5s waiting to finish everything to 1s 2021-09-06 18:34:38 +02:00
f2c3ef49e9
README update 2021-09-06 12:33:08 +02:00
128ed765e7
Daemon: don't return not matched labels when searching 2021-09-05 22:00:08 +02:00
f60f92d667
Makefile update 2021-09-05 21:44:11 +02:00
65c6aef5dc
CTL: color output 2021-09-05 21:43:19 +02:00
753aaf8377
CTL: JSON output 2021-09-05 21:35:10 +02:00
c6cb674053
Search by labels and prefixes
Implemented in API and ctl.
2021-09-05 17:30:21 +02:00
b120970054
Makefile and README updates
Update description text and fix installation instructions.

Makefile can build binaries for multiple archs now.
2021-09-05 17:10:47 +02:00
b19002ec84
README update 2021-09-05 14:38:32 +02:00
8ef975dfd3
README update, testing in Makefile 2021-09-05 14:36:11 +02:00
ced732b7e6
README update 2021-09-05 12:36:02 +02:00
5b7459afb5
Go client for the API, lobbyctl tool 2021-09-05 01:27:39 +02:00
0276465876
README update 2021-09-04 22:18:50 +02:00
471f8bb170
Refactoring of identification code, runtime labels 2021-09-04 22:17:56 +02:00
ff7a26e0d4
Removing a comment 2021-09-04 14:59:16 +02:00
4c5cb1bd85
Refactoring
Discovery creation move into server package
Tests for server package
2021-09-04 14:16:14 +02:00
07a70b8285
Small label processing enhancement
No dulicated labels coming from envvars. Possibility to omit port
in the prometheus output.
2021-09-04 00:09:03 +02:00
5461af2902
Prometheus output update, documentation in README 2021-09-02 23:05:57 +02:00
c982038c53
NATS driver separated into its own package
* Client mode
2021-09-02 19:22:39 +02:00
85ac19d7fe
README update 2021-09-02 02:13:35 +02:00
68d4fc50a6
Goodbye messages, proper exit handling 2021-09-02 02:13:12 +02:00
33 changed files with 2266 additions and 220 deletions

10
.drone.yml Normal file
View File

@ -0,0 +1,10 @@
kind: pipeline
type: docker
name: testing
steps:
- name: test
image: golang
commands:
- go mod tidy
- make test

4
.gitignore vendored
View File

@ -1,5 +1,9 @@
.history/
# Files with secrets
*secret*
tmp/
# Binaries
lobby_*
bin/

11
LICENCE Normal file
View File

@ -0,0 +1,11 @@
Copyright 2021 Adam Štrauch <cx@initd.cz>
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

50
Makefile Normal file
View File

@ -0,0 +1,50 @@
VERSION=1.5
.PHONY: all
all: build
.PHONY: clean
clean:
rm -rf bin
.PHONY:test
test:
go test -v server/*.go
init:
mkdir -p ./bin
.PHONY: build
build: test clean init linux-amd64 linux-arm linux-arm64 darwin-amd64 darwin-arm64
.PHONY: linux-amd64
linux-amd64: clean init
# linux amd64
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/lobbyd-${VERSION}-linux-amd64 daemon/*.go
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/lobbyctl-${VERSION}-linux-amd64 ctl/*.go
.PHONY: linux-arm
linux-arm: clean init
# linux arm
env CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -o ./bin/lobbyd-${VERSION}-linux-arm daemon/*.go
env CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -o ./bin/lobbyctl-${VERSION}-linux-arm ctl/*.go
.PHONY: linux-arm64
linux-arm64: clean init
# linux arm64
env CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./bin/lobbyd-${VERSION}-linux-arm64 daemon/*.go
env CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./bin/lobbyctl-${VERSION}-linux-arm64 ctl/*.go
.PHONY: darwin-amd64
darwin-amd64: clean init
# darwin amd64
env CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./bin/lobbyd-${VERSION}-darwin-amd64 daemon/*.go
env CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./bin/lobbyctl-${VERSION}-darwin-amd64 ctl/*.go
.PHONY: darwin-arm64
darwin-arm64: clean init
# darwin arm64
env CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o ./bin/lobbyd-${VERSION}-darwin-arm64 daemon/*.go
env CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o ./bin/lobbyctl-${VERSION}-darwin-arm64 ctl/*.go

253
README.md
View File

@ -1,14 +1,243 @@
# Lobby - simple server/service discovery service
TLDR: This a labeling tool for your servers. Like AWS resource tags but available everywhere.
In one of ours projects we needed service discovery that doesn't need complicated setup just to share
a simple information about running services and checking if they are still alive. So we came up with
this small service we called Lobby. It's like a lobby for users in games but in this case there are
servers instead. Each server runs one or more instances of lobby daemon and it regularly sends info
about its hostname and configured labels.
Labels are similar what you could know from AWS. It's basically alternative to resources' tags feature
with the only different that you can use this anywhere including AWS. Every server sends something called
"discovery packet" which is basically a json that looks like this:
```json
{
"hostname": "smtp.example.com",
"labels": [
"service:smtp",
"public_ip4:1.2.3.4",
"public_ip6:2a03::1"
]
}
```
The packet contains information what's the server's hostname and then list of labels describing
what's running on it, what are the IP addresses, services, directories to backup or server's location for example.
What's in the labels is completely up to you but in some use-cases (Node Exporter API endpoint) it
expects "NAME:VALUE" format.
The labels can be configured via environment variables but also as files located in
*/etc/lobby/labels* (configurable path) so it can dynamically change. Another way is to use
*lobbyctl* which can add new labels at runtime.
When everything is running just call your favorite http client against "http://localhost:1313/"
on any of the running instances and lobby returns a list of all available servers and
their labels. You can hook it to Prometheus, deployment scripts, CI/CD automations or
your internal system that sends emails and it needs to know where is the SMTP server for
example.
Lobby doesn't care if you have a one or thousand instances of it running. Each instance
is connected to a common point which is a [NATS server](https://nats.io/) or Redis. NATS is super fast and reliable
messaging system which handles the communication part but also the high availability part.
NATS is easy to run and it offloads a huge part of the problem from lobby itself. But Redis
is not a bad choice either in some cases.
The code is open to support multiple backends and it's not that hard to add a new one.
Support for NATS is only less than 150 lines.
## Quickstart guide
The quickest way how to run lobby on your server is this:
```shell
wget -O /usr/local/bin/lobbyd https://github.com/by-cx/lobby/releases/download/v1.2/lobbyd-1.2-linux-amd64
chmod +x /usr/local/bin/lobbyd
wget -O /usr/local/bin/lobbyctl https://github.com/by-cx/lobby/releases/download/v1.2/lobbyctl-1.2-linux-amd64
chmod +x /usr/local/bin/lobbyctl
# Update NATS_URL and LABELS here
cat << EOF > /etc/systemd/system/lobbyd.service
[Unit]
Description=Server Lobby service
After=network.target
[Service]
Environment="NATS_URL=tls://nats.example.com:4222"
Environment="LABELS=service:ns,ns:primary,public_ip4:1,2,3,4,public_ip6:2a03::1,location:prague"
ExecStart=/usr/local/bin/lobbyd
PrivateTmp=false
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl start lobbyd
systemctl enable lobbyd
```
If you run lobbyd in production, consider to create its own system user and group and add both into this
service file. It doesn't need to access everything in your system.
To test if local instance is running call this:
lobbyctl discovery
## Daemon
There are other config directives you can use to fine-tune lobbyd to exactly what you need.
| Environment variable | Type | Default | Required | Note |
| ------------------------ | ------ | ----------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TOKEN | string | | no | Authentication token for API, if empty auth is disabled |
| HOST | string | 127.0.0.1 | no | IP address used for the REST server to listen |
| PORT | int | 1313 | no | Port related to the address above |
| DISABLE_API | bool | false | no | If true API interface won't start |
| DRIVER | string | NATS | yes | Selects which driver is used to exchange the discovery packets. |
| NATS_URL | string | | yes (NATS driver) | NATS URL used to connect to the NATS server |
| NATS_DISCOVERY_CHANNEL | string | lobby.discovery | no | Channel where the keep-alive packets are sent |
| REDIS_HOST | string | 127.0.0.1" | no | Redis host |
| REDIS_PORT | uint16 | 6379 | no | Redis port |
| REDIS_DB | string | 0 | no | Redis DB |
| REDIS_CHANNEL | string | lobby:discovery | no | Redis channel |
| REDIS_PASSWORD | string | | no | Redis password |
| LABELS | string | | no | List of labels, labels should be separated by comma |
| LABELS_PATH | string | /etc/lobby/labels | no | Path where filesystem based labels are located, one label per line, filename is not important for lobby |
| RUNTIME_LABELS_FILENAME | string | _runtime | no | Filename for file created in LabelsPath where runtime labels will be added |
| HOSTNAME | string | | no | Override local machine's hostname |
| CLEAN_EVERY | int | 15 | no | How often to clean the list of discovered servers to get rid of the not alive ones [secs] |
| KEEP_ALIVE | int | 5 | no | how often to send the keep-alive discovery message with all available information [secs] |
| TTL | int | 30 | no | After how many secs is discovery record considered as invalid |
| NODE_EXPORTER_PORT | int | 9100 | no | Default port where node_exporter listens on all registered servers, this is used when the special prometheus labels doesn't contain port |
| REGISTER | bool | true | no | If true (default) then local instance is registered with other instance (discovery packet is sent regularly), if false the daemon runs only as a client |
| CALLBACK | string | | no | Path to a script that runs when the the discovery packet records are changed. Not running for first |
| CALLBACK_COOLDOWN | int | 15 | no | Cooldown prevents the call back script to run sooner than configured amount of seconds after last run is finished. |
| CALLBACK_FIRST_RUN_DELAY | int | 30 | no | Wait for this amount of seconds before callback is run for first time after fresh start of the daemon |
### Callback script
When your application cannot support Lobbyd's API it can be configured via callback script that runs everytime something has changed in the network. Callback script is run every 15 seconds (configured by CALLBACK_COOLDOWN) but only when something has changed.
The script runs under the same user as lobbyd. When lobbyd starts first thirty seconds (CALLBACK_FIRST_RUN_DELAY) is ignored and then the script is run for first time. After these thirty seconds everything runs in loop based on the changes in the network.
All current discovery packets are passed to the callback script via standard input. It's basically the same input you get if you run `lobbyctl discoveries`.
### Service discovery for Prometheus
Lobbyd has an API endpoint that returns list of targets for [Prometheus's HTTP SD config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config). That
allows you to use lobbyd to configure Prometheus dynamically based on running servers. There are special kind of labels that are used to set the output for Prometheus properly.
Let's check this:
prometheus:nodeexporter:host:192.168.1.1
prometheus:nodeexporter:port:9100
prometheus:nodeexporter:location:prague
If you set port to *-* lobby daemon omits port entirely from the output.
There can be multiple `host` labels but only one `port` label and all prometheus labels (last line) will be common for all hosts labels. If port is omitted then default 9100 is used or port can also be part of the host label.
When you open URL http://localhost:1313/v1/prometheus/nodeexporter it returns this:
```json
[
{
"Labels": {
"location": "prague"
},
"Targets": [
"192.168.1.1:9100"
]
}
]
```
"nodeexporter" can be anything you want. It determines name of the monitored service, the service that provides the */metrics* endpoint.
There is also a minimal way how to add server to the prometheus output. Simply set label *prometheus:nodeexporter* and it
will use default port from the environment variable above and hostname of the server
```json
[
{
"Labels": {},
"Targets": [
"192.168.1.1:9100"
]
}
]
```
At least one prometheus label has to be set to export the monitoring service in the prometheus output.
## Command line tool
To access your servers from command line or shell scripts you can use *lobbyctl*.
```
Usage of lobbyctl:
-host string
Hostname or IP address of lobby daemon
-port uint
Port of lobby daemon
-proto string
Select HTTP or HTTPS protocol
-token string
Token needed to communicate lobby daemon, if empty auth is disabled
Commands:
discovery returns discovery packet of the server where the client is connected to
discoveries returns list of all registered discovery packets
labels add LABEL [LABEL] ... adds new runtime labels
labels del LABEL [LABEL] ... deletes runtime labels
```
It uses Go client library also located in this repository.
## REST API
So far the REST API is super simple and it has only two endpoints:
```
GET / # Same as /v1/discoveries
GET /v1/discovery # Returns current local discovery packet
GET /v1/discoveries # Returns list of all discovered servers and their labels.
GET /v1/discoveries?labels=LABELS&prefixes=PREFIXES # output will be filtered based on one or multiple labels separated by comma or it can search for given prefixes, only one of those will be used
GET /v1/prometheus/:name # Generates output for Prometheus's SD config, name is group of the monitoring services described above.
POST /v1/labels # Add runtime labels that will persist over daemon restarts. Labels should be in the body of the request, one line per one label.
DELETE /v1/labels # Delete runtime labels. One label per line. Can't affect the labels from environment variables or labels added from the LabelPath.
```
If there is an error the error message is returned as plain text.
## API clients
* Golang client is part of this repository.
* There is also [Python client](https://github.com/by-cx/lobby-python) available.
## Notes
I wanted to use SQS or SNS as backend but when I checked the services I found out
it wouldn't work. SNS would require open HTTP server whicn is hard to do in our
infrastructure and I couldn't find a way how SQS could deliver every message
to all instances of lobbyd.
Instead I decided to implement Redis because it's much easier to use for
development and testing. But there is no reason why it couldn't work in production
too.
## TODO
* [X] filtering based on labels
* [X] Output for prometheus
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config
~~This should be implemented as a template in /etc/lobby/templates~~
* [X] labels in directory
/etc/lobby/labels
One file per one label
* [ ] Deregistration
* [ ] Deregister when the daemon exists
* [ ] Separate the NATS code so it can support multiple backend/drivers
* [ ] Documentation
* [ ] Tests
* [X] Tests
* [X] Command hooks - script or list of scripts that are triggered when discovery status has changed
* [ ] Support for multiple active backend drivers
* [X] Redis driver
* [X] Remove the 5 secs waiting when daemon is stopped
* [X] API to allow add labels at runtime
* [ ] Check what happens when driver is disconnected

226
client/main.go Normal file
View File

@ -0,0 +1,226 @@
package client
import (
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/by-cx/lobby/server"
"github.com/go-resty/resty/v2"
)
// Encapsulation of Lobby's client code
type LobbyClient struct {
Proto string
Host string
Port uint
Token string
}
func (l *LobbyClient) init() {
if len(l.Proto) == 0 {
l.Host = "http"
}
if len(l.Host) == 0 {
l.Host = "localhost"
}
if l.Port == 0 {
l.Port = 1313
}
}
// calls the backend API with given method, path and request body and returns status code, response body and error if there is any.
// Method can be GET, POST or DELETE.
// Path should start with / and it can contain query parameters too.
func (l *LobbyClient) call(method, path, body string) (uint, string, error) {
client := resty.New().R()
if len(l.Token) != 0 {
client = client.SetHeader("Authorization", fmt.Sprintf("Token %s", l.Token))
}
if strings.ToUpper(method) == "GET" {
resp, err := client.Get(fmt.Sprintf("%s://%s:%d%s", l.Proto, l.Host, l.Port, path))
if err != nil {
return 0, "", err
}
return uint(resp.StatusCode()), string(resp.Body()), nil
} else if strings.ToUpper(method) == "POST" {
resp, err := client.SetBody(body).Post(fmt.Sprintf("%s://%s:%d%s", l.Proto, l.Host, l.Port, path))
if err != nil {
return 0, "", err
}
return uint(resp.StatusCode()), string(resp.Body()), nil
} else if strings.ToUpper(method) == "DELETE" {
resp, err := client.SetBody(body).Delete(fmt.Sprintf("%s://%s:%d%s", l.Proto, l.Host, l.Port, path))
if err != nil {
return 0, "", err
}
return uint(resp.StatusCode()), string(resp.Body()), nil
} else {
return 0, "", errors.New("unsupported method")
}
}
// Returns discovery object of local machine
func (l *LobbyClient) GetDiscovery() (server.Discovery, error) {
l.init()
var discovery server.Discovery
path := "/v1/discovery"
method := "GET"
status, body, err := l.call(method, path, "")
if err != nil {
return discovery, err
}
if status != 200 {
return discovery, fmt.Errorf("non-200 response: %s", body)
}
err = json.Unmarshal([]byte(body), &discovery)
if err != nil {
return discovery, fmt.Errorf("response parsing error: %v", err)
}
return discovery, nil
}
// Returns all registered discovery packets
func (l *LobbyClient) GetDiscoveries() ([]server.Discovery, error) {
l.init()
path := "/v1/discoveries"
method := "GET"
var discoveries []server.Discovery
status, body, err := l.call(method, path, "")
if err != nil {
return discoveries, err
}
if status != 200 {
return discoveries, fmt.Errorf("non-200 response: %s", body)
}
err = json.Unmarshal([]byte(body), &discoveries)
if err != nil {
return discoveries, fmt.Errorf("response parsing error: %v", err)
}
return discoveries, nil
}
// Resolve returns list of hostnames that have given label
func (l *LobbyClient) Resolve(label server.Label) ([]string, error) {
l.init()
path := fmt.Sprintf("/v1/resolve?label=%s", label.String())
method := "GET"
var hostnames []string
status, body, err := l.call(method, path, "")
if err != nil {
return hostnames, err
}
if status != 200 {
return hostnames, fmt.Errorf("non-200 response: %s", body)
}
err = json.Unmarshal([]byte(body), &hostnames)
if err != nil {
return hostnames, fmt.Errorf("response parsing error: %v", err)
}
return hostnames, nil
}
// Find discoveries by their labels
func (l *LobbyClient) FindByLabels(labels server.Labels) ([]server.Discovery, error) {
l.init()
path := fmt.Sprintf("/v1/discoveries?labels=%s", strings.Join(labels.StringSlice(), ","))
method := "GET"
var discoveries []server.Discovery
status, body, err := l.call(method, path, "")
if err != nil {
return discoveries, err
}
if status != 200 {
return discoveries, fmt.Errorf("non-200 response: %s", body)
}
err = json.Unmarshal([]byte(body), &discoveries)
if err != nil {
return discoveries, fmt.Errorf("response parsing error: %v", err)
}
return discoveries, nil
}
// Find discoveries by label prefixes
func (l *LobbyClient) FindByPrefixes(prefixes []string) ([]server.Discovery, error) {
l.init()
path := fmt.Sprintf("/v1/discoveries?prefixes=%s", strings.Join(prefixes, ","))
method := "GET"
var discoveries []server.Discovery
status, body, err := l.call(method, path, "")
if err != nil {
return discoveries, err
}
if status != 200 {
return discoveries, fmt.Errorf("non-200 response: %s", body)
}
err = json.Unmarshal([]byte(body), &discoveries)
if err != nil {
return discoveries, fmt.Errorf("response parsing error: %v", err)
}
return discoveries, nil
}
// Adds runtime labels for the local machine
func (l *LobbyClient) AddLabels(labels server.Labels) error {
l.init()
path := "/v1/labels"
method := "POST"
status, body, err := l.call(method, path, strings.Join(labels.StringSlice(), "\n"))
if err != nil {
return err
}
if status != 200 {
return fmt.Errorf("non-200 response: %s", body)
}
return nil
}
// Removes runtime labels of the local machine
func (l *LobbyClient) DeleteLabels(labels server.Labels) error {
l.init()
path := "/v1/labels"
method := "DELETE"
status, body, err := l.call(method, path, strings.Join(labels.StringSlice(), "\n"))
if err != nil {
return err
}
if status != 200 {
return fmt.Errorf("non-200 response: %s", body)
}
return nil
}

16
common/types.go Normal file
View File

@ -0,0 +1,16 @@
package common
import "github.com/by-cx/lobby/server"
// Listener is a function that returns received discovery
type Listener func(server.Discovery)
// Driver interface describes exported methods that have to be implemented in each driver
type Driver interface {
Init() error
Close() error
RegisterSubscribeFunction(listener Listener)
RegisterUnsubscribeFunction(listener Listener)
SendDiscoveryPacket(discovery server.Discovery) error
SendGoodbyePacket(discovery server.Discovery) error
}

29
ctl/config.go Normal file
View File

@ -0,0 +1,29 @@
package main
import (
"fmt"
"os"
"github.com/kelseyhightower/envconfig"
)
// Config keeps info about configuration of this daemon
type Config struct {
Token string `envconfig:"TOKEN" required:"false"` // Authentication token, if empty auth is disabled
Proto string `envconfig:"PROTOCOL" required:"false" default:"http"` // selected http or https protocols, default is http
Host string `envconfig:"HOST" required:"false" default:"127.0.0.1"` // IP address or hostname where lobbyd is listening
Port uint `envconfig:"PORT" required:"false" default:"1313"` // Same thing but the port part
}
// GetConfig return configuration created based on environment variables
func GetConfig() *Config {
var config Config
err := envconfig.Process("", &config)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
return &config
}

69
ctl/format.go Normal file
View File

@ -0,0 +1,69 @@
package main
import (
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"github.com/by-cx/lobby/server"
"github.com/fatih/color"
)
func printDiscovery(discovery server.Discovery) {
color.Yellow("Hostname:\n %s\n", discovery.Hostname)
if len(discovery.Labels) > 0 {
fmt.Printf("Labels:\n")
for _, label := range discovery.Labels {
fmt.Printf(" %s\n", label)
}
}
}
func colorLabel(label server.Label) string {
parts := strings.Split(label.String(), ":")
if len(parts) == 1 {
return color.GreenString(parts[0])
}
return color.GreenString(parts[0]) + ":" + color.MagentaString((strings.Join(parts[1:], ":")))
}
func printDiscoveries(discoveries []server.Discovery) {
maxHostnameWidth := 0
for _, discovery := range discoveries {
if len(discovery.Hostname) > maxHostnameWidth {
maxHostnameWidth = len(discovery.Hostname)
}
}
for _, discovery := range discoveries {
if len(discovery.Labels) == 0 {
// fmt.Println(discovery.Hostname)
color.Yellow(discovery.Hostname)
} else {
hostname := fmt.Sprintf("%"+strconv.Itoa(maxHostnameWidth)+"s", discovery.Hostname)
fmt.Printf("%s %s\n", color.YellowString(hostname), colorLabel(discovery.Labels[0]))
if len(discovery.Labels) > 1 {
for _, label := range discovery.Labels[1:] {
fmt.Printf("%"+strconv.Itoa(maxHostnameWidth+4)+"s%s\n", " ", colorLabel(label))
}
}
}
fmt.Println()
}
}
func printJSON(data interface{}) {
body, err := json.Marshal(data)
if err != nil {
fmt.Println("error occurred while formating the output into JSON:", err.Error())
os.Exit(3)
}
fmt.Println(string(body))
}

180
ctl/main.go Normal file
View File

@ -0,0 +1,180 @@
package main
import (
"flag"
"fmt"
"os"
"strings"
"github.com/by-cx/lobby/client"
"github.com/by-cx/lobby/server"
)
func Usage() {
flag.Usage()
fmt.Println("")
fmt.Println("Commands:")
fmt.Println(" resolve label returns list of hostnames with given label")
fmt.Println(" discovery returns discovery packet of the server where the client is connected to")
fmt.Println(" discoveries returns list of all registered discovery packets")
fmt.Println(" discoveries labels [LABEL] ... returns list of all registered discovery packets with given labels (OR)")
fmt.Println(" discoveries search [LABEL] ... returns list of all registered discovery packets with given label prefixes (OR)")
fmt.Println(" labels add LABEL [LABEL] ... adds new runtime labels")
fmt.Println(" labels del LABEL [LABEL] ... deletes runtime labels")
}
func main() {
config := GetConfig()
// Setup flags
proto := flag.String("proto", "", "Select HTTP or HTTPS protocol")
host := flag.String("host", "", "Hostname or IP address of lobby daemon")
port := flag.Uint("port", 0, "Port of lobby daemon")
token := flag.String("token", "", "Token needed to communicate lobby daemon, if empty auth is disabled")
jsonOutput := flag.Bool("json", false, "set output to JSON, error will be still in plain text")
flag.Parse()
// Replace empty values from flags by values from environment variables
if *proto == "" {
proto = &config.Proto
}
if *host == "" {
host = &config.Host
}
if *port == 0 {
port = &config.Port
}
if *token == "" {
token = &config.Token
}
// Validation
if *proto != "http" && *proto != "https" {
fmt.Println("Protocol can be only http or https")
}
// Setup lobby client library
client := client.LobbyClient{
Proto: strings.ToLower(*proto),
Host: *host,
Port: *port,
Token: *token,
}
// Process rest of the arguments
if len(flag.Args()) == 0 {
Usage()
os.Exit(0)
}
switch flag.Args()[0] {
case "resolve":
if len(flag.Args()) == 2 {
hostnames, err := client.Resolve(server.Label(flag.Arg(1)))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if *jsonOutput {
printJSON(hostnames)
} else {
for _, hostname := range hostnames {
fmt.Println(hostname)
}
}
} else {
Usage()
os.Exit(0)
}
case "discoveries":
var discoveries []server.Discovery
var err error
if len(flag.Args()) > 2 {
if flag.Arg(1) == "labels" {
labels := []server.Label{}
for _, label := range flag.Args()[2:] {
labels = append(labels, server.Label(label))
}
discoveries, err = client.FindByLabels(labels)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
} else if flag.Arg(1) == "search" {
discoveries, err = client.FindByPrefixes(flag.Args()[2:])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
} else {
fmt.Println("ERROR: unknown usage of discoveries arguments")
fmt.Println("")
Usage()
os.Exit(0)
}
} else {
discoveries, err = client.GetDiscoveries()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
if *jsonOutput {
printJSON(discoveries)
} else {
printDiscoveries(discoveries)
}
case "discovery":
discovery, err := client.GetDiscovery()
if err != nil {
fmt.Println(err)
}
if *jsonOutput {
printJSON(discovery)
} else {
printDiscovery(discovery)
}
case "labels":
if len(flag.Args()) < 3 {
fmt.Println("ERROR: not enough arguments for labels command")
fmt.Println("")
Usage()
os.Exit(0)
}
labels := server.Labels{}
labelsString := flag.Args()[2:]
for _, labelString := range labelsString {
labels = append(labels, server.Label(labelString))
}
if flag.Args()[1] == "add" {
err := client.AddLabels(labels)
if err != nil {
fmt.Printf("ERROR: %v\n", err)
os.Exit(2)
}
} else if flag.Args()[1] == "del" {
err := client.DeleteLabels(labels)
if err != nil {
fmt.Printf("ERROR: %v\n", err)
os.Exit(2)
}
} else {
fmt.Printf("ERROR: wrong labels subcommand\n\n")
Usage()
os.Exit(2)
}
default:
Usage()
os.Exit(0)
}
}

View File

@ -3,14 +3,36 @@ package main
import (
"log"
"github.com/by-cx/lobby/server"
"github.com/kelseyhightower/envconfig"
)
// Config keeps info about configuration of this daemon
type Config struct {
Token string `envconfig:"TOKEN" required:"false"` // not used yet
NATSURL string `envconfig:"NATS_URL" required:"true"`
Labels []string `envconfig:"LABELS" required:"false" default:""`
Token string `envconfig:"TOKEN" required:"false"` // Authentication token, if empty auth is disabled
Host string `envconfig:"HOST" required:"false" default:"127.0.0.1"` // IP address used for the REST server to listen
Port uint16 `envconfig:"PORT" required:"false" default:"1313"` // Port related to the address above
DisableAPI bool `envconfig:"DISABLE_API" required:"false" default:"false"` // If true API interface won't start
Driver string `envconfig:"DRIVER" required:"false" default:"NATS"` // Select driver to use to communicate with the group of nodes. The possible values are NATS and Redis
NATSURL string `envconfig:"NATS_URL" required:"false"` // NATS URL used to connect to the NATS server
NATSDiscoveryChannel string `envconfig:"NATS_DISCOVERY_CHANNEL" required:"false" default:"lobby.discovery"` // Channel where the kepp alive packets are sent
RedisHost string `envconfig:"REDIS_HOST" required:"false" default:"127.0.0.1"` // Redis host
RedisPort uint16 `envconfig:"REDIS_PORT" required:"false" default:"6379"` // Redis port
RedisDB uint `envconfig:"REDIS_DB" required:"false" default:"0"` // Redis DB
RedisChannel string `envconfig:"REDIS_CHANNEL" required:"false" default:"lobby:discovery"` // Redis channel
RedisPassword string `envconfig:"REDIS_PASSWORD" required:"false" default:""` // Redis password
Labels server.Labels `envconfig:"LABELS" required:"false" default:""` // List of labels
LabelsPath string `envconfig:"LABELS_PATH" required:"false" default:"/etc/lobby/labels"` // Path where filesystem based labels are located
RuntimeLabelsFilename string `envconfig:"RUNTIME_LABELS_FILENAME" required:"false" default:"_runtime"` // Filename for file created in LabelsPath where runtime labels will be added
HostName string `envconfig:"HOSTNAME" required:"false"` // Overrise local machine's hostname
CleanEvery uint `envconfig:"CLEAN_EVERY" required:"false" default:"15"` // How often to clean the list of servers to get rid of the not alive ones
KeepAlive uint `envconfig:"KEEP_ALIVE" required:"false" default:"5"` // how often to send the keepalive message with all availabel information [secs]
TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid
NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers
Register bool `envconfig:"REGISTER" required:"false" default:"true"` // If true (default) then local instance is registered with other instance (discovery packet is sent regularly)
Callback string `envconfig:"CALLBACK" required:"false" default:""` // path to a script that runs when the is a change in the labels database
CallbackCooldown uint `envconfig:"CALLBACK_COOLDOWN" required:"false" default:"15"` // cooldown that prevents to run the config change script too many times in row
CallbackFirstRunDelay uint `envconfig:"CALLBACK_FIRST_RUN_DELAY" required:"false" default:"30"` // Wait for this amount of seconds before callback is run for first time after fresh start of the daemon
}
// GetConfig return configuration created based on environment variables
@ -22,5 +44,13 @@ func GetConfig() *Config {
log.Fatal(err.Error())
}
if config.Driver != "Redis" && config.Driver != "NATS" {
log.Fatal("ERROR: the only supported drivers are Redis and NATS (default)")
}
if config.Driver == "NATS" && len(config.NATSURL) == 0 {
log.Fatal("ERROR: NATS_URL cannot be empty when driver is set to NATS")
}
return &config
}

108
daemon/handlers.go Normal file
View File

@ -0,0 +1,108 @@
package main
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"github.com/by-cx/lobby/server"
"github.com/labstack/echo"
)
func listHandler(c echo.Context) error {
labels := c.QueryParam("labels")
prefixes := c.QueryParam("prefixes")
var discoveries []server.Discovery
if len(labels) > 0 {
labelsFilterSlice := strings.Split(labels, ",")
discoveries = discoveryStorage.Filter(labelsFilterSlice)
} else if len(prefixes) > 0 {
discoveries = discoveryStorage.FilterPrefix(strings.Split(prefixes, ","))
} else {
discoveries = discoveryStorage.GetAll()
}
return c.JSONPretty(200, discoveries, " ")
}
// resolveHandler returns hostname(s) based on another label
func resolveHandler(c echo.Context) error {
label := c.QueryParam("label") // This is label we will use to filter discovery packets
output := []string{}
discoveries := discoveryStorage.Filter([]string{label})
for _, discovery := range discoveries {
output = append(output, discovery.Hostname)
}
return c.JSONPretty(http.StatusOK, output, " ")
}
func prometheusHandler(c echo.Context) error {
name := c.Param("name")
services := preparePrometheusOutput(name, discoveryStorage.GetAll())
return c.JSONPretty(http.StatusOK, services, " ")
}
func getIdentificationHandler(c echo.Context) error {
discovery, err := localHost.GetIdentification()
if err != nil {
return c.String(http.StatusInternalServerError, fmt.Sprintf("gathering identification info error: %v\n", err))
}
return c.JSONPretty(http.StatusOK, discovery, " ")
}
func addLabelsHandler(c echo.Context) error {
body, err := ioutil.ReadAll(c.Request().Body)
if err != nil {
return c.String(http.StatusBadRequest, fmt.Sprintf("reading request body error: %v\n", err))
}
labels := server.Labels{}
for _, label := range strings.Split(string(body), "\n") {
labels = append(labels, server.Label(label))
}
err = localHost.AddLabels(labels)
if err != nil {
return c.String(http.StatusBadRequest, err.Error())
}
// Update the other nodes with this new change
sendDiscoveryPacket()
return c.String(http.StatusOK, "OK")
}
func deleteLabelsHandler(c echo.Context) error {
body, err := ioutil.ReadAll(c.Request().Body)
if err != nil {
return c.String(http.StatusBadRequest, fmt.Sprintf("reading request body error: %v\n", err))
}
labels := server.Labels{}
for _, label := range strings.Split(string(body), "\n") {
labels = append(labels, server.Label(label))
}
err = localHost.DeleteLabels(labels)
if err != nil {
return c.String(http.StatusBadRequest, err.Error())
}
// Update the other nodes with this new change
sendDiscoveryPacket()
return c.String(http.StatusOK, "OK")
}

View File

@ -1,78 +0,0 @@
package main
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
"github.com/rosti-cz/server_lobby/server"
"github.com/shirou/gopsutil/v3/host"
)
func getIdentification() (server.Discovery, error) {
discovery := server.Discovery{}
localLabels, err := loadLocalLabels()
if err != nil {
return discovery, err
}
if len(config.HostName) == 0 {
info, err := host.Info()
if err != nil {
return discovery, err
}
discovery.Hostname = info.Hostname
} else {
discovery.Hostname = config.HostName
}
discovery.Labels = append(config.Labels, localLabels...)
return discovery, nil
}
// loadLocalLabels scans local directory where labels are stored and adds them to the labels configured as environment variables.
// Filename in LabelsPath is not importent and each file can contain multiple labels, one per each line.
func loadLocalLabels() ([]string, error) {
labels := []string{}
if _, err := os.Stat(config.LabelsPath); !os.IsNotExist(err) {
files, err := ioutil.ReadDir(config.LabelsPath)
if err != nil {
return labels, err
}
for _, filename := range files {
fullPath := path.Join(config.LabelsPath, filename.Name())
fp, err := os.OpenFile(fullPath, os.O_RDONLY, os.ModePerm)
if err != nil {
return labels, fmt.Errorf("open file error: %v", err)
}
defer fp.Close()
rd := bufio.NewReader(fp)
for {
line, err := rd.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
return labels, fmt.Errorf("read file line error: %v", err)
}
line = strings.TrimSpace(line)
if len(line) > 0 {
labels = append(labels, line)
}
}
}
}
return labels, nil
}

View File

@ -1,23 +1,70 @@
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/by-cx/lobby/common"
"github.com/by-cx/lobby/nats_driver"
"github.com/by-cx/lobby/redis_driver"
"github.com/by-cx/lobby/server"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
"github.com/nats-io/nats.go"
"github.com/rosti-cz/server_lobby/server"
)
var discoveryStorage server.Discoveries = server.Discoveries{}
var driver common.Driver
var localHost server.LocalHost
var lastLocalHostname string
var config Config
var shuttingDown bool
var sendDiscoveryPacketTrigger chan bool = make(chan bool)
func init() {
// Load config from environment variables
config = *GetConfig()
// Setup discovery storage
discoveryStorage.LogChannel = make(chan string)
discoveryStorage.TTL = config.TTL
// localhost initiation
localHost = server.LocalHost{
LabelsPath: config.LabelsPath,
HostnameOverride: config.HostName,
InitialLabels: config.Labels,
RuntimeLabelsFilename: config.RuntimeLabelsFilename,
}
// Setup driver
if config.Driver == "NATS" {
driver = &nats_driver.Driver{
NATSUrl: config.NATSURL,
NATSDiscoveryChannel: config.NATSDiscoveryChannel,
LogChannel: discoveryStorage.LogChannel,
}
} else if config.Driver == "Redis" {
driver = &redis_driver.Driver{
Host: config.RedisHost,
Port: uint(config.RedisPort),
Password: config.RedisPassword,
Channel: config.RedisChannel,
DB: uint(config.RedisDB),
LogChannel: discoveryStorage.LogChannel,
}
} else {
log.Fatalf("unsupported driver %s", config.Driver)
}
}
// cleanDiscoveryPool clears the local server map and keeps only the alive servers
@ -29,23 +76,60 @@ func cleanDiscoveryPool() {
}
// sendDisoveryPacket sends discovery packet regularly so the network know we exist
func sendDisoveryPacket(nc *nats.Conn) {
for {
discovery, err := getIdentification()
// sendGoodbyePacket is almost same as sendDiscoveryPacket but it's not running in loop
// and it adds goodbye message so other nodes know this node is gonna die.
func sendGoodbyePacket() {
discovery, err := localHost.GetIdentification()
if err != nil {
log.Printf("sending discovery identification error: %v\n", err)
}
data, err := discovery.Bytes()
err = driver.SendGoodbyePacket(discovery)
if err != nil {
log.Printf("sending discovery formating message error: %v\n", err)
log.Println(err)
}
err = nc.Publish(config.NATSDiscoveryChannel, data)
}
// sendDiscoveryPacket sends a single discovery packet out
func sendDiscoveryPacket() {
sendDiscoveryPacketTrigger <- true
}
// sendDisoveryPacket sends discovery packet to the driver which passes it to the
// other nodes. By this it propagates any change that happens in the local discovery struct.
// Every tune trigger is triggered it sends one message.
func sendDiscoveryPacketTask(trigger chan bool) {
for {
// We are waiting for the trigger
<-trigger
if !shuttingDown {
// Get info about local machine and send to the exchange point
discovery, err := localHost.GetIdentification()
if err != nil {
log.Printf("sending discovery error: %v\n", err)
log.Printf("sending discovery identification error: %v\n", err)
}
err = driver.SendDiscoveryPacket(discovery)
if err != nil {
log.Println(err.Error())
}
// If local hostname changes, we will deregister it
if discovery.Hostname != lastLocalHostname && lastLocalHostname != "" {
log.Println("Hostname change detected, deregistering the old one")
err = driver.SendGoodbyePacket(server.Discovery{
Hostname: lastLocalHostname,
})
if err != nil {
log.Println(err)
}
}
lastLocalHostname = discovery.Hostname
} else {
break
}
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
}
}
@ -62,35 +146,84 @@ func main() {
// Closing the logging channel
defer close(discoveryStorage.LogChannel)
discoveryStorage.TTL = config.TTL
// Load config from environment variables
config = *GetConfig()
defer driver.Close()
// ------------------------
// Server discovering stuff
// ------------------------
// Connect to the NATS service
nc, err := nats.Connect(config.NATSURL)
// Setup callback function to register and unregister discovery packets from other servers
driver.RegisterSubscribeFunction(func(d server.Discovery) {
// Check if the local version and the new version are somehowe changed
localVersion := discoveryStorage.Get(d.Hostname)
exists := discoveryStorage.Exist(d.Hostname)
changed := server.Compare(localVersion, d)
discoveryStorage.Add(d)
if changed {
// Print this only if the server is already registered
if exists {
log.Printf("%s has been updated", d.Hostname)
}
go func() {
err = discoveryChange(d)
if err != nil {
log.Printf("discovery changed error: %v", err)
}
}()
}
})
driver.RegisterUnsubscribeFunction(func(d server.Discovery) {
discoveryStorage.Delete(d.Hostname)
go func() {
err = discoveryChange(d)
if err != nil {
log.Printf("discovery changed error: %v", err)
}
}()
})
err = driver.Init()
if err != nil {
log.Fatalln(err)
}
defer nc.Drain()
go printDiscoveryLogs()
go cleanDiscoveryPool()
// Subscribe
log.Println("> discovery channel")
_, err = nc.Subscribe(config.NATSDiscoveryChannel, discoveryHandler)
if err != nil {
log.Fatalln(err)
if len(config.Callback) > 0 {
go discoveryChangeLoop()
go changeCatcherLoop()
}
go cleanDiscoveryPool()
go sendDisoveryPacket(nc)
// When the daemon boots up we trigger discovery change event so the config can be setup via callback script if there is any
err = discoveryChange(server.Discovery{})
if err != nil {
log.Printf("discovery changed error: %v", err)
}
// If config.Register is false this instance won't be registered with other nodes
if config.Register {
// This is background process that sends the message
go sendDiscoveryPacketTask(sendDiscoveryPacketTrigger)
// This triggers the process
go func() {
for {
sendDiscoveryPacket()
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
if shuttingDown {
break
}
}
}()
} else {
log.Println("standalone mode, I won't register myself")
}
// --------
// REST API
// --------
@ -102,45 +235,36 @@ func main() {
}
e.Use(middleware.Logger())
e.Use(middleware.Recover())
// Routes
e.GET("/", func(c echo.Context) error {
label := c.QueryParam("label")
var discoveries []server.Discovery
if len(label) > 0 {
discoveries = discoveryStorage.Filter(label)
} else {
discoveries = discoveryStorage.GetAll()
if !config.DisableAPI {
e.GET("/", listHandler)
e.GET("/v1/resolve", resolveHandler)
e.GET("/v1/discovery", getIdentificationHandler)
e.GET("/v1/discoveries", listHandler)
e.POST("/v1/labels", addLabelsHandler)
e.DELETE("/v1/labels", deleteLabelsHandler)
e.GET("/v1/prometheus/:name", prometheusHandler)
}
return c.JSONPretty(200, discoveries, " ")
})
e.GET("/prometheus", func(c echo.Context) error {
services := preparePrometheusOutput(discoveryStorage.GetAll())
return c.JSONPretty(http.StatusOK, services, " ")
})
// e.GET("/template/:template", func(c echo.Context) error {
// templateName := c.Param("template")
// discoveries := discoveryStorage.GetAll()
// var body bytes.Buffer
// tmpl, err := template.New("main").ParseFiles(path.Join(config.TemplatesPath, templateName))
// if err != nil {
// return c.String(http.StatusInternalServerError, err.Error())
// }
// err = tmpl.Execute(&body, &discoveries)
// if err != nil {
// return c.String(http.StatusInternalServerError, err.Error())
// }
// return c.String(http.StatusOK, body.String())
// })
// ------------------------------
// Termination signals processing
// ------------------------------
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func(e *echo.Echo, config Config) {
sig := <-signals
shuttingDown = true
if config.Register {
log.Printf("%s signal received, sending goodbye packet\n", sig.String())
sendGoodbyePacket()
time.Sleep(1 * time.Second) // we wait for a few seconds to let background jobs to finish their job
} else {
log.Printf("%s signal received", sig.String())
}
e.Shutdown(context.TODO())
}(e, config)
// Start server
e.Logger.Fatal(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
// In most cases this will end expectedly so it doesn't make sense to use the echo's approach to treat this message as an error.
e.Logger.Info(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
}

View File

@ -1,27 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"log"
"github.com/nats-io/nats.go"
"github.com/rosti-cz/server_lobby/server"
)
// discoveryHandler accepts discovery message and
func discoveryHandler(m *nats.Msg) {
message := server.Discovery{}
err := json.Unmarshal(m.Data, &message)
if err != nil {
log.Println(fmt.Errorf("decoding message error: %v", err))
}
err = message.Validate()
if err != nil {
log.Println(fmt.Errorf("validation error: %v", err))
}
discoveryStorage.Add(message)
}

View File

@ -4,7 +4,7 @@ import (
"strconv"
"strings"
"github.com/rosti-cz/server_lobby/server"
"github.com/by-cx/lobby/server"
)
// [
@ -29,36 +29,59 @@ type PrometheusService struct {
// preparePrometheusOutput returns PrometheusServices which is struct compatible to what Prometheus expects
// labels starting "ne:" will be used as NodeExporter labels. Label "ne:port:9123" will be used as port
// used in the targets field. Same for "ne:host:1.2.3.4".
func preparePrometheusOutput(discoveries []server.Discovery) PrometheusServices {
func preparePrometheusOutput(name string, discoveries []server.Discovery) PrometheusServices {
services := PrometheusServices{}
for _, discovery := range discoveries {
port := strconv.Itoa(int(config.NodeExporterPort))
host := discovery.Hostname
hosts := []string{}
var add bool // add to the prometheus output when there is at least one prometheus related label
labels := map[string]string{}
labels := map[string]string{} // These are prometheus labels, not Lobby's labels
for _, label := range discovery.FindLabels("ne") {
trimmed := strings.TrimPrefix(label, "ne:")
for _, label := range discovery.FindLabelsByPrefix("prometheus:" + name + ":") {
trimmed := strings.TrimPrefix(label.String(), "prometheus:"+name+":")
parts := strings.SplitN(trimmed, ":", 2)
if len(parts) == 2 {
if parts[0] == "port" {
port = parts[1]
} else if parts[0] == "host" {
host = parts[1]
hosts = append(hosts, parts[1])
} else {
labels[parts[0]] = parts[1]
}
add = true
}
}
// This has to be checked here again because FindLabels adds : at the end of the label name.
if !add {
for _, label := range discovery.Labels {
if label.String() == "prometheus:"+name {
add = true
break
}
}
}
if add {
targets := []string{}
for _, host := range hosts {
// Omit port part if "-" is set or port is part of the host
target := host + ":" + port
if strings.Contains(host, ":") || port == "-" {
target = host
}
targets = append(targets, target)
}
service := PrometheusService{
Targets: []string{host + ":" + port},
Targets: targets,
Labels: labels,
}
services = append(services, service)
}
}
return services

47
daemon/prometheus_test.go Normal file
View File

@ -0,0 +1,47 @@
package main
import (
"testing"
"github.com/by-cx/lobby/server"
"github.com/stretchr/testify/assert"
)
func TestPreparePrometheusOutput(t *testing.T) {
discoveries := []server.Discovery{
{
Hostname: "test.server",
Labels: server.Labels{
// test1
"prometheus:test1:label1:l1",
"prometheus:test1:host:srv1:1234",
"prometheus:test1:host:srv1:1235",
"prometheus:test1:host:srv1",
"prometheus:test1:label2:l2",
// test2
"prometheus:test2:host:srv2",
"prometheus:test2:host:srv2:1235",
"prometheus:test2:host:srv2:1236",
"prometheus:test2:host:srv2:1237",
"prometheus:test2:label1:l3",
},
},
}
services := preparePrometheusOutput("test1", discoveries)
assert.Equal(t, 1, len(services))
assert.Equal(t, 3, len(services[0].Targets))
assert.Contains(t, services[0].Targets, "srv1:9100")
assert.Contains(t, services[0].Targets, "srv1:1234")
assert.Contains(t, services[0].Targets, "srv1:1235")
assert.Contains(t, services[0].Labels["label1"], "l1")
assert.Contains(t, services[0].Labels["label2"], "l2")
services = preparePrometheusOutput("test2", discoveries)
assert.Equal(t, 1, len(services))
assert.Equal(t, 4, len(services[0].Targets))
assert.Contains(t, services[0].Targets, "srv2:9100")
assert.Contains(t, services[0].Targets, "srv2:1235")
assert.Contains(t, services[0].Targets, "srv2:1236")
assert.Contains(t, services[0].Targets, "srv2:1237")
assert.Contains(t, services[0].Labels["label1"], "l3")
}

102
daemon/update.go Normal file
View File

@ -0,0 +1,102 @@
package main
import (
"encoding/json"
"log"
"os/exec"
"strings"
"time"
"github.com/by-cx/lobby/server"
)
// These functions are called when something has changed in the storage
var changeDetectedChannel chan bool = make(chan bool)
var changeDetected bool
// changeCatcherLoop waits for a change signal and switches variable that says if the callback should run or not
func changeCatcherLoop() {
for {
<-changeDetectedChannel
changeDetected = true
}
}
// discoveryChangeLoop is used to process dynamic configuration changes.
// When there is a change detected a shell script or given process is triggered
// which does some operations with the new data. Usually it generates the
// configuration.
// This function has internal loop and won't allow to run the command more
// often than it's the configured amount of time. That prevents
func discoveryChangeLoop() {
// This other loop tics in strict intervals and prevents the callback script to run more often than it's configured
var cmd *exec.Cmd
// Delay first run of the callback script a little so everything can set up
log.Printf("Delaying start of discovery change loop (%d seconds)\n", config.CallbackFirstRunDelay)
time.Sleep(time.Duration(config.CallbackFirstRunDelay) * time.Second)
log.Println("Starting discovery change loop")
for {
if changeDetected {
// We switch this at the beginning so we can detect new changes while the callback script is running
changeDetected = false
log.Println("Running callback function")
// TODO: this is not the best way
callbackCommandSlice := strings.Split(config.Callback, " ")
if len(callbackCommandSlice) == 1 {
cmd = exec.Command(callbackCommandSlice[0])
} else if len(callbackCommandSlice) > 1 {
cmd = exec.Command(callbackCommandSlice[0], callbackCommandSlice[1:]...)
} else {
log.Println("wrong number of parts of the callback command")
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
continue
}
stdin, err := cmd.StdinPipe()
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
discoveriesJSON, err := json.Marshal(discoveryStorage.GetAll())
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
_, err = stdin.Write([]byte(discoveriesJSON))
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
err = stdin.Close()
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
stdout, err := cmd.CombinedOutput()
if err != nil {
log.Println("Callback error: ", err.Error())
}
log.Println("Callback output: ", string(stdout))
}
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
}
}
// discoveryChange is called when daemon detects that a newly arrived discovery
// packet is somehow different than the localone. This can be used to trigger
// some action in the local machine.
func discoveryChange(discovery server.Discovery) error {
if len(config.Callback) > 0 {
changeDetectedChannel <- true
}
return nil
}

11
go.mod
View File

@ -1,17 +1,22 @@
module github.com/rosti-cz/server_lobby
module github.com/by-cx/lobby
go 1.16
require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/fatih/color v1.12.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-resty/resty/v2 v2.6.0
github.com/google/go-cmp v0.5.5
github.com/kelseyhightower/envconfig v1.4.0
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0 // indirect
github.com/nats-io/nats-server/v2 v2.4.0 // indirect
github.com/nats-io/nats.go v1.12.0
github.com/onsi/gomega v1.16.0 // indirect
github.com/shirou/gopsutil/v3 v3.21.7
github.com/stretchr/testify v1.7.0
github.com/valyala/fasttemplate v1.2.1 // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
google.golang.org/protobuf v1.27.1 // indirect
)

76
go.sum
View File

@ -1,11 +1,23 @@
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-resty/resty/v2 v2.6.0 h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKYS4=
github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@ -19,7 +31,9 @@ github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
@ -28,11 +42,13 @@ github.com/labstack/echo v3.3.10+incompatible h1:pGRcYk231ExFAyoAjAfD85kQzRJCRI8
github.com/labstack/echo v3.3.10+incompatible/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s=
github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
@ -48,12 +64,24 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shirou/gopsutil/v3 v3.21.7 h1:PnTqQamUjwEDSgn+nBGu0qSDV/CfvyiR/gwTH3i7HTU=
github.com/shirou/gopsutil/v3 v3.21.7/go.mod h1:RGl11Y7XMTQPmHh8F0ayC6haKNBgH4PXMJuTAcMOlz4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tklauser/go-sysconf v0.3.7 h1:HT7h4+536gjqeq1ZIJPgOl1rg1XFatQGVZWp7Py53eg=
@ -65,34 +93,64 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@ -103,7 +161,15 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

133
nats_driver/main.go Normal file
View File

@ -0,0 +1,133 @@
package nats_driver
import (
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/by-cx/lobby/common"
"github.com/by-cx/lobby/server"
"github.com/nats-io/nats.go"
)
// NATS drivers is used to send discovery packet to other nodes into the group via NATS messenging protocol.
type Driver struct {
NATSUrl string
NATSDiscoveryChannel string
LogChannel chan string
nc *nats.Conn
subscribeListener common.Listener
unsubscribeListener common.Listener
}
// handler is called asynchronously so and because it cannot log directly to stderr there
// is channel called LogChannel that can be used to log error messages from this
func (d *Driver) handler(m *nats.Msg) {
message := discoveryEnvelope{}
err := json.Unmarshal(m.Data, &message)
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("decoding message error: %v", err).Error()
}
err = message.Discovery.Validate()
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("validation error: %v", err).Error()
}
if message.Message == "hi" {
d.subscribeListener(message.Discovery)
} else if message.Message == "goodbye" {
d.unsubscribeListener(message.Discovery)
} else {
if d.LogChannel != nil {
d.LogChannel <- "incompatible message"
}
}
}
func (d *Driver) Init() error {
if d.LogChannel == nil {
return fmt.Errorf("please initiate LogChannel variable")
}
for {
nc, err := nats.Connect(d.NATSUrl)
if err != nil {
log.Printf("Can't connect to the NATS server, waiting for 5 seconds before I try it again. (%v)\n", err)
time.Sleep(time.Second * 5)
continue
}
d.nc = nc
break
}
_, err := d.nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
if err != nil {
return fmt.Errorf("subscribe error: %v", err)
}
return nil
}
// Close is called when all is done.
func (d *Driver) Close() error {
return d.nc.Drain()
}
// RegisterSubscribeFunction sets the function that will process the incoming messages
func (d *Driver) RegisterSubscribeFunction(listener common.Listener) {
d.subscribeListener = listener
}
// RegisterUnsubscribeFunction sets the function that will process the goodbye incoming messages
func (d *Driver) RegisterUnsubscribeFunction(listener common.Listener) {
d.unsubscribeListener = listener
}
// SendDiscoveryPacket send discovery packet to the group.
func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "hi",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.nc.Publish(d.NATSDiscoveryChannel, data)
// In case the connection is down we will try to reconnect
if err != nil && strings.Contains(err.Error(), "connection closed") {
d.nc.Close()
err = d.Init()
if err != nil {
return fmt.Errorf("sending discovery reconnect error: %v", err)
}
} else if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}
// SendGoodbyePacket deregister node from the group. It tells everybody that it's going to die.
func (d *Driver) SendGoodbyePacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "goodbye",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.nc.Publish(d.NATSDiscoveryChannel, data)
if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}

20
nats_driver/types.go Normal file
View File

@ -0,0 +1,20 @@
package nats_driver
import (
"encoding/json"
"github.com/by-cx/lobby/server"
)
// discoveryEnvelope adds a message to the standard discovery format. The message
// can be "hi" or "goodbye" where "hi" is used when the node is sending keep alive
// packets and "goodbye" means the node is leaving.
type discoveryEnvelope struct {
Discovery server.Discovery `json:"discovery"`
Message string `json:"message"` // can be hi or goodbye
}
func (e *discoveryEnvelope) Bytes() ([]byte, error) {
body, err := json.Marshal(e)
return body, err
}

142
redis_driver/main.go Normal file
View File

@ -0,0 +1,142 @@
package redis_driver
import (
"encoding/json"
"github.com/by-cx/lobby/common"
"github.com/by-cx/lobby/server"
"github.com/go-redis/redis"
"fmt"
)
// Redis drivers is used to send discovery packet to other nodes into the group via Redis's pubsub protocol.
type Driver struct {
Host string
Port uint
Password string
Channel string
DB uint
LogChannel chan string
subscribeListener common.Listener
unsubscribeListener common.Listener
redis *redis.Client
}
// handler is called asynchronously so and because it cannot log directly to stderr there
// is channel called LogChannel that can be used to log error messages from this
func (d *Driver) handler(payload string) {
message := discoveryEnvelope{}
err := json.Unmarshal([]byte(payload), &message)
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("decoding message error: %v", err).Error()
}
err = message.Discovery.Validate()
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("validation error: %v", err).Error()
}
if message.Message == "hi" {
d.subscribeListener(message.Discovery)
} else if message.Message == "goodbye" {
d.unsubscribeListener(message.Discovery)
} else {
if d.LogChannel != nil {
d.LogChannel <- "incompatible message"
}
}
}
// Init connect to Redis, subscribes to the channel and waits for the messages.
// It runs goroutine in background that listens for new messages.
func (d *Driver) Init() error {
if d.LogChannel == nil {
return fmt.Errorf("please initiate LogChannel variable")
}
if len(d.Host) == 0 {
return fmt.Errorf("parameter Host cannot be empty")
}
if len(d.Channel) == 0 {
return fmt.Errorf("pattern cannot be empty")
}
if d.Port <= 0 || d.Port > 65536 {
return fmt.Errorf("port has to be in range of 0-65536")
}
d.redis = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", d.Host, d.Port),
Password: d.Password,
DB: int(d.DB),
})
pubsub := d.redis.Subscribe(d.Channel)
go func(pubsub *redis.PubSub, d *Driver) {
channel := pubsub.Channel()
for message := range channel {
d.handler(message.Payload)
}
}(pubsub, d)
return nil
}
// Close is called when all is done.
func (d *Driver) Close() error {
return d.redis.Close()
}
// RegisterSubscribeFunction sets the function that will process the incoming messages
func (d *Driver) RegisterSubscribeFunction(listener common.Listener) {
d.subscribeListener = listener
}
// RegisterUnsubscribeFunction sets the function that will process the goodbye incoming messages
func (d *Driver) RegisterUnsubscribeFunction(listener common.Listener) {
d.unsubscribeListener = listener
}
// SendDiscoveryPacket send discovery packet to the group.
func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "hi",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.redis.Publish(d.Channel, data).Err()
if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}
// SendGoodbyePacket deregister node from the group. It tells everybody that it's going to die.
func (d *Driver) SendGoodbyePacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "goodbye",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.redis.Publish(d.Channel, data).Err()
if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}

20
redis_driver/types.go Normal file
View File

@ -0,0 +1,20 @@
package redis_driver
import (
"encoding/json"
"github.com/by-cx/lobby/server"
)
// discoveryEnvelope adds a message to the standard discovery format. The message
// can be "hi" or "goodbye" where "hi" is used when the node is sending keep alive
// packets and "goodbye" means the node is leaving.
type discoveryEnvelope struct {
Discovery server.Discovery `json:"discovery"`
Message string `json:"message"` // can be hi or goodbye
}
func (e *discoveryEnvelope) Bytes() ([]byte, error) {
body, err := json.Marshal(e)
return body, err
}

View File

@ -3,6 +3,7 @@ package server
import (
"encoding/json"
"fmt"
"sort"
"strings"
"time"
)
@ -16,7 +17,7 @@ const TimeToLife = 60 // when server won't occur in the discovery channel longer
// Discovery contains information about a single server and is used for server discovery
type Discovery struct {
Hostname string `json:"hostname"`
Labels []string `json:"labels"`
Labels Labels `json:"labels"`
// For internal use to check if the server is still alive.
// Contains timestamp of the last check.
@ -44,17 +45,30 @@ func (d *Discovery) Bytes() ([]byte, error) {
return data, err
}
// FindLabels returns list of labels with given prefix. For example "service:ns" has prefix "service"
func (d *Discovery) FindLabels(prefix string) []string {
labels := []string{}
// FindLabelsByPrefix returns list of labels with given prefix. For example "service:ns" has prefix "service" or "service:".
// It doesn't have to be prefix, but for example "service:test" will match "service:test" and also "service:test2".
func (d *Discovery) FindLabelsByPrefix(prefix string) Labels {
labels := Labels{}
for _, label := range d.Labels {
if strings.HasPrefix(label, prefix+":") {
if strings.HasPrefix(label.String(), prefix) {
labels = append(labels, label)
}
}
return labels
}
func (d *Discovery) SortLabels() {
labelStrings := d.Labels.StringSlice()
sort.Strings(labelStrings)
labels := Labels{}
for _, label := range labelStrings {
labels = append(labels, Label(label))
}
d.Labels = labels
}
// -----------------
// Discovery storage
// -----------------
@ -105,6 +119,10 @@ func (d *Discoveries) Refresh(hostname string) {
// Delete removes server identified by hostname from the storage
func (d *Discoveries) Delete(hostname string) {
if !d.Exist(hostname) {
return
}
if d.LogChannel != nil {
d.LogChannel <- fmt.Sprintf("removing %s", hostname)
}
@ -143,19 +161,69 @@ func (d *Discoveries) GetAll() []Discovery {
return d.activeServers
}
func (d *Discoveries) Filter(labelFilter string) []Discovery {
// Filter returns list of discoveries based on given labels
func (d *Discoveries) Filter(labelsFilter []string) []Discovery {
newSet := []Discovery{}
if len(labelFilter) > 0 {
var found bool
if len(labelsFilter) > 0 {
for _, discovery := range d.activeServers {
newDiscovery := discovery
newDiscovery.Labels = Labels{}
found = false
for _, label := range discovery.Labels {
if label == labelFilter {
newSet = append(newSet, discovery)
for _, labelFilter := range labelsFilter {
if label.String() == labelFilter {
found = true
newDiscovery.Labels = append(newDiscovery.Labels, label)
break
}
}
}
if found {
newSet = append(newSet, newDiscovery)
}
}
}
return newSet
}
// Filter returns list of discoveries based on given label prefixes.
func (d *Discoveries) FilterPrefix(prefixes []string) []Discovery {
newSet := []Discovery{}
var found bool
if len(prefixes) > 0 {
for _, discovery := range d.activeServers {
newDiscovery := discovery
newDiscovery.Labels = Labels{}
found = false
if found {
newSet = append(newSet, newDiscovery)
}
for _, label := range discovery.Labels {
for _, prefix := range prefixes {
if strings.HasPrefix(label.String(), prefix) {
found = true
newDiscovery.Labels = append(newDiscovery.Labels, label)
break
}
}
}
if found {
newSet = append(newSet, newDiscovery)
}
}
}
return newSet

38
server/discovery_test.go Normal file
View File

@ -0,0 +1,38 @@
package server
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestDiscovery(t *testing.T) {
now := time.Now().Unix()
now90 := now - 90
discovery := Discovery{
Hostname: "test.rosti.cz",
Labels: Labels{
Label("service:test"),
Label("test:123"),
Label("public_ip:1.2.3.4"),
},
LastCheck: now,
}
assert.True(t, discovery.IsAlive(), "discovery suppose to be alive")
discovery.LastCheck = now90
assert.False(t, discovery.IsAlive(), "discovery not suppose to be alive")
discovery.LastCheck = now
assert.Equal(t, Labels{Label("service:test")}, discovery.FindLabelsByPrefix("service"))
assert.Equal(t, nil, discovery.Validate()) // TODO: This needs more love
content, err := json.Marshal(&discovery)
assert.Nil(t, err)
content2, err := discovery.Bytes()
assert.Nil(t, err)
assert.Equal(t, content, content2)
}

185
server/identification.go Normal file
View File

@ -0,0 +1,185 @@
package server
import (
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"github.com/shirou/gopsutil/v3/host"
)
type LocalHost struct {
LabelsPath string // Where labels are stored
RuntimeLabelsFilename string // Filename under which are runtime labels saved in LabelsPath
InitialLabels Labels // this usually coming from the config
HostnameOverride string // if not empty string hostname in the discovery packet will be replaced by this
}
// saveRuntimeLabels stores labels in the runtime filesname
func (l *LocalHost) saveRuntimeLabels(labels Labels) error {
stringLabels := []string{}
for _, label := range labels {
stringLabels = append(stringLabels, label.String())
}
content := strings.Join(stringLabels, "\n")
err := os.WriteFile(path.Join(l.LabelsPath, l.RuntimeLabelsFilename), []byte(content), 0755)
return err
}
// getRuntimeLabels returns labels from the runtime filename
func (l *LocalHost) getRuntimeLabels() (Labels, error) {
labels := Labels{}
content, err := os.ReadFile(path.Join(l.LabelsPath, l.RuntimeLabelsFilename))
if err != nil {
if strings.Contains(err.Error(), "no such file or directory") {
return labels, nil
}
return labels, err
}
for _, label := range strings.Split(string(content), "\n") {
labels = append(labels, Label(strings.TrimSpace(label)))
}
return labels, nil
}
// AddLabel adds runtime label into the LabelsPath directory
func (l *LocalHost) AddLabels(labels Labels) error {
runtimeLabels, err := l.getRuntimeLabels()
if err != nil {
return fmt.Errorf("error while loading stored labels: %v", err)
}
var found bool
for _, label := range labels {
found = false
for _, runtimeLabel := range runtimeLabels {
if label == runtimeLabel {
found = true
break
}
}
if !found {
runtimeLabels = append(runtimeLabels, label)
}
}
err = l.saveRuntimeLabels(runtimeLabels)
if err != nil {
return fmt.Errorf("error while saving new set of labels: %v", err)
}
return nil
}
// DeleteLabels removed labels from LabelsPath directory. Only labels added this way can be deleted.
func (l *LocalHost) DeleteLabels(labels Labels) error {
runtimeLabels, err := l.getRuntimeLabels()
if err != nil {
return fmt.Errorf("error while loading stored labels: %v", err)
}
newSet := Labels{}
var found bool
for _, runtimeLabel := range runtimeLabels {
found = false
for _, label := range labels {
if label == runtimeLabel {
found = true
break
}
}
if !found {
newSet = append(newSet, runtimeLabel)
}
}
err = l.saveRuntimeLabels(newSet)
if err != nil {
return fmt.Errorf("error while saving new set of labels: %v", err)
}
return nil
}
// GetIdentification assembles the discovery packet that contains hotname and set of labels describing a single server, in this case the local server.
// Parameter initialLabels usually coming from configuration of the app.
// If hostname is empty it will be discovered automatically.
func (l *LocalHost) GetIdentification() (Discovery, error) {
discovery := Discovery{}
localLabels, err := l.loadLocalLabels()
if err != nil {
return discovery, err
}
if len(l.HostnameOverride) == 0 {
info, err := host.Info()
if err != nil {
return discovery, err
}
discovery.Hostname = info.Hostname
} else {
discovery.Hostname = l.HostnameOverride
}
discovery.Labels = append(l.InitialLabels, localLabels...)
discovery.SortLabels()
return discovery, nil
}
// loadLocalLabels scans local directory where labels are stored and adds them to the labels configured as environment variables.
// Filename in LabelsPath is not importent and each file can contain multiple labels, one per each line.
func (l *LocalHost) loadLocalLabels() (Labels, error) {
labels := Labels{}
var found bool
if _, err := os.Stat(l.LabelsPath); !os.IsNotExist(err) {
files, err := ioutil.ReadDir(l.LabelsPath)
if err != nil {
return labels, err
}
for _, filename := range files {
fullPath := path.Join(l.LabelsPath, filename.Name())
content, err := os.ReadFile(fullPath)
if err != nil {
return labels, fmt.Errorf("read file error: %v", err)
}
for _, line := range strings.Split(string(content), "\n") {
line = strings.TrimSpace(line)
if len(line) > 0 {
found = false
for _, skipLabel := range l.InitialLabels {
if skipLabel == Label(line) {
found = true
break
}
}
if !found {
labels = append(labels, Label(line))
}
}
}
}
}
return labels, nil
}

View File

@ -0,0 +1,60 @@
package server
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
)
const tmpPath = "./tmp"
const testLabelPath = tmpPath + "/labels"
func TestGetIdentification(t *testing.T) {
localHost := LocalHost{
LabelsPath: testLabelPath,
HostnameOverride: "test.example.com",
InitialLabels: Labels{Label("service:test"), Label("test:1")},
}
discovery, err := localHost.GetIdentification()
assert.Nil(t, err)
assert.Equal(t, "test.example.com", discovery.Hostname)
assert.Equal(t, "service:test", discovery.Labels[0].String())
err = os.MkdirAll(testLabelPath, os.ModePerm)
assert.Nil(t, err)
err = os.WriteFile(testLabelPath+"/test", []byte("service:test2\npublic_ip:1.2.3.4"), 0644)
assert.Nil(t, err)
discovery, err = localHost.GetIdentification()
assert.Nil(t, err)
assert.Equal(t, Label("test:1"), discovery.Labels[3])
os.RemoveAll(tmpPath)
}
func TestLoadLocalLabels(t *testing.T) {
localHost := LocalHost{
LabelsPath: testLabelPath,
HostnameOverride: "test.example.com",
InitialLabels: Labels{Label("service:test"), Label("test:1")},
}
err := os.MkdirAll(testLabelPath, os.ModePerm)
assert.Nil(t, err)
err = os.WriteFile(testLabelPath+"/test", []byte("service:test\npublic_ip:1.2.3.4"), 0644)
assert.Nil(t, err)
labels, err := localHost.loadLocalLabels()
assert.Nil(t, err)
assert.Equal(t, 1, len(labels))
assert.Equal(t, "public_ip:1.2.3.4", labels[0].String())
os.RemoveAll(tmpPath)
}

9
server/runtime.go Normal file
View File

@ -0,0 +1,9 @@
package server
func AddRuntimeLabel(label Label) error {
return nil
}
func RemoveRuntimeLabel(label Label) error {
return nil
}

12
server/tools.go Normal file
View File

@ -0,0 +1,12 @@
package server
import "github.com/google/go-cmp/cmp"
// Compare compares discovery A and B and returns true if those two are different.
func Compare(discoveryA, discoveryB Discovery) bool {
discoveryA.LastCheck = 0
discoveryB.LastCheck = 0
discoveryA.TTL = 0
discoveryB.TTL = 0
return !cmp.Equal(discoveryA, discoveryB)
}

45
server/tools_test.go Normal file
View File

@ -0,0 +1,45 @@
package server
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCompare(t *testing.T) {
discoveryA := Discovery{
Hostname: "abcd.com",
Labels: Labels{
Label("label1"),
},
LastCheck: 52,
}
discoveryAB := Discovery{
Hostname: "abcd.com",
Labels: Labels{
Label("label1"),
},
LastCheck: 56,
}
discoveryB := Discovery{
Hostname: "efgh.com",
Labels: Labels{
Label("label2"),
},
LastCheck: 56,
}
discoveryC := Discovery{
Hostname: "abcd.com",
Labels: Labels{
Label("label2"),
},
LastCheck: 60,
}
assert.True(t, Compare(discoveryA, discoveryB))
assert.True(t, Compare(discoveryB, discoveryC))
assert.True(t, Compare(discoveryA, discoveryC)) // Test different labels and same hostname
assert.False(t, Compare(discoveryA, discoveryA))
assert.False(t, Compare(discoveryB, discoveryB))
assert.False(t, Compare(discoveryA, discoveryAB)) // Test that last check is zeroed
}

52
server/types.go Normal file
View File

@ -0,0 +1,52 @@
package server
import "strings"
// Label keeps one piece of information about a single server
type Label string
func (l Label) String() string {
return string(l)
}
// GetPart returns specific part of the label, if part index is higher than last available index it returns empty string.
func (l Label) GetPart(idx int) string {
parts := strings.Split(l.String(), ":")
if idx < 0 {
return ""
}
if len(parts) >= idx {
return ""
}
return parts[idx]
}
// GetPart1 is exactly same as GetPart but it splits the label only once, this is good for IPv6 addresses
func (l Label) GetPart1(idx int) string {
parts := strings.SplitN(l.String(), ":", 2)
if idx < 0 {
return ""
}
if len(parts) >= idx {
return ""
}
return parts[idx]
}
// Labels stores multiple Label records
type Labels []Label
// StringSlice return slice of Label as strings
func (l *Labels) StringSlice() []string {
labelsString := []string{}
for _, label := range *l {
labelsString = append(labelsString, label.String())
}
return labelsString
}

38
templater/main.go Normal file
View File

@ -0,0 +1,38 @@
package main
import (
"flag"
"fmt"
"os"
)
// Templater is used to generate config files from predefined
// templates based on content of gathered discovery packets.
// It can for example configure Nginx's backend or database
// replication.
//
// It reads templates from /var/lib/lobby/templates (default)
// which are YAML files cotaining the template itself and command(s)
// that needs to be run when the template changes.
const defaultTemplatesPath = "/var/lib/lobby/templates"
var templatesPath *string
func init() {
templatesPath = flag.String("templates-path", defaultTemplatesPath, "path of where templates are stored")
flag.Parse()
err := os.MkdirAll(*templatesPath, 0750)
if err != nil {
fmt.Println(err)
flag.Usage()
os.Exit(1)
}
}
func main() {
fmt.Println(*templatesPath)
}