diff --git a/examples/bin/update_replay_test_histories b/examples/bin/update_replay_test_histories new file mode 100755 index 00000000..bc0f807a --- /dev/null +++ b/examples/bin/update_replay_test_histories @@ -0,0 +1,51 @@ +#!/usr/bin/env ruby + +# This script regenerates the workflow history files used in the example replay tests +# under examples/spec/replay/histories. It starts the necessary workflow, sends some +# signals, awaits workflow completion, then collects the history into JSON and protobuf +# binary file formats. +# +# To use this, start your Temporal server and bin/worker first. This script can then +# be run without any arguments. It will overwrite existing history files in the tree. +# +# NOTE: By default, collected history files contain the host names of the machines +# where the worker and this script are run because the default identity is pid@hostname. +# If you'd like, you can override this by setting an identity in the configuration in +# init.rb. + +require_relative "../init" +require_relative "../workflows/signal_with_start_workflow" + +workflow_id = SecureRandom.uuid +run_id = Temporal.start_workflow( + SignalWithStartWorkflow, + "hit", + options: { + workflow_id: workflow_id, + timeouts: { + execution: 30 + }, + signal_name: "miss", + signal_input: 1 + } +) +Temporal.logger.info("Started workflow", {workflow_id: workflow_id, run_id: run_id}) +sleep(1) +Temporal.signal_workflow(SignalWithStartWorkflow, "miss", workflow_id, run_id, 2) +sleep(1) +Temporal.signal_workflow(SignalWithStartWorkflow, "hit", workflow_id, run_id, 3) +Temporal.await_workflow_result(SignalWithStartWorkflow, workflow_id: workflow_id, run_id: run_id) + +# Save in JSON, exactly like would be downloaded from Temporal UI +history_json = Temporal.get_workflow_history_json(workflow_id: workflow_id, run_id: run_id) +filename = File.expand_path("../spec/replay/histories/signal_with_start.json", File.dirname(__FILE__)) +File.open(filename, "w") do |f| + f.write(history_json) +end + +# Save in protobuf binary format +history_binary = Temporal.get_workflow_history_protobuf(workflow_id: workflow_id, run_id: run_id) +filename = File.expand_path("../spec/replay/histories/signal_with_start.protobin", File.dirname(__FILE__)) +File.open(filename, "wb") do |f| + f.write(history_binary) +end diff --git a/examples/spec/replay/histories/signal_with_start.binpb b/examples/spec/replay/histories/signal_with_start.binpb new file mode 100644 index 00000000..7d7bf89c Binary files /dev/null and b/examples/spec/replay/histories/signal_with_start.binpb differ diff --git a/examples/spec/replay/histories/signal_with_start.json b/examples/spec/replay/histories/signal_with_start.json new file mode 100644 index 00000000..fe301a04 --- /dev/null +++ b/examples/spec/replay/histories/signal_with_start.json @@ -0,0 +1,361 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-05-28T02:46:26.852786129Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "31457280", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SignalWithStartWorkflow" + }, + "taskQueue": { + "name": "general", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImhpdCI=" + } + ] + }, + "workflowExecutionTimeout": "30s", + "workflowRunTimeout": "30s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "c6e8de96-4e18-409d-8e60-38d58f2f11b9", + "identity": "4514@DESKTOP-JRJDVRG\n", + "firstExecutionRunId": "c6e8de96-4e18-409d-8e60-38d58f2f11b9", + "attempt": 1, + "workflowExecutionExpirationTime": "2024-05-28T02:46:56.853Z", + "firstWorkflowTaskBackoff": "0s", + "memo": { + }, + "searchAttributes": { + }, + "header": { + } + } + }, + { + "eventId": "2", + "eventTime": "2024-05-28T02:46:26.852896774Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "31457281", + "workflowExecutionSignaledEventAttributes": { + "signalName": "miss", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "identity": "4514@DESKTOP-JRJDVRG\n", + "header": { + } + } + }, + { + "eventId": "3", + "eventTime": "2024-05-28T02:46:26.852900524Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "31457282", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "general", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "4", + "eventTime": "2024-05-28T02:46:26.873042948Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "31457287", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "3", + "identity": "4417@DESKTOP-JRJDVRG\n", + "requestId": "0074c78e-013b-4845-86d5-f83f1f6feb61", + "historySizeBytes": "421" + } + }, + { + "eventId": "5", + "eventTime": "2024-05-28T02:46:26.896346434Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "31457291", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "3", + "startedEventId": "4", + "identity": "4417@DESKTOP-JRJDVRG\n", + "binaryChecksum": "07d96d88e3691440609a4f5de039969b14a4e6f8", + "sdkMetadata": { + "langUsedFlags": [ + 2 + ] + } + } + }, + { + "eventId": "6", + "eventTime": "2024-05-28T02:46:27.869664722Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "31457294", + "workflowExecutionSignaledEventAttributes": { + "signalName": "miss", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Mg==" + } + ] + }, + "identity": "4514@DESKTOP-JRJDVRG\n" + } + }, + { + "eventId": "7", + "eventTime": "2024-05-28T02:46:27.869669568Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "31457295", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "general", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "8", + "eventTime": "2024-05-28T02:46:27.881436143Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "31457298", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "7", + "identity": "4417@DESKTOP-JRJDVRG\n", + "requestId": "b1c0b0cd-cdb1-4bfd-973c-fa43eef6dfb5", + "historySizeBytes": "749" + } + }, + { + "eventId": "9", + "eventTime": "2024-05-28T02:46:27.907949953Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "31457302", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "7", + "startedEventId": "8", + "identity": "4417@DESKTOP-JRJDVRG\n", + "binaryChecksum": "07d96d88e3691440609a4f5de039969b14a4e6f8" + } + }, + { + "eventId": "10", + "eventTime": "2024-05-28T02:46:28.883578435Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "31457304", + "workflowExecutionSignaledEventAttributes": { + "signalName": "hit", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Mw==" + } + ] + }, + "identity": "4514@DESKTOP-JRJDVRG\n" + } + }, + { + "eventId": "11", + "eventTime": "2024-05-28T02:46:28.883586706Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "31457305", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "general", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "12", + "eventTime": "2024-05-28T02:46:28.899268187Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "31457308", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "11", + "identity": "4417@DESKTOP-JRJDVRG\n", + "requestId": "4840d372-5d7f-46f0-af41-85c9fcac752d", + "historySizeBytes": "1071" + } + }, + { + "eventId": "13", + "eventTime": "2024-05-28T02:46:28.925343005Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "31457312", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "11", + "startedEventId": "12", + "identity": "4417@DESKTOP-JRJDVRG\n", + "binaryChecksum": "07d96d88e3691440609a4f5de039969b14a4e6f8" + } + }, + { + "eventId": "14", + "eventTime": "2024-05-28T02:46:28.925386163Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED", + "taskId": "31457313", + "activityTaskScheduledEventAttributes": { + "activityId": "14", + "activityType": { + "name": "HelloWorldActivity" + }, + "taskQueue": { + "name": "general", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "header": { + "fields": { + "test-header": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InRlc3Qi" + } + } + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImV4cGVjdGVkIHNpZ25hbCI=" + } + ] + }, + "scheduleToCloseTimeout": "30s", + "scheduleToStartTimeout": "30s", + "startToCloseTimeout": "30s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "13", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + } + } + }, + { + "eventId": "15", + "eventTime": "2024-05-28T02:46:28.944893259Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED", + "taskId": "31457317", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "14", + "identity": "4417@DESKTOP-JRJDVRG\n", + "requestId": "73f99ef3-e606-421a-ad79-a4e43e41ceba", + "attempt": 1 + } + }, + { + "eventId": "16", + "eventTime": "2024-05-28T02:46:29.008828231Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED", + "taskId": "31457318", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvIFdvcmxkLCBleHBlY3RlZCBzaWduYWwi" + } + ] + }, + "scheduledEventId": "14", + "startedEventId": "15", + "identity": "4417@DESKTOP-JRJDVRG\n" + } + }, + { + "eventId": "17", + "eventTime": "2024-05-28T02:46:29.008834769Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "31457319", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "general", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "18", + "eventTime": "2024-05-28T02:46:29.022515754Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "31457322", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "17", + "identity": "4417@DESKTOP-JRJDVRG\n", + "requestId": "a24ea1bd-8584-41ae-8cc3-0880b8a946d1", + "historySizeBytes": "1713" + } + }, + { + "eventId": "19", + "eventTime": "2024-05-28T02:46:29.043259634Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "31457326", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "17", + "startedEventId": "18", + "identity": "4417@DESKTOP-JRJDVRG\n", + "binaryChecksum": "07d96d88e3691440609a4f5de039969b14a4e6f8" + } + }, + { + "eventId": "20", + "eventTime": "2024-05-28T02:46:29.043294503Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "31457327", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "Mw==" + } + ] + }, + "workflowTaskCompletedEventId": "19" + } + } + ] +} \ No newline at end of file diff --git a/examples/spec/replay/signal_with_start_spec.rb b/examples/spec/replay/signal_with_start_spec.rb new file mode 100644 index 00000000..13c1cb0d --- /dev/null +++ b/examples/spec/replay/signal_with_start_spec.rb @@ -0,0 +1,21 @@ +require "workflows/signal_with_start_workflow" +require "temporal/testing/replay_tester" +require "temporal/workflow/history/serialization" + +describe "signal with start" do + let(:replay_tester) { Temporal::Testing::ReplayTester.new } + + it "two misses, one hit, replay, json" do + replay_tester.replay_history( + SignalWithStartWorkflow, + Temporal::Workflow::History::Serialization.from_json_file("spec/replay/histories/signal_with_start.json") + ) + end + + it "two misses, one hit, replay, binary" do + replay_tester.replay_history( + SignalWithStartWorkflow, + Temporal::Workflow::History::Serialization.from_protobuf_file("spec/replay/histories/signal_with_start.binpb") + ) + end +end diff --git a/examples/workflows/signal_with_start_workflow.rb b/examples/workflows/signal_with_start_workflow.rb index ab94a5d1..cfee2bed 100644 --- a/examples/workflows/signal_with_start_workflow.rb +++ b/examples/workflows/signal_with_start_workflow.rb @@ -1,21 +1,25 @@ -require 'activities/hello_world_activity' +require "activities/hello_world_activity" class SignalWithStartWorkflow < Temporal::Workflow def execute(expected_signal) - initial_value = 'no signal received' + initial_value = "no signal received" received = initial_value workflow.on_signal do |signal, input| if signal == expected_signal - HelloWorldActivity.execute!('expected signal') + workflow.logger.info("Accepting expected signal #{signal}: #{input}") + HelloWorldActivity.execute!("expected signal") received = input + else + workflow.logger.info("Ignoring unexpected signal #{signal}: #{input}") end end # Wait for the activity in signal callbacks to complete. The workflow will # not automatically wait for any blocking calls made in callbacks to complete # before returning. + workflow.logger.info("Waiting for expected signal #{expected_signal}") workflow.wait_until { received != initial_value } received end diff --git a/lib/temporal.rb b/lib/temporal.rb index 4ba588a3..078defe0 100644 --- a/lib/temporal.rb +++ b/lib/temporal.rb @@ -43,7 +43,10 @@ module Temporal :update_schedule, :trigger_schedule, :pause_schedule, - :unpause_schedule + :unpause_schedule, + :get_workflow_history, + :get_workflow_history_json, + :get_workflow_history_protobuf class << self def configure(&block) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index f1e6d3a7..e738e2b6 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -1,3 +1,4 @@ +require 'json' require 'temporal/execution_options' require 'temporal/connection' require 'temporal/activity' @@ -5,6 +6,7 @@ require 'temporal/workflow' require 'temporal/workflow/context_helpers' require 'temporal/workflow/history' +require 'temporal/workflow/history/serialization' require 'temporal/workflow/execution_info' require 'temporal/workflow/executions' require 'temporal/workflow/status' @@ -397,9 +399,9 @@ def fail_activity(async_token, exception) # @param run_id [String] # # @return [Temporal::Workflow::History] workflow's execution history - def get_workflow_history(namespace:, workflow_id:, run_id:) + def get_workflow_history(namespace: nil, workflow_id:, run_id:) history_response = connection.get_workflow_execution_history( - namespace: namespace, + namespace: namespace || config.default_execution_options.namespace, workflow_id: workflow_id, run_id: run_id ) @@ -407,6 +409,42 @@ def get_workflow_history(namespace:, workflow_id:, run_id:) Workflow::History.new(history_response.history.events) end + # Fetch workflow's execution history as JSON. This output can be used for replay testing. + # + # @param namespace [String] + # @param workflow_id [String] + # @param run_id [String] optional + # @param pretty_print [Boolean] optional + # + # @return a JSON string representation of the history + def get_workflow_history_json(namespace: nil, workflow_id:, run_id: nil, pretty_print: true) + history_response = connection.get_workflow_execution_history( + namespace: namespace || config.default_execution_options.namespace, + workflow_id: workflow_id, + run_id: run_id + ) + Temporal::Workflow::History::Serialization.to_json(history_response.history) + end + + # Fetch workflow's execution history as protobuf binary. This output can be used for replay testing. + # + # @param namespace [String] + # @param workflow_id [String] + # @param run_id [String] optional + # + # @return a binary string representation of the history + def get_workflow_history_protobuf(namespace: nil, workflow_id:, run_id: nil) + history_response = connection.get_workflow_execution_history( + namespace: namespace || config.default_execution_options.namespace, + workflow_id: workflow_id, + run_id: run_id + ) + + # Protobuf for Ruby unfortunately does not support textproto. Plain binary provides + # a less debuggable, but compact option. + Temporal::Workflow::History::Serialization.to_protobuf(history_response.history) + end + def list_open_workflow_executions(namespace, from, to = Time.now, filter: {}, next_page_token: nil, max_page_size: nil) validate_filter(filter, :workflow, :workflow_id) diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 014187c1..136c73ce 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -18,7 +18,7 @@ class Configuration attr_reader :timeouts, :error_handlers, :capabilities attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators, - :payload_codec, :legacy_signals, :no_signals_in_first_task, :connection_options + :payload_codec, :legacy_signals, :no_signals_in_first_task, :connection_options, :log_on_workflow_replay # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -85,6 +85,8 @@ def initialize @header_propagators = [] @capabilities = Capabilities.new(self) @connection_options = {} + # Setting this to true can be useful when debugging workflow code or running replay tests + @log_on_workflow_replay = false # Signals previously were incorrectly replayed in order within a workflow task window, rather # than at the beginning. Correcting this changes the determinism of any workflow with signals. diff --git a/lib/temporal/testing/replay_tester.rb b/lib/temporal/testing/replay_tester.rb new file mode 100644 index 00000000..6a98c86e --- /dev/null +++ b/lib/temporal/testing/replay_tester.rb @@ -0,0 +1,73 @@ +require "gen/temporal/api/history/v1/message_pb" +require "json" +require "temporal/errors" +require "temporal/metadata/workflow_task" +require "temporal/middleware/chain" +require "temporal/workflow/executor" +require "temporal/workflow/stack_trace_tracker" + +module Temporal + module Testing + class ReplayError < StandardError + end + + class ReplayTester + def initialize(config: Temporal.configuration) + @config = config + end + + attr_reader :config + + # Runs a replay test by using the specific Temporal::Workflow::History object. Instances of these objects + # can be obtained using various from_ methods in Temporal::Workflow::History::Serialization. + # + # If the replay test succeeds, the method will return silently. If the replay tests fails, an error will be raised. + def replay_history(workflow_class, history) + # This code roughly resembles the workflow TaskProcessor but with history being fed in rather + # than being pulled via a workflow task, no query support, no metrics, and other + # simplifications. Fake metadata needs to be provided. + start_workflow_event = history.find_event_by_id(1) + if start_workflow_event.nil? || start_workflow_event.type != "WORKFLOW_EXECUTION_STARTED" + raise ReplayError, "History does not start with workflow_execution_started event" + end + + metadata = Temporal::Metadata::WorkflowTask.new( + namespace: config.namespace, + id: 1, + task_token: "", + attempt: 1, + workflow_run_id: "run_id", + workflow_id: "workflow_id", + # Protobuf deserialization will ensure this tree is present + workflow_name: start_workflow_event.attributes.workflow_type.name + ) + + executor = Workflow::Executor.new( + workflow_class, + history, + metadata, + config, + true, + Middleware::Chain.new([]) + ) + + begin + executor.run + rescue StandardError + query = Struct.new(:query_type, :query_args).new( + Temporal::Workflow::StackTraceTracker::STACK_TRACE_QUERY_NAME, + nil + ) + query_result = executor.process_queries( + {"stack_trace" => query} + ) + replay_error = ReplayError.new("Workflow code failed to replay successfully against history") + # Override the stack trace to the point in the workflow code where the failure occured, not the + # point in the StateManager where non-determinism is detected + replay_error.set_backtrace("Fiber backtraces: #{query_result["stack_trace"].result}") + raise replay_error + end + end + end + end +end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 2d69bd2d..07b917a2 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -47,8 +47,10 @@ def completed? end def logger - @logger ||= ReplayAwareLogger.new(Temporal.logger) - @logger.replay = state_manager.replay? + @logger ||= ReplayAwareLogger.new( + @config.logger, + replaying: -> { state_manager.replay? && !@config.log_on_workflow_replay } + ) @logger end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 5c8eaf9e..6233feb0 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -33,7 +33,7 @@ def initialize(workflow_class, history, task_metadata, config, track_stack_trace def run dispatcher.register_handler( - History::EventTarget.workflow, + History::EventTarget.start_workflow, 'started', &method(:execute_workflow) ) diff --git a/lib/temporal/workflow/history.rb b/lib/temporal/workflow/history.rb index e11ea9b2..07bbe96d 100644 --- a/lib/temporal/workflow/history.rb +++ b/lib/temporal/workflow/history.rb @@ -51,6 +51,9 @@ def next_window CANCEL_TIMER_FAILED TIMER_CANCELED WORKFLOW_EXECUTION_CANCEL_REQUESTED + WORKFLOW_EXECUTION_COMPLETED + WORKFLOW_EXECUTION_CONTINUED_AS_NEW + WORKFLOW_EXECUTION_FAILED START_CHILD_WORKFLOW_EXECUTION_INITIATED SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED REQUEST_CANCEL_ACTIVITY_TASK_FAILED diff --git a/lib/temporal/workflow/history/event_target.rb b/lib/temporal/workflow/history/event_target.rb index d054947f..881a7823 100644 --- a/lib/temporal/workflow/history/event_target.rb +++ b/lib/temporal/workflow/history/event_target.rb @@ -14,8 +14,13 @@ class UnexpectedEventType < InternalError; end MARKER_TYPE = :marker EXTERNAL_WORKFLOW_TYPE = :external_workflow CANCEL_EXTERNAL_WORKFLOW_REQUEST_TYPE = :cancel_external_workflow_request - WORKFLOW_TYPE = :workflow CANCEL_WORKFLOW_REQUEST_TYPE = :cancel_workflow_request + WORKFLOW_TYPE = :workflow + COMPLETE_WORKFLOW_TYPE = :complete_workflow + CONTINUE_AS_NEW_WORKFLOW_TYPE = :continue_as_new_workflow + FAIL_WORKFLOW_TYPE = :fail_workflow + SIGNAL_WORKFLOW_TYPE = :signal_workflow + START_WORKFLOW_TYPE = :start_workflow UPSERT_SEARCH_ATTRIBUTES_REQUEST_TYPE = :upsert_search_attributes_request # NOTE: The order is important, first prefix match wins (will be a longer match) @@ -35,13 +40,21 @@ class UnexpectedEventType < InternalError; end 'REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION' => CANCEL_EXTERNAL_WORKFLOW_REQUEST_TYPE, 'UPSERT_WORKFLOW_SEARCH_ATTRIBUTES' => UPSERT_SEARCH_ATTRIBUTES_REQUEST_TYPE, 'WORKFLOW_EXECUTION_CANCEL' => CANCEL_WORKFLOW_REQUEST_TYPE, + 'WORKFLOW_EXECUTION_COMPLETED' => COMPLETE_WORKFLOW_TYPE, + 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW' => CONTINUE_AS_NEW_WORKFLOW_TYPE, + 'WORKFLOW_EXECUTION_FAILED' => FAIL_WORKFLOW_TYPE, + 'WORKFLOW_EXECUTION_SIGNALED' => SIGNAL_WORKFLOW_TYPE, + 'WORKFLOW_EXECUTION_STARTED' => START_WORKFLOW_TYPE, + # This is a fall-through type for various event types that workflow code cannot + # react to, either because they're externally triggered (workflow termination, + # timeout) or use an unsupported feature (workflow cancellation, updates). 'WORKFLOW_EXECUTION' => WORKFLOW_TYPE, }.freeze attr_reader :id, :type - def self.workflow - @workflow ||= new(1, WORKFLOW_TYPE) + def self.start_workflow + @workflow ||= new(1, START_WORKFLOW_TYPE) end def self.from_event(event) diff --git a/lib/temporal/workflow/history/serialization.rb b/lib/temporal/workflow/history/serialization.rb new file mode 100644 index 00000000..2219dddd --- /dev/null +++ b/lib/temporal/workflow/history/serialization.rb @@ -0,0 +1,61 @@ +module Temporal + class Workflow + class History + # Functions for deserializing workflow histories from JSON and protobuf. These are useful + # in writing replay tests + # + # `from_` methods return Temporal::Workflow::History instances.` + # `to_` methods take Temporalio::Api::History::V1::History instances + # + # This asymmetry stems from our own internal history representation being a projection + # of the "full" history. + class Serialization + # Parse History from a JSON string + def self.from_json(json) + raw_history = Temporalio::Api::History::V1::History.decode_json(json, ignore_unknown_fields: true) + Workflow::History.new(raw_history.events) + end + + # Convert a raw history to JSON. This method is typically only used by methods on Workflow::Client + def self.to_json(raw_history, pretty_print: true) + json = raw_history.to_json + if pretty_print + # pretty print JSON to make it more debuggable + ::JSON.pretty_generate(::JSON.load(json)) + else + json + end + end + + def self.from_json_file(path) + self.from_json(File.read(path)) + end + + def self.to_json_file(raw_history, path, pretty_print: true) + json = self.to_json(raw_history, pretty_print: pretty_print) + File.write(path, json) + end + + def self.from_protobuf(protobuf) + raw_history = Temporalio::Api::History::V1::History.decode(protobuf) + Workflow::History.new(raw_history.events) + end + + def self.to_protobuf(raw_history) + raw_history.to_proto + end + + def self.from_protobuf_file(path) + self.from_protobuf(File.open(path, "rb", &:read)) + end + + def self.to_protobuf_file(raw_history, path) + protobuf = self.to_protobuf(raw_history) + File.open(path, "wb") do |f| + f.write(protobuf) + end + end + end + end + end +end diff --git a/lib/temporal/workflow/replay_aware_logger.rb b/lib/temporal/workflow/replay_aware_logger.rb index a56494b4..65dafc59 100644 --- a/lib/temporal/workflow/replay_aware_logger.rb +++ b/lib/temporal/workflow/replay_aware_logger.rb @@ -3,11 +3,9 @@ class Workflow class ReplayAwareLogger SEVERITIES = %i[debug info warn error fatal unknown].freeze - attr_writer :replay - - def initialize(main_logger, replay = true) + def initialize(main_logger, replaying:) @main_logger = main_logger - @replay = replay + @replaying = replaying end SEVERITIES.each do |severity| @@ -29,7 +27,7 @@ def log(severity, message, data = {}) attr_reader :main_logger def replay? - @replay + @replaying.call end end end diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 2cf82159..f8c5cc64 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -255,17 +255,19 @@ def apply_event(event) state_machine.start dispatch( - History::EventTarget.workflow, + History::EventTarget.start_workflow, 'started', from_payloads(event.attributes.input), event ) when 'WORKFLOW_EXECUTION_COMPLETED' - # todo + # should only be triggered in query execution and replay testing + discard_command(history_target) when 'WORKFLOW_EXECUTION_FAILED' - # todo + # should only be triggered in query execution and replay testing + discard_command(history_target) when 'WORKFLOW_EXECUTION_TIMED_OUT' # todo @@ -366,7 +368,8 @@ def apply_event(event) # todo when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW' - # todo + # should only be triggered in query execution and replay testing + discard_command(history_target) when 'START_CHILD_WORKFLOW_EXECUTION_INITIATED' state_machine.schedule @@ -446,8 +449,12 @@ def event_target_from(command_id, command) History::EventTarget::TIMER_TYPE when Command::CancelTimer History::EventTarget::CANCEL_TIMER_REQUEST_TYPE - when Command::CompleteWorkflow, Command::FailWorkflow - History::EventTarget::WORKFLOW_TYPE + when Command::CompleteWorkflow + History::EventTarget::COMPLETE_WORKFLOW_TYPE + when Command::ContinueAsNew + History::EventTarget::CONTINUE_AS_NEW_WORKFLOW_TYPE + when Command::FailWorkflow + History::EventTarget::FAIL_WORKFLOW_TYPE when Command::StartChildWorkflow History::EventTarget::CHILD_WORKFLOW_TYPE when Command::UpsertSearchAttributes @@ -465,7 +472,7 @@ def dispatch(history_target, name, *attributes) NONDETERMINISM_ERROR_SUGGESTION = 'Likely, either you have made a version-unsafe change to your workflow or have non-deterministic '\ - 'behavior in your workflow. See https://docs.temporal.io/docs/java/versioning/#introduction-to-versioning.'.freeze + 'behavior in your workflow. See https://docs.temporal.io/docs/java/versioning/#introduction-to-versioning.'.freeze def discard_command(history_target) # Pop the first command from the list, it is expected to match @@ -480,7 +487,7 @@ def discard_command(history_target) return unless history_target != replay_target raise NonDeterministicWorkflowError, - "Unexpected command. The replaying code is issuing: #{replay_target}, "\ + "Unexpected command. The replaying code is issuing: #{replay_target}, "\ "but the history of previous executions recorded: #{history_target}. " + NONDETERMINISM_ERROR_SUGGESTION end diff --git a/spec/unit/lib/temporal/testing/replay_histories/do_nothing.json b/spec/unit/lib/temporal/testing/replay_histories/do_nothing.json new file mode 100644 index 00000000..45a0ef20 --- /dev/null +++ b/spec/unit/lib/temporal/testing/replay_histories/do_nothing.json @@ -0,0 +1,103 @@ +{ + "events":[ + { + "eventId":"1", + "eventTime":"2024-05-27T18:53:53.483530640Z", + "eventType":"EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId":"27263213", + "workflowExecutionStartedEventAttributes":{ + "workflowType":{ + "name":"TestReplayWorkflow" + }, + "taskQueue":{ + "name":"general", + "kind":"TASK_QUEUE_KIND_NORMAL" + }, + "input":{ + "payloads":[ + { + "metadata":{ + "encoding":"anNvbi9wbGFpbg==" + }, + "data":"eyI6cmVzdWx0Ijoic3VjY2VzcyJ9Cg==" + } + ] + }, + "workflowExecutionTimeout":"30s", + "workflowRunTimeout":"30s", + "workflowTaskTimeout":"10s", + "originalExecutionRunId":"b3711f7b-2693-4c1b-ab67-24e73f80bdcf", + "identity":"123@test", + "firstExecutionRunId":"b3711f7b-2693-4c1b-ab67-24e73f80bdcf", + "attempt":1, + "workflowExecutionExpirationTime":"2024-05-27T18:54:23.483Z", + "firstWorkflowTaskBackoff":"0s", + "memo":{}, + "searchAttributes":{}, + "header":{} + } + }, + { + "eventId":"2", + "eventTime":"2024-05-27T18:53:53.483621296Z", + "eventType":"EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId":"27263215", + "workflowTaskScheduledEventAttributes":{ + "taskQueue":{ + "name":"general", + "kind":"TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout":"10s", + "attempt":1 + } + }, + { + "eventId":"3", + "eventTime":"2024-05-27T18:53:53.504351823Z", + "eventType":"EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId":"27263220", + "workflowTaskStartedEventAttributes":{ + "scheduledEventId":"2", + "identity":"123@test", + "requestId":"195003c8-4c89-486b-8ae8-85cb209dc8b9", + "historySizeBytes":"395" + } + }, + { + "eventId":"4", + "eventTime":"2024-05-27T18:53:53.620416193Z", + "eventType":"EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId":"27263224", + "workflowTaskCompletedEventAttributes":{ + "scheduledEventId":"2", + "startedEventId":"3", + "identity":"123@test", + "binaryChecksum":"d1feac6b4ac2fb57a304ddf1419efd6e06088e41", + "sdkMetadata":{ + "langUsedFlags":[ + 2 + ] + } + } + }, + { + "eventId":"5", + "eventTime":"2024-05-27T18:53:55.790974964Z", + "eventType":"EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId":"27263260", + "workflowExecutionCompletedEventAttributes":{ + "result":{ + "payloads":[ + { + "metadata":{ + "encoding":"anNvbi9wbGFpbg==" + }, + "data":"ImRvbmUiCg==" + } + ] + }, + "workflowTaskCompletedEventId":"4" + } + } + ] +} \ No newline at end of file diff --git a/spec/unit/lib/temporal/testing/replay_tester_spec.rb b/spec/unit/lib/temporal/testing/replay_tester_spec.rb new file mode 100644 index 00000000..861a5813 --- /dev/null +++ b/spec/unit/lib/temporal/testing/replay_tester_spec.rb @@ -0,0 +1,142 @@ +require "base64" +require "json" +require "temporal/testing/replay_tester" +require "temporal/workflow" +require "temporal/workflow/history" + +describe Temporal::Testing::ReplayTester do + class TestReplayActivity < Temporal::Activity + def execute + raise "should never run" + end + end + + class TestReplayWorkflow < Temporal::Workflow + def execute(run_activity: false, run_sleep: false, result: "success") + TestReplayActivity.execute! if run_activity + + workflow.sleep(1) if run_sleep + + case result + when "success" + "done" + when "continue_as_new" + workflow.continue_as_new + nil + when "await" + # wait forever + workflow.wait_until { false } + when "fail" + raise "failed" + end + end + end + + let(:replay_tester) { Temporal::Testing::ReplayTester.new } + let(:do_nothing_json) do + File.read( + "spec/unit/lib/temporal/testing/replay_histories/do_nothing.json" + ) + end + + let(:do_nothing) do + Temporal::Workflow::History::Serialization.from_json(do_nothing_json) + end + + it "replay do nothing successful" do + replay_tester.replay_history( + TestReplayWorkflow, + do_nothing + ) + end + + def remove_first_history_event(history) + history.events.shift + history + end + + it "replay missing start workflow execution event" do + replay_tester.replay_history( + TestReplayWorkflow, + remove_first_history_event(do_nothing) + ) + raise "Expected error to raise" + rescue Temporal::Testing::ReplayError => e + expect(e.message).to(eq("History does not start with workflow_execution_started event")) + end + + def set_workflow_args_in_history(json_args) + obj = JSON.load(do_nothing_json) + obj["events"][0]["workflowExecutionStartedEventAttributes"]["input"]["payloads"][0]["data"] = Base64.strict_encode64( + json_args + ) + new_json = JSON.generate(obj) + Temporal::Workflow::History::Serialization.from_json(new_json) + end + + it "replay extra activity" do + # The linked history will cause an error because it will cause an activity run even though + # there isn't one in the history. + + replay_tester.replay_history( + TestReplayWorkflow, + set_workflow_args_in_history("{\":run_activity\":true}") + ) + raise "Expected error to raise" + rescue Temporal::Testing::ReplayError => e + expect(e.message).to(eq("Workflow code failed to replay successfully against history")) + # Ensure backtrace was overwritten + expect(e.backtrace.first).to(start_with("Fiber backtraces:")) + expect(e.cause).to(be_a(Temporal::NonDeterministicWorkflowError)) + expect(e.cause.message).to( + eq( + "Unexpected command. The replaying code is issuing: activity (5), but the history of previous executions " \ + "recorded: complete_workflow (5). Likely, either you have made a version-unsafe change to your workflow or " \ + "have non-deterministic behavior in your workflow. See https://docs.temporal.io/docs/java/versioning/#introduction-to-versioning." + ) + ) + end + + it "replay continues as new when history completed" do + # The linked history will cause an error because it will cause the workflow to continue + # as new on replay when in the history, it completed successfully. + + replay_tester.replay_history( + TestReplayWorkflow, + set_workflow_args_in_history("{\":result\":\"continue_as_new\"}") + ) + raise "Expected error to raise" + rescue Temporal::Testing::ReplayError => e + expect(e.message).to(eq("Workflow code failed to replay successfully against history")) + expect(e.cause).to(be_a(Temporal::NonDeterministicWorkflowError)) + expect(e.cause.message).to( + eq( + "Unexpected command. The replaying code is issuing: continue_as_new_workflow (5), but the history of " \ + "previous executions recorded: complete_workflow (5). Likely, either you have made a version-unsafe " \ + "change to your workflow or have non-deterministic behavior in your workflow. " \ + "See https://docs.temporal.io/docs/java/versioning/#introduction-to-versioning." + ) + ) + end + + it "replay keeps going when history succeeded" do + # The linked history will cause an error because it will cause the workflow to keep running + # when in the history, it completed successfully. + + replay_tester.replay_history( + TestReplayWorkflow, + set_workflow_args_in_history("{\":result\":\"await\"}") + ) + raise "Expected error to raise" + rescue Temporal::Testing::ReplayError => e + expect(e.message).to(eq("Workflow code failed to replay successfully against history")) + expect(e.cause).to(be_a(Temporal::NonDeterministicWorkflowError)) + expect(e.cause.message).to( + eq( + "A command in the history of previous executions, complete_workflow (5), was not scheduled upon replay. " \ + "Likely, either you have made a version-unsafe change to your workflow or have non-deterministic behavior " \ + "in your workflow. See https://docs.temporal.io/docs/java/versioning/#introduction-to-versioning." + ) + ) + end +end diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index bad27d22..a1caaa2c 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -59,7 +59,7 @@ class MyWorkflow < Temporal::Workflow; end it 'dispatcher invoked for start' do expect(dispatcher).to receive(:dispatch).with( - Temporal::Workflow::History::EventTarget.workflow, 'started', instance_of(Array) + Temporal::Workflow::History::EventTarget.start_workflow, 'started', instance_of(Array) ).once state_manager.apply(history.next_window) end @@ -88,7 +88,7 @@ class MyWorkflow < Temporal::Workflow; end ] ).once.ordered expect(dispatcher).to receive(:dispatch).with( - Temporal::Workflow::History::EventTarget.workflow, 'started', instance_of(Array) + Temporal::Workflow::History::EventTarget.start_workflow, 'started', instance_of(Array) ).once.ordered state_manager.apply(history.next_window) @@ -119,7 +119,7 @@ class MyWorkflow < Temporal::Workflow; end allow(connection).to receive(:get_system_info).and_return(system_info) expect(dispatcher).to receive(:dispatch).with( - Temporal::Workflow::History::EventTarget.workflow, 'started', instance_of(Array) + Temporal::Workflow::History::EventTarget.start_workflow, 'started', instance_of(Array) ).once.ordered expect(dispatcher).to receive(:dispatch).with( Temporal::Workflow::Signal.new(signal_entry.workflow_execution_signaled_event_attributes.signal_name), @@ -140,7 +140,7 @@ class MyWorkflow < Temporal::Workflow; end allow(connection).to receive(:get_system_info).and_return(system_info) expect(dispatcher).to receive(:dispatch).with( - Temporal::Workflow::History::EventTarget.workflow, 'started', instance_of(Array) + Temporal::Workflow::History::EventTarget.start_workflow, 'started', instance_of(Array) ).once.ordered expect(dispatcher).to receive(:dispatch).with( Temporal::Workflow::Signal.new(signal_entry.workflow_execution_signaled_event_attributes.signal_name), @@ -173,7 +173,7 @@ class MyWorkflow < Temporal::Workflow; end ] ).once.ordered expect(dispatcher).to receive(:dispatch).with( - Temporal::Workflow::History::EventTarget.workflow, 'started', instance_of(Array) + Temporal::Workflow::History::EventTarget.start_workflow, 'started', instance_of(Array) ).once.ordered state_manager.apply(history.next_window) @@ -204,7 +204,7 @@ class MyWorkflow < Temporal::Workflow; end it 'marker handled first' do activity_target = nil - dispatcher.register_handler(Temporal::Workflow::History::EventTarget.workflow, 'started') do + dispatcher.register_handler(Temporal::Workflow::History::EventTarget.start_workflow, 'started') do activity_target, = state_manager.schedule( Temporal::Workflow::Command::ScheduleActivity.new( activity_id: activity_entry.event_id, @@ -249,7 +249,7 @@ def test_order(signal_first) activity_target = nil signaled = false - dispatcher.register_handler(Temporal::Workflow::History::EventTarget.workflow, 'started') do + dispatcher.register_handler(Temporal::Workflow::History::EventTarget.start_workflow, 'started') do activity_target, = state_manager.schedule( Temporal::Workflow::Command::ScheduleActivity.new( activity_id: activity_entry.event_id,