diff --git a/go.mod b/go.mod index 96bde9e1..05ea017c 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/temporalio/ui-server/v2 v2.34.0 go.temporal.io/api v1.43.0 - go.temporal.io/sdk v1.31.0 + go.temporal.io/sdk v1.32.1 go.temporal.io/server v1.26.2 google.golang.org/grpc v1.67.1 google.golang.org/protobuf v1.35.1 diff --git a/go.sum b/go.sum index 070e4896..d1edbcaa 100644 --- a/go.sum +++ b/go.sum @@ -309,8 +309,6 @@ github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb/go.mod h1:143 github.com/temporalio/tchannel-go v1.22.1-0.20220818200552-1be8d8cffa5b/go.mod h1:c+V9Z/ZgkzAdyGvHrvC5AsXgN+M9Qwey04cBdKYzV7U= github.com/temporalio/tchannel-go v1.22.1-0.20240528171429-1db37fdea938 h1:sEJGhmDo+0FaPWM6f0v8Tjia0H5pR6/Baj6+kS78B+M= github.com/temporalio/tchannel-go v1.22.1-0.20240528171429-1db37fdea938/go.mod h1:ezRQRwu9KQXy8Wuuv1aaFFxoCNz5CeNbVOOkh3xctbY= -github.com/temporalio/ui-server/v2 v2.32.0 h1:mR6eet9n4eRkGgHcZqaJdXWK5sfQguN4LoWxQXsqpY0= -github.com/temporalio/ui-server/v2 v2.32.0/go.mod h1:b8whRt0/lbgNDzG7alSdiDzXFO8Fk783eRMhIycWtn8= github.com/temporalio/ui-server/v2 v2.34.0 h1:KLTTMh870/h1oxYqOtGMnmQBOP0oIcwFEnP7/i0C0hA= github.com/temporalio/ui-server/v2 v2.34.0/go.mod h1:Um2G8/8bDQczAdY+21ba+y+nLXwUdL7ZLlhAAaIeVqA= github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg= @@ -361,8 +359,8 @@ go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeX go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.temporal.io/api v1.43.0 h1:lBhq+u5qFJqGMXwWsmg/i8qn1UA/3LCwVc88l2xUMHg= go.temporal.io/api v1.43.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= -go.temporal.io/sdk v1.31.0 h1:CLYiP0R5Sdj0gq8LyYKDDz4ccGOdJPR8wNGJU0JGwj8= -go.temporal.io/sdk v1.31.0/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8= +go.temporal.io/sdk v1.32.1 h1:slA8prhdFr4lxpsTcRusWVitD/cGjELfKUh0mBj73SU= +go.temporal.io/sdk v1.32.1/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8= go.temporal.io/server v1.26.2 h1:vDW11lxslYPlGDbQklWi/tqbkVZ2ExtRO1jNjvZmUUI= go.temporal.io/server v1.26.2/go.mod h1:tgY+4z/PuIdqs6ouV1bT90RWSWfEioWkzmrNrLYLUrk= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 3ddc1c1e..ff6c537a 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -131,6 +131,18 @@ func (v *WorkflowReferenceOptions) buildFlags(cctx *CommandContext, f *pflag.Fla f.StringVarP(&v.RunId, "run-id", "r", "", "Run ID.") } +type DeploymentReferenceOptions struct { + SeriesName string + BuildId string +} + +func (v *DeploymentReferenceOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { + f.StringVar(&v.SeriesName, "series-name", "", "Series Name for a Worker Deployment. Required.") + _ = cobra.MarkFlagRequired(f, "series-name") + f.StringVar(&v.BuildId, "build-id", "", "Build ID for a Worker Deployment. Required.") + _ = cobra.MarkFlagRequired(f, "build-id") +} + type SingleWorkflowOrBatchOptions struct { WorkflowId string Query string @@ -296,6 +308,7 @@ func NewTemporalCommand(cctx *CommandContext) *TemporalCommand { s.Command.AddCommand(&NewTemporalScheduleCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalServerCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalTaskQueueCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkerCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowCommand(cctx, &s).Command) s.Command.PersistentFlags().StringVar(&s.Env, "env", "default", "Active environment name (`ENV`).") cctx.BindFlagEnvVar(s.Command.PersistentFlags().Lookup("env"), "TEMPORAL_ENV") @@ -2492,6 +2505,164 @@ func NewTemporalTaskQueueVersioningReplaceRedirectRuleCommand(cctx *CommandConte return &s } +type TemporalWorkerCommand struct { + Parent *TemporalCommand + Command cobra.Command + ClientOptions +} + +func NewTemporalWorkerCommand(cctx *CommandContext, parent *TemporalCommand) *TemporalWorkerCommand { + var s TemporalWorkerCommand + s.Parent = parent + s.Command.Use = "worker" + s.Command.Short = "Read or update Worker state" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker is experimental. Worker commands are subject to |\n| change. |\n+---------------------------------------------------------------------+\n\nModify or read state associated with a Worker, for example,\nusing Worker Deployments commands:\n\n\x1b[1mtemporal worker deployment\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker is experimental. Worker commands are subject to |\n| change. |\n+---------------------------------------------------------------------+\n\nModify or read state associated with a Worker, for example,\nusing Worker Deployments commands:\n\n```\ntemporal worker deployment\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalWorkerDeploymentCommand(cctx, &s).Command) + s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags()) + return &s +} + +type TemporalWorkerDeploymentCommand struct { + Parent *TemporalWorkerCommand + Command cobra.Command +} + +func NewTemporalWorkerDeploymentCommand(cctx *CommandContext, parent *TemporalWorkerCommand) *TemporalWorkerDeploymentCommand { + var s TemporalWorkerDeploymentCommand + s.Parent = parent + s.Command.Use = "deployment" + s.Command.Short = "Describe, list, and operate on Worker Deployments" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nDeployment commands perform operations on Worker Deployments:\n\n\x1b[1mtemporal worker deployment [command] [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment list\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nDeployment commands perform operations on Worker Deployments:\n\n```\ntemporal worker deployment [command] [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment list\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalWorkerDeploymentDescribeCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkerDeploymentGetCurrentCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkerDeploymentListCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkerDeploymentSetCurrentCommand(cctx, &s).Command) + return &s +} + +type TemporalWorkerDeploymentDescribeCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + DeploymentReferenceOptions + ReportReachability bool +} + +func NewTemporalWorkerDeploymentDescribeCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentDescribeCommand { + var s TemporalWorkerDeploymentDescribeCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "describe [flags]" + s.Command.Short = "Show properties of a Worker Deployment" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nDescribes properties of a Worker Deployment, such as whether it is\ncurrent, the non-empty list of its task queues, custom metadata if\npresent, and reachability status when requested.\n\n\x1b[1mtemporal worker deployment describe [options]\x1b[0m\n\nFor example, to also include reachability information:\n\n\x1b[1mtemporal worker deployment describe \\\n --series-name YourDeploymentSeriesName \\\n --build-id YourDeploymentBuildId \\\n --report-reachability\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nDescribes properties of a Worker Deployment, such as whether it is\ncurrent, the non-empty list of its task queues, custom metadata if\npresent, and reachability status when requested.\n\n```\ntemporal worker deployment describe [options]\n```\n\nFor example, to also include reachability information:\n\n```\ntemporal worker deployment describe \\\n --series-name YourDeploymentSeriesName \\\n --build-id YourDeploymentBuildId \\\n --report-reachability\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().BoolVar(&s.ReportReachability, "report-reachability", false, "Include reachability information of a Worker Deployment.") + s.DeploymentReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalWorkerDeploymentGetCurrentCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + SeriesName string +} + +func NewTemporalWorkerDeploymentGetCurrentCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentGetCurrentCommand { + var s TemporalWorkerDeploymentGetCurrentCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "get-current [flags]" + s.Command.Short = "Show the current Worker Deployment" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nGets the current Worker Deployment for a Deployment Series Name.\nWhen a Deployment is current, Workers of that Deployment will receive\ntasks from new Workflows and from existing AutoUpgrade Workflows that\nare running on this Deployment Series.\n\n\x1b[1mtemporal worker deployment get-current [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment get-current \\\n --series-name YourDeploymentSeriesName\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nGets the current Worker Deployment for a Deployment Series Name.\nWhen a Deployment is current, Workers of that Deployment will receive\ntasks from new Workflows and from existing AutoUpgrade Workflows that\nare running on this Deployment Series.\n\n```\ntemporal worker deployment get-current [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment get-current \\\n --series-name YourDeploymentSeriesName\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.SeriesName, "series-name", "", "Series Name for the current Worker Deployment. Required.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "series-name") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalWorkerDeploymentListCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + SeriesName string +} + +func NewTemporalWorkerDeploymentListCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentListCommand { + var s TemporalWorkerDeploymentListCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "list [flags]" + s.Command.Short = "Enumerate Worker Deployments in the client's namespace" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nList existing Worker Deployments in the client's namespace, optionally\nfiltering them by Deployment Series Name.\n\n\n\x1b[1mtemporal worker deployment list [options]\x1b[0m\n\nFor example, adding an optional filter:\n\n\x1b[1mtemporal worker deployment list \\\n --series-name YourDeploymentSeriesName\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nList existing Worker Deployments in the client's namespace, optionally\nfiltering them by Deployment Series Name.\n\n\n```\ntemporal worker deployment list [options]\n```\n\nFor example, adding an optional filter:\n\n```\ntemporal worker deployment list \\\n --series-name YourDeploymentSeriesName\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.SeriesName, "series-name", "", "Series Name to filter Worker Deployments.") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalWorkerDeploymentSetCurrentCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + DeploymentReferenceOptions + Metadata []string +} + +func NewTemporalWorkerDeploymentSetCurrentCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentSetCurrentCommand { + var s TemporalWorkerDeploymentSetCurrentCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "set-current [flags]" + s.Command.Short = "Change the current Worker Deployment" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nSets the current Deployment for a given Deployment Series.\nWhen a Deployment is current, Workers of that Deployment will receive\ntasks from new Workflows and from existing AutoUpgrade Workflows that\nare running on this Deployment Series.\n\n\x1b[1mtemporal worker deployment set-current [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment set-current \\\n --series-name YourDeploymentSeriesName \\\n --build-id YourDeploymentBuildId\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worker deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n\nSets the current Deployment for a given Deployment Series.\nWhen a Deployment is current, Workers of that Deployment will receive\ntasks from new Workflows and from existing AutoUpgrade Workflows that\nare running on this Deployment Series.\n\n```\ntemporal worker deployment set-current [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment set-current \\\n --series-name YourDeploymentSeriesName \\\n --build-id YourDeploymentBuildId\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringArrayVar(&s.Metadata, "metadata", nil, "Set deployment metadata using `KEY=\"VALUE\"` pairs. Keys must be identifiers, and values must be JSON values. For example: 'YourKey={\"your\": \"value\"}'. Can be passed multiple times.") + s.DeploymentReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowCommand struct { Parent *TemporalCommand Command cobra.Command @@ -2526,6 +2697,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTraceCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowUpdateCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowUpdateOptionsCommand(cctx, &s).Command) s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags()) return &s } @@ -3195,3 +3367,38 @@ func NewTemporalWorkflowUpdateStartCommand(cctx *CommandContext, parent *Tempora } return &s } + +type TemporalWorkflowUpdateOptionsCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SingleWorkflowOrBatchOptions + VersioningOverrideBehavior StringEnum + VersioningOverrideSeriesName string + VersioningOverrideBuildId string +} + +func NewTemporalWorkflowUpdateOptionsCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowUpdateOptionsCommand { + var s TemporalWorkflowUpdateOptionsCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "update-options [flags]" + s.Command.Short = "Change Workflow Execution Options" + if hasHighlighting { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worflow update-options is experimental. Workflow Execution |\n| properties are subject to change. |\n+---------------------------------------------------------------------+\n\nModify properties of Workflow Executions:\n\n\x1b[1mtemporal workflow update-options [options]\x1b[0m\n\nIt can override the Worker Deployment configuration of a\nWorkflow Execution, which controls Worker Versioning.\n\nFor example, to force Workers in the current Deployment execute the\nnext Workflow Task change behavior to \x1b[1mauto_upgrade\x1b[0m:\n\n\x1b[1mtemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior auto_upgrade\x1b[0m\n\nor to pin the workflow execution to a Worker Deployment, set behavior\nto \x1b[1mpinned\x1b[0m:\n\n\x1b[1mtemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior pinned \\\n --versioning-override-series-name YourDeploymentSeriesName \\\n --versioning-override-build-id YourDeploymentBuildId\x1b[0m\n\nTo remove any previous overrides, set the behavior to\n\x1b[1munspecified\x1b[0m:\n\n\x1b[1mtemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior unspecified\x1b[0m\n\nTo see the current override use \x1b[1mtemporal workflow describe\x1b[0m" + } else { + s.Command.Long = "+---------------------------------------------------------------------+\n| CAUTION: Worflow update-options is experimental. Workflow Execution |\n| properties are subject to change. |\n+---------------------------------------------------------------------+\n\nModify properties of Workflow Executions:\n\n```\ntemporal workflow update-options [options]\n```\n\nIt can override the Worker Deployment configuration of a\nWorkflow Execution, which controls Worker Versioning.\n\nFor example, to force Workers in the current Deployment execute the\nnext Workflow Task change behavior to `auto_upgrade`:\n\n```\ntemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior auto_upgrade\n```\n\nor to pin the workflow execution to a Worker Deployment, set behavior\nto `pinned`:\n\n```\ntemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior pinned \\\n --versioning-override-series-name YourDeploymentSeriesName \\\n --versioning-override-build-id YourDeploymentBuildId\n```\n\nTo remove any previous overrides, set the behavior to\n`unspecified`:\n\n```\ntemporal workflow update-options \\\n --workflow-id YourWorkflowId \\\n --versioning-override-behavior unspecified\n```\n\nTo see the current override use `temporal workflow describe`" + } + s.Command.Args = cobra.NoArgs + s.VersioningOverrideBehavior = NewStringEnum([]string{"unspecified", "pinned", "auto_upgrade"}, "") + s.Command.Flags().Var(&s.VersioningOverrideBehavior, "versioning-override-behavior", "Override the versioning behavior of a Workflow. Accepted values: unspecified, pinned, auto_upgrade. Required.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "versioning-override-behavior") + s.Command.Flags().StringVar(&s.VersioningOverrideSeriesName, "versioning-override-series-name", "", "Override Series Name for a Worker Deployment (Only for pinned).") + s.Command.Flags().StringVar(&s.VersioningOverrideBuildId, "versioning-override-build-id", "", "Override Build ID for a Worker Deployment (Only for pinned).") + s.SingleWorkflowOrBatchOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} diff --git a/temporalcli/commands.taskqueue_test.go b/temporalcli/commands.taskqueue_test.go index dd863844..346df180 100644 --- a/temporalcli/commands.taskqueue_test.go +++ b/temporalcli/commands.taskqueue_test.go @@ -2,11 +2,12 @@ package temporalcli_test import ( "encoding/json" - "github.com/stretchr/testify/assert" - "go.temporal.io/sdk/workflow" "strings" "time" + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/workflow" + "github.com/google/uuid" "github.com/temporalio/cli/temporalcli" "go.temporal.io/api/enums/v1" @@ -261,6 +262,10 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() { ) s.NoError(res.Err) + // TODO(antlai-temporal): Delete when a server caching bug in 1.26.2 is fixed, + // see https://github.com/temporalio/temporal/pull/6978 + time.Sleep(1 * time.Second) + // Text res = s.Execute( "task-queue", "describe", @@ -270,11 +275,14 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() { "--task-queue", s.Worker().Options.TaskQueue, ) s.NoError(res.Err) - s.ContainsOnSameLine(res.Stdout.String(), "id1", "reachable") // No pollers on id1 s.NotContains(res.Stdout.String(), "now") + // TODO(antlai-temporal): Delete when a server caching bug in 1.26.2 is fixed, + // see https://github.com/temporalio/temporal/pull/6978 + time.Sleep(1 * time.Second) + res = s.Execute( "task-queue", "describe", "--select-unversioned", @@ -286,8 +294,12 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() { s.NoError(res.Err) s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "unreachable") - s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "workflow", s.DevServer.Options.ClientOptions.Identity, "now", "100000") - s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "activity", s.DevServer.Options.ClientOptions.Identity, "now", "100000") + s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "workflow", s.DevServer.Options.ClientOptions.Identity, "2 seconds ago", "100000") + s.ContainsOnSameLine(res.Stdout.String(), "UNVERSIONED", "activity", s.DevServer.Options.ClientOptions.Identity, "2 seconds ago", "100000") + + // TODO(antlai-temporal): Delete when a server caching bug in 1.26.2 is fixed, + // see https://github.com/temporalio/temporal/pull/6978 + time.Sleep(1 * time.Second) res = s.Execute( "task-queue", "describe", @@ -303,6 +315,10 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() { // No pollers on id2 s.NotContains(res.Stdout.String(), "now") + // TODO(antlai-temporal): Delete when a server caching bug in 1.26.2 is fixed, + // see https://github.com/temporalio/temporal/pull/6978 + time.Sleep(1 * time.Second) + res = s.Execute( "task-queue", "describe", "--select-all-active", diff --git a/temporalcli/commands.worker.deployment.go b/temporalcli/commands.worker.deployment.go new file mode 100644 index 00000000..45a44ec0 --- /dev/null +++ b/temporalcli/commands.worker.deployment.go @@ -0,0 +1,377 @@ +package temporalcli + +import ( + "fmt" + "time" + + "github.com/fatih/color" + "github.com/temporalio/cli/temporalcli/internal/printer" + "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/client" +) + +type taskQueuesInfosRowType struct { + Name string `json:"name"` + Type string `json:"type"` + FirstPollerTime time.Time `json:"firstPollerTime"` +} + +type deploymentType struct { + SeriesName string `json:"seriesName"` + BuildID string `json:"buildId"` +} + +type formattedDeploymentInfoType struct { + Deployment deploymentType `json:"deployment"` + CreateTime time.Time `json:"createTime"` + IsCurrent bool `json:"isCurrent"` + TaskQueuesInfos []taskQueuesInfosRowType `json:"taskQueuesInfos,omitempty"` + Metadata map[string]*common.Payload `json:"metadata,omitempty"` +} + +type formattedDeploymentReachabilityInfoType struct { + DeploymentInfo formattedDeploymentInfoType `json:"deploymentInfo"` + Reachability string `json:"reachability"` + LastUpdateTime time.Time `json:"lastUpdateTime"` +} + +type formattedDeploymentListEntryType struct { + SeriesName string + BuildID string + CreateTime time.Time + IsCurrent bool +} + +type formattedDualDeploymentInfoType struct { + Previous formattedDeploymentInfoType `json:"previous"` + Current formattedDeploymentInfoType `json:"current"` +} + +func formatTaskQueuesInfos(tqis []client.DeploymentTaskQueueInfo) ([]taskQueuesInfosRowType, error) { + var tqiRows []taskQueuesInfosRowType + for _, tqi := range tqis { + tqTypeStr, err := taskQueueTypeToStr(tqi.Type) + if err != nil { + return tqiRows, err + } + tqiRows = append(tqiRows, taskQueuesInfosRowType{ + Name: tqi.Name, + Type: tqTypeStr, + FirstPollerTime: tqi.FirstPollerTime, + }) + } + return tqiRows, nil +} + +func deploymentInfoToRows(deploymentInfo client.DeploymentInfo) (formattedDeploymentInfoType, error) { + tqi, err := formatTaskQueuesInfos(deploymentInfo.TaskQueuesInfos) + if err != nil { + return formattedDeploymentInfoType{}, err + } + + return formattedDeploymentInfoType{ + Deployment: deploymentType{ + SeriesName: deploymentInfo.Deployment.SeriesName, + BuildID: deploymentInfo.Deployment.BuildID, + }, + CreateTime: deploymentInfo.CreateTime, + IsCurrent: deploymentInfo.IsCurrent, + TaskQueuesInfos: tqi, + Metadata: deploymentInfo.Metadata, + }, nil +} + +func printDeploymentInfo(cctx *CommandContext, deploymentInfo client.DeploymentInfo, msg string) error { + + fDeploymentInfo, err := deploymentInfoToRows(deploymentInfo) + if err != nil { + return err + } + + if !cctx.JSONOutput { + cctx.Printer.Println(color.MagentaString(msg)) + printMe := struct { + SeriesName string + BuildID string + CreateTime time.Time + IsCurrent bool + Metadata map[string]*common.Payload `cli:",cardOmitEmpty"` + }{ + SeriesName: deploymentInfo.Deployment.SeriesName, + BuildID: deploymentInfo.Deployment.BuildID, + CreateTime: deploymentInfo.CreateTime, + IsCurrent: deploymentInfo.IsCurrent, + Metadata: deploymentInfo.Metadata, + } + err := cctx.Printer.PrintStructured(printMe, printer.StructuredOptions{}) + if err != nil { + return fmt.Errorf("displaying worker deployment info failed: %w", err) + } + + if len(deploymentInfo.TaskQueuesInfos) > 0 { + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString("Task Queues:")) + err := cctx.Printer.PrintStructured( + deploymentInfo.TaskQueuesInfos, + printer.StructuredOptions{Table: &printer.TableOptions{}}, + ) + if err != nil { + return fmt.Errorf("displaying task queues info failed: %w", err) + } + } + + return nil + } + + // json output + return cctx.Printer.PrintStructured(fDeploymentInfo, printer.StructuredOptions{}) +} + +func deploymentReachabilityTypeToStr(reachabilityType client.DeploymentReachability) (string, error) { + switch reachabilityType { + case client.DeploymentReachabilityUnspecified: + return "unspecified", nil + case client.DeploymentReachabilityReachable: + return "reachable", nil + case client.DeploymentReachabilityClosedWorkflows: + return "closed", nil + case client.DeploymentReachabilityUnreachable: + return "unreachable", nil + default: + return "", fmt.Errorf("unrecognized deployment reachability type: %d", reachabilityType) + } +} + +func printDeploymentReachabilityInfo(cctx *CommandContext, reachability client.DeploymentReachabilityInfo) error { + fDeploymentInfo, err := deploymentInfoToRows(reachability.DeploymentInfo) + if err != nil { + return err + } + + rTypeStr, err := deploymentReachabilityTypeToStr(reachability.Reachability) + if err != nil { + return err + } + + fReachabilityInfo := formattedDeploymentReachabilityInfoType{ + DeploymentInfo: fDeploymentInfo, + LastUpdateTime: reachability.LastUpdateTime, + Reachability: rTypeStr, + } + + if !cctx.JSONOutput { + err := printDeploymentInfo(cctx, reachability.DeploymentInfo, "Worker Deployment:") + if err != nil { + return err + } + + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString("Reachability:")) + printMe := struct { + LastUpdateTime time.Time + Reachability string + }{ + LastUpdateTime: fReachabilityInfo.LastUpdateTime, + Reachability: fReachabilityInfo.Reachability, + } + return cctx.Printer.PrintStructured(printMe, printer.StructuredOptions{}) + } + + // json output + return cctx.Printer.PrintStructured(fReachabilityInfo, printer.StructuredOptions{}) +} + +func printDeploymentSetCurrentResponse(cctx *CommandContext, response client.DeploymentSetCurrentResponse) error { + + if !cctx.JSONOutput { + err := printDeploymentInfo(cctx, response.Previous, "Previous Worker Deployment:") + if err != nil { + return fmt.Errorf("displaying previous worker deployment info failed: %w", err) + } + + err = printDeploymentInfo(cctx, response.Current, "Current Worker Deployment:") + if err != nil { + return fmt.Errorf("displaying current worker deployment info failed: %w", err) + } + + return nil + } + + previous, err := deploymentInfoToRows(response.Previous) + if err != nil { + return fmt.Errorf("displaying previous worker deployment info failed: %w", err) + } + current, err := deploymentInfoToRows(response.Current) + if err != nil { + return fmt.Errorf("displaying current worker deployment info failed: %w", err) + } + + return cctx.Printer.PrintStructured(formattedDualDeploymentInfoType{ + Previous: previous, + Current: current, + }, printer.StructuredOptions{}) +} + +func (c *TemporalWorkerDeploymentDescribeCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + if c.ReportReachability { + // Expensive call, rate-limited by target method + resp, err := cl.DeploymentClient().GetReachability(cctx, client.DeploymentGetReachabilityOptions{ + Deployment: client.Deployment{ + SeriesName: c.SeriesName, + BuildID: c.BuildId, + }, + }) + if err != nil { + return fmt.Errorf("error describing worker deployment with reachability: %w", err) + } + + err = printDeploymentReachabilityInfo(cctx, resp) + if err != nil { + return err + } + } else { + resp, err := cl.DeploymentClient().Describe(cctx, client.DeploymentDescribeOptions{ + Deployment: client.Deployment{ + SeriesName: c.SeriesName, + BuildID: c.BuildId, + }, + }) + if err != nil { + return fmt.Errorf("error describing worker deployment: %w", err) + } + err = printDeploymentInfo(cctx, resp.DeploymentInfo, "Worker Deployment:") + if err != nil { + return err + } + + } + + return nil +} + +func (c *TemporalWorkerDeploymentGetCurrentCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + resp, err := cl.DeploymentClient().GetCurrent(cctx, client.DeploymentGetCurrentOptions{ + SeriesName: c.SeriesName, + }) + if err != nil { + return fmt.Errorf("error getting the current deployment: %w", err) + } + + err = printDeploymentInfo(cctx, resp.DeploymentInfo, "Current Worker Deployment:") + if err != nil { + return err + } + + return nil +} + +func (c *TemporalWorkerDeploymentListCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.Parent.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + res, err := cl.DeploymentClient().List(cctx, client.DeploymentListOptions{ + SeriesName: c.SeriesName, + }) + if err != nil { + return err + } + + // This is a listing command subject to json vs jsonl rules + cctx.Printer.StartList() + defer cctx.Printer.EndList() + + printTableOpts := printer.StructuredOptions{ + Table: &printer.TableOptions{}, + } + + // make artificial "pages" so we get better aligned columns + page := make([]*formattedDeploymentListEntryType, 0, 100) + + for res.HasNext() { + entry, err := res.Next() + if err != nil { + return err + } + listEntry := formattedDeploymentInfoType{ + Deployment: deploymentType{ + SeriesName: entry.Deployment.SeriesName, + BuildID: entry.Deployment.BuildID, + }, + CreateTime: entry.CreateTime, + IsCurrent: entry.IsCurrent, + } + if cctx.JSONOutput { + // For JSON dump one line of JSON per deployment + _ = cctx.Printer.PrintStructured(listEntry, printer.StructuredOptions{}) + } else { + // For non-JSON, we are doing a table for each page + page = append(page, &formattedDeploymentListEntryType{ + SeriesName: listEntry.Deployment.SeriesName, + BuildID: listEntry.Deployment.BuildID, + CreateTime: listEntry.CreateTime, + IsCurrent: listEntry.IsCurrent, + }) + if len(page) == cap(page) { + _ = cctx.Printer.PrintStructured(page, printTableOpts) + page = page[:0] + printTableOpts.Table.NoHeader = true + } + } + } + + if !cctx.JSONOutput { + // Last partial page for non-JSON + _ = cctx.Printer.PrintStructured(page, printTableOpts) + } + + return nil +} + +func (c *TemporalWorkerDeploymentSetCurrentCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.Parent.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + metadata, err := stringKeysJSONValues(c.Metadata, false) + if err != nil { + return fmt.Errorf("invalid metadata values: %w", err) + } + + resp, err := cl.DeploymentClient().SetCurrent(cctx, client.DeploymentSetCurrentOptions{ + Deployment: client.Deployment{ + SeriesName: c.SeriesName, + BuildID: c.BuildId, + }, + MetadataUpdate: client.DeploymentMetadataUpdate{ + UpsertEntries: metadata, + }, + }) + if err != nil { + return fmt.Errorf("error setting the current worker deployment: %w", err) + } + + err = printDeploymentSetCurrentResponse(cctx, resp) + if err != nil { + return err + } + + cctx.Printer.Println("Successfully setting the current worker deployment") + return nil +} diff --git a/temporalcli/commands.worker.deployment_test.go b/temporalcli/commands.worker.deployment_test.go new file mode 100644 index 00000000..87dba990 --- /dev/null +++ b/temporalcli/commands.worker.deployment_test.go @@ -0,0 +1,191 @@ +package temporalcli_test + +import ( + "encoding/base64" + "encoding/json" + "sort" + "time" + + "github.com/google/uuid" + "go.temporal.io/api/common/v1" +) + +type jsonTaskQueuesInfosRowType struct { + Name string `json:"name"` + Type string `json:"type"` + FirstPollerTime time.Time `json:"firstPollerTime"` +} + +type jsonDeploymentType struct { + SeriesName string `json:"seriesName"` + BuildID string `json:"buildId"` +} + +type jsonDeploymentInfoType struct { + Deployment jsonDeploymentType `json:"deployment"` + CreateTime time.Time `json:"createTime"` + IsCurrent bool `json:"isCurrent"` + TaskQueuesInfos []jsonTaskQueuesInfosRowType `json:"taskQueuesInfos,omitempty"` + Metadata map[string]*common.Payload `json:"metadata,omitempty"` +} + +type jsonDeploymentReachabilityInfoType struct { + DeploymentInfo jsonDeploymentInfoType `json:"deploymentInfo"` + Reachability string `json:"reachability"` + LastUpdateTime time.Time `json:"lastUpdateTime"` +} + +type jsonDeploymentListEntryType struct { + Deployment jsonDeploymentType `json:"deployment"` + CreateTime time.Time `json:"createTime"` + IsCurrent bool `json:"isCurrent"` +} + +func (s *SharedServerSuite) TestDeployment_Set_Current() { + seriesName := uuid.NewString() + buildId := uuid.NewString() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId, + "--metadata", "bar=1", + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "get-current", + "--address", s.Address(), + "--series-name", seriesName, + ) + s.NoError(res.Err) + + s.ContainsOnSameLine(res.Stdout.String(), "SeriesName", seriesName) + s.ContainsOnSameLine(res.Stdout.String(), "BuildID", buildId) + s.ContainsOnSameLine(res.Stdout.String(), "IsCurrent", "true") + s.ContainsOnSameLine(res.Stdout.String(), "Metadata", "data:\"1\"") + + // json + res = s.Execute( + "worker", "deployment", "get-current", + "--address", s.Address(), + "--series-name", seriesName, + "--output", "json", + ) + s.NoError(res.Err) + + var jsonOut jsonDeploymentInfoType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal(jsonDeploymentType{SeriesName: seriesName, BuildID: buildId}, jsonOut.Deployment) + s.True(jsonOut.IsCurrent) + // "1" is "MQ==" + s.Equal("MQ==", base64.StdEncoding.EncodeToString(jsonOut.Metadata["bar"].GetData())) + // "json/plain" is "anNvbi9wbGFpbg==" + s.Equal("anNvbi9wbGFpbg==", base64.StdEncoding.EncodeToString(jsonOut.Metadata["bar"].GetMetadata()["encoding"])) +} + +func (s *SharedServerSuite) TestDeployment_List() { + seriesName := uuid.NewString() + buildId1 := uuid.NewString() + buildId2 := uuid.NewString() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId2, + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "list", + "--address", s.Address(), + "--series-name", seriesName, + ) + s.NoError(res.Err) + + s.ContainsOnSameLine(res.Stdout.String(), seriesName, buildId1, "now", "false") + s.ContainsOnSameLine(res.Stdout.String(), seriesName, buildId2, "now", "true") + + // json + res = s.Execute( + "worker", "deployment", "list", + "--address", s.Address(), + "--series-name", seriesName, + "--output", "json", + ) + s.NoError(res.Err) + + var jsonOut []jsonDeploymentListEntryType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + sort.Slice(jsonOut, func(i, j int) bool { + return jsonOut[i].CreateTime.Before(jsonOut[j].CreateTime) + }) + s.Equal(len(jsonOut), 2) + s.Equal(jsonDeploymentType{SeriesName: seriesName, BuildID: buildId1}, jsonOut[0].Deployment) + s.True(!jsonOut[0].IsCurrent) + s.Equal(jsonDeploymentType{SeriesName: seriesName, BuildID: buildId2}, jsonOut[1].Deployment) + s.True(jsonOut[1].IsCurrent) +} + +func (s *SharedServerSuite) TestDeployment_Describe_Reachability() { + seriesName := uuid.NewString() + buildId1 := uuid.NewString() + buildId2 := uuid.NewString() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId2, + ) + s.NoError(res.Err) + + res = s.Execute( + "worker", "deployment", "describe", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + "--report-reachability", + ) + s.NoError(res.Err) + + s.ContainsOnSameLine(res.Stdout.String(), "SeriesName", seriesName) + s.ContainsOnSameLine(res.Stdout.String(), "BuildID", buildId1) + s.ContainsOnSameLine(res.Stdout.String(), "IsCurrent", "false") + s.ContainsOnSameLine(res.Stdout.String(), "Reachability", "unreachable") + + // json + res = s.Execute( + "worker", "deployment", "describe", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId2, + "--report-reachability", + "--output", "json", + ) + s.NoError(res.Err) + + var jsonOut jsonDeploymentReachabilityInfoType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal(jsonDeploymentType{SeriesName: seriesName, BuildID: buildId2}, jsonOut.DeploymentInfo.Deployment) + s.True(jsonOut.DeploymentInfo.IsCurrent) + s.Equal(jsonOut.Reachability, "reachable") +} diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index 02316aad..481f881f 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -8,18 +8,22 @@ import ( "os/user" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/workflow" "github.com/fatih/color" "github.com/google/uuid" "github.com/temporalio/cli/temporalcli/internal/printer" "go.temporal.io/api/batch/v1" "go.temporal.io/api/common/v1" + deploymentpb "go.temporal.io/api/deployment/v1" "go.temporal.io/api/enums/v1" "go.temporal.io/api/query/v1" "go.temporal.io/api/update/v1" + workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" + "google.golang.org/protobuf/types/known/fieldmaskpb" ) func (c *TemporalWorkflowCancelCommand) run(cctx *CommandContext, args []string) error { @@ -88,6 +92,117 @@ func (c *TemporalWorkflowDeleteCommand) run(cctx *CommandContext, args []string) return nil } +func (c *TemporalWorkflowUpdateOptionsCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + if c.VersioningOverrideBehavior.Value == "unspecified" || c.VersioningOverrideBehavior.Value == "auto_upgrade" { + if c.VersioningOverrideSeriesName != "" { + return fmt.Errorf("cannot set deployment series name with %v behavior", c.VersioningOverrideBehavior) + } + if c.VersioningOverrideBuildId != "" { + return fmt.Errorf("cannot set deployment build ID with %v behavior", c.VersioningOverrideBehavior) + } + } + + if c.VersioningOverrideBehavior.Value == "pinned" { + if c.VersioningOverrideSeriesName == "" { + return fmt.Errorf("missing deployment series name with 'pinned' behavior") + } + if c.VersioningOverrideBuildId == "" { + return fmt.Errorf("missing deployment build ID with 'pinned' behavior") + } + } + + exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{}) + + // Run single or batch + if err != nil { + return err + } else if exec != nil { + behavior := workflow.VersioningBehaviorUnspecified + switch c.VersioningOverrideBehavior.Value { + case "unspecified": + case "pinned": + behavior = workflow.VersioningBehaviorPinned + case "auto_upgrade": + behavior = workflow.VersioningBehaviorAutoUpgrade + default: + return fmt.Errorf( + "invalid deployment behavior: %v, valid values are: 'unspecified', 'pinned', and 'auto_upgrade'", + c.VersioningOverrideBehavior, + ) + } + + _, err := cl.UpdateWorkflowExecutionOptions(cctx, client.UpdateWorkflowExecutionOptionsRequest{ + WorkflowId: exec.WorkflowId, + RunId: exec.RunId, + WorkflowExecutionOptionsChanges: client.WorkflowExecutionOptionsChanges{ + VersioningOverride: &client.VersioningOverride{ + Behavior: behavior, + Deployment: client.Deployment{ + SeriesName: c.VersioningOverrideSeriesName, + BuildID: c.VersioningOverrideBuildId, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to update workflow options: %w", err) + } + cctx.Printer.Println("Update workflow options succeeded") + } else { // Run batch + var workflowExecutionOptions *workflowpb.WorkflowExecutionOptions + protoMask, err := fieldmaskpb.New(workflowExecutionOptions, "versioning_override") + if err != nil { + return fmt.Errorf("invalid field mask: %w", err) + } + + behavior := enums.VERSIONING_BEHAVIOR_UNSPECIFIED + switch c.VersioningOverrideBehavior.Value { + case "unspecified": + case "pinned": + behavior = enums.VERSIONING_BEHAVIOR_PINNED + case "auto_upgrade": + behavior = enums.VERSIONING_BEHAVIOR_AUTO_UPGRADE + default: + return fmt.Errorf( + "invalid deployment behavior: %v, valid values are: 'unspecified', 'pinned', and 'auto_upgrade'", + c.VersioningOverrideBehavior, + ) + } + + deployment := &deploymentpb.Deployment{ + SeriesName: c.VersioningOverrideSeriesName, + BuildId: c.VersioningOverrideBuildId, + } + if c.VersioningOverrideSeriesName == "" && c.VersioningOverrideBuildId == "" { + // auto_upgrade needs a `nil` pointer + deployment = nil + } + + batchReq.Operation = &workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation{ + UpdateWorkflowOptionsOperation: &batch.BatchOperationUpdateWorkflowExecutionOptions{ + Identity: clientIdentity(), + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: behavior, + Deployment: deployment, + }, + }, + UpdateMask: protoMask, + }, + } + if err := startBatchJob(cctx, cl, batchReq); err != nil { + return err + } + } + return nil +} + func (c *TemporalWorkflowQueryCommand) run(cctx *CommandContext, args []string) error { return queryHelper(cctx, c.Parent, c.PayloadInputOptions, c.Name, c.RejectCondition, c.WorkflowReferenceOptions) diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index 8ee07293..af873bb7 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -11,9 +11,12 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/temporalio/cli/temporalcli" "go.temporal.io/api/enums/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" "google.golang.org/grpc" ) @@ -418,6 +421,191 @@ func (s *SharedServerSuite) TestWorkflow_Cancel_SingleWorkflowSuccess() { s.Error(workflow.ErrCanceled, run.Get(s.Context, nil)) } +func (s *SharedServerSuite) TestWorkflow_Batch_Update_Options_Versioning_Override() { + buildId1 := uuid.NewString() + buildId2 := uuid.NewString() + seriesName := uuid.NewString() + // Workflow that waits to be canceled. + waitingWorkflow := func(ctx workflow.Context) error { + ctx.Done().Receive(ctx, nil) + return ctx.Err() + } + w := s.DevServer.StartDevWorker(s.Suite.T(), DevWorkerOptions{ + Worker: worker.Options{ + BuildID: buildId1, + UseBuildIDForVersioning: true, + DeploymentOptions: worker.DeploymentOptions{ + DeploymentSeriesName: seriesName, + DefaultVersioningBehavior: workflow.VersioningBehaviorPinned, + }, + }, + Workflows: []any{waitingWorkflow}, + }) + defer w.Stop() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + ) + s.NoError(res.Err) + + // Start workflows + numWorkflows := 5 + runs := make([]client.WorkflowRun, numWorkflows) + searchAttr := "keyword-" + uuid.NewString() + for i := range runs { + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: w.Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + waitingWorkflow, + ) + s.NoError(err) + runs[i] = run + } + + s.EventuallyWithT(func(t *assert.CollectT) { + for _, run := range runs { + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + ) + assert.NoError(t, res.Err) + assert.Contains(t, res.Stdout.String(), buildId1) + assert.Contains(t, res.Stdout.String(), "Pinned") + } + }, 30*time.Second, 100*time.Millisecond) + + s.CommandHarness.Stdin.WriteString("y\n") + res = s.Execute( + "workflow", "update-options", + "--address", s.Address(), + "--query", "CustomKeywordField = '"+searchAttr+"'", + "--versioning-override-behavior", "pinned", + "--versioning-override-series-name", seriesName, + "--versioning-override-build-id", buildId2, + ) + s.NoError(res.Err) + + s.EventuallyWithT(func(t *assert.CollectT) { + for _, run := range runs { + // json + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + "--output", "json", + ) + assert.NoError(t, res.Err) + + var jsonResp workflowservice.DescribeWorkflowExecutionResponse + assert.NoError(t, temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonResp, true)) + versioningInfo := jsonResp.WorkflowExecutionInfo.VersioningInfo + assert.NotNil(t, versioningInfo.VersioningOverride) + assert.Equal(t, buildId2, versioningInfo.VersioningOverride.Deployment.BuildId) + } + }, 30*time.Second, 100*time.Millisecond) +} + +func (s *SharedServerSuite) TestWorkflow_Update_Options_Versioning_Override() { + buildId1 := uuid.NewString() + buildId2 := uuid.NewString() + seriesName := uuid.NewString() + // Workflow that waits to be canceled. + waitingWorkflow := func(ctx workflow.Context) error { + ctx.Done().Receive(ctx, nil) + return ctx.Err() + } + w := s.DevServer.StartDevWorker(s.Suite.T(), DevWorkerOptions{ + Worker: worker.Options{ + BuildID: buildId1, + UseBuildIDForVersioning: true, + DeploymentOptions: worker.DeploymentOptions{ + DeploymentSeriesName: seriesName, + DefaultVersioningBehavior: workflow.VersioningBehaviorPinned, + }, + }, + Workflows: []any{waitingWorkflow}, + }) + defer w.Stop() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId1, + ) + s.NoError(res.Err) + + // Start the workflow and wait until the operation is started. + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: w.Options.TaskQueue}, + waitingWorkflow, + ) + s.NoError(err) + + s.EventuallyWithT(func(t *assert.CollectT) { + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + ) + assert.NoError(t, res.Err) + assert.Contains(t, res.Stdout.String(), buildId1) + assert.Contains(t, res.Stdout.String(), "Pinned") + }, 30*time.Second, 100*time.Millisecond) + + res = s.Execute( + "workflow", "update-options", + "--address", s.Address(), + "-w", run.GetID(), + "--versioning-override-behavior", "pinned", + "--versioning-override-series-name", seriesName, + "--versioning-override-build-id", buildId2, + ) + s.NoError(res.Err) + + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + ) + s.NoError(res.Err) + + s.ContainsOnSameLine(res.Stdout.String(), "OverrideBehavior", "Pinned") + s.ContainsOnSameLine(res.Stdout.String(), "OverrideDeploymentSeriesName", seriesName) + s.ContainsOnSameLine(res.Stdout.String(), "OverrideDeploymentBuildID", buildId2) + + // remove override + res = s.Execute( + "workflow", "update-options", + "--address", s.Address(), + "-w", run.GetID(), + "--versioning-override-behavior", "unspecified", + ) + s.NoError(res.Err) + + // json + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + "--output", "json", + ) + s.NoError(res.Err) + + var jsonResp workflowservice.DescribeWorkflowExecutionResponse + s.NoError(temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonResp, true)) + versioningInfo := jsonResp.WorkflowExecutionInfo.VersioningInfo + s.Nil(versioningInfo.VersioningOverride) +} + func (s *SharedServerSuite) TestWorkflow_Update_Execute() { workflowUpdateTest{ s: s, diff --git a/temporalcli/commands.workflow_view.go b/temporalcli/commands.workflow_view.go index 15677b40..0552dd95 100644 --- a/temporalcli/commands.workflow_view.go +++ b/temporalcli/commands.workflow_view.go @@ -134,6 +134,32 @@ func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []strin HistorySize: info.HistorySizeBytes, }, printer.StructuredOptions{}) + if info.VersioningInfo != nil { + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString("Versioning Info:")) + cctx.Printer.Println() + vInfo := info.VersioningInfo + _ = cctx.Printer.PrintStructured(struct { + Behavior string + DeploymentSeriesName string + DeploymentBuildID string + OverrideBehavior string `cli:",cardOmitEmpty"` + OverrideDeploymentSeriesName string `cli:",cardOmitEmpty"` + OverrideDeploymentBuildID string `cli:",cardOmitEmpty"` + TransitionDeploymentSeriesName string `cli:",cardOmitEmpty"` + TransitionDeploymentBuildID string `cli:",cardOmitEmpty"` + }{ + Behavior: vInfo.Behavior.String(), + DeploymentSeriesName: vInfo.Deployment.GetSeriesName(), + DeploymentBuildID: vInfo.Deployment.GetBuildId(), + OverrideBehavior: vInfo.VersioningOverride.GetBehavior().String(), + OverrideDeploymentSeriesName: vInfo.VersioningOverride.GetDeployment().GetSeriesName(), + OverrideDeploymentBuildID: vInfo.VersioningOverride.GetDeployment().GetBuildId(), + TransitionDeploymentSeriesName: vInfo.DeploymentTransition.GetDeployment().GetSeriesName(), + TransitionDeploymentBuildID: vInfo.DeploymentTransition.GetDeployment().GetBuildId(), + }, printer.StructuredOptions{}) + } + if len(resp.Callbacks) > 0 { cctx.Printer.Println() cctx.Printer.Println(color.MagentaString("Callbacks: %v", len(resp.Callbacks))) diff --git a/temporalcli/commands.workflow_view_test.go b/temporalcli/commands.workflow_view_test.go index f1c6a60c..3f589449 100644 --- a/temporalcli/commands.workflow_view_test.go +++ b/temporalcli/commands.workflow_view_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/nexus-rpc/sdk-go/nexus" + "github.com/stretchr/testify/assert" "github.com/temporalio/cli/temporalcli" "go.temporal.io/api/enums/v1" nexuspb "go.temporal.io/api/nexus/v1" @@ -21,6 +22,7 @@ import ( "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/temporalnexus" + "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) @@ -546,6 +548,79 @@ func (s *SharedServerSuite) TestWorkflow_Count() { s.Contains(out, `{"groupValues":["Completed"],"count":"3"}`) } +func (s *SharedServerSuite) TestWorkflow_Describe_Deployment() { + buildId := uuid.NewString() + seriesName := uuid.NewString() + // Workflow that waits to be canceled. + waitingWorkflow := func(ctx workflow.Context) error { + ctx.Done().Receive(ctx, nil) + return ctx.Err() + } + w := s.DevServer.StartDevWorker(s.Suite.T(), DevWorkerOptions{ + Worker: worker.Options{ + BuildID: buildId, + UseBuildIDForVersioning: true, + DeploymentOptions: worker.DeploymentOptions{ + DeploymentSeriesName: seriesName, + DefaultVersioningBehavior: workflow.VersioningBehaviorPinned, + }, + }, + Workflows: []any{waitingWorkflow}, + }) + defer w.Stop() + + res := s.Execute( + "worker", "deployment", "set-current", + "--address", s.Address(), + "--series-name", seriesName, + "--build-id", buildId, + ) + s.NoError(res.Err) + + // Start the workflow and wait until the operation is started. + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: w.Options.TaskQueue}, + waitingWorkflow, + ) + s.NoError(err) + + s.EventuallyWithT(func(t *assert.CollectT) { + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + ) + assert.NoError(t, res.Err) + assert.Contains(t, res.Stdout.String(), buildId) + assert.Contains(t, res.Stdout.String(), "Pinned") + }, 30*time.Second, 100*time.Millisecond) + + out := res.Stdout.String() + s.ContainsOnSameLine(out, "Behavior", "Pinned") + s.ContainsOnSameLine(out, "DeploymentBuildID", buildId) + s.ContainsOnSameLine(out, "DeploymentSeriesName", seriesName) + s.ContainsOnSameLine(out, "OverrideBehavior", "Unspecified") + + // json + res = s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + "--output", "json", + ) + s.NoError(res.Err) + + var jsonResp workflowservice.DescribeWorkflowExecutionResponse + s.NoError(temporalcli.UnmarshalProtoJSONWithOptions(res.Stdout.Bytes(), &jsonResp, true)) + versioningInfo := jsonResp.WorkflowExecutionInfo.VersioningInfo + s.Equal("Pinned", versioningInfo.Behavior.String()) + s.Equal(buildId, versioningInfo.Deployment.BuildId) + s.Equal(seriesName, versioningInfo.Deployment.SeriesName) + s.Nil(versioningInfo.VersioningOverride) + s.Nil(versioningInfo.DeploymentTransition) +} + func (s *SharedServerSuite) TestWorkflow_Describe_NexusOperationAndCallback() { handlerWorkflowID := uuid.NewString() endpointName := validEndpointName(s.T()) diff --git a/temporalcli/commands_test.go b/temporalcli/commands_test.go index 5c120d60..4b3a823a 100644 --- a/temporalcli/commands_test.go +++ b/temporalcli/commands_test.go @@ -369,6 +369,7 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer { d.Options.DynamicConfigValues["frontend.workerVersioningRuleAPIs"] = true d.Options.DynamicConfigValues["frontend.workerVersioningDataAPIs"] = true d.Options.DynamicConfigValues["frontend.workerVersioningWorkflowAPIs"] = true + d.Options.DynamicConfigValues["system.enableDeployments"] = true d.Options.DynamicConfigValues["worker.buildIdScavengerEnabled"] = true d.Options.DynamicConfigValues["frontend.enableUpdateWorkflowExecution"] = true d.Options.DynamicConfigValues["frontend.MaxConcurrentBatchOperationPerNamespace"] = 1000 diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index f6832f10..8046d759 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -605,6 +605,184 @@ commands: description: Reason for terminating the batch job. required: true + - name: temporal worker + summary: Read or update Worker state + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker is experimental. Worker commands are subject to | + | change. | + +---------------------------------------------------------------------+ + + Modify or read state associated with a Worker, for example, + using Worker Deployments commands: + + ``` + temporal worker deployment + ``` + option-sets: + - client + docs: + description-header: >- + Learn how to read or modify state associated with a Worker, + such as Worker Deployments. + keywords: + - worker + - worker deployment + + - name: temporal worker deployment + summary: Describe, list, and operate on Worker Deployments + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker Deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + Deployment commands perform operations on Worker Deployments: + + ``` + temporal worker deployment [command] [options] + ``` + + For example: + + ``` + temporal worker deployment list + ``` + docs: + description-header: >- + Temporal Deployment commands enable operations on Worker Deployments, + such as describe, list, set-current, and get-current, simplifying + versioning and management of workers. + keywords: + - worker deployment + - worker deployment describe + - worker deployment list + - worker deployment get-current + - worker deployment set-current + + - name: temporal worker deployment describe + summary: Show properties of a Worker Deployment + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker Deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + Describes properties of a Worker Deployment, such as whether it is + current, the non-empty list of its task queues, custom metadata if + present, and reachability status when requested. + + ``` + temporal worker deployment describe [options] + ``` + + For example, to also include reachability information: + + ``` + temporal worker deployment describe \ + --series-name YourDeploymentSeriesName \ + --build-id YourDeploymentBuildId \ + --report-reachability + ``` + option-sets: + - deployment-reference + options: + - name: report-reachability + type: bool + description: | + Include reachability information of a Worker Deployment. + + - name: temporal worker deployment get-current + summary: Show the current Worker Deployment + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker Deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + Gets the current Worker Deployment for a Deployment Series Name. + When a Deployment is current, Workers of that Deployment will receive + tasks from new Workflows and from existing AutoUpgrade Workflows that + are running on this Deployment Series. + + ``` + temporal worker deployment get-current [options] + ``` + + For example: + + ``` + temporal worker deployment get-current \ + --series-name YourDeploymentSeriesName + ``` + options: + - name: series-name + type: string + description: Series Name for the current Worker Deployment. + required: true + + - name: temporal worker deployment list + summary: Enumerate Worker Deployments in the client's namespace + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker Deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + List existing Worker Deployments in the client's namespace, optionally + filtering them by Deployment Series Name. + + + ``` + temporal worker deployment list [options] + ``` + + For example, adding an optional filter: + + ``` + temporal worker deployment list \ + --series-name YourDeploymentSeriesName + ``` + options: + - name: series-name + type: string + description: Series Name to filter Worker Deployments. + + - name: temporal worker deployment set-current + summary: Change the current Worker Deployment + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worker deployment is experimental. Deployment commands are | + | subject to change. | + +---------------------------------------------------------------------+ + + Sets the current Deployment for a given Deployment Series. + When a Deployment is current, Workers of that Deployment will receive + tasks from new Workflows and from existing AutoUpgrade Workflows that + are running on this Deployment Series. + + ``` + temporal worker deployment set-current [options] + ``` + + For example: + + ``` + temporal worker deployment set-current \ + --series-name YourDeploymentSeriesName \ + --build-id YourDeploymentBuildId + ``` + option-sets: + - deployment-reference + options: + - name: metadata + type: string[] + description: | + Set deployment metadata using `KEY="VALUE"` pairs. + Keys must be identifiers, and values must be JSON values. + For example: 'YourKey={"your": "value"}'. + Can be passed multiple times. + - name: temporal env summary: Manage environments description: | @@ -2495,9 +2673,9 @@ commands: docs: description-header: >- Temporal Workflow commands enable operations on Workflow Executions, - such as cancel, count, delete, describe, execute, list, query, reset, - reset-batch, show, signal, stack, start, terminate, trace, and update, - enhancing efficiency and control. + such as cancel, count, delete, describe, execute, list, update-options, + query, reset, reset-batch, show, signal, stack, start, terminate, + trace, and update, enhancing efficiency and control. keywords: - call stack - cancellation @@ -2520,6 +2698,7 @@ commands: - workflow execute - workflow execution - workflow list + - workflow update-options - workflow query - workflow reset - workflow reset-batch @@ -2707,6 +2886,73 @@ commands: type: int description: Maximum number of Workflow Executions to display. + - name: temporal workflow update-options + summary: Change Workflow Execution Options + description: | + +---------------------------------------------------------------------+ + | CAUTION: Worflow update-options is experimental. Workflow Execution | + | properties are subject to change. | + +---------------------------------------------------------------------+ + + Modify properties of Workflow Executions: + + ``` + temporal workflow update-options [options] + ``` + + It can override the Worker Deployment configuration of a + Workflow Execution, which controls Worker Versioning. + + For example, to force Workers in the current Deployment execute the + next Workflow Task change behavior to `auto_upgrade`: + + ``` + temporal workflow update-options \ + --workflow-id YourWorkflowId \ + --versioning-override-behavior auto_upgrade + ``` + + or to pin the workflow execution to a Worker Deployment, set behavior + to `pinned`: + + ``` + temporal workflow update-options \ + --workflow-id YourWorkflowId \ + --versioning-override-behavior pinned \ + --versioning-override-series-name YourDeploymentSeriesName \ + --versioning-override-build-id YourDeploymentBuildId + ``` + + To remove any previous overrides, set the behavior to + `unspecified`: + + ``` + temporal workflow update-options \ + --workflow-id YourWorkflowId \ + --versioning-override-behavior unspecified + ``` + + To see the current override use `temporal workflow describe` + + option-sets: + - single-workflow-or-batch + options: + - name: versioning-override-behavior + type: string-enum + description: | + Override the versioning behavior of a Workflow. + required: true + enum-values: + - unspecified + - pinned + - auto_upgrade + - name: versioning-override-series-name + type: string + description: Override Series Name for a Worker Deployment (Only for pinned). + - name: versioning-override-build-id + type: string + description: Override Build ID for a Worker Deployment (Only for pinned). + - name: temporal workflow query summary: Retrieve Workflow Execution state description: | @@ -3311,6 +3557,17 @@ option-sets: short: r description: Run ID. + - name: deployment-reference + options: + - name: series-name + type: string + description: Series Name for a Worker Deployment. + required: true + - name: build-id + type: string + description: Build ID for a Worker Deployment. + required: true + - name: single-workflow-or-batch options: - name: workflow-id