Skip to content

Commit

Permalink
Save signals on first workflow task (#268)
Browse files Browse the repository at this point in the history
* Save signals on first workflow task

* Save signals on first workflow task

* Config option for preserving no signals in the first task

* Update version and CHANGELOG

* Remove redundant sdk flags
  • Loading branch information
jeffschoner authored Oct 20, 2023
1 parent 263e975 commit 08fe1e9
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 25 deletions.
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## 0.1.1
Allows signals to be processed within the first workflow task.

**IMPORTANT:** This change is backward compatible, but workflows started
on this version cannot run on earlier versions. If you roll back, you will
see workflow task failures mentioning an unknown SDK flag. This will prevent
those workflows from making progress until your code is rolled forward
again. If you'd like to roll this out more gradually, you can,
1. Set the `no_signals_in_first_task` configuration option to `true`
2. Deploy your worker
3. Wait until you are certain you won't need to roll back
4. Remove the configuration option, which will default it to `false`
5. Deploy your worker

## 0.1.0

This introduces signal first ordering. See https://github.com/coinbase/temporal-ruby/issues/258 for
Expand All @@ -21,4 +35,4 @@ process, you must follow these rollout steps to avoid non-determinism errors:

These steps ensure any workflow that executes in signals first mode will continue to be executed
in this order on replay. If you don't follow these steps, you may see failed workflow tasks, which
in some cases could result in unrecoverable history corruption.
in some cases could result in unrecoverable history corruption.
4 changes: 3 additions & 1 deletion examples/workflows/signal_with_start_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ def execute(expected_signal)
end
end

# Do something to get descheduled so the signal handler has a chance to run
# Wait for the activity in signal callbacks to complete. The workflow will
# not automatically wait for any blocking calls made in callbacks to complete
# before returning.
workflow.wait_until { received != initial_value }
received
end
Expand Down
6 changes: 5 additions & 1 deletion lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Configuration
attr_reader :timeouts, :error_handlers, :capabilities
attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity,
:logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators,
:payload_codec, :legacy_signals
:payload_codec, :legacy_signals, :no_signals_in_first_task

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
Expand Down Expand Up @@ -92,6 +92,10 @@ def initialize
# in Temporal server 1.20, it is ignored when connected to older versions and effectively
# treated as true.
@legacy_signals = false

# This is a legacy behavior that is incorrect, but which existing workflow code may rely on. Only
# set to true until you can fix your workflow code.
@no_signals_in_first_task = false
end

def on_error(&block)
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Temporal
VERSION = '0.1.0'.freeze
VERSION = '0.1.1'.freeze
end
16 changes: 16 additions & 0 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,32 @@ def now
#
# @param signal_name [String, Symbol, nil] an optional signal name; converted to a String
def on_signal(signal_name = nil, &block)
first_task_signals = if state_manager.sdk_flags.include?(SDKFlags::SAVE_FIRST_TASK_SIGNALS)
state_manager.first_task_signals
else
[]
end

if signal_name
target = Signal.new(signal_name)
dispatcher.register_handler(target, 'signaled') do |_, input|
# do not pass signal name when triggering a named handler
call_in_fiber(block, input)
end

first_task_signals.each do |name, input|
if name == signal_name
call_in_fiber(block, input)
end
end
else
dispatcher.register_handler(Dispatcher::WILDCARD, 'signaled') do |signal, input|
call_in_fiber(block, signal, input)
end

first_task_signals.each do |name, input|
call_in_fiber(block, name, input)
end
end

return
Expand Down
4 changes: 3 additions & 1 deletion lib/temporal/workflow/sdk_flags.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ module Temporal
class Workflow
module SDKFlags
HANDLE_SIGNALS_FIRST = 1
# The presence of SAVE_FIRST_TASK_SIGNALS implies HANDLE_SIGNALS_FIRST
SAVE_FIRST_TASK_SIGNALS = 2

# Make sure to include all known flags here
ALL = Set.new([HANDLE_SIGNALS_FIRST])
ALL = Set.new([HANDLE_SIGNALS_FIRST, SAVE_FIRST_TASK_SIGNALS])
end
end
end
60 changes: 45 additions & 15 deletions lib/temporal/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class StateManager
class UnsupportedEvent < Temporal::InternalError; end
class UnsupportedMarkerType < Temporal::InternalError; end

attr_reader :commands, :local_time, :search_attributes, :new_sdk_flags_used
attr_reader :commands, :local_time, :search_attributes, :new_sdk_flags_used, :sdk_flags, :first_task_signals

def initialize(dispatcher, config)
@dispatcher = dispatcher
Expand All @@ -40,6 +40,17 @@ def initialize(dispatcher, config)

# New flags used when not replaying
@new_sdk_flags_used = Set.new

# Because signals must be processed first and a signal handler cannot be registered
# until workflow code runs, this dispatcher handler will save these signals for
# when a callback is first registered.
@first_task_signals = []
@first_task_signal_handler = dispatcher.register_handler(
Dispatcher::WILDCARD,
'signaled'
) do |name, input|
@first_task_signals << [name, input]
end
end

def replay?
Expand Down Expand Up @@ -97,14 +108,19 @@ def apply(history_window)
order_events(history_window.events).each do |event|
apply_event(event)
end

return unless @first_task_signal_handler

@first_task_signal_handler.unregister
@first_task_signals = []
@first_task_signal_handler = nil
end

def self.event_order(event, signals_first)
def self.event_order(event, signals_first, execution_started_before_signals)
if event.type == 'MARKER_RECORDED'
# markers always come first
0
elsif event.type == 'WORKFLOW_EXECUTION_STARTED'
# This always comes next if present
elsif !execution_started_before_signals && workflow_execution_started_event?(event)
1
elsif signals_first && signal_event?(event)
# signals come next if we are in signals first mode
Expand All @@ -119,6 +135,10 @@ def self.signal_event?(event)
event.type == 'WORKFLOW_EXECUTION_SIGNALED'
end

def self.workflow_execution_started_event?(event)
event.type == 'WORKFLOW_EXECUTION_STARTED'
end

def history_size
History::Size.new(
events: @last_event_id,
Expand All @@ -129,23 +149,32 @@ def history_size

private

attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :sdk_flags
attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config

def use_signals_first(raw_events)
if sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST)
# The presence of SAVE_FIRST_TASK_SIGNALS implies HANDLE_SIGNALS_FIRST
if sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) || sdk_flags.include?(SDKFlags::SAVE_FIRST_TASK_SIGNALS)
# If signals were handled first when this task or a previous one in this run were first
# played, we must continue to do so in order to ensure determinism regardless of what
# the configuration value is set to. Even the capabilities can be ignored because the
# server must have returned SDK metadata for this to be true.
true
elsif raw_events.any? { |event| StateManager.signal_event?(event) } &&
# If this is being played for the first time, use the configuration flag to choose
(!replay? && !@config.legacy_signals) &&
!replay? && !config.legacy_signals &&
# In order to preserve determinism, the server must support SDK metadata to order signals
# first. This is checked last because it will result in a Temporal server call the first
# time it's called in the worker process.
@config.capabilities.sdk_metadata
report_flag_used(SDKFlags::HANDLE_SIGNALS_FIRST)
config.capabilities.sdk_metadata

if raw_events.any? do |event|
StateManager.workflow_execution_started_event?(event)
end && !config.no_signals_in_first_task
report_flag_used(SDKFlags::SAVE_FIRST_TASK_SIGNALS)
else
report_flag_used(SDKFlags::HANDLE_SIGNALS_FIRST)
end

true
else
false
Expand All @@ -154,10 +183,11 @@ def use_signals_first(raw_events)

def order_events(raw_events)
signals_first = use_signals_first(raw_events)
execution_started_before_signals = sdk_flags.include?(SDKFlags::SAVE_FIRST_TASK_SIGNALS)

raw_events.sort_by.with_index do |event, index|
# sort_by is not stable, so include index to preserve order
[StateManager.event_order(event, signals_first), index]
[StateManager.event_order(event, signals_first, execution_started_before_signals), index]
end
end

Expand Down Expand Up @@ -429,11 +459,11 @@ def discard_command(history_target)
end

replay_target = event_target_from(replay_command_id, replay_command)
if history_target != replay_target
raise NonDeterministicWorkflowError,
"Unexpected command. The replaying code is issuing: #{replay_target}, "\
"but the history of previous executions recorded: #{history_target}. " + NONDETERMINISM_ERROR_SUGGESTION
end
return unless history_target != replay_target

raise NonDeterministicWorkflowError,
"Unexpected command. The replaying code is issuing: #{replay_target}, "\
"but the history of previous executions recorded: #{history_target}. " + NONDETERMINISM_ERROR_SUGGESTION
end

def handle_marker(id, type, details)
Expand Down
5 changes: 4 additions & 1 deletion spec/unit/lib/temporal/workflow/executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ def execute
decisions = subject.run

expect(decisions.commands.length).to eq(1)
expect(decisions.new_sdk_flags_used).to eq(Set.new([Temporal::Workflow::SDKFlags::HANDLE_SIGNALS_FIRST]))
expect(decisions.new_sdk_flags_used).to eq(
Set.new([
Temporal::Workflow::SDKFlags::SAVE_FIRST_TASK_SIGNALS
]))
end
end

Expand Down
Loading

0 comments on commit 08fe1e9

Please sign in to comment.