lobby/redis_driver/main.go

143 lines
3.6 KiB
Go
Raw Normal View History

package redis_driver
import (
"encoding/json"
"github.com/by-cx/lobby/common"
"github.com/by-cx/lobby/server"
"github.com/go-redis/redis"
"fmt"
)
// Redis drivers is used to send discovery packet to other nodes into the group via Redis's pubsub protocol.
type Driver struct {
Host string
Port uint
Password string
Channel string
DB uint
LogChannel chan string
subscribeListener common.Listener
unsubscribeListener common.Listener
redis *redis.Client
}
// handler is called asynchronously so and because it cannot log directly to stderr there
// is channel called LogChannel that can be used to log error messages from this
func (d *Driver) handler(payload string) {
message := discoveryEnvelope{}
err := json.Unmarshal([]byte(payload), &message)
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("decoding message error: %v", err).Error()
}
err = message.Discovery.Validate()
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("validation error: %v", err).Error()
}
if message.Message == "hi" {
d.subscribeListener(message.Discovery)
} else if message.Message == "goodbye" {
d.unsubscribeListener(message.Discovery)
} else {
if d.LogChannel != nil {
d.LogChannel <- "incompatible message"
}
}
}
// Init connect to Redis, subscribes to the channel and waits for the messages.
// It runs goroutine in background that listens for new messages.
func (d *Driver) Init() error {
if d.LogChannel == nil {
return fmt.Errorf("please initiate LogChannel variable")
}
if len(d.Host) == 0 {
return fmt.Errorf("parameter Host cannot be empty")
}
if len(d.Channel) == 0 {
return fmt.Errorf("pattern cannot be empty")
}
if d.Port <= 0 || d.Port > 65536 {
return fmt.Errorf("port has to be in range of 0-65536")
}
d.redis = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", d.Host, d.Port),
Password: d.Password,
DB: int(d.DB),
})
pubsub := d.redis.Subscribe(d.Channel)
go func(pubsub *redis.PubSub, d *Driver) {
channel := pubsub.Channel()
for message := range channel {
d.handler(message.Payload)
}
}(pubsub, d)
return nil
}
// Close is called when all is done.
func (d *Driver) Close() error {
return d.redis.Close()
}
// RegisterSubscribeFunction sets the function that will process the incoming messages
func (d *Driver) RegisterSubscribeFunction(listener common.Listener) {
d.subscribeListener = listener
}
// RegisterUnsubscribeFunction sets the function that will process the goodbye incoming messages
func (d *Driver) RegisterUnsubscribeFunction(listener common.Listener) {
d.unsubscribeListener = listener
}
// SendDiscoveryPacket send discovery packet to the group.
func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "hi",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.redis.Publish(d.Channel, data).Err()
if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}
// SendGoodbyePacket deregister node from the group. It tells everybody that it's going to die.
func (d *Driver) SendGoodbyePacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "goodbye",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.redis.Publish(d.Channel, data).Err()
if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}