From b5efd2cef802be2fa97d5bab04839413726ac06e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Santiago=20Dold=C3=A1n?= Date: Thu, 5 Dec 2024 14:43:18 -0300 Subject: [PATCH] Add workflow start delay option (#294) --- lib/temporal/client.rb | 5 ++- lib/temporal/connection/grpc.rb | 8 +++- lib/temporal/execution_options.rb | 4 +- spec/unit/lib/temporal/client_spec.rb | 37 ++++++++++++------- .../lib/temporal/execution_options_spec.rb | 10 +++-- spec/unit/lib/temporal/grpc_spec.rb | 4 ++ 6 files changed, 46 insertions(+), 22 deletions(-) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index fc390e92..c2e83e1f 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -41,6 +41,7 @@ def initialize(config) # @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS # @option options [Hash] :headers # @option options [Hash] :search_attributes + # @option options [Integer] :start_delay determines the amount of seconds to wait before initiating a Workflow # # @return [String] workflow's run ID def start_workflow(workflow, *input, options: {}, **args) @@ -67,6 +68,7 @@ def start_workflow(workflow, *input, options: {}, **args) headers: config.header_propagator_chain.inject(execution_options.headers), memo: execution_options.memo, search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes), + start_delay: execution_options.start_delay ) else raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil? @@ -85,7 +87,8 @@ def start_workflow(workflow, *input, options: {}, **args) memo: execution_options.memo, search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes), signal_name: signal_name, - signal_input: signal_input + signal_input: signal_input, + start_delay: execution_options.start_delay ) end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 1f817f51..f35d2dc3 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -120,7 +120,8 @@ def start_workflow_execution( headers: nil, cron_schedule: nil, memo: nil, - search_attributes: nil + search_attributes: nil, + start_delay: nil ) request = Temporalio::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new( identity: identity, @@ -137,6 +138,7 @@ def start_workflow_execution( workflow_execution_timeout: execution_timeout, workflow_run_timeout: run_timeout, workflow_task_timeout: task_timeout, + workflow_start_delay: start_delay, request_id: SecureRandom.uuid, header: Temporalio::Api::Common::V1::Header.new( fields: converter.to_payload_map(headers || {}) @@ -379,7 +381,8 @@ def signal_with_start_workflow_execution( headers: nil, cron_schedule: nil, memo: nil, - search_attributes: nil + search_attributes: nil, + start_delay: nil ) proto_header_fields = if headers.nil? converter.to_payload_map({}) @@ -406,6 +409,7 @@ def signal_with_start_workflow_execution( workflow_execution_timeout: execution_timeout, workflow_run_timeout: run_timeout, workflow_task_timeout: task_timeout, + workflow_start_delay: start_delay, request_id: SecureRandom.uuid, header: Temporalio::Api::Common::V1::Header.new( fields: proto_header_fields diff --git a/lib/temporal/execution_options.rb b/lib/temporal/execution_options.rb index 65f0031c..d3319cb8 100644 --- a/lib/temporal/execution_options.rb +++ b/lib/temporal/execution_options.rb @@ -3,7 +3,8 @@ module Temporal class ExecutionOptions - attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes + attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes, + :start_delay def initialize(object, options, defaults = nil) # Options are treated as overrides and take precedence @@ -15,6 +16,7 @@ def initialize(object, options, defaults = nil) @headers = options[:headers] || {} @memo = options[:memo] || {} @search_attributes = options[:search_attributes] || {} + @start_delay = options[:start_delay] || 0 # For Temporal::Workflow and Temporal::Activity use defined values as the next option if has_executable_concern?(object) diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb index a0ba4dc1..31dc4a78 100644 --- a/spec/unit/lib/temporal/client_spec.rb +++ b/spec/unit/lib/temporal/client_spec.rb @@ -52,20 +52,21 @@ def inject!(header) subject.start_workflow(TestStartWorkflow, 42) expect(connection) .to have_received(:start_workflow_execution) - .with( - namespace: 'default-test-namespace', - workflow_id: an_instance_of(String), - workflow_name: 'TestStartWorkflow', - task_queue: 'default-test-task-queue', - input: [42], - task_timeout: config.timeouts[:task], - run_timeout: config.timeouts[:run], - execution_timeout: config.timeouts[:execution], - workflow_id_reuse_policy: nil, - headers: { 'test' => 'asdf' }, - memo: {}, - search_attributes: {}, - ) + .with( + namespace: 'default-test-namespace', + workflow_id: an_instance_of(String), + workflow_name: 'TestStartWorkflow', + task_queue: 'default-test-task-queue', + input: [42], + task_timeout: config.timeouts[:task], + run_timeout: config.timeouts[:run], + execution_timeout: config.timeouts[:execution], + workflow_id_reuse_policy: nil, + headers: { 'test' => 'asdf' }, + memo: {}, + search_attributes: {}, + start_delay: 0 + ) end end @@ -94,6 +95,7 @@ def inject!(header) headers: {}, memo: {}, search_attributes: {}, + start_delay: 0 ) end @@ -109,6 +111,7 @@ def inject!(header) workflow_id_reuse_policy: :reject, memo: { 'MemoKey1' => 'MemoValue1' }, search_attributes: { 'SearchAttribute1' => 256 }, + start_delay: 10 } ) @@ -127,6 +130,7 @@ def inject!(header) headers: { 'Foo' => 'Bar' }, memo: { 'MemoKey1' => 'MemoValue1' }, search_attributes: { 'SearchAttribute1' => 256 }, + start_delay: 10 ) end @@ -154,6 +158,7 @@ def inject!(header) headers: {}, memo: {}, search_attributes: {}, + start_delay: 0 ) end @@ -175,6 +180,7 @@ def inject!(header) headers: {}, memo: {}, search_attributes: {}, + start_delay: 0 ) end @@ -198,6 +204,7 @@ def inject!(header) headers: {}, memo: {}, search_attributes: {}, + start_delay: 0 ) end end @@ -225,6 +232,7 @@ def inject!(header) headers: {}, memo: {}, search_attributes: {}, + start_delay: 0 ) end end @@ -255,6 +263,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument) search_attributes: {}, signal_name: 'the question', signal_input: expected_signal_argument, + start_delay: 0 ) end diff --git a/spec/unit/lib/temporal/execution_options_spec.rb b/spec/unit/lib/temporal/execution_options_spec.rb index 98fbe380..d0c9d017 100644 --- a/spec/unit/lib/temporal/execution_options_spec.rb +++ b/spec/unit/lib/temporal/execution_options_spec.rb @@ -99,10 +99,11 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow task_queue: 'test-task-queue', retry_policy: { interval: 1, backoff: 2, max_attempts: 5 }, timeouts: { start_to_close: 10 }, - headers: { 'TestHeader' => 'Test' } + headers: { 'TestHeader' => 'Test' }, + start_delay: 10 } end - + it 'is initialized with full options' do expect(subject.name).to eq(options[:name]) expect(subject.namespace).to eq(options[:namespace]) @@ -113,12 +114,13 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow expect(subject.retry_policy.max_attempts).to eq(options[:retry_policy][:max_attempts]) expect(subject.timeouts).to eq(options[:timeouts]) expect(subject.headers).to eq(options[:headers]) + expect(subject.start_delay).to eq(options[:start_delay]) end end - + context 'when retry policy options are invalid' do let(:options) { { retry_policy: { max_attempts: 10 } } } - + it 'raises' do expect { subject }.to raise_error( Temporal::RetryPolicy::InvalidRetryPolicy, diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index 9d0fe528..5639a0e9 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -66,6 +66,7 @@ execution_timeout: 1, run_timeout: 2, task_timeout: 3, + start_delay: 10, memo: {}, search_attributes: { 'foo-int-attribute' => 256, @@ -90,6 +91,7 @@ expect(request.workflow_execution_timeout.seconds).to eq(1) expect(request.workflow_run_timeout.seconds).to eq(2) expect(request.workflow_task_timeout.seconds).to eq(3) + expect(request.workflow_start_delay.seconds).to eq(10) expect(request.workflow_id_reuse_policy).to eq(:WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) expect(request.search_attributes.indexed_fields).to eq({ 'foo-int-attribute' => Temporalio::Api::Common::V1::Payload.new(data: '256', metadata: { 'encoding' => 'json/plain' }), @@ -138,6 +140,7 @@ execution_timeout: 1, run_timeout: 2, task_timeout: 3, + start_delay: 10, workflow_id_reuse_policy: :allow, signal_name: 'the question', signal_input: 'what do you get if you multiply six by nine?' @@ -153,6 +156,7 @@ expect(request.workflow_execution_timeout.seconds).to eq(1) expect(request.workflow_run_timeout.seconds).to eq(2) expect(request.workflow_task_timeout.seconds).to eq(3) + expect(request.workflow_start_delay.seconds).to eq(10) expect(request.signal_name).to eq('the question') expect(request.signal_input.payloads[0].data).to eq('"what do you get if you multiply six by nine?"') expect(request.workflow_id_reuse_policy).to eq(:WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)