Adam Štrauch
e58d6462a9
All checks were successful
continuous-integration/drone/push Build is passing
Taking logic from handler into glue module Add tests for apps Updated docker library
835 lines
22 KiB
Go
835 lines
22 KiB
Go
package main
|
|
|
|
/*
|
|
Message handling is pretty straightforward. When request is received it's
|
|
usually asynchronic and the client is not waiting for an reply. In this case
|
|
client writes down that something is happening here and when it's done publish()
|
|
function is used to notify everybody that it's done. Client usually listens to
|
|
this channel and it finish his part as it needs.
|
|
|
|
In case there is a reply we use NATS's responde() method to return the data
|
|
as soon as possible.
|
|
*/
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/pkg/errors"
|
|
"github.com/rosti-cz/node-api/apps"
|
|
"github.com/rosti-cz/node-api/common"
|
|
"github.com/rosti-cz/node-api/glue"
|
|
)
|
|
|
|
// This handler only passes messages to another function for easier testing
|
|
func messageHandler(msg *nats.Msg) {
|
|
go _messageHandler(msg)
|
|
}
|
|
|
|
func _messageHandler(m *nats.Msg) error {
|
|
message := RequestMessage{}
|
|
err := json.Unmarshal(m.Data, &message)
|
|
if err != nil {
|
|
log.Println(errors.Wrap(err, "invalid JSON data in the incoming message"))
|
|
return err
|
|
}
|
|
fmt.Printf("Received a message: %v\n", message)
|
|
|
|
eventHandlerMap := map[string](func(m *nats.Msg, message *RequestMessage) error){
|
|
"list": listEventHandler,
|
|
"get": getEventHandler,
|
|
"create": createEventHandler,
|
|
"register": registerEventHandler,
|
|
"update": updateEventHandler,
|
|
"delete": deleteEventHandler,
|
|
"stop": stopEventHandler,
|
|
"start": startEventHandler,
|
|
"restart": restartEventHandler,
|
|
"update_keys": updateKeysEventHandler,
|
|
"set_password": setPasswordEventHandler,
|
|
"processes": processesEventHandler,
|
|
"enable_tech": enableTechEventHandler,
|
|
"rebuild": rebuildEventHandler,
|
|
"add_label": addLabelEventHandler,
|
|
"remove_label": removeLabelEventHandler,
|
|
"list_orphans": listOrphansEventHandler,
|
|
"node": getNodeEventHandler,
|
|
"create_snapshot": createSnapshotEventHandler,
|
|
"restore_from_snapshot": restoreFromSnapshotEventHandler,
|
|
"list_snapshots": listSnapshotsEventHandler,
|
|
"list_apps_snapshots": listAppsSnapshotsEventHandler,
|
|
"list_snapshots_by_label": listSnapshotsByLabelEventHandler,
|
|
"get_snapshot": getSnapshotEventHandler,
|
|
"get_snapshot_download_link": getSnapshotDownloadLinkEventHandler,
|
|
"delete_snapshot": deleteSnapshotEventHandler,
|
|
"delete_app_snapshots": deleteAppSnapshotsEventHandler,
|
|
}
|
|
|
|
if eventHandler, ok := eventHandlerMap[message.Type]; ok {
|
|
return eventHandler(m, &message)
|
|
} else {
|
|
log.Println("ERROR: event handler not defined for " + message.Type)
|
|
}
|
|
|
|
// Set password for the app user in the container
|
|
|
|
// Application processes
|
|
|
|
// Enable one of the supported technologies or services (python, node, redis, ...)
|
|
// Rebuilds existing app, it keeps the data but creates the container again
|
|
|
|
// Orphans returns directories in /srv that doesn't match any hosted application
|
|
// Return info about the node including performance index
|
|
|
|
return nil
|
|
}
|
|
|
|
// Returns list of apps
|
|
func listEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
log.Println("> List")
|
|
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
}
|
|
|
|
applications, err := processor.List()
|
|
if err != nil {
|
|
return errorReplyFormater(m, "backend error", err)
|
|
}
|
|
|
|
reply := ReplyMessage{
|
|
Payload: applications,
|
|
}
|
|
|
|
data, err := json.Marshal(reply)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "reply formatter error", err)
|
|
}
|
|
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR: list apps:", err.Error())
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Returns one app
|
|
func getEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
DB: common.GetDBConnection(),
|
|
}
|
|
|
|
app, err := processor.Get()
|
|
if err != nil {
|
|
log.Printf("backend error: %v\n", err)
|
|
return errorReplyFormater(m, "backend error", err)
|
|
}
|
|
|
|
// Assembling reply message
|
|
reply := ReplyMessage{
|
|
AppName: app.Name,
|
|
Payload: app,
|
|
}
|
|
|
|
data, err := json.Marshal(reply)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "reply formatter error", err)
|
|
}
|
|
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR: get app:", err.Error())
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Create a new app
|
|
func createEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
appTemplate := apps.App{}
|
|
body := []byte(message.Payload)
|
|
err := json.Unmarshal(body, &appTemplate)
|
|
if err != nil {
|
|
log.Println("ERROR create application problem (unmarshal): " + err.Error())
|
|
publish(message.AppName, "payload parsing problem", true)
|
|
return err
|
|
}
|
|
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err = processor.Create(appTemplate)
|
|
if err != nil && strings.Contains(err.Error(), "validation error") {
|
|
publish(message.AppName, "validation error", true)
|
|
return err
|
|
} else if err != nil {
|
|
log.Println("ERROR create application problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "created", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Registers new app without creating an container
|
|
func registerEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
|
|
appTemplate := apps.App{}
|
|
body := []byte(message.Payload)
|
|
err := json.Unmarshal(body, &appTemplate)
|
|
if err != nil {
|
|
log.Println("ERROR create application problem (Unmarshal): " + err.Error())
|
|
publish(message.AppName, "payload parsing problem", true)
|
|
return err
|
|
}
|
|
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err = processor.Create(appTemplate)
|
|
if err != nil && strings.Contains(err.Error(), "validation error") {
|
|
publish(message.AppName, "validation error", true)
|
|
return err
|
|
} else if err != nil {
|
|
log.Println("ERROR create application problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "registered", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Update existing app
|
|
func updateEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
appTemplate := apps.App{}
|
|
body := []byte(message.Payload)
|
|
err := json.Unmarshal(body, &appTemplate)
|
|
if err != nil {
|
|
log.Println("ERROR update application problem (unmarshal): " + err.Error())
|
|
publish(message.AppName, "payload parsing problem", true)
|
|
return err
|
|
}
|
|
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err = processor.Update(appTemplate)
|
|
if err != nil && strings.Contains(err.Error(), "validation error") {
|
|
publish(message.AppName, "validation error", true)
|
|
return err
|
|
} else if err != nil {
|
|
log.Println("ERROR create application problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "updated", false)
|
|
return nil
|
|
}
|
|
|
|
// Delete one app
|
|
func deleteEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err := processor.Delete()
|
|
if err != nil {
|
|
log.Println("ERROR delete application problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "deleted", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop existing app
|
|
func stopEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err := processor.Stop()
|
|
if err != nil {
|
|
log.Println("ERROR stop application problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "stopped", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start existing app
|
|
func startEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err := processor.Start()
|
|
if err != nil {
|
|
log.Println("ERROR start application problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "started", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Restart existing app
|
|
func restartEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err := processor.Restart()
|
|
if err != nil {
|
|
log.Println("ERROR restart application problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "restarted", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Copies body of the request into /srv/.ssh/authorized_keys
|
|
func updateKeysEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
body := message.Payload
|
|
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err := processor.UpdateKeys(body)
|
|
if err != nil {
|
|
log.Println("ERROR update keys problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "keys updated", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Set password for the app user in the container
|
|
func setPasswordEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
password := message.Payload
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err := processor.SetPassword(password)
|
|
|
|
if err != nil {
|
|
log.Println("ERROR password update problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "password updated", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Application processes
|
|
func processesEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
|
|
processes, err := processor.Processes()
|
|
if err != nil {
|
|
log.Println("ERROR processes list problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
reply := ReplyMessage{
|
|
Payload: processes,
|
|
}
|
|
|
|
data, err := json.Marshal(reply)
|
|
if err != nil {
|
|
log.Println("ERROR processes list problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR processes list problem: " + err.Error())
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Enable one of the supported technologies or services (python, node, redis, ...)
|
|
// If payload contains only name of the tech the default version for given image is selected.
|
|
// Otherwise it can be passed in format tech:version.
|
|
func enableTechEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
matched, err := regexp.Match(`[a-z0-9A-Z]*:?[0-9\.\-]*`, []byte(message.Payload))
|
|
if err != nil {
|
|
log.Println("ERROR enable tech problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
if !matched {
|
|
return errors.New("payload malformation, it has to be in format tech:version")
|
|
}
|
|
|
|
service := message.Payload
|
|
version := ""
|
|
|
|
if strings.Contains(service, ":") {
|
|
parts := strings.SplitN(message.Payload, ":", 2)
|
|
if len(parts) != 2 {
|
|
return errors.New("service and version malformat")
|
|
}
|
|
|
|
service = parts[0]
|
|
version = parts[1]
|
|
}
|
|
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
processor.EnableTech(service, version)
|
|
if err != nil {
|
|
log.Println("ERROR enable tech problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "tech updated", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Rebuilds existing app, it keeps the data but creates the container again
|
|
func rebuildEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
|
|
err := processor.Rebuild()
|
|
if err != nil {
|
|
log.Println("ERROR rebuild app problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "app rebuild", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Adds new label
|
|
func addLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
label := message.Payload
|
|
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err := processor.AddLabel(label)
|
|
if err != nil {
|
|
log.Println("ERROR add label problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "label added", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Removes existing label
|
|
func removeLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
label := message.Payload
|
|
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
err := processor.RemoveLabel(label)
|
|
if err != nil {
|
|
log.Println("ERROR remove label problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "label removed", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Orphans returns directories in /srv that doesn't match any hosted application
|
|
func listOrphansEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
reply := ReplyMessage{
|
|
Error: true,
|
|
Payload: "not implemented yet",
|
|
}
|
|
|
|
data, err := json.Marshal(reply)
|
|
if err != nil {
|
|
log.Println("ERROR orphans list problem: " + err.Error())
|
|
return err
|
|
}
|
|
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR orphans list problem: " + err.Error())
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
getNodeEventHandler returns info about the node including performance index
|
|
|
|
|
|
*/
|
|
func getNodeEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
|
|
node, err := processor.GetNode()
|
|
if err != nil {
|
|
log.Println("ERROR performance index problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
reply := ReplyMessage{
|
|
Payload: node,
|
|
}
|
|
|
|
data, err := json.Marshal(reply)
|
|
if err != nil {
|
|
log.Println("ERROR performance index problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR performance index problem: " + err.Error())
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
createSnapshotEventHandler create snapshot of given application
|
|
|
|
Uses appName from the message struct
|
|
|
|
Payload: no payload needed
|
|
Response: notification when it's done or error
|
|
*/
|
|
func createSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
|
|
err := processor.CreateSnapshot(strings.Split(message.Payload, ","))
|
|
if err != nil {
|
|
log.Println("ERROR create snapshot error: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
publish(message.AppName, "snapshot created", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
restoreFromSnapshotEventHandler restores app from a snapshot. The app has to exist
|
|
before it can be restored. Any snapshot can be used to restore any application.
|
|
Use labels to store information about what app should be created.
|
|
|
|
Uses appName from the message struct as the destination app where the data should
|
|
be restored
|
|
|
|
Payload: string with the snapshot name
|
|
Response: notification when it's done or error
|
|
*/
|
|
func restoreFromSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
|
|
err := processor.RestoreFromSnapshot(message.Payload)
|
|
if err != nil {
|
|
log.Println("ERROR start application problem: " + err.Error())
|
|
publish(message.AppName, "backend problem", true)
|
|
return err
|
|
}
|
|
|
|
// notify clients that it's all done
|
|
publish(message.AppName, "snapshot restored", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
listSnapshotsEventHandler returns list of snapshots related to a single application
|
|
|
|
Uses appName from the message
|
|
|
|
Payload: no payload needed
|
|
Response: replies with list of snapshots or an error message
|
|
*/
|
|
func listSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
snapshots, err := processor.ListSnapshots()
|
|
if err != nil {
|
|
return errorReplyFormater(m, "backend error", err)
|
|
}
|
|
|
|
reply := ReplyMessage{
|
|
Payload: snapshots,
|
|
}
|
|
|
|
data, err := json.Marshal(reply)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "reply formatter error", err)
|
|
}
|
|
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR: list of snapshots:", err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
listAppsSnapshotsEventHandler returns list of snapshots related to list of application names.
|
|
|
|
Payload: list of appNames separated by comma (no spaces)
|
|
Response: replies with list of snapshots or an error message
|
|
*/
|
|
func listAppsSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
snapshots, err := processor.ListAppsSnapshots(strings.Split(message.Payload, ","))
|
|
if err != nil {
|
|
return errorReplyFormater(m, "backend error", err)
|
|
}
|
|
|
|
reply := ReplyMessage{
|
|
Payload: snapshots,
|
|
}
|
|
|
|
data, err := json.Marshal(reply)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "reply formatter error", err)
|
|
}
|
|
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR: list of snapshots:", err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
listSnapshotsByLabelEventHandler returns list of snapshots with given label
|
|
|
|
Payload: snapshot label
|
|
Response: replies with list of snapshots or an error message
|
|
*/
|
|
func listSnapshotsByLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
snapshots, err := processor.ListSnapshotsByLabel(message.Payload)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "backend error", err)
|
|
}
|
|
|
|
reply := ReplyMessage{
|
|
Payload: snapshots,
|
|
}
|
|
|
|
data, err := json.Marshal(reply)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "reply formatter error", err)
|
|
}
|
|
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR: list of snapshots:", err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
getSnapshotEventHandler returns a single snapshot for given key from the request payload.
|
|
|
|
Payload: snapshot's key
|
|
Response: snapshot metadata
|
|
*/
|
|
func getSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
snapshot, err := processor.GetSnapshot(message.Payload)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "backend error", err)
|
|
}
|
|
|
|
data, err := json.Marshal(snapshot)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "reply formatter error", err)
|
|
}
|
|
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR: get snapshot:", err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
getSnapshotDownloadLinkEventHandler return URL that can be used to download snapshot
|
|
|
|
Payload: string with a snapshot name (key)
|
|
Response: string with the URL
|
|
*/
|
|
func getSnapshotDownloadLinkEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
link, err := processor.GetSnapshotDownloadLink(message.Payload)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "backend error", err)
|
|
}
|
|
|
|
reply := ReplyMessage{
|
|
Payload: link,
|
|
}
|
|
|
|
data, err := json.Marshal(reply)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "reply formatter error", err)
|
|
}
|
|
|
|
err = m.Respond(data)
|
|
if err != nil {
|
|
log.Println("ERROR: get snapshot:", err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
deleteSnapshotEventHandler delete a single snapshot. This is not bound to application name.
|
|
|
|
Payload: string with a snapshot name
|
|
Response: notification when it's done or error
|
|
*/
|
|
func deleteSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
|
|
err := processor.DeleteSnapshot(message.Payload)
|
|
if err != nil {
|
|
return errorReplyFormater(m, "backend error", err)
|
|
}
|
|
|
|
publish(message.AppName, "snapshot deleted", false)
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
deleteAppSnapshotsEventHandler deletes all snapshots related to a single application
|
|
|
|
Uses appName from the message struct
|
|
|
|
Payload: no payload needed
|
|
Response: notification when it's done or error
|
|
*/
|
|
func deleteAppSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|
processor := glue.Processor{
|
|
AppName: message.AppName,
|
|
DB: common.GetDBConnection(),
|
|
SnapshotProcessor: &snapshotProcessor,
|
|
}
|
|
|
|
err := processor.DeleteAppSnapshots()
|
|
if err != nil {
|
|
return errorReplyFormater(m, "backend error", err)
|
|
}
|
|
|
|
publish(message.AppName, "snapshots deleted", false)
|
|
|
|
return nil
|
|
}
|