-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlegacy.go
426 lines (363 loc) · 11.4 KB
/
legacy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
package main
/* Legacy - Simple Cassandra Backup Utility
*/
import (
"bytes"
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path"
"runtime/pprof"
"strconv"
"strings"
"time"
"github.com/goamz/goamz/aws"
"github.com/goamz/goamz/s3"
"github.com/iamthemovie/legacy/backup"
"github.com/rlmcpherson/s3gof3r"
)
// LegacyArguments ...
type LegacyArguments struct {
AwsSecret string
AwsAccessKey string
AwsRegion string
S3Bucket string
S3BasePath string
NewSnapshot bool
DataDirectories string
LogDirectory string
ExcludeKeyspaces string
Help bool
}
// Legacy ...
type Legacy struct {
MachineName string
DataDirectories []string
ExcludeKeyspaces []string
LogDirectory string
LogFile *os.File
SeedSnaphshot string
S3Bucket *s3.Bucket
S3StreamBucket *s3gof3r.Bucket
S3BasePath string
NewSnapshot bool
}
// LegacyTableManifest ...
type LegacyTableManifest struct {
SnapshotName string
DateCreated string
DateLastUpdated string
}
var memprofile = flag.String("memprofile", "", "write memory profile to this file")
func main() {
go func() {
if *memprofile != "" {
written := false
for {
if written {
os.Remove(*memprofile)
}
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
time.Sleep(1 * time.Millisecond)
f.Close()
}
}
}()
args, err := GetLegacyArguments()
if err != nil {
fmt.Println(err.Error())
return
}
if args.Help {
flag.Usage()
return
}
legacy, err := args.GetLegacy()
if err != nil {
log.Println(err)
return
}
legacy.SetupLogging()
legacy.Run()
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
return
}
legacy.ShutdownLogging()
}
// Run ...
func (l *Legacy) Run() {
// Every time we run, we create snapshot. This is used to check for active
// tables / new tables. It is deleted after we've finished :)
l.RunTokenBackup()
if l.NewSnapshot {
log.Println("[New Snapshot Requested]")
}
snapshotName, _ := CreateNewSnapshot(strconv.Itoa(int(time.Now().Unix())))
l.SeedSnaphshot = snapshotName
tables := l.GetTableReferences()
for _, table := range tables {
l.RunTableBackup(&table)
}
log.Println("Table backup complete. Clearing snapshot: " + snapshotName)
ClearSnapshot(snapshotName)
}
// SetupLogging ...
func (l *Legacy) SetupLogging() {
if len(l.LogDirectory) == 0 {
log.Println("Looks like no log directory is set. Defaulting to stdout.")
return
}
filename := "legacy-" + time.Now().Format("20060201") + ".log"
logPath := path.Join(l.LogDirectory, filename)
fh, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err == nil {
l.LogFile = fh
log.SetOutput(fh)
log.Println("Using file logging...")
return
}
// @todo create log folder? recursive or let the user create their folders?
log.Println("An error occurred attempting to use the log file:" + err.Error())
log.Println("Please make sure the log directory exists with the" +
" correct permissions.")
}
// ShutdownLogging closes the file handle.
// @todo In future we could do with having this clean up older log files,
// maybe even in compress them.
func (l *Legacy) ShutdownLogging() {
if l.LogFile == nil {
return
}
l.LogFile.Close()
}
// GetManifest ...
func (l *Legacy) GetManifest(tablePath string) (*LegacyTableManifest, error) {
p := path.Join(l.S3BasePath, l.MachineName, ".legacy", tablePath, "manifest.json")
log.Println("Getting manifest for: " + p)
data, _ := l.S3Bucket.Get(p)
if len(data) == 0 {
return &LegacyTableManifest{}, errors.New("No exists?")
}
manifest := LegacyTableManifest{}
json.Unmarshal(data, &manifest)
return &manifest, nil
}
// SaveManifest ...
func (l *Legacy) SaveManifest(tablePath string, manifest LegacyTableManifest) {
p := path.Join(l.S3BasePath, l.MachineName, ".legacy", tablePath, "manifest.json")
log.Println("Saving manifest for: " + p)
output, _ := json.Marshal(manifest)
l.S3Bucket.Put(p, output, "application/json", s3.BucketOwnerFull, s3.Options{})
}
// RunTableBackup ...
func (l *Legacy) RunTableBackup(table *CassandraTableMeta) {
tableManifest, err := l.GetManifest(table.GetManifestPath())
if err != nil {
log.Println("Manifest does not exist for Keyspace:" + table.KeyspaceName +
" Table: " + table.Folder)
}
snapshotFileSystemPath :=
path.Join(table.GetDataPath(), "snapshots", l.SeedSnaphshot)
backupFileSystemPath :=
path.Join(table.GetDataPath(), "backups")
s3UploadPath :=
path.Join(l.S3BasePath, l.MachineName, table.GetManifestPath(), "snapshots")
if err != nil || l.NewSnapshot {
log.Println("Computing initial snapshot upload size...")
tableManifest = &LegacyTableManifest{
SnapshotName: l.SeedSnaphshot,
DateCreated: time.Now().Format(time.RFC3339),
DateLastUpdated: "",
}
log.Println("Starting SSTable snapshot upload for table: " + table.Folder)
log.Println("Path: " + snapshotFileSystemPath)
backupInstance := backup.Backup{
FileSystemRoot: snapshotFileSystemPath,
S3StreamBucket: l.S3StreamBucket,
S3Path: path.Join(s3UploadPath, l.SeedSnaphshot),
RemoveAfterUpload: false,
}
backupInstance.Run()
l.SaveManifest(table.GetManifestPath(), *tableManifest)
return
}
// Does the backup direoctory exist?
if _, err := os.Stat(backupFileSystemPath); os.IsNotExist(err) {
log.Println(backupFileSystemPath)
log.Println("No backups directory present. Have incremental backups been " +
" enabled? If so, Cassandra may not have flushed the SSTables yet.")
return
}
backupInstance := backup.Backup{
FileSystemRoot: backupFileSystemPath,
S3StreamBucket: l.S3StreamBucket,
S3Path: path.Join(s3UploadPath, tableManifest.SnapshotName),
RemoveAfterUpload: true,
}
backupInstance.Run()
}
// GetLegacy ...
func (la *LegacyArguments) GetLegacy() (*Legacy, error) {
// Create a "TEST" snapshot in order to work out which tables are active
// Get a list of Keyspaces and Table Names (plus directories)
// Walk through all the directories.
auth, _ := aws.GetAuth(
la.AwsAccessKey,
la.AwsSecret,
"",
time.Now().AddDate(0, 0, 1))
// Check the bucket exists.
bucket := s3.New(auth, GetAwsRegion(la.AwsRegion)).Bucket(la.S3Bucket)
_, err := bucket.List("/", "/", "", 1)
if err != nil {
return nil, err
}
streamAccess := s3gof3r.New("", s3gof3r.Keys{
AccessKey: la.AwsAccessKey,
SecretKey: la.AwsSecret,
SecurityToken: "",
})
streamBucket := streamAccess.Bucket(la.S3Bucket)
legacy := &Legacy{
DataDirectories: make([]string, 0),
S3Bucket: bucket,
S3StreamBucket: streamBucket,
LogDirectory: la.LogDirectory,
NewSnapshot: la.NewSnapshot,
}
legacy.MachineName, _ = os.Hostname()
legacy.DataDirectories = SplitAndTrim(la.DataDirectories, ",")
legacy.ExcludeKeyspaces = SplitAndTrim(la.ExcludeKeyspaces, ",")
return legacy, nil
}
// GetTableReferences ...
func (l *Legacy) GetTableReferences() []CassandraTableMeta {
retrieveKeyspaces := func(files []os.FileInfo, err error) []string {
names := []string{}
for _, element := range files {
if !element.IsDir() {
continue
}
names = append(names, element.Name())
}
return names
}
retrieveTableFolders := func(dataDir, keyspaceName string) []CassandraTableMeta {
tableMetas := []CassandraTableMeta{}
keyspaceFolder := path.Join(dataDir, keyspaceName)
tableDirList, _ := ioutil.ReadDir(keyspaceFolder)
for _, tableDir := range tableDirList {
tableDirName := tableDir.Name()
p := (path.Join(keyspaceFolder, tableDirName, "snapshots", l.SeedSnaphshot))
log.Println(p)
if _, err := os.Stat(p); os.IsNotExist(err) {
continue
}
tableMetas = append(tableMetas, CassandraTableMeta{
Folder: tableDirName,
KeyspaceName: keyspaceName,
DataDirectory: dataDir,
})
}
return tableMetas
}
activeTableList := []CassandraTableMeta{}
for _, element := range l.DataDirectories {
// Walk through this directory and get the Keyspace
keyspacesForDirectory := retrieveKeyspaces(ioutil.ReadDir(element))
for _, keyspaceName := range keyspacesForDirectory {
if SliceContainsString(keyspaceName, l.ExcludeKeyspaces) {
log.Println("Excluding Keyspace: " + keyspaceName)
continue
}
tables := retrieveTableFolders(element, keyspaceName)
activeTableList = append(activeTableList, tables...)
}
}
return activeTableList
}
// RunTokenBackup ...
func (l *Legacy) RunTokenBackup() {
// Question: Should we version tokens? For now no.
// Get interfaces of this machine and retrieve a list of the rings tokens
// for this node, once we've got them we'll push them into a file and
// up to S3...
log.Println("Running token backup for node: " + l.MachineName)
tokens := GetNodeTokens()
if tokens == nil {
log.Println("Token Backup: Unable to backup node tokens.")
return
}
var buffer bytes.Buffer
p := path.Join(l.S3BasePath, l.MachineName, "NODE-TOKENS")
buffer.WriteString(l.MachineName)
buffer.WriteString("\n\n")
buffer.WriteString(strings.Join(tokens, ","))
l.S3Bucket.Put(p, buffer.Bytes(), "text/plain", s3.BucketOwnerFull, s3.Options{})
}
// GetLegacyArguments ...
func GetLegacyArguments() (*LegacyArguments, error) {
args := &LegacyArguments{}
flag.StringVar(&args.AwsSecret, "aws-secret", os.Getenv("AWS_SECRET_ACCESS_KEY"), "AWS Secret - Default: AWS_SECRET_ACCESS_KEY environment variable")
flag.StringVar(&args.AwsAccessKey, "aws-access-key", os.Getenv("AWS_ACCESS_KEY_ID"), "AWS Secret Key - Default: AWS_ACCESS_KEY_ID environment variable")
flag.StringVar(&args.AwsRegion, "aws-region", "eu-west-1", "AWS Region - Default: eu-west-1. See: http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region")
flag.StringVar(&args.S3Bucket, "s3-bucket", "", "The name of the bucket for the backup destination.")
flag.StringVar(&args.S3BasePath, "s3-base-path", "", "The path inside the bucket where the backups will be placed.")
flag.StringVar(&args.DataDirectories, "directories", "/var/lib/cassandra/data", "A set of data directories that contain the keyspace / tables. For multiple, comma separate: /data1,/data2")
flag.BoolVar(&args.Help, "help", false, "Print this info.")
flag.BoolVar(&args.NewSnapshot, "new-snapshot", false, "Force a new snapshot.")
flag.StringVar(&args.LogDirectory, "logs", "/var/log/legacy", "The directory to store the mercury logs.")
flag.StringVar(&args.ExcludeKeyspaces, "exclude-keyspaces", "", "A comma seperated list of keypaces you wish to exlude from the backup.")
flag.Parse()
if args.Help {
return args, nil
}
if len(args.AwsSecret) == 0 || len(args.AwsAccessKey) == 0 {
return nil, errors.New("You must set both the AWS Secret and Access Key. -help for usage.")
}
if len(args.S3Bucket) == 0 || len(args.S3BasePath) == 0 {
return nil, errors.New("You must set both the S3 Bucket and the Base path destination. -help for usage")
}
return args, nil
}
// GetAwsRegion ...
func GetAwsRegion(region string) aws.Region {
switch region {
case "us-gov-west-1":
return aws.USGovWest
case "us-east-1":
return aws.USEast
case "us-west-1":
return aws.USWest
case "us-west-2":
return aws.USWest2
case "eu-west-1":
return aws.EUWest
case "eu-central-1":
return aws.EUCentral
case "ap-southeast-1":
return aws.APSoutheast
case "ap-southeast-2":
return aws.APSoutheast2
case "ap-northeast-1":
return aws.APNortheast
case "cn-north-1":
return aws.CNNorth
default:
return aws.EUWest
}
}