From 141cf9bc975f295bc94c35e1a4e0fe648db0159c Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Wed, 9 Nov 2022 18:12:53 +0200 Subject: [PATCH] lakectl import (#4558) --- cmd/lakectl/cmd/import.go | 234 ++++++++++++++++++ docs/reference/commands.md | 23 ++ docs/setup/import.md | 56 ++++- esti/golden/lakectl_help.golden | 1 + esti/golden/lakectl_import.golden | 6 + esti/golden/lakectl_import_and_merge.golden | 7 + .../golden/lakectl_import_with_message.golden | 6 + esti/lakectl_test.go | 23 ++ 8 files changed, 348 insertions(+), 8 deletions(-) create mode 100644 cmd/lakectl/cmd/import.go create mode 100644 esti/golden/lakectl_import.golden create mode 100644 esti/golden/lakectl_import_and_merge.golden create mode 100644 esti/golden/lakectl_import_with_message.golden diff --git a/cmd/lakectl/cmd/import.go b/cmd/lakectl/cmd/import.go new file mode 100644 index 00000000000..7b11a2d1866 --- /dev/null +++ b/cmd/lakectl/cmd/import.go @@ -0,0 +1,234 @@ +package cmd + +import ( + "context" + "fmt" + "net/http" + "os" + "regexp" + "time" + + "github.com/schollz/progressbar/v3" + "github.com/spf13/cobra" + "github.com/treeverse/lakefs/pkg/api" +) + +const importSummaryTemplate = `Import of {{ .Objects | yellow }} object(s) into "{{.Branch}}" completed. +MetaRange ID: {{.MetaRangeID|yellow}} +Commit ID: {{.Commit.Id|yellow}} +Message: {{.Commit.Message}} +Timestamp: {{.Commit.CreationDate|date}} +Parents: {{.Commit.Parents|join ", "}} +` + +var importCmd = &cobra.Command{ + Use: "import --from --to [--merge]", + Short: "Import data from external source to an imported branch (with optional merge)", + Run: func(cmd *cobra.Command, args []string) { + flags := cmd.Flags() + merge := MustBool(flags.GetBool("merge")) + noProgress := MustBool(flags.GetBool("no-progress")) + from := MustString(flags.GetString("from")) + to := MustString(flags.GetString("to")) + toURI := MustParsePathURI("to", to) + message := MustString(flags.GetString("message")) + metadata, err := getKV(cmd, "meta") + if err != nil { + DieErr(err) + } + + ctx := cmd.Context() + client := getClient() + verifySourceMatchConfiguredStorage(ctx, client, from) + + // verify target branch exists before we try to create and import into the associated imported branch + if err, ok := branchExists(ctx, client, toURI.Repository, toURI.Ref); err != nil { + DieErr(err) + } else if !ok { + DieFmt("Target branch '%s', does not exists!", toURI.Ref) + } + + // setup progress bar - based on `progressbar.Default` defaults + control visibility + bar := newImportProgressBar(!noProgress) + var ( + sum int + continuationToken *string + after string + ranges = make([]api.RangeMetadata, 0) + ) + for { + rangeResp, err := client.IngestRangeWithResponse(ctx, toURI.Repository, api.IngestRangeJSONRequestBody{ + After: after, + ContinuationToken: continuationToken, + FromSourceURI: from, + Prepend: api.StringValue(toURI.Path), + }) + DieOnErrorOrUnexpectedStatusCode(rangeResp, err, http.StatusCreated) + if rangeResp.JSON201 == nil { + Die("Bad response from server", 1) + } + if rangeResp.JSON201.Range != nil { + rangeInfo := *rangeResp.JSON201.Range + ranges = append(ranges, rangeInfo) + sum += rangeInfo.Count + _ = bar.Add(rangeInfo.Count) + } + + continuationToken = rangeResp.JSON201.Pagination.ContinuationToken + after = rangeResp.JSON201.Pagination.LastKey + if !rangeResp.JSON201.Pagination.HasMore { + break + } + } + _ = bar.Clear() + + // create metarange with all the ranges we created + metaRangeResp, err := client.CreateMetaRangeWithResponse(ctx, toURI.Repository, api.CreateMetaRangeJSONRequestBody{ + Ranges: ranges, + }) + DieOnErrorOrUnexpectedStatusCode(metaRangeResp, err, http.StatusCreated) + if metaRangeResp.JSON201 == nil { + Die("Bad response from server", 1) + } + + importedBranchID := formatImportedBranchID(toURI.Ref) + ensureBranchExists(ctx, client, toURI.Repository, importedBranchID, toURI.Ref) + + // commit metarange to the imported branch + commitResp, err := client.CommitWithResponse(ctx, toURI.Repository, importedBranchID, &api.CommitParams{ + SourceMetarange: metaRangeResp.JSON201.Id, + }, api.CommitJSONRequestBody{ + Message: message, + Metadata: &api.CommitCreation_Metadata{ + AdditionalProperties: metadata, + }, + }) + DieOnErrorOrUnexpectedStatusCode(commitResp, err, http.StatusCreated) + if commitResp.JSON201 == nil { + Die("Bad response from server", 1) + } + Write(importSummaryTemplate, struct { + Objects int + MetaRangeID string + Branch string + Commit *api.Commit + }{ + Objects: sum, + MetaRangeID: api.StringValue(metaRangeResp.JSON201.Id), + Branch: importedBranchID, + Commit: commitResp.JSON201, + }) + + // merge to target branch if needed + if merge { + mergeImportedBranch(ctx, client, toURI.Repository, importedBranchID, toURI.Ref) + } + }, +} + +func newImportProgressBar(visible bool) *progressbar.ProgressBar { + const ( + barSpinnerType = 14 + barWidth = 10 + barThrottle = 65 * time.Millisecond + ) + bar := progressbar.NewOptions64( + -1, + progressbar.OptionSetDescription("Importing"), + progressbar.OptionSetWriter(os.Stderr), + progressbar.OptionSetWidth(barWidth), + progressbar.OptionThrottle(barThrottle), + progressbar.OptionShowCount(), + progressbar.OptionShowIts(), + progressbar.OptionSetItsString("object"), + progressbar.OptionOnCompletion(func() { + _, _ = fmt.Fprint(os.Stderr, "\n") + }), + progressbar.OptionSpinnerType(barSpinnerType), + progressbar.OptionFullWidth(), + progressbar.OptionSetVisibility(visible), + ) + _ = bar.RenderBlank() + return bar +} + +func verifySourceMatchConfiguredStorage(ctx context.Context, client *api.ClientWithResponses, source string) { + storageConfResp, err := client.GetStorageConfigWithResponse(ctx) + DieOnErrorOrUnexpectedStatusCode(storageConfResp, err, http.StatusOK) + storageConfig := storageConfResp.JSON200 + if storageConfig == nil { + Die("Bad response from server", 1) + } + if storageConfig.BlockstoreNamespaceValidityRegex == "" { + return + } + matched, err := regexp.MatchString(storageConfig.BlockstoreNamespaceValidityRegex, source) + if err != nil { + DieErr(err) + } + if !matched { + DieFmt("import source '%s' doesn't match current configured storage '%s'", source, storageConfig.BlockstoreType) + } +} + +func mergeImportedBranch(ctx context.Context, client *api.ClientWithResponses, repository, fromBranch, toBranch string) { + mergeResp, err := client.MergeIntoBranchWithResponse(ctx, repository, fromBranch, toBranch, api.MergeIntoBranchJSONRequestBody{}) + DieOnErrorOrUnexpectedStatusCode(mergeResp, err, http.StatusOK) + if mergeResp.JSON200 == nil { + Die("Bad response from server", 1) + } + Write(mergeCreateTemplate, struct { + Merge FromTo + Result *api.MergeResult + }{ + Merge: FromTo{ + FromRef: fromBranch, + ToRef: toBranch, + }, + Result: mergeResp.JSON200, + }) +} + +func branchExists(ctx context.Context, client *api.ClientWithResponses, repository string, branch string) (error, bool) { + resp, err := client.GetBranchWithResponse(ctx, repository, branch) + if err != nil { + return err, false + } + if resp.JSON200 != nil { + return nil, true + } + if resp.JSON404 != nil { + return nil, false + } + return RetrieveError(resp, err), false +} + +func ensureBranchExists(ctx context.Context, client *api.ClientWithResponses, repository, branch, sourceBranch string) { + if err, ok := branchExists(ctx, client, repository, branch); err != nil { + DieErr(err) + } else if ok { + return + } + createBranchResp, err := client.CreateBranchWithResponse(ctx, repository, api.CreateBranchJSONRequestBody{ + Name: branch, + Source: sourceBranch, + }) + DieOnErrorOrUnexpectedStatusCode(createBranchResp, err, http.StatusCreated) +} + +func formatImportedBranchID(branch string) string { + return "_" + branch + "_imported" +} + +//nolint:gochecknoinits,gomnd +func init() { + importCmd.Flags().String("from", "", "prefix to read from (e.g. \"s3://bucket/sub/path/\"). must not be in a storage namespace") + _ = importCmd.MarkFlagRequired("from") + importCmd.Flags().String("to", "", "lakeFS path to load objects into (e.g. \"lakefs://repo/branch/sub/path/\")") + _ = importCmd.MarkFlagRequired("to") + importCmd.Flags().Bool("merge", false, "merge imported branch into target branch") + importCmd.Flags().Bool("no-progress", false, "switch off the progress output") + importCmd.Flags().StringP("message", "m", "Import objects", "commit message") + importCmd.Flags().StringSlice("meta", []string{}, "key value pair in the form of key=value") + rootCmd.AddCommand(importCmd) +} diff --git a/docs/reference/commands.md b/docs/reference/commands.md index 575663bf8dc..111edaee0d3 100644 --- a/docs/reference/commands.md +++ b/docs/reference/commands.md @@ -2168,6 +2168,29 @@ lakectl help [command] [flags] +### lakectl import + +Import data from external source to an imported branch (with optional merge) + +``` +lakectl import --from --to [--merge] [flags] +``` + +#### Options +{:.no_toc} + +``` + --from string prefix to read from (e.g. "s3://bucket/sub/path/"). must not be in a storage namespace + -h, --help help for import + --merge merge imported branch into target branch + -m, --message string commit message (default "Import objects") + --meta strings key value pair in the form of key=value + --no-progress switch off the progress output + --to string lakeFS path to load objects into (e.g. "lakefs://repo/branch/sub/path/") +``` + + + ### lakectl ingest Ingest objects from an external source into a lakeFS branch (without actually copying them) diff --git a/docs/setup/import.md b/docs/setup/import.md index 40afb611809..9f27116cc18 100644 --- a/docs/setup/import.md +++ b/docs/setup/import.md @@ -26,7 +26,7 @@ and in the same region of your destination bucket. lakeFS supports two ways to ingest objects from the object store without copying the data: 1. [Importing using the lakeFS UI](#importing-using-the-lakefs-ui) - A UI dialog to trigger an import to a designated import branch. It creates a commit from all imported objects. -1. [Importing using lakectl cli](#importing-using-lakectl-cli) - You can use a the [`lakectl` CLI command](../reference/commands.md#lakectl) to create uncommitted objects in a branch. It will make sequential calls between the CLI and the server. +1. [Importing using lakectl cli](#importing-using-lakectl-cli) - You can use the [`lakectl` CLI command](../reference/commands.md#lakectl) to create uncommitted objects in a branch. It will make sequential calls between the CLI and the server. #### Using the import wizard @@ -34,7 +34,7 @@ Clicking the Import button from any branch will open the following dialog: ![Import dialog example configured with S3](../assets/img/UI-Import-Dialog.png) -If it's the first import to the selected branch, it will create the import branch named `_imported`. +If it's the first import to the selected branch, it will create the import branch named `__imported`. lakeFS will import all objects from the Source URI to the import branch under the given prefix. The UI will update periodically with the amount of objects imported. How long it takes depends on the amount of objects to be imported but will roughly be a few thousand objects per second. @@ -48,18 +48,58 @@ Once the import is completed, you can merge the changes from the import branch t ### Importing using lakectl cli -The `lakectl` command supports ingesting objects from an external source. -This is done by listing the source bucket (and optional prefix), and creating pointers to the returned objects in lakeFS. +The `lakectl` cli supports _import_ and _ingest_ commands to import objects from an external source. -#### Prerequisites +- The _import_ command acts the same as the UI import wizard. It imports (zero copy) and commits the changes on `__imported` branch with an optional flag to also merge the changes to ``. +- The _Ingest_ is listing the source bucket (and optional prefix) from the client, and creating pointers to the returned objects in lakeFS. The objects will be staged on the branch. -1. The user calling `lakectl ingest` has permissions to list the objects at the source object store. -2. _recommended_: The lakeFS installation has read permissions to the objects being ingested (to support downloading them directly from the lakeFS server) -3. The source path is **not** a storage namespace used by lakeFS. For example, if `lakefs://my-repo` created with storage namespace `s3://my-bucket`, then `s3://my-bucket/*` cannot be an ingestion source. + +#### Using the `lakectl import` command + +##### Usage + +
+ +
+```shell +lakectl import \ + --from s3://bucket/optional/prefix/ \ + --to lakefs://my-repo/my-branch/optional/path/ +``` +
+
+```shell +lakectl import \ + --from https://storageAccountName.blob.core.windows.net/container/optional/prefix/ \ + --to lakefs://my-repo/my-branch/optional/path/ +``` +
+
+```shell +lakectl import \ + --from gs://bucket/optional/prefix/ \ + --to lakefs://my-repo/my-branch/optional/path/ +``` +
+
+ +The imported objects will be committed to `_my-branch_imported` branch. If the branch does not exist, it will be created. The flag `--merge` will merge the branch `_my-branch_imported` to `my-branch` after a successful import. #### Using the `lakectl ingest` command +##### Prerequisites + +1. The user calling `lakectl ingest` has permissions to list the objects at the source object store. +2. _Recommended_: The lakeFS installation has read permissions to the objects being ingested (to support downloading them directly from the lakeFS server) +3. The source path is **not** a storage namespace used by lakeFS. For example, if `lakefs://my-repo` created with storage namespace `s3://my-bucket`, then `s3://my-bucket/*` cannot be an ingestion source. + +##### Usage +
  • AWS S3 or S3 API Compatible storage
  • diff --git a/esti/golden/lakectl_help.golden b/esti/golden/lakectl_help.golden index 681dfd46ff9..1ea2a30d2f0 100644 --- a/esti/golden/lakectl_help.golden +++ b/esti/golden/lakectl_help.golden @@ -18,6 +18,7 @@ Available Commands: fs View and manipulate objects gc Manage the garbage collection policy help Help about any command + import Import data from external source to an imported branch (with optional merge) ingest Ingest objects from an external source into a lakeFS branch (without actually copying them) log Show log of commits merge Merge & commit changes from source branch into destination branch diff --git a/esti/golden/lakectl_import.golden b/esti/golden/lakectl_import.golden new file mode 100644 index 00000000000..82e3c30870a --- /dev/null +++ b/esti/golden/lakectl_import.golden @@ -0,0 +1,6 @@ + Import of ${OBJECTS} object(s) into "${IMPORTED_BRANCH}" completed. +MetaRange ID: +Commit ID: +Message: Import objects +Timestamp: