Snapshot metadata storage reworked
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
Because there is a limit of filesize 255 bytes in Linux which we hit during testing.
This commit is contained in:
parent
95ae31fdfb
commit
d4dddc0df3
@ -1,7 +1,6 @@
|
|||||||
package apps
|
package apps
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/base64"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -13,14 +12,18 @@ import (
|
|||||||
|
|
||||||
"github.com/mholt/archiver/v3"
|
"github.com/mholt/archiver/v3"
|
||||||
"github.com/rosti-cz/node-api/apps/drivers"
|
"github.com/rosti-cz/node-api/apps/drivers"
|
||||||
|
uuid "github.com/satori/go.uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
const bucketName = "snapshots"
|
const bucketName = "snapshots"
|
||||||
const dateFormat = "20060102_150405"
|
const dateFormat = "20060102_150405"
|
||||||
const keySplitCharacter = ":"
|
const keySplitCharacter = ":"
|
||||||
|
const metadataPrefix = "_metadata"
|
||||||
|
const metadataKeyTemplate = metadataPrefix + "/%s"
|
||||||
|
|
||||||
// Snapshot contains metadata about a single snapshot
|
// Snapshot contains metadata about a single snapshot
|
||||||
type Snapshot struct {
|
type Snapshot struct {
|
||||||
|
UUID string `json:"uuid"`
|
||||||
AppName string `json:"app_name"`
|
AppName string `json:"app_name"`
|
||||||
TimeStamp int64 `json:"ts"`
|
TimeStamp int64 `json:"ts"`
|
||||||
Labels []string `json:"labels"`
|
Labels []string `json:"labels"`
|
||||||
@ -34,31 +37,15 @@ func (s *Snapshot) ToString() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// KeyName returns keyname used to store the snapshot in the storage
|
// KeyName returns keyname used to store the snapshot in the storage
|
||||||
func (s *Snapshot) KeyName() string {
|
func (s *Snapshot) KeyName(indexLabel string) string {
|
||||||
metadata := base64.StdEncoding.EncodeToString([]byte(s.ToString()))
|
labelValue := "-"
|
||||||
// TODO: this can't be bigger than 1kB
|
for _, label := range s.Labels {
|
||||||
return fmt.Sprintf("%s%s%d%s%s", s.AppName, keySplitCharacter, s.TimeStamp, keySplitCharacter, metadata)
|
if strings.HasPrefix(label, indexLabel+":") {
|
||||||
|
labelValue = label[len(indexLabel)+1:]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DecodeKeyName returns snapshot structure containing all metadata about a single snapshot
|
return fmt.Sprintf("%s%s%s%s%s%s%d", s.AppName, keySplitCharacter, s.UUID, keySplitCharacter, labelValue, keySplitCharacter, s.TimeStamp)
|
||||||
func DecodeKeyName(key string) (Snapshot, error) {
|
|
||||||
parts := strings.Split(key, keySplitCharacter)
|
|
||||||
if len(parts) != 3 {
|
|
||||||
return Snapshot{}, errors.New("key name in incorrect format")
|
|
||||||
}
|
|
||||||
|
|
||||||
_metadata, err := base64.StdEncoding.DecodeString(parts[2])
|
|
||||||
if len(parts) != 3 {
|
|
||||||
return Snapshot{}, fmt.Errorf("base64 decoding error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshot := Snapshot{}
|
|
||||||
err = json.Unmarshal(_metadata, &snapshot)
|
|
||||||
if err != nil {
|
|
||||||
return snapshot, fmt.Errorf("metadata unmarshal error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return snapshot, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Snapshots []Snapshot
|
type Snapshots []Snapshot
|
||||||
@ -66,20 +53,75 @@ type Snapshots []Snapshot
|
|||||||
// SnapshotProcessor encapsulates everything realted to snapshots. Snapshot is an archive of app's
|
// SnapshotProcessor encapsulates everything realted to snapshots. Snapshot is an archive of app's
|
||||||
// directory content. It's stored in S3.
|
// directory content. It's stored in S3.
|
||||||
// The biggest problem in the implementation is speed of looking for snapshots by labels.
|
// The biggest problem in the implementation is speed of looking for snapshots by labels.
|
||||||
|
// So we can setup one label that can be used as index and it will speed up filtering a little bit in
|
||||||
|
// certain situations - all snapshot of one owner for example.
|
||||||
// This is distributed interface for the snapshot storage and any node can handle the request message
|
// This is distributed interface for the snapshot storage and any node can handle the request message
|
||||||
// so we don't have any locking mechanism here and we cannot created index of snapshots without
|
// so we don't have any locking mechanism here and we cannot created a single index of snapshots without
|
||||||
// significant time spend on it. Let's deal with it later. I think we are fine for first 10k snapshots.
|
// significant time spend on it. Let's deal with it later. I think we are fine for first 10k snapshots.
|
||||||
type SnapshotProcessor struct {
|
type SnapshotProcessor struct {
|
||||||
AppsPath string // Where apps are stored
|
AppsPath string // Where apps are stored
|
||||||
TmpSnapshotPath string // where temporary location for snapshots is
|
TmpSnapshotPath string // where temporary location for snapshots is
|
||||||
|
IndexLabel string // Label that will be used as index to make listing faster
|
||||||
|
|
||||||
Driver drivers.DriverInterface
|
Driver drivers.DriverInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// saveMetadata saves metadata of single snapshot into the metadata storage
|
||||||
|
func (s *SnapshotProcessor) saveMetadata(snapshot Snapshot) error {
|
||||||
|
body, err := json.Marshal(snapshot)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal snapshot into JSON error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.Driver.Write(fmt.Sprintf(metadataKeyTemplate, snapshot.UUID), body)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("copying metadata into S3 error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadMetadata returns metadata for given snapshot's UUID
|
||||||
|
func (s *SnapshotProcessor) loadMetadata(snapshotUUID string) (Snapshot, error) {
|
||||||
|
snapshot := Snapshot{}
|
||||||
|
|
||||||
|
body, err := s.Driver.Read(fmt.Sprintf(metadataKeyTemplate, snapshotUUID))
|
||||||
|
if err != nil {
|
||||||
|
return snapshot, fmt.Errorf("reading metadata from S3 error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal(body, &snapshot)
|
||||||
|
if err != nil {
|
||||||
|
return snapshot, fmt.Errorf("decoding metadata from JSON error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshot, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteMetadata deletes metadata object
|
||||||
|
func (s *SnapshotProcessor) deleteMetadata(snapshotUUID string) error {
|
||||||
|
err := s.Driver.Delete(fmt.Sprintf(metadataKeyTemplate, snapshotUUID))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("delete metadata from S3 error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// metadataForSnapshotKey returns metadata for snapshot key
|
||||||
|
func (s *SnapshotProcessor) metadataForSnapshotKey(snapshotKey string) (Snapshot, error) {
|
||||||
|
parts := strings.Split(snapshotKey, keySplitCharacter)
|
||||||
|
if len(parts) != 4 {
|
||||||
|
return Snapshot{}, errors.New("wrong snapshot key format")
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshot, err := s.loadMetadata(parts[1])
|
||||||
|
return snapshot, err
|
||||||
|
}
|
||||||
|
|
||||||
// CreateSnapshot creates an archive of existing application and stores it in S3 storage
|
// CreateSnapshot creates an archive of existing application and stores it in S3 storage
|
||||||
// Returns key under which is the snapshot stored and/or error if there is any.
|
// Returns key under which is the snapshot stored and/or error if there is any.
|
||||||
// Metadata about the snapshot are encoded in the third part of the keyname.
|
// Metadata about the snapshot are stored in extra object under metadata/ prefix.
|
||||||
// The keyname cannot be bigger than 1 kB.
|
|
||||||
func (s *SnapshotProcessor) CreateSnapshot(appName string, labels []string) (string, error) {
|
func (s *SnapshotProcessor) CreateSnapshot(appName string, labels []string) (string, error) {
|
||||||
// Create an archive
|
// Create an archive
|
||||||
archive := archiver.Zip{
|
archive := archiver.Zip{
|
||||||
@ -92,26 +134,32 @@ func (s *SnapshotProcessor) CreateSnapshot(appName string, labels []string) (str
|
|||||||
}
|
}
|
||||||
|
|
||||||
snapshot := Snapshot{
|
snapshot := Snapshot{
|
||||||
|
UUID: uuid.NewV4().String(),
|
||||||
AppName: appName,
|
AppName: appName,
|
||||||
TimeStamp: time.Now().Unix(),
|
TimeStamp: time.Now().Unix(),
|
||||||
Labels: labels,
|
Labels: labels,
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpSnapshotArchivePath := path.Join(s.TmpSnapshotPath, snapshot.KeyName()+".zip")
|
err := s.saveMetadata(snapshot)
|
||||||
|
|
||||||
err := os.Chdir(path.Join(s.AppsPath, appName))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snapshot.KeyName(), fmt.Errorf("change working directory error: %v", err)
|
return snapshot.KeyName(s.IndexLabel), fmt.Errorf("saving metadata error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpSnapshotArchivePath := path.Join(s.TmpSnapshotPath, snapshot.KeyName(s.IndexLabel)+".zip")
|
||||||
|
|
||||||
|
err = os.Chdir(path.Join(s.AppsPath, appName))
|
||||||
|
if err != nil {
|
||||||
|
return snapshot.KeyName(s.IndexLabel), fmt.Errorf("change working directory error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = archive.Archive([]string{"./"}, tmpSnapshotArchivePath)
|
err = archive.Archive([]string{"./"}, tmpSnapshotArchivePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snapshot.KeyName(), fmt.Errorf("compression error: %v", err)
|
return snapshot.KeyName(s.IndexLabel), fmt.Errorf("compression error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := os.Stat(tmpSnapshotArchivePath)
|
info, err := os.Stat(tmpSnapshotArchivePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snapshot.KeyName(), fmt.Errorf("temporary file stat error: %v", err)
|
return snapshot.KeyName(s.IndexLabel), fmt.Errorf("temporary file stat error: %v", err)
|
||||||
}
|
}
|
||||||
snapshot.Labels = append(snapshot.Labels, fmt.Sprintf("size:%d", info.Size()))
|
snapshot.Labels = append(snapshot.Labels, fmt.Sprintf("size:%d", info.Size()))
|
||||||
|
|
||||||
@ -124,17 +172,17 @@ func (s *SnapshotProcessor) CreateSnapshot(appName string, labels []string) (str
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// Pipe it into the storage
|
// Pipe it into the storage
|
||||||
err = s.Driver.Create(snapshot.KeyName(), tmpSnapshotArchivePath)
|
err = s.Driver.Create(snapshot.KeyName(s.IndexLabel), tmpSnapshotArchivePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snapshot.KeyName(), fmt.Errorf("copying snapshot into S3 error: %v", err)
|
return snapshot.KeyName(s.IndexLabel), fmt.Errorf("copying snapshot into S3 error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return snapshot.KeyName(), nil
|
return snapshot.KeyName(s.IndexLabel), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RestoreSnapshot restores snapshot into an existing application
|
// RestoreSnapshot restores snapshot into an existing application
|
||||||
// If you need a new app from existing snapshot just create it.
|
// If you need a new app from existing snapshot just create it.
|
||||||
// This restored only content of the disk, doesn't create the container.
|
// This restores only content on the disk, doesn't create the container.
|
||||||
func (s *SnapshotProcessor) RestoreSnapshot(key string, newAppName string) error {
|
func (s *SnapshotProcessor) RestoreSnapshot(key string, newAppName string) error {
|
||||||
tmpSnapshotArchivePath := path.Join(s.TmpSnapshotPath, key+".zip")
|
tmpSnapshotArchivePath := path.Join(s.TmpSnapshotPath, key+".zip")
|
||||||
|
|
||||||
@ -185,8 +233,12 @@ func (s *SnapshotProcessor) ListAppSnapshots(appName string) ([]Snapshot, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
|
if key == metadataPrefix+"/" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if strings.HasPrefix(key, appName+keySplitCharacter) {
|
if strings.HasPrefix(key, appName+keySplitCharacter) {
|
||||||
snapshot, err := DecodeKeyName(key)
|
snapshot, err := s.metadataForSnapshotKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snapshots, err
|
return snapshots, err
|
||||||
}
|
}
|
||||||
@ -208,9 +260,13 @@ func (s *SnapshotProcessor) ListAppsSnapshots(appNames []string) ([]Snapshot, er
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
|
if key == metadataPrefix+"/" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
for _, appName := range appNames {
|
for _, appName := range appNames {
|
||||||
if strings.HasPrefix(key, appName+keySplitCharacter) {
|
if strings.HasPrefix(key, appName+keySplitCharacter) {
|
||||||
snapshot, err := DecodeKeyName(key)
|
snapshot, err := s.metadataForSnapshotKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snapshots, err
|
return snapshots, err
|
||||||
}
|
}
|
||||||
@ -225,7 +281,7 @@ func (s *SnapshotProcessor) ListAppsSnapshots(appNames []string) ([]Snapshot, er
|
|||||||
|
|
||||||
// ListAppsSnapshotsByLabel returns list of snapshots with given label
|
// ListAppsSnapshotsByLabel returns list of snapshots with given label
|
||||||
// TODO: this will be ok for now but probably little slow when users start using it more
|
// TODO: this will be ok for now but probably little slow when users start using it more
|
||||||
func (s *SnapshotProcessor) ListAppsSnapshotsByLabel(desiredLabel string) ([]Snapshot, error) {
|
func (s *SnapshotProcessor) ListAppsSnapshotsByLabel(labelValue string) ([]Snapshot, error) {
|
||||||
snapshots := []Snapshot{}
|
snapshots := []Snapshot{}
|
||||||
|
|
||||||
keys, err := s.Driver.List("")
|
keys, err := s.Driver.List("")
|
||||||
@ -234,13 +290,17 @@ func (s *SnapshotProcessor) ListAppsSnapshotsByLabel(desiredLabel string) ([]Sna
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
snapshot, err := DecodeKeyName(key)
|
if key == metadataPrefix+"/" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshot, err := s.metadataForSnapshotKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snapshots, err
|
return snapshots, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, label := range snapshot.Labels {
|
for _, label := range snapshot.Labels {
|
||||||
if label == desiredLabel {
|
if label == fmt.Sprintf("%s:%s", s.IndexLabel, labelValue) {
|
||||||
snapshots = append(snapshots, snapshot)
|
snapshots = append(snapshots, snapshot)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -250,12 +310,10 @@ func (s *SnapshotProcessor) ListAppsSnapshotsByLabel(desiredLabel string) ([]Sna
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetSnapshot returns a single snapshot's metadata for given key.
|
// GetSnapshot returns a single snapshot's metadata for given key.
|
||||||
// In fact this is just key translation method. It parses the key and returns
|
|
||||||
// what's inside. Doesn't check if the snapshot actually exist.
|
|
||||||
func (s *SnapshotProcessor) GetSnapshot(key string) (Snapshot, error) {
|
func (s *SnapshotProcessor) GetSnapshot(key string) (Snapshot, error) {
|
||||||
snapshot := Snapshot{}
|
snapshot := Snapshot{}
|
||||||
|
|
||||||
snapshot, err := DecodeKeyName(key)
|
snapshot, err := s.metadataForSnapshotKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snapshot, err
|
return snapshot, err
|
||||||
}
|
}
|
||||||
@ -281,7 +339,7 @@ func (s *SnapshotProcessor) DeleteAppSnapshots(appName string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, snapshot := range snapshots {
|
for _, snapshot := range snapshots {
|
||||||
err = s.DeleteSnapshot(snapshot.KeyName())
|
err = s.DeleteSnapshot(snapshot.KeyName(s.IndexLabel))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("removing snapshots error: %v", err)
|
return fmt.Errorf("removing snapshots error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,7 @@ func TestMain(m *testing.M) {
|
|||||||
snapshotProcessor = &SnapshotProcessor{
|
snapshotProcessor = &SnapshotProcessor{
|
||||||
AppsPath: path.Join(initialPwd, "tmp/apps"),
|
AppsPath: path.Join(initialPwd, "tmp/apps"),
|
||||||
TmpSnapshotPath: path.Join(initialPwd, "tmp/snapshots"),
|
TmpSnapshotPath: path.Join(initialPwd, "tmp/snapshots"),
|
||||||
|
IndexLabel: "testlabel",
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
Driver: drivers.S3Driver{
|
||||||
S3AccessKey: "test",
|
S3AccessKey: "test",
|
||||||
@ -76,14 +77,18 @@ func TestMain(m *testing.M) {
|
|||||||
|
|
||||||
func TestSnapshot(t *testing.T) {
|
func TestSnapshot(t *testing.T) {
|
||||||
snapshot := Snapshot{
|
snapshot := Snapshot{
|
||||||
|
UUID: "ABCDEF",
|
||||||
AppName: "app_0102",
|
AppName: "app_0102",
|
||||||
TimeStamp: 1634510035,
|
TimeStamp: 1634510035,
|
||||||
Labels: []string{"userid:1"},
|
Labels: []string{"userid:1"},
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(t, "app_0102:1634510035:eyJhcHBfbmFtZSI6ImFwcF8wMTAyIiwidHMiOjE2MzQ1MTAwMzUsImxhYmVscyI6WyJ1c2VyaWQ6MSJdfQ==", snapshot.KeyName())
|
assert.Equal(t, "app_0102:ABCDEF:1:1634510035", snapshot.KeyName("userid"))
|
||||||
|
|
||||||
snapshot2, err := DecodeKeyName("app_0102:1634510035:eyJhcHBfbmFtZSI6ImFwcF8wMTAyIiwidHMiOjE2MzQ1MTAwMzUsImxhYmVscyI6WyJ1c2VyaWQ6MSJdfQ==")
|
err := snapshotProcessor.saveMetadata(snapshot)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
snapshot2, err := snapshotProcessor.metadataForSnapshotKey("app_0102:ABCDEF:1:1634510035")
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, "app_0102", snapshot2.AppName)
|
assert.Equal(t, "app_0102", snapshot2.AppName)
|
||||||
assert.Equal(t, int64(1634510035), snapshot2.TimeStamp)
|
assert.Equal(t, int64(1634510035), snapshot2.TimeStamp)
|
||||||
@ -155,7 +160,7 @@ func TestCreateRestoreListSnapshot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGetSnapshot(t *testing.T) {
|
func TestGetSnapshot(t *testing.T) {
|
||||||
snapshot, err := snapshotProcessor.GetSnapshot("app_0102:1634510035:eyJhcHBfbmFtZSI6ImFwcF8wMTAyIiwidHMiOjE2MzQ1MTAwMzUsImxhYmVscyI6WyJ1c2VyaWQ6MSJdfQ==")
|
snapshot, err := snapshotProcessor.GetSnapshot("app_0102:ABCDEF:1:1634510035")
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, "app_0102", snapshot.AppName)
|
assert.Equal(t, "app_0102", snapshot.AppName)
|
||||||
}
|
}
|
||||||
@ -177,11 +182,11 @@ func TestListAppsSnapshotsByLabel(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create an snapshot
|
// Create an snapshot
|
||||||
snapshotName, err := snapshotProcessor.CreateSnapshot(appName, []string{"app:test2", "almost_no_content"})
|
snapshotName, err := snapshotProcessor.CreateSnapshot(appName, []string{"app:test2", "almost_no_content", "testlabel:abcde"})
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, strings.HasPrefix(snapshotName, appName+":"), true)
|
assert.Equal(t, strings.HasPrefix(snapshotName, appName+":"), true)
|
||||||
|
|
||||||
snapshots, err := snapshotProcessor.ListAppsSnapshotsByLabel("app:test2")
|
snapshots, err := snapshotProcessor.ListAppsSnapshotsByLabel("abcde")
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.True(t, len(snapshots) > 0)
|
assert.True(t, len(snapshots) > 0)
|
||||||
|
|
||||||
|
@ -21,6 +21,7 @@ type Config struct {
|
|||||||
SnapshotsS3Endpoint string `envconfig:"SNAPSHOTS_S3_ENDPOINT" required:"false"`
|
SnapshotsS3Endpoint string `envconfig:"SNAPSHOTS_S3_ENDPOINT" required:"false"`
|
||||||
SnapshotsS3SSL bool `envconfig:"SNAPSHOTS_S3_SSL" required:"false" default:"true"`
|
SnapshotsS3SSL bool `envconfig:"SNAPSHOTS_S3_SSL" required:"false" default:"true"`
|
||||||
SnapshotsS3Bucket string `envconfig:"SNAPSHOTS_S3_BUCKET" required:"false" default:"snapshots"`
|
SnapshotsS3Bucket string `envconfig:"SNAPSHOTS_S3_BUCKET" required:"false" default:"snapshots"`
|
||||||
|
SnapshotsIndexLabel string `envconfig:"SNAPSHOTS_INDEX_LABEL" required:"false" default:"owner"` // Label that will be part of the object name and it will be used as index to quick listing
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetConfig return configuration created based on environment variables
|
// GetConfig return configuration created based on environment variables
|
||||||
|
1
go.mod
1
go.mod
@ -22,6 +22,7 @@ require (
|
|||||||
github.com/nats-io/nats.go v1.12.3
|
github.com/nats-io/nats.go v1.12.3
|
||||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
|
github.com/satori/go.uuid v1.2.0
|
||||||
github.com/shirou/gopsutil v2.20.6+incompatible
|
github.com/shirou/gopsutil v2.20.6+incompatible
|
||||||
github.com/stretchr/testify v1.4.0
|
github.com/stretchr/testify v1.4.0
|
||||||
google.golang.org/protobuf v1.27.1 // indirect
|
google.golang.org/protobuf v1.27.1 // indirect
|
||||||
|
2
go.sum
2
go.sum
@ -166,6 +166,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
|
|||||||
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
|
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
|
||||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||||
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
||||||
|
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
|
||||||
|
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
|
||||||
github.com/shirou/gopsutil v2.20.6+incompatible h1:P37G9YH8M4vqkKcwBosp+URN5O8Tay67D2MbR361ioY=
|
github.com/shirou/gopsutil v2.20.6+incompatible h1:P37G9YH8M4vqkKcwBosp+URN5O8Tay67D2MbR361ioY=
|
||||||
github.com/shirou/gopsutil v2.20.6+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
|
github.com/shirou/gopsutil v2.20.6+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
|
||||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||||
|
156
handlers_nats.go
156
handlers_nats.go
@ -88,6 +88,21 @@ func _messageHandler(m *nats.Msg) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepare snapshot processor
|
||||||
|
var snapshotProcessor apps.SnapshotProcessor = apps.SnapshotProcessor{
|
||||||
|
AppsPath: config.AppsPath,
|
||||||
|
TmpSnapshotPath: config.SnapshotsPath,
|
||||||
|
IndexLabel: config.SnapshotsIndexLabel,
|
||||||
|
|
||||||
|
Driver: drivers.S3Driver{
|
||||||
|
S3AccessKey: config.SnapshotsS3AccessKey,
|
||||||
|
S3SecretKey: config.SnapshotsS3SecretKey,
|
||||||
|
S3Endpoint: config.SnapshotsS3Endpoint,
|
||||||
|
S3SSL: config.SnapshotsS3SSL,
|
||||||
|
Bucket: config.SnapshotsS3Bucket,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// Returns list of apps
|
// Returns list of apps
|
||||||
func listEventHandler(m *nats.Msg, message *RequestMessage) error {
|
func listEventHandler(m *nats.Msg, message *RequestMessage) error {
|
||||||
log.Println("> List")
|
log.Println("> List")
|
||||||
@ -200,20 +215,6 @@ func createEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|||||||
if len(appTemplate.Snapshot) > 0 {
|
if len(appTemplate.Snapshot) > 0 {
|
||||||
log.Printf("App %s is going to be created from %s snapshot\n", message.AppName, appTemplate.Snapshot)
|
log.Printf("App %s is going to be created from %s snapshot\n", message.AppName, appTemplate.Snapshot)
|
||||||
|
|
||||||
// Setup processors
|
|
||||||
snapshotProcessor := apps.SnapshotProcessor{
|
|
||||||
AppsPath: config.AppsPath,
|
|
||||||
TmpSnapshotPath: config.SnapshotsPath,
|
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
|
||||||
S3AccessKey: config.SnapshotsS3AccessKey,
|
|
||||||
S3SecretKey: config.SnapshotsS3SecretKey,
|
|
||||||
S3Endpoint: config.SnapshotsS3Endpoint,
|
|
||||||
S3SSL: config.SnapshotsS3SSL,
|
|
||||||
Bucket: config.SnapshotsS3Bucket,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restore the data
|
// Restore the data
|
||||||
err = snapshotProcessor.RestoreSnapshot(appTemplate.Snapshot, message.AppName)
|
err = snapshotProcessor.RestoreSnapshot(appTemplate.Snapshot, message.AppName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -785,20 +786,7 @@ Payload: no payload needed
|
|||||||
Response: notification when it's done or error
|
Response: notification when it's done or error
|
||||||
*/
|
*/
|
||||||
func createSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
func createSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
||||||
processor := apps.SnapshotProcessor{
|
_, err := snapshotProcessor.CreateSnapshot(message.AppName, strings.Split(message.Payload, ","))
|
||||||
AppsPath: config.AppsPath,
|
|
||||||
TmpSnapshotPath: config.SnapshotsPath,
|
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
|
||||||
S3AccessKey: config.SnapshotsS3AccessKey,
|
|
||||||
S3SecretKey: config.SnapshotsS3SecretKey,
|
|
||||||
S3Endpoint: config.SnapshotsS3Endpoint,
|
|
||||||
S3SSL: config.SnapshotsS3SSL,
|
|
||||||
Bucket: config.SnapshotsS3Bucket,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := processor.CreateSnapshot(message.AppName, strings.Split(message.Payload, ","))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("ERROR create snapshot error: " + err.Error())
|
log.Println("ERROR create snapshot error: " + err.Error())
|
||||||
publish(message.AppName, "backend problem", true)
|
publish(message.AppName, "backend problem", true)
|
||||||
@ -822,20 +810,6 @@ Payload: string with the snapshot name
|
|||||||
Response: notification when it's done or error
|
Response: notification when it's done or error
|
||||||
*/
|
*/
|
||||||
func restoreFromSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
func restoreFromSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
||||||
// Setup processors
|
|
||||||
snapshotProcessor := apps.SnapshotProcessor{
|
|
||||||
AppsPath: config.AppsPath,
|
|
||||||
TmpSnapshotPath: config.SnapshotsPath,
|
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
|
||||||
S3AccessKey: config.SnapshotsS3AccessKey,
|
|
||||||
S3SecretKey: config.SnapshotsS3SecretKey,
|
|
||||||
S3Endpoint: config.SnapshotsS3Endpoint,
|
|
||||||
S3SSL: config.SnapshotsS3SSL,
|
|
||||||
Bucket: config.SnapshotsS3Bucket,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
processor := apps.AppsProcessor{
|
processor := apps.AppsProcessor{
|
||||||
DB: common.GetDBConnection(),
|
DB: common.GetDBConnection(),
|
||||||
}
|
}
|
||||||
@ -912,20 +886,7 @@ Payload: no payload needed
|
|||||||
Response: replies with list of snapshots or an error message
|
Response: replies with list of snapshots or an error message
|
||||||
*/
|
*/
|
||||||
func listSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
func listSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
||||||
processor := apps.SnapshotProcessor{
|
snapshots, err := snapshotProcessor.ListAppSnapshots(message.AppName)
|
||||||
AppsPath: config.AppsPath,
|
|
||||||
TmpSnapshotPath: config.SnapshotsPath,
|
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
|
||||||
S3AccessKey: config.SnapshotsS3AccessKey,
|
|
||||||
S3SecretKey: config.SnapshotsS3SecretKey,
|
|
||||||
S3Endpoint: config.SnapshotsS3Endpoint,
|
|
||||||
S3SSL: config.SnapshotsS3SSL,
|
|
||||||
Bucket: config.SnapshotsS3Bucket,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshots, err := processor.ListAppSnapshots(message.AppName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errorReplyFormater(m, "backend error", err)
|
return errorReplyFormater(m, "backend error", err)
|
||||||
}
|
}
|
||||||
@ -933,7 +894,7 @@ func listSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|||||||
var output SnapshotsMetadata
|
var output SnapshotsMetadata
|
||||||
for _, snapshot := range snapshots {
|
for _, snapshot := range snapshots {
|
||||||
output = append(output, SnapshotMetadata{
|
output = append(output, SnapshotMetadata{
|
||||||
Key: snapshot.KeyName(),
|
Key: snapshot.KeyName(snapshotProcessor.IndexLabel),
|
||||||
Metadata: snapshot,
|
Metadata: snapshot,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -961,20 +922,7 @@ Payload: list of appNames separated by comma (no spaces)
|
|||||||
Response: replies with list of snapshots or an error message
|
Response: replies with list of snapshots or an error message
|
||||||
*/
|
*/
|
||||||
func listAppsSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
func listAppsSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
||||||
processor := apps.SnapshotProcessor{
|
snapshots, err := snapshotProcessor.ListAppsSnapshots(strings.Split(message.Payload, ","))
|
||||||
AppsPath: config.AppsPath,
|
|
||||||
TmpSnapshotPath: config.SnapshotsPath,
|
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
|
||||||
S3AccessKey: config.SnapshotsS3AccessKey,
|
|
||||||
S3SecretKey: config.SnapshotsS3SecretKey,
|
|
||||||
S3Endpoint: config.SnapshotsS3Endpoint,
|
|
||||||
S3SSL: config.SnapshotsS3SSL,
|
|
||||||
Bucket: config.SnapshotsS3Bucket,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshots, err := processor.ListAppsSnapshots(strings.Split(message.Payload, ","))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errorReplyFormater(m, "backend error", err)
|
return errorReplyFormater(m, "backend error", err)
|
||||||
}
|
}
|
||||||
@ -982,7 +930,7 @@ func listAppsSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
|||||||
var output SnapshotsMetadata
|
var output SnapshotsMetadata
|
||||||
for _, snapshot := range snapshots {
|
for _, snapshot := range snapshots {
|
||||||
output = append(output, SnapshotMetadata{
|
output = append(output, SnapshotMetadata{
|
||||||
Key: snapshot.KeyName(),
|
Key: snapshot.KeyName(snapshotProcessor.IndexLabel),
|
||||||
Metadata: snapshot,
|
Metadata: snapshot,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -1010,20 +958,7 @@ Payload: snapshot label
|
|||||||
Response: replies with list of snapshots or an error message
|
Response: replies with list of snapshots or an error message
|
||||||
*/
|
*/
|
||||||
func listSnapshotsByLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
|
func listSnapshotsByLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
|
||||||
processor := apps.SnapshotProcessor{
|
snapshots, err := snapshotProcessor.ListAppsSnapshotsByLabel(message.Payload)
|
||||||
AppsPath: config.AppsPath,
|
|
||||||
TmpSnapshotPath: config.SnapshotsPath,
|
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
|
||||||
S3AccessKey: config.SnapshotsS3AccessKey,
|
|
||||||
S3SecretKey: config.SnapshotsS3SecretKey,
|
|
||||||
S3Endpoint: config.SnapshotsS3Endpoint,
|
|
||||||
S3SSL: config.SnapshotsS3SSL,
|
|
||||||
Bucket: config.SnapshotsS3Bucket,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshots, err := processor.ListAppsSnapshotsByLabel(message.Payload)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errorReplyFormater(m, "backend error", err)
|
return errorReplyFormater(m, "backend error", err)
|
||||||
}
|
}
|
||||||
@ -1031,7 +966,7 @@ func listSnapshotsByLabelEventHandler(m *nats.Msg, message *RequestMessage) erro
|
|||||||
var output SnapshotsMetadata
|
var output SnapshotsMetadata
|
||||||
for _, snapshot := range snapshots {
|
for _, snapshot := range snapshots {
|
||||||
output = append(output, SnapshotMetadata{
|
output = append(output, SnapshotMetadata{
|
||||||
Key: snapshot.KeyName(),
|
Key: snapshot.KeyName(snapshotProcessor.IndexLabel),
|
||||||
Metadata: snapshot,
|
Metadata: snapshot,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -1059,25 +994,12 @@ Payload: snapshot's key
|
|||||||
Response: snapshot metadata
|
Response: snapshot metadata
|
||||||
*/
|
*/
|
||||||
func getSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
func getSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
||||||
processor := apps.SnapshotProcessor{
|
snapshot, err := snapshotProcessor.GetSnapshot(message.Payload)
|
||||||
AppsPath: config.AppsPath,
|
|
||||||
TmpSnapshotPath: config.SnapshotsPath,
|
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
|
||||||
S3AccessKey: config.SnapshotsS3AccessKey,
|
|
||||||
S3SecretKey: config.SnapshotsS3SecretKey,
|
|
||||||
S3Endpoint: config.SnapshotsS3Endpoint,
|
|
||||||
S3SSL: config.SnapshotsS3SSL,
|
|
||||||
Bucket: config.SnapshotsS3Bucket,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshot, err := processor.GetSnapshot(message.Payload)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errorReplyFormater(m, "backend error", err)
|
return errorReplyFormater(m, "backend error", err)
|
||||||
}
|
}
|
||||||
output := SnapshotMetadata{
|
output := SnapshotMetadata{
|
||||||
Key: snapshot.KeyName(),
|
Key: snapshot.KeyName(snapshotProcessor.IndexLabel),
|
||||||
Metadata: snapshot,
|
Metadata: snapshot,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1101,20 +1023,7 @@ Payload: string with a snapshot name
|
|||||||
Response: notification when it's done or error
|
Response: notification when it's done or error
|
||||||
*/
|
*/
|
||||||
func deleteSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
func deleteSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error {
|
||||||
processor := apps.SnapshotProcessor{
|
err := snapshotProcessor.DeleteSnapshot(message.Payload)
|
||||||
AppsPath: config.AppsPath,
|
|
||||||
TmpSnapshotPath: config.SnapshotsPath,
|
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
|
||||||
S3AccessKey: config.SnapshotsS3AccessKey,
|
|
||||||
S3SecretKey: config.SnapshotsS3SecretKey,
|
|
||||||
S3Endpoint: config.SnapshotsS3Endpoint,
|
|
||||||
S3SSL: config.SnapshotsS3SSL,
|
|
||||||
Bucket: config.SnapshotsS3Bucket,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err := processor.DeleteSnapshot(message.Payload)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errorReplyFormater(m, "backend error", err)
|
return errorReplyFormater(m, "backend error", err)
|
||||||
}
|
}
|
||||||
@ -1133,20 +1042,7 @@ Payload: no payload needed
|
|||||||
Response: notification when it's done or error
|
Response: notification when it's done or error
|
||||||
*/
|
*/
|
||||||
func deleteAppSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
func deleteAppSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error {
|
||||||
processor := apps.SnapshotProcessor{
|
err := snapshotProcessor.DeleteAppSnapshots(message.AppName)
|
||||||
AppsPath: config.AppsPath,
|
|
||||||
TmpSnapshotPath: config.SnapshotsPath,
|
|
||||||
|
|
||||||
Driver: drivers.S3Driver{
|
|
||||||
S3AccessKey: config.SnapshotsS3AccessKey,
|
|
||||||
S3SecretKey: config.SnapshotsS3SecretKey,
|
|
||||||
S3Endpoint: config.SnapshotsS3Endpoint,
|
|
||||||
S3SSL: config.SnapshotsS3SSL,
|
|
||||||
Bucket: config.SnapshotsS3Bucket,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err := processor.DeleteAppSnapshots(message.AppName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errorReplyFormater(m, "backend error", err)
|
return errorReplyFormater(m, "backend error", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user