node-api/handlers_nats.go

1158 lines
29 KiB
Go
Raw Normal View History

2021-04-25 22:57:05 +00:00
package main
/*
Message handling is pretty straightforward. When request is received it's
usually asynchronic and the client is not waiting for an reply. In this case
client writes down that something is happening here and when it's done publish()
function is used to notify everybody that it's done. Client usually listens to
this channel and it finish his part as it needs.
In case there is a reply we use NATS's responde() method to return the data
as soon as possible.
*/
2021-04-25 22:57:05 +00:00
import (
"encoding/json"
"fmt"
"log"
"strings"
2021-04-25 22:57:05 +00:00
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/rosti-cz/node-api/apps"
"github.com/rosti-cz/node-api/apps/drivers"
"github.com/rosti-cz/node-api/common"
2021-04-25 22:57:05 +00:00
"github.com/rosti-cz/node-api/docker"
"github.com/rosti-cz/node-api/node"
)
// This handler only passes messages to another function for easier testing
func messageHandler(msg *nats.Msg) {
go _messageHandler(msg)
}
func _messageHandler(m *nats.Msg) error {
message := RequestMessage{}
err := json.Unmarshal(m.Data, &message)
if err != nil {
log.Println(errors.Wrap(err, "invalid JSON data in the incoming message"))
return err
}
fmt.Printf("Received a message: %v\n", message)
eventHandlerMap := map[string](func(m *nats.Msg, message *RequestMessage) error){
"list": listEventHandler,
"get": getEventHandler,
"create": createEventHandler,
"register": registerEventHandler,
"update": updateEventHandler,
"delete": deleteEventHandler,
"stop": stopEventHandler,
"start": startEventHandler,
"restart": restartEventHandler,
"update_keys": updateKeysEventHandler,
"set_password": setPasswordEventHandler,
"processes": processesEventHandler,
"enable_tech": enableTechEventHandler,
"rebuild": rebuildEventHandler,
"add_label": addLabelEventHandler,
"remove_label": removeLabelEventHandler,
"list_orphans": listOrphansEventHandler,
"node": getNoteEventHandler,
"create_snapshot": createSnapshotEventHandler,
"restore_from_snapshot": restoreFromSnapshotEventHandler,
"list_snapshots": listSnapshotsEventHandler,
"list_apps_snapshots": listAppsSnapshotsEventHandler,
"list_snapshots_by_label": listSnapshotsByLabelEventHandler,
"get_snapshot": getSnapshotEventHandler,
"delete_snapshot": deleteSnapshotEventHandler,
"delete_app_snapshots": deleteAppSnapshotsEventHandler,
2021-04-25 22:57:05 +00:00
}
if eventHandler, ok := eventHandlerMap[message.Type]; ok {
return eventHandler(m, &message)
} else {
log.Println("ERROR: event handler not defined for " + message.Type)
}
// Set password for the app user in the container
// Application processes
// Enable one of the supported technologies or services (python, node, redis, ...)
// Rebuilds existing app, it keeps the data but creates the container again
// Orphans returns directories in /srv that doesn't match any hosted application
// Return info about the node including performance index
return nil
}
// Returns list of apps
func listEventHandler(m *nats.Msg, message *RequestMessage) error {
log.Println("> List")
err := gatherStates()
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
processor.Init()
applications, err := processor.List()
2021-04-25 22:57:05 +00:00
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
reply := ReplyMessage{
Payload: applications,
}
data, err := json.Marshal(reply)
if err != nil {
return errorReplyFormater(m, "reply formatter error", err)
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR: list apps:", err.Error())
}
return err
}
// Returns one app
func getEventHandler(m *nats.Msg, message *RequestMessage) error {
err := updateState(message.AppName)
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
reply := ReplyMessage{
AppName: app.Name,
Payload: app,
}
data, err := json.Marshal(reply)
if err != nil {
return errorReplyFormater(m, "reply formatter error", err)
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR: get app:", err.Error())
}
return err
}
// Create a new app
func createEventHandler(m *nats.Msg, message *RequestMessage) error {
appTemplate := apps.App{}
body := []byte(message.Payload)
err := json.Unmarshal(body, &appTemplate)
if err != nil {
log.Println("ERROR create application problem: " + err.Error())
2021-04-27 17:17:43 +00:00
publish(message.AppName, "payload parsing problem", true)
2021-04-25 22:57:05 +00:00
return err
}
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
err = processor.New(message.AppName, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory)
2021-04-25 22:57:05 +00:00
if err != nil {
if validationError, ok := err.(apps.ValidationError); ok {
log.Println("ERROR create application problem: " + validationError.Error())
2021-04-27 17:17:43 +00:00
publish(message.AppName, "validation problem", true)
2021-04-25 22:57:05 +00:00
return err
}
log.Println("ERROR create application problem: " + err.Error())
2021-04-27 17:17:43 +00:00
publish(message.AppName, "backend problem", true)
2021-04-25 22:57:05 +00:00
return err
}
container := docker.Container{
App: &appTemplate,
}
err = container.Create()
if err != nil {
log.Println("ERROR create application problem: " + err.Error())
2021-04-27 17:17:43 +00:00
publish(message.AppName, "backend problem", true)
2021-04-25 22:57:05 +00:00
return err
}
// Restore from snapshot if it's noted in the request
if len(appTemplate.Snapshot) > 0 {
2021-10-31 18:00:06 +00:00
log.Printf("App %s is going to be created from %s snapshot\n", message.AppName, appTemplate.Snapshot)
// Setup processors
snapshotProcessor := apps.SnapshotProcessor{
AppsPath: config.AppsPath,
TmpSnapshotPath: config.SnapshotsPath,
Driver: drivers.S3Driver{
S3AccessKey: config.SnapshotsS3AccessKey,
S3SecretKey: config.SnapshotsS3SecretKey,
S3Endpoint: config.SnapshotsS3Endpoint,
S3SSL: config.SnapshotsS3SSL,
Bucket: config.SnapshotsS3Bucket,
},
}
// Restore the data
err = snapshotProcessor.RestoreSnapshot(appTemplate.Snapshot, message.AppName)
if err != nil {
log.Println("ERROR restore snapshot error: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
}
2021-04-25 22:57:05 +00:00
err = container.Start()
if err != nil {
log.Println("ERROR create application problem: " + err.Error())
2021-04-27 17:17:43 +00:00
publish(message.AppName, "backend problem", true)
2021-04-25 22:57:05 +00:00
return err
}
2021-04-27 17:17:43 +00:00
publish(message.AppName, "created", false)
2021-04-25 22:57:05 +00:00
return nil
}
2021-04-30 21:01:34 +00:00
// Registers new app without creating an container
func registerEventHandler(m *nats.Msg, message *RequestMessage) error {
appTemplate := apps.App{}
body := []byte(message.Payload)
err := json.Unmarshal(body, &appTemplate)
if err != nil {
log.Println("ERROR create application problem: " + err.Error())
publish(message.AppName, "payload parsing problem", true)
return err
}
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
err = processor.New(message.AppName, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory)
2021-04-30 21:01:34 +00:00
if err != nil {
if validationError, ok := err.(apps.ValidationError); ok {
log.Println("ERROR create application problem: " + validationError.Error())
publish(message.AppName, "validation problem", true)
return err
}
log.Println("ERROR create application problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
publish(message.AppName, "registered", false)
return nil
}
2021-04-25 22:57:05 +00:00
// Update existing app
func updateEventHandler(m *nats.Msg, message *RequestMessage) error {
appTemplate := apps.App{}
body := []byte(message.Payload)
err := json.Unmarshal(body, &appTemplate)
if err != nil {
log.Println("ERROR update application problem: " + err.Error())
2021-04-27 17:17:43 +00:00
publish(message.AppName, "payload parsing problem", true)
2021-04-25 22:57:05 +00:00
return err
}
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Update(message.AppName, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory)
2021-04-25 22:57:05 +00:00
if err != nil {
if validationError, ok := err.(apps.ValidationError); ok {
log.Println("ERROR update application problem: " + validationError.Error())
2021-04-27 17:17:43 +00:00
publish(message.AppName, "backend problem", true)
2021-04-25 22:57:05 +00:00
return err
}
log.Println("ERROR update application problem: " + err.Error())
2021-04-27 17:17:43 +00:00
publish(message.AppName, "backend problem", true)
2021-04-25 22:57:05 +00:00
return err
}
container := docker.Container{
App: app,
}
err = container.Destroy()
if err != nil && err.Error() == "no container found" {
// We don't care if the container didn't exist anyway
err = nil
}
if err != nil {
log.Println("ERROR update application problem: " + err.Error())
2021-04-27 17:17:43 +00:00
publish(app.Name, "backend problem", true)
2021-04-25 22:57:05 +00:00
return err
}
err = container.Create()
if err != nil {
log.Println("ERROR update application problem: " + err.Error())
2021-04-27 17:17:43 +00:00
publish(app.Name, "backend problem", true)
2021-04-25 22:57:05 +00:00
return err
}
err = container.Start()
if err != nil {
log.Println("ERROR update application problem: " + err.Error())
2021-04-27 17:17:43 +00:00
publish(app.Name, "backend problem", true)
2021-04-25 22:57:05 +00:00
return err
}
2021-04-27 17:17:43 +00:00
publish(app.Name, "updated", false)
2021-04-25 22:57:05 +00:00
return nil
}
// Delete one app
func deleteEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR: delete app:", err.Error())
}
container := docker.Container{
App: app,
}
status, err := container.Status()
if err != nil {
log.Println("ERROR delete application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
if status != "no-container" {
// We stop the container first
err = container.Stop()
if err != nil {
log.Println("ERROR delete application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
// Then delete it
err = container.Delete()
if err != nil {
log.Println("ERROR delete application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
}
err = processor.Delete(app.Name)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR delete application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "deleted", false)
return nil
}
// Stop existing app
func stopEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR stop application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
status, err := container.Status()
if err != nil {
log.Println("ERROR stop application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
// Stop the container only when it exists
if status != "no-container" {
err = container.Stop()
if err != nil {
log.Println("ERROR stop application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
}
publish(app.Name, "stopped", false)
return nil
}
// Start existing app
func startEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR start application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
status, err := container.Status()
if err != nil {
return err
}
if status == "no-container" {
err = container.Create()
if err != nil {
log.Println("ERROR start application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
}
err = container.Start()
if err != nil {
log.Println("ERROR start application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "started", false)
return nil
}
// Restart existing app
func restartEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR restart application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.Restart()
if err != nil {
log.Println("ERROR restart application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "restarted", false)
return nil
}
// Copies body of the request into /srv/.ssh/authorized_keys
func updateKeysEventHandler(m *nats.Msg, message *RequestMessage) error {
err := waitForApp(message.AppName)
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
body := message.Payload
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR keys update problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.SetFileContent(sshPubKeysLocation, body+"\n", "0600")
if err != nil {
log.Println("ERROR keys update problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "keys updated", false)
return nil
}
// Set password for the app user in the container
func setPasswordEventHandler(m *nats.Msg, message *RequestMessage) error {
err := waitForApp(message.AppName)
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
password := message.Payload
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR password update problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.SetPassword(password)
if err != nil {
log.Println("ERROR password update problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "password updated", false)
return nil
}
// Application processes
func processesEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR processes list problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
processes, err := container.GetProcessList()
if err != nil {
log.Println("ERROR processes list problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
reply := ReplyMessage{
Payload: processes,
}
data, err := json.Marshal(reply)
if err != nil {
log.Println("ERROR processes list problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR processes list problem: " + err.Error())
return err
}
return nil
}
// Enable one of the supported technologies or services (python, node, redis, ...)
func enableTechEventHandler(m *nats.Msg, message *RequestMessage) error {
service := message.Payload
err := waitForApp(message.AppName)
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.SetTechnology(service)
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "tech updated", false)
return nil
}
// Rebuilds existing app, it keeps the data but creates the container again
func rebuildEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR rebuild app problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.Destroy()
if err != nil && err.Error() == "no container found" {
// We don't care if the container didn't exist anyway
err = nil
}
if err != nil {
log.Println("ERROR rebuild app problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
err = container.Create()
if err != nil {
log.Println("ERROR rebuild app problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
err = container.Start()
if err != nil {
log.Println("ERROR rebuild app problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "app rebuild", false)
return nil
}
// Adds new label
func addLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
label := message.Payload
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
err := processor.AddLabel(message.AppName, label)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR add label problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
publish(message.AppName, "label added", false)
return nil
}
// Removes existing label
func removeLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
label := message.Payload
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
err := processor.RemoveLabel(message.AppName, label)
2021-04-25 22:57:05 +00:00
if err != nil {
log.Println("ERROR remove label problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
publish(message.AppName, "label removed", false)
return nil
}
// Orphans returns directories in /srv that doesn't match any hosted application
func listOrphansEventHandler(m *nats.Msg, message *RequestMessage) error {
reply := ReplyMessage{
Error: true,
Payload: "not implemented yet",
}
data, err := json.Marshal(reply)
if err != nil {
log.Println("ERROR orphans list problem: " + err.Error())
return err
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR orphans list problem: " + err.Error())
return err
}
return nil
}
/*
getNoteEventHandler returns info about the node including performance index
*/
2021-04-25 22:57:05 +00:00
func getNoteEventHandler(m *nats.Msg, message *RequestMessage) error {
node, err := node.GetNodeInfo()
if err != nil {
log.Println("ERROR performance index problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
reply := ReplyMessage{
Payload: node,
}
data, err := json.Marshal(reply)
if err != nil {
log.Println("ERROR performance index problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR performance index problem: " + err.Error())
return err
}
return nil
}
/*
createSnapshotEventHandler create snapshot of given application
Uses appName from the message struct
Payload: no payload needed
Response: notification when it's done or error
*/
func createSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.SnapshotProcessor{
AppsPath: config.AppsPath,
TmpSnapshotPath: config.SnapshotsPath,
Driver: drivers.S3Driver{
S3AccessKey: config.SnapshotsS3AccessKey,
S3SecretKey: config.SnapshotsS3SecretKey,
S3Endpoint: config.SnapshotsS3Endpoint,
S3SSL: config.SnapshotsS3SSL,
Bucket: config.SnapshotsS3Bucket,
},
}
_, err := processor.CreateSnapshot(message.AppName, strings.Split(message.Payload, ","))
if err != nil {
log.Println("ERROR create snapshot error: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
publish(message.AppName, "snapshot created", false)
return nil
}
/*
restoreFromSnapshotEventHandler restores app from a snapshot. The app has to exist
before it can be restored. Any snapshot can be used to restore any application.
Use labels to store information about what app should be created.
Uses appName from the message struct as the destination app where the data should
be restored
Payload: string with the snapshot name
Response: notification when it's done or error
*/
func restoreFromSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
// Setup processors
snapshotProcessor := apps.SnapshotProcessor{
AppsPath: config.AppsPath,
TmpSnapshotPath: config.SnapshotsPath,
Driver: drivers.S3Driver{
S3AccessKey: config.SnapshotsS3AccessKey,
S3SecretKey: config.SnapshotsS3SecretKey,
S3Endpoint: config.SnapshotsS3Endpoint,
S3SSL: config.SnapshotsS3SSL,
Bucket: config.SnapshotsS3Bucket,
},
}
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
app, err := processor.Get(message.AppName)
if err != nil {
log.Println("ERROR restore snapshot problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
// Stop the container
status, err := container.Status()
if err != nil {
log.Println("ERROR restore snapshot problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
// Stop the container only when it exists
if status != "no-container" {
err = container.Stop()
if err != nil {
log.Println("ERROR restore snapshot problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
}
// Restore the data
err = snapshotProcessor.RestoreSnapshot(message.Payload, message.AppName)
if err != nil {
log.Println("ERROR restore snapshot error: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
// Start the container
status, err = container.Status()
if err != nil {
return err
}
if status == "no-container" {
err = container.Create()
if err != nil {
log.Println("ERROR start application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
}
err = container.Start()
if err != nil {
log.Println("ERROR start application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
// notify clients that it's all done
publish(message.AppName, "snapshot restored", false)
return nil
}
/*
listSnapshotsEventHandler returns list of snapshots related to a single application
Uses appName from the message
Payload: no payload needed
Response: replies with list of snapshots or an error message
*/
func listSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.SnapshotProcessor{
AppsPath: config.AppsPath,
TmpSnapshotPath: config.SnapshotsPath,
Driver: drivers.S3Driver{
S3AccessKey: config.SnapshotsS3AccessKey,
S3SecretKey: config.SnapshotsS3SecretKey,
S3Endpoint: config.SnapshotsS3Endpoint,
S3SSL: config.SnapshotsS3SSL,
Bucket: config.SnapshotsS3Bucket,
},
}
snapshots, err := processor.ListAppSnapshots(message.AppName)
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
var output SnapshotsMetadata
for _, snapshot := range snapshots {
output = append(output, SnapshotMetadata{
Key: snapshot.KeyName(),
Metadata: snapshot,
})
}
reply := ReplyMessage{
Payload: output,
}
data, err := json.Marshal(reply)
if err != nil {
return errorReplyFormater(m, "reply formatter error", err)
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR: list of snapshots:", err.Error())
}
return nil
}
/*
listAppsSnapshotsEventHandler returns list of snapshots related to list of application names.
Payload: list of appNames separated by comma (no spaces)
Response: replies with list of snapshots or an error message
*/
func listAppsSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.SnapshotProcessor{
AppsPath: config.AppsPath,
TmpSnapshotPath: config.SnapshotsPath,
Driver: drivers.S3Driver{
S3AccessKey: config.SnapshotsS3AccessKey,
S3SecretKey: config.SnapshotsS3SecretKey,
S3Endpoint: config.SnapshotsS3Endpoint,
S3SSL: config.SnapshotsS3SSL,
Bucket: config.SnapshotsS3Bucket,
},
}
snapshots, err := processor.ListAppsSnapshots(strings.Split(message.Payload, ","))
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
var output SnapshotsMetadata
for _, snapshot := range snapshots {
output = append(output, SnapshotMetadata{
Key: snapshot.KeyName(),
Metadata: snapshot,
})
}
reply := ReplyMessage{
Payload: output,
}
data, err := json.Marshal(reply)
if err != nil {
return errorReplyFormater(m, "reply formatter error", err)
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR: list of snapshots:", err.Error())
}
return nil
}
/*
listSnapshotsByLabelEventHandler returns list of snapshots with given label
Payload: snapshot label
Response: replies with list of snapshots or an error message
*/
func listSnapshotsByLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.SnapshotProcessor{
AppsPath: config.AppsPath,
TmpSnapshotPath: config.SnapshotsPath,
Driver: drivers.S3Driver{
S3AccessKey: config.SnapshotsS3AccessKey,
S3SecretKey: config.SnapshotsS3SecretKey,
S3Endpoint: config.SnapshotsS3Endpoint,
S3SSL: config.SnapshotsS3SSL,
Bucket: config.SnapshotsS3Bucket,
},
}
snapshots, err := processor.ListAppsSnapshotsByLabel(message.Payload)
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
var output SnapshotsMetadata
for _, snapshot := range snapshots {
output = append(output, SnapshotMetadata{
Key: snapshot.KeyName(),
Metadata: snapshot,
})
}
reply := ReplyMessage{
Payload: output,
}
data, err := json.Marshal(reply)
if err != nil {
return errorReplyFormater(m, "reply formatter error", err)
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR: list of snapshots:", err.Error())
}
return nil
}
/*
getSnapshotEventHandler returns a single snapshot for given key from the request payload.
Payload: snapshot's key
Response: snapshot metadata
*/
func getSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.SnapshotProcessor{
AppsPath: config.AppsPath,
TmpSnapshotPath: config.SnapshotsPath,
Driver: drivers.S3Driver{
S3AccessKey: config.SnapshotsS3AccessKey,
S3SecretKey: config.SnapshotsS3SecretKey,
S3Endpoint: config.SnapshotsS3Endpoint,
S3SSL: config.SnapshotsS3SSL,
Bucket: config.SnapshotsS3Bucket,
},
}
snapshot, err := processor.GetSnapshot(message.Payload)
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
output := SnapshotMetadata{
Key: snapshot.KeyName(),
Metadata: snapshot,
}
data, err := json.Marshal(output)
if err != nil {
return errorReplyFormater(m, "reply formatter error", err)
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR: get snapshot:", err.Error())
}
return nil
}
/*
deleteSnapshotEventHandler delete a single snapshot. This is not bound to application name.
Payload: string with a snapshot name
Response: notification when it's done or error
*/
func deleteSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.SnapshotProcessor{
AppsPath: config.AppsPath,
TmpSnapshotPath: config.SnapshotsPath,
Driver: drivers.S3Driver{
S3AccessKey: config.SnapshotsS3AccessKey,
S3SecretKey: config.SnapshotsS3SecretKey,
S3Endpoint: config.SnapshotsS3Endpoint,
S3SSL: config.SnapshotsS3SSL,
Bucket: config.SnapshotsS3Bucket,
},
}
err := processor.DeleteSnapshot(message.Payload)
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
publish(message.AppName, "snapshot deleted", false)
return nil
}
/*
deleteAppSnapshotsEventHandler deletes all snapshots related to a single application
Uses appName from the message struct
Payload: no payload needed
Response: notification when it's done or error
*/
func deleteAppSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := apps.SnapshotProcessor{
AppsPath: config.AppsPath,
TmpSnapshotPath: config.SnapshotsPath,
Driver: drivers.S3Driver{
S3AccessKey: config.SnapshotsS3AccessKey,
S3SecretKey: config.SnapshotsS3SecretKey,
S3Endpoint: config.SnapshotsS3Endpoint,
S3SSL: config.SnapshotsS3SSL,
Bucket: config.SnapshotsS3Bucket,
},
}
err := processor.DeleteAppSnapshots(message.AppName)
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
publish(message.AppName, "snapshots deleted", false)
return nil
}