diff --git a/examples/spec/integration/call_failing_activity_workflow_spec.rb b/examples/spec/integration/call_failing_activity_workflow_spec.rb index c39853a6..090dd312 100644 --- a/examples/spec/integration/call_failing_activity_workflow_spec.rb +++ b/examples/spec/integration/call_failing_activity_workflow_spec.rb @@ -1,11 +1,6 @@ require 'workflows/call_failing_activity_workflow' describe CallFailingActivityWorkflow, :integration do - - class TestDeserializer - include Temporal::Concerns::Payloads - end - it 'correctly re-raises an activity-thrown exception in the workflow' do workflow_id = SecureRandom.uuid expected_message = "a failure message" diff --git a/examples/spec/integration/converter_spec.rb b/examples/spec/integration/converter_spec.rb index 0a97f075..6c6672c4 100644 --- a/examples/spec/integration/converter_spec.rb +++ b/examples/spec/integration/converter_spec.rb @@ -3,16 +3,20 @@ require 'grpc/errors' describe 'Converter', :integration do + let(:codec) do + Temporal::Connection::Converter::Codec::Chain.new( + payload_codecs: [ + Temporal::CryptPayloadCodec.new + ] + ) + end + around(:each) do |example| task_queue = Temporal.configuration.task_queue Temporal.configure do |config| config.task_queue = 'crypt' - config.payload_codec = Temporal::Connection::Converter::Codec::Chain.new( - payload_codecs: [ - Temporal::CryptPayloadCodec.new - ] - ) + config.payload_codec = codec end example.run @@ -67,8 +71,6 @@ completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first result = completion_event.workflow_execution_completed_event_attributes.result - payload_codec = Temporal.configuration.payload_codec - - expect(payload_codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"') + expect(codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"') end end diff --git a/lib/temporal.rb b/lib/temporal.rb index 078defe0..b9f49d55 100644 --- a/lib/temporal.rb +++ b/lib/temporal.rb @@ -51,6 +51,9 @@ module Temporal class << self def configure(&block) yield config + # Reset the singleton client after configuration was altered to ensure + # it is initialized with the latest attributes + @default_client = nil end def configuration diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 35f4bbc6..ef20780b 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -2,7 +2,6 @@ require 'temporal/error_handler' require 'temporal/errors' require 'temporal/activity/context' -require 'temporal/concerns/payloads' require 'temporal/connection/retryer' require 'temporal/connection' require 'temporal/metric_keys' @@ -10,13 +9,11 @@ module Temporal class Activity class TaskProcessor - include Concerns::Payloads - def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool) @task = task @task_queue = task_queue @namespace = namespace - @metadata = Metadata.generate_activity_metadata(task, namespace) + @metadata = Metadata.generate_activity_metadata(task, namespace, config.converter) @task_token = task.task_token @activity_name = task.activity_type.name @activity_class = activity_lookup.find(activity_name) @@ -38,7 +35,7 @@ def process end result = middleware_chain.invoke(metadata) do - activity_class.execute_in_context(context, from_payloads(task.input)) + activity_class.execute_in_context(context, config.converter.from_payloads(task.input)) end # Do not complete asynchronous activities, these should be completed manually diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index a49b5ffb..9b537a9d 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -16,6 +16,7 @@ module Temporal class Client def initialize(config) @config = config + @converter = config.converter end # Start a workflow with an optional signal @@ -251,7 +252,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam case closed_event.type when 'WORKFLOW_EXECUTION_COMPLETED' payloads = closed_event.attributes.result - return ResultConverter.from_result_payloads(payloads) + return converter.from_result_payloads(payloads) when 'WORKFLOW_EXECUTION_TIMED_OUT' raise Temporal::WorkflowTimedOut when 'WORKFLOW_EXECUTION_TERMINATED' @@ -259,7 +260,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam when 'WORKFLOW_EXECUTION_CANCELED' raise Temporal::WorkflowCanceled when 'WORKFLOW_EXECUTION_FAILED' - raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure) + raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure, converter) when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW' new_run_id = closed_event.attributes.new_execution_run_id # Throw to let the caller know they're not getting the result @@ -355,7 +356,7 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id) run_id: run_id ) - Workflow::ExecutionInfo.generate_from(response.workflow_execution_info) + Workflow::ExecutionInfo.generate_from(response.workflow_execution_info, converter) end # Manually complete an activity @@ -458,19 +459,19 @@ def get_workflow_history_protobuf(namespace: nil, workflow_id:, run_id: nil) def list_open_workflow_executions(namespace, from, to = Time.now, filter: {}, next_page_token: nil, max_page_size: nil) validate_filter(filter, :workflow, :workflow_id) - Temporal::Workflow::Executions.new(connection: connection, status: :open, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter)) + Temporal::Workflow::Executions.new(converter, connection: connection, status: :open, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter)) end def list_closed_workflow_executions(namespace, from, to = Time.now, filter: {}, next_page_token: nil, max_page_size: nil) validate_filter(filter, :status, :workflow, :workflow_id) - Temporal::Workflow::Executions.new(connection: connection, status: :closed, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter)) + Temporal::Workflow::Executions.new(converter, connection: connection, status: :closed, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter)) end def query_workflow_executions(namespace, query, filter: {}, next_page_token: nil, max_page_size: nil) validate_filter(filter, :status, :workflow, :workflow_id) - Temporal::Workflow::Executions.new(connection: connection, status: :all, request_options: { namespace: namespace, query: query, next_page_token: next_page_token, max_page_size: max_page_size }.merge(filter)) + Temporal::Workflow::Executions.new(converter, connection: connection, status: :all, request_options: { namespace: namespace, query: query, next_page_token: next_page_token, max_page_size: max_page_size }.merge(filter)) end # Count the number of workflows matching the provided query @@ -598,14 +599,9 @@ def connection @connection ||= Temporal::Connection.generate(config.for_connection) end - class ResultConverter - extend Concerns::Payloads - end - private_constant :ResultConverter - private - attr_reader :config + attr_reader :config, :converter def compute_run_timeout(execution_options) execution_options.timeouts[:run] || execution_options.timeouts[:execution] diff --git a/lib/temporal/concerns/payloads.rb b/lib/temporal/concerns/payloads.rb deleted file mode 100644 index 5c771e21..00000000 --- a/lib/temporal/concerns/payloads.rb +++ /dev/null @@ -1,86 +0,0 @@ -module Temporal - module Concerns - module Payloads - def from_payloads(payloads) - payloads = payload_codec.decodes(payloads) - payload_converter.from_payloads(payloads) - end - - def from_payload(payload) - payload = payload_codec.decode(payload) - payload_converter.from_payload(payload) - end - - def from_payload_map_without_codec(payload_map) - payload_map.map { |key, value| [key, payload_converter.from_payload(value)] }.to_h - end - - def from_result_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_details_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_signal_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_query_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_payload_map(payload_map) - payload_map.map { |key, value| [key, from_payload(value)] }.to_h - end - - def to_payloads(data) - payloads = payload_converter.to_payloads(data) - payload_codec.encodes(payloads) - end - - def to_payload(data) - payload = payload_converter.to_payload(data) - payload_codec.encode(payload) - end - - def to_payload_map_without_codec(data) - # skips the payload_codec step because search attributes don't use this pipeline - data.transform_values do |value| - payload_converter.to_payload(value) - end - end - - def to_result_payloads(data) - to_payloads([data]) - end - - def to_details_payloads(data) - to_payloads([data]) - end - - def to_signal_payloads(data) - to_payloads([data]) - end - - def to_query_payloads(data) - to_payloads([data]) - end - - def to_payload_map(data) - data.transform_values(&method(:to_payload)) - end - - private - - def payload_converter - Temporal.configuration.converter - end - - def payload_codec - Temporal.configuration.payload_codec - end - end - end -end diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 136c73ce..101ad956 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -1,4 +1,5 @@ require 'temporal/capabilities' +require 'temporal/converter_wrapper' require 'temporal/logger' require 'temporal/metrics_adapters/null' require 'temporal/middleware/header_propagator_chain' @@ -12,13 +13,13 @@ module Temporal class Configuration - Connection = Struct.new(:type, :host, :port, :credentials, :identity, :connection_options, keyword_init: true) + Connection = Struct.new(:type, :host, :port, :credentials, :identity, :converter, :connection_options, keyword_init: true) Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true) - attr_reader :timeouts, :error_handlers, :capabilities - attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, + attr_reader :timeouts, :error_handlers, :capabilities, :payload_codec + attr_accessor :connection_type, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators, - :payload_codec, :legacy_signals, :no_signals_in_first_task, :connection_options, :log_on_workflow_replay + :legacy_signals, :no_signals_in_first_task, :connection_options, :log_on_workflow_replay # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -124,6 +125,7 @@ def for_connection port: port, credentials: credentials, identity: identity || default_identity, + converter: converter, connection_options: connection_options.merge(use_error_serialization_v2: @use_error_serialization_v2) ).freeze end @@ -148,6 +150,20 @@ def header_propagator_chain Middleware::HeaderPropagatorChain.new(header_propagators) end + def converter + @converter_wrapper ||= ConverterWrapper.new(@converter, @payload_codec) + end + + def converter=(new_converter) + @converter = new_converter + @converter_wrapper = nil + end + + def payload_codec=(new_codec) + @payload_codec = new_codec + @converter_wrapper = nil + end + private def default_identity diff --git a/lib/temporal/connection.rb b/lib/temporal/connection.rb index a36fe091..6ee1bcc7 100644 --- a/lib/temporal/connection.rb +++ b/lib/temporal/connection.rb @@ -12,9 +12,10 @@ def self.generate(configuration) port = configuration.port credentials = configuration.credentials identity = configuration.identity + converter = configuration.converter options = configuration.connection_options - connection_class.new(host, port, identity, credentials, options) + connection_class.new(host, port, identity, credentials, converter, options) end end end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 282a262c..5392f62a 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -15,13 +15,10 @@ require 'temporal/connection/serializer/backfill' require 'temporal/connection/serializer/schedule' require 'temporal/connection/serializer/workflow_id_reuse_policy' -require 'temporal/concerns/payloads' module Temporal module Connection class GRPC - include Concerns::Payloads - HISTORY_EVENT_FILTER = { all: Temporalio::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, close: Temporalio::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT @@ -58,10 +55,11 @@ class GRPC CONNECTION_TIMEOUT_SECONDS = 60 - def initialize(host, port, identity, credentials, options = {}) + def initialize(host, port, identity, credentials, converter, options = {}) @url = "#{host}:#{port}" @identity = identity @credentials = credentials + @converter = converter @poll = true @poll_mutex = Mutex.new @poll_request = nil @@ -131,24 +129,24 @@ def start_workflow_execution( name: workflow_name ), workflow_id: workflow_id, - workflow_id_reuse_policy: Temporal::Connection::Serializer::WorkflowIdReusePolicy.new(workflow_id_reuse_policy).to_proto, + workflow_id_reuse_policy: Temporal::Connection::Serializer::WorkflowIdReusePolicy.new(workflow_id_reuse_policy, converter).to_proto, task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ), - input: to_payloads(input), + input: converter.to_payloads(input), workflow_execution_timeout: execution_timeout, workflow_run_timeout: run_timeout, workflow_task_timeout: task_timeout, request_id: SecureRandom.uuid, header: Temporalio::Api::Common::V1::Header.new( - fields: to_payload_map(headers || {}) + fields: converter.to_payload_map(headers || {}) ), cron_schedule: cron_schedule, memo: Temporalio::Api::Common::V1::Memo.new( - fields: to_payload_map(memo || {}) + fields: converter.to_payload_map(memo || {}) ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map_without_codec(search_attributes || {}) + indexed_fields: converter.to_payload_map_without_codec(search_attributes || {}) ) ) @@ -213,7 +211,7 @@ def poll_workflow_task_queue(namespace:, task_queue:, binary_checksum:) end def respond_query_task_completed(namespace:, task_token:, query_result:) - query_result_proto = Serializer.serialize(query_result) + query_result_proto = Serializer.serialize(query_result, converter) request = Temporalio::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest.new( task_token: task_token, namespace: namespace, @@ -230,8 +228,8 @@ def respond_workflow_task_completed(namespace:, task_token:, commands:, binary_c namespace: namespace, identity: identity, task_token: task_token, - commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }, - query_results: query_results.transform_values { |value| Serializer.serialize(value) }, + commands: Array(commands).map { |(_, command)| Serializer.serialize(command, converter) }, + query_results: query_results.transform_values { |value| Serializer.serialize(value, converter) }, binary_checksum: binary_checksum, sdk_metadata: if new_sdk_flags_used.any? Temporalio::Api::Sdk::V1::WorkflowTaskCompletedMetadata.new( @@ -250,7 +248,7 @@ def respond_workflow_task_failed(namespace:, task_token:, cause:, exception:, bi identity: identity, task_token: task_token, cause: cause, - failure: Serializer::Failure.new(exception).to_proto, + failure: Serializer::Failure.new(exception, converter).to_proto, binary_checksum: binary_checksum ) client.respond_workflow_task_failed(request) @@ -284,7 +282,7 @@ def record_activity_task_heartbeat(namespace:, task_token:, details: nil) request = Temporalio::Api::WorkflowService::V1::RecordActivityTaskHeartbeatRequest.new( namespace: namespace, task_token: task_token, - details: to_details_payloads(details), + details: converter.to_details_payloads(details), identity: identity ) client.record_activity_task_heartbeat(request) @@ -299,7 +297,7 @@ def respond_activity_task_completed(namespace:, task_token:, result:) namespace: namespace, identity: identity, task_token: task_token, - result: to_result_payloads(result) + result: converter.to_result_payloads(result) ) client.respond_activity_task_completed(request) end @@ -311,7 +309,7 @@ def respond_activity_task_completed_by_id(namespace:, activity_id:, workflow_id: workflow_id: workflow_id, run_id: run_id, activity_id: activity_id, - result: to_result_payloads(result) + result: converter.to_result_payloads(result) ) client.respond_activity_task_completed_by_id(request) end @@ -322,7 +320,7 @@ def respond_activity_task_failed(namespace:, task_token:, exception:) namespace: namespace, identity: identity, task_token: task_token, - failure: Serializer::Failure.new(exception, serialize_whole_error: serialize_whole_error).to_proto + failure: Serializer::Failure.new(exception, converter, serialize_whole_error: serialize_whole_error).to_proto ) client.respond_activity_task_failed(request) end @@ -334,7 +332,7 @@ def respond_activity_task_failed_by_id(namespace:, activity_id:, workflow_id:, r workflow_id: workflow_id, run_id: run_id, activity_id: activity_id, - failure: Serializer::Failure.new(exception).to_proto + failure: Serializer::Failure.new(exception, converter).to_proto ) client.respond_activity_task_failed_by_id(request) end @@ -343,7 +341,7 @@ def respond_activity_task_canceled(namespace:, task_token:, details: nil) request = Temporalio::Api::WorkflowService::V1::RespondActivityTaskCanceledRequest.new( namespace: namespace, task_token: task_token, - details: to_details_payloads(details), + details: converter.to_details_payloads(details), identity: identity ) client.respond_activity_task_canceled(request) @@ -365,7 +363,7 @@ def signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: run_id: run_id ), signal_name: signal, - input: to_signal_payloads(input), + input: converter.to_signal_payloads(input), identity: identity ) client.signal_workflow_execution(request) @@ -384,9 +382,9 @@ def signal_with_start_workflow_execution( search_attributes: nil ) proto_header_fields = if headers.nil? - to_payload_map({}) + converter.to_payload_map({}) elsif headers.instance_of?(Hash) - to_payload_map(headers) + converter.to_payload_map(headers) else # Preserve backward compatability for headers specified using proto objects warn '[DEPRECATION] Specify headers using a hash rather than protobuf objects' @@ -400,11 +398,11 @@ def signal_with_start_workflow_execution( name: workflow_name ), workflow_id: workflow_id, - workflow_id_reuse_policy: Temporal::Connection::Serializer::WorkflowIdReusePolicy.new(workflow_id_reuse_policy).to_proto, + workflow_id_reuse_policy: Temporal::Connection::Serializer::WorkflowIdReusePolicy.new(workflow_id_reuse_policy, converter).to_proto, task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ), - input: to_payloads(input), + input: converter.to_payloads(input), workflow_execution_timeout: execution_timeout, workflow_run_timeout: run_timeout, workflow_task_timeout: task_timeout, @@ -414,12 +412,12 @@ def signal_with_start_workflow_execution( ), cron_schedule: cron_schedule, signal_name: signal_name, - signal_input: to_signal_payloads(signal_input), + signal_input: converter.to_signal_payloads(signal_input), memo: Temporalio::Api::Common::V1::Memo.new( - fields: to_payload_map(memo || {}) + fields: converter.to_payload_map(memo || {}) ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map_without_codec(search_attributes || {}) + indexed_fields: converter.to_payload_map_without_codec(search_attributes || {}) ) ) @@ -463,7 +461,7 @@ def terminate_workflow_execution( run_id: run_id ), reason: reason, - details: to_details_payloads(details) + details: converter.to_details_payloads(details) ) client.terminate_workflow_execution(request) @@ -577,7 +575,7 @@ def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_r ), query: Temporalio::Api::Query::V1::WorkflowQuery.new( query_type: query, - query_args: to_query_payloads(args) + query_args: converter.to_query_payloads(args) ) ) if query_reject_condition @@ -599,7 +597,7 @@ def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_r elsif !response.query_result raise Temporal::QueryFailed, 'Invalid response from server' else - from_query_payloads(response.query_result) + converter.from_query_payloads(response.query_result) end end @@ -649,8 +647,8 @@ def list_schedules(namespace:, maximum_page_size:, next_page_token:) schedules: resp.schedules.map do |schedule| Temporal::Schedule::ScheduleListEntry.new( schedule_id: schedule.schedule_id, - memo: from_payload_map(schedule.memo&.fields || {}), - search_attributes: from_payload_map_without_codec(schedule.search_attributes&.indexed_fields || {}), + memo: converter.from_payload_map(schedule.memo&.fields || {}), + search_attributes: converter.from_payload_map_without_codec(schedule.search_attributes&.indexed_fields || {}), info: schedule.info ) end, @@ -674,8 +672,8 @@ def describe_schedule(namespace:, schedule_id:) Temporal::Schedule::DescribeScheduleResponse.new( schedule: resp.schedule, info: resp.info, - memo: from_payload_map(resp.memo&.fields || {}), - search_attributes: from_payload_map_without_codec(resp.search_attributes&.indexed_fields || {}), + memo: converter.from_payload_map(resp.memo&.fields || {}), + search_attributes: converter.from_payload_map_without_codec(resp.search_attributes&.indexed_fields || {}), conflict_token: resp.conflict_token ) end @@ -695,27 +693,28 @@ def create_schedule( if trigger_immediately initial_patch.trigger_immediately = Temporalio::Api::Schedule::V1::TriggerImmediatelyRequest.new( overlap_policy: Temporal::Connection::Serializer::ScheduleOverlapPolicy.new( - schedule.policies&.overlap_policy + schedule.policies&.overlap_policy, + converter ).to_proto ) end if backfill - initial_patch.backfill_request += [Temporal::Connection::Serializer::Backfill.new(backfill).to_proto] + initial_patch.backfill_request += [Temporal::Connection::Serializer::Backfill.new(backfill, converter).to_proto] end end request = Temporalio::Api::WorkflowService::V1::CreateScheduleRequest.new( namespace: namespace, schedule_id: schedule_id, - schedule: Temporal::Connection::Serializer::Schedule.new(schedule).to_proto, + schedule: Temporal::Connection::Serializer::Schedule.new(schedule, converter).to_proto, identity: identity, request_id: SecureRandom.uuid, memo: Temporalio::Api::Common::V1::Memo.new( - fields: to_payload_map(memo || {}) + fields: converter.to_payload_map(memo || {}) ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map_without_codec(search_attributes || {}) + indexed_fields: converter.to_payload_map_without_codec(search_attributes || {}) ) ) client.create_schedule(request) @@ -739,7 +738,7 @@ def update_schedule(namespace:, schedule_id:, schedule:, conflict_token: nil) request = Temporalio::Api::WorkflowService::V1::UpdateScheduleRequest.new( namespace: namespace, schedule_id: schedule_id, - schedule: Temporal::Connection::Serializer::Schedule.new(schedule).to_proto, + schedule: Temporal::Connection::Serializer::Schedule.new(schedule, converter).to_proto, conflict_token: conflict_token, identity: identity, request_id: SecureRandom.uuid @@ -759,7 +758,8 @@ def trigger_schedule(namespace:, schedule_id:, overlap_policy: nil) patch: Temporalio::Api::Schedule::V1::SchedulePatch.new( trigger_immediately: Temporalio::Api::Schedule::V1::TriggerImmediatelyRequest.new( overlap_policy: Temporal::Connection::Serializer::ScheduleOverlapPolicy.new( - overlap_policy + overlap_policy, + converter ).to_proto ), ), @@ -799,7 +799,7 @@ def pause_schedule(namespace:, schedule_id:, should_pause:, note: nil) private - attr_reader :url, :identity, :credentials, :options, :poll_mutex, :poll_request + attr_reader :url, :identity, :credentials, :converter, :options, :poll_mutex, :poll_request def client return @client if @client diff --git a/lib/temporal/connection/serializer.rb b/lib/temporal/connection/serializer.rb index 46070c66..b31c1005 100644 --- a/lib/temporal/connection/serializer.rb +++ b/lib/temporal/connection/serializer.rb @@ -33,9 +33,9 @@ module Serializer Workflow::QueryResult::Failure => Serializer::QueryFailure, }.freeze - def self.serialize(object) + def self.serialize(object, converter) serializer = SERIALIZERS_MAP[object.class] - serializer.new(object).to_proto + serializer.new(object, converter).to_proto end end end diff --git a/lib/temporal/connection/serializer/backfill.rb b/lib/temporal/connection/serializer/backfill.rb index 04998cfb..7abb40a5 100644 --- a/lib/temporal/connection/serializer/backfill.rb +++ b/lib/temporal/connection/serializer/backfill.rb @@ -11,7 +11,7 @@ def to_proto Temporalio::Api::Schedule::V1::BackfillRequest.new( start_time: serialize_time(object.start_time), end_time: serialize_time(object.end_time), - overlap_policy: Temporal::Connection::Serializer::ScheduleOverlapPolicy.new(object.overlap_policy).to_proto + overlap_policy: Temporal::Connection::Serializer::ScheduleOverlapPolicy.new(object.overlap_policy, converter).to_proto ) end diff --git a/lib/temporal/connection/serializer/base.rb b/lib/temporal/connection/serializer/base.rb index 9fcd49c5..79e8767a 100644 --- a/lib/temporal/connection/serializer/base.rb +++ b/lib/temporal/connection/serializer/base.rb @@ -6,8 +6,9 @@ module Temporal module Connection module Serializer class Base - def initialize(object) + def initialize(object, converter) @object = object + @converter = converter end def to_proto @@ -16,7 +17,7 @@ def to_proto private - attr_reader :object + attr_reader :object, :converter end end end diff --git a/lib/temporal/connection/serializer/complete_workflow.rb b/lib/temporal/connection/serializer/complete_workflow.rb index beb3b0ed..8eaa3ed4 100644 --- a/lib/temporal/connection/serializer/complete_workflow.rb +++ b/lib/temporal/connection/serializer/complete_workflow.rb @@ -1,18 +1,15 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class CompleteWorkflow < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, complete_workflow_execution_command_attributes: Temporalio::Api::Command::V1::CompleteWorkflowExecutionCommandAttributes.new( - result: to_result_payloads(object.result) + result: converter.to_result_payloads(object.result) ) ) end diff --git a/lib/temporal/connection/serializer/continue_as_new.rb b/lib/temporal/connection/serializer/continue_as_new.rb index 6573c8ec..989ff2a9 100644 --- a/lib/temporal/connection/serializer/continue_as_new.rb +++ b/lib/temporal/connection/serializer/continue_as_new.rb @@ -1,13 +1,10 @@ require 'temporal/connection/serializer/base' require 'temporal/connection/serializer/retry_policy' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class ContinueAsNew < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, @@ -15,10 +12,10 @@ def to_proto Temporalio::Api::Command::V1::ContinueAsNewWorkflowExecutionCommandAttributes.new( workflow_type: Temporalio::Api::Common::V1::WorkflowType.new(name: object.workflow_type), task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new(name: object.task_queue), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), workflow_run_timeout: object.timeouts[:run], workflow_task_timeout: object.timeouts[:task], - retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto, header: serialize_headers(object.headers), memo: serialize_memo(object.memo), search_attributes: serialize_search_attributes(object.search_attributes), @@ -31,19 +28,19 @@ def to_proto def serialize_headers(headers) return unless headers - Temporalio::Api::Common::V1::Header.new(fields: to_payload_map(headers)) + Temporalio::Api::Common::V1::Header.new(fields: converter.to_payload_map(headers)) end def serialize_memo(memo) return unless memo - Temporalio::Api::Common::V1::Memo.new(fields: to_payload_map(memo)) + Temporalio::Api::Common::V1::Memo.new(fields: converter.to_payload_map(memo)) end def serialize_search_attributes(search_attributes) return unless search_attributes - Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map_without_codec(search_attributes)) + Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: converter.to_payload_map_without_codec(search_attributes)) end end end diff --git a/lib/temporal/connection/serializer/fail_workflow.rb b/lib/temporal/connection/serializer/fail_workflow.rb index a6ef9ea0..2bedb688 100644 --- a/lib/temporal/connection/serializer/fail_workflow.rb +++ b/lib/temporal/connection/serializer/fail_workflow.rb @@ -10,7 +10,7 @@ def to_proto command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION, fail_workflow_execution_command_attributes: Temporalio::Api::Command::V1::FailWorkflowExecutionCommandAttributes.new( - failure: Failure.new(object.exception).to_proto + failure: Failure.new(object.exception, converter).to_proto ) ) end diff --git a/lib/temporal/connection/serializer/failure.rb b/lib/temporal/connection/serializer/failure.rb index ddfeb2e3..2d17e949 100644 --- a/lib/temporal/connection/serializer/failure.rb +++ b/lib/temporal/connection/serializer/failure.rb @@ -1,21 +1,18 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class Failure < Base - include Concerns::Payloads - - def initialize(error, serialize_whole_error: false, max_bytes: 200_000) + def initialize(error, converter, serialize_whole_error: false, max_bytes: 200_000) @serialize_whole_error = serialize_whole_error @max_bytes = max_bytes - super(error) + super(error, converter) end def to_proto if @serialize_whole_error - details = to_details_payloads(object) + details = converter.to_details_payloads(object) if details.payloads.first.data.size > @max_bytes Temporal.logger.error( "Could not serialize exception because it's too large, so we are using a fallback that may not "\ @@ -25,10 +22,10 @@ def to_proto ) # Fallback to a more conservative serialization if the payload is too big to avoid # sending a huge amount of data to temporal and putting it in the history. - details = to_details_payloads(object.message) + details = converter.to_details_payloads(object.message) end else - details = to_details_payloads(object.message) + details = converter.to_details_payloads(object.message) end Temporalio::Api::Failure::V1::Failure.new( message: object.message, diff --git a/lib/temporal/connection/serializer/query_answer.rb b/lib/temporal/connection/serializer/query_answer.rb index 746c50c0..0c98b010 100644 --- a/lib/temporal/connection/serializer/query_answer.rb +++ b/lib/temporal/connection/serializer/query_answer.rb @@ -1,16 +1,13 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class QueryAnswer < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Query::V1::WorkflowQueryResult.new( result_type: Temporalio::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED, - answer: to_query_payloads(object.result) + answer: converter.to_query_payloads(object.result) ) end end diff --git a/lib/temporal/connection/serializer/record_marker.rb b/lib/temporal/connection/serializer/record_marker.rb index b29040f3..99fddb8c 100644 --- a/lib/temporal/connection/serializer/record_marker.rb +++ b/lib/temporal/connection/serializer/record_marker.rb @@ -1,12 +1,9 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class RecordMarker < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_RECORD_MARKER, @@ -14,7 +11,7 @@ def to_proto Temporalio::Api::Command::V1::RecordMarkerCommandAttributes.new( marker_name: object.name, details: { - 'data' => to_details_payloads(object.details) + 'data' => converter.to_details_payloads(object.details) } ) ) diff --git a/lib/temporal/connection/serializer/schedule.rb b/lib/temporal/connection/serializer/schedule.rb index 32c206fd..3e2fc264 100644 --- a/lib/temporal/connection/serializer/schedule.rb +++ b/lib/temporal/connection/serializer/schedule.rb @@ -10,10 +10,10 @@ module Serializer class Schedule < Base def to_proto Temporalio::Api::Schedule::V1::Schedule.new( - spec: Temporal::Connection::Serializer::ScheduleSpec.new(object.spec).to_proto, - action: Temporal::Connection::Serializer::ScheduleAction.new(object.action).to_proto, - policies: Temporal::Connection::Serializer::SchedulePolicies.new(object.policies).to_proto, - state: Temporal::Connection::Serializer::ScheduleState.new(object.state).to_proto + spec: Temporal::Connection::Serializer::ScheduleSpec.new(object.spec, converter).to_proto, + action: Temporal::Connection::Serializer::ScheduleAction.new(object.action, converter).to_proto, + policies: Temporal::Connection::Serializer::SchedulePolicies.new(object.policies, converter).to_proto, + state: Temporal::Connection::Serializer::ScheduleState.new(object.state, converter).to_proto ) end end diff --git a/lib/temporal/connection/serializer/schedule_action.rb b/lib/temporal/connection/serializer/schedule_action.rb index ab4ce4c0..b79942be 100644 --- a/lib/temporal/connection/serializer/schedule_action.rb +++ b/lib/temporal/connection/serializer/schedule_action.rb @@ -1,12 +1,9 @@ require "temporal/connection/serializer/base" -require "temporal/concerns/payloads" module Temporal module Connection module Serializer class ScheduleAction < Base - include Concerns::Payloads - def to_proto unless object.is_a?(Temporal::Schedule::StartWorkflowAction) raise ArgumentError, "Unknown action type #{object.class}" @@ -21,18 +18,18 @@ def to_proto task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new( name: object.task_queue ), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), workflow_execution_timeout: object.execution_timeout, workflow_run_timeout: object.run_timeout, workflow_task_timeout: object.task_timeout, header: Temporalio::Api::Common::V1::Header.new( - fields: to_payload_map(object.headers || {}) + fields: converter.to_payload_map(object.headers || {}) ), memo: Temporalio::Api::Common::V1::Memo.new( - fields: to_payload_map(object.memo || {}) + fields: converter.to_payload_map(object.memo || {}) ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map_without_codec(object.search_attributes || {}) + indexed_fields: converter.to_payload_map_without_codec(object.search_attributes || {}) ) ) ) diff --git a/lib/temporal/connection/serializer/schedule_activity.rb b/lib/temporal/connection/serializer/schedule_activity.rb index 10b26570..b3640639 100644 --- a/lib/temporal/connection/serializer/schedule_activity.rb +++ b/lib/temporal/connection/serializer/schedule_activity.rb @@ -1,13 +1,10 @@ require 'temporal/connection/serializer/base' require 'temporal/connection/serializer/retry_policy' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class ScheduleActivity < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, @@ -15,13 +12,13 @@ def to_proto Temporalio::Api::Command::V1::ScheduleActivityTaskCommandAttributes.new( activity_id: object.activity_id.to_s, activity_type: Temporalio::Api::Common::V1::ActivityType.new(name: object.activity_type), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new(name: object.task_queue), schedule_to_close_timeout: object.timeouts[:schedule_to_close], schedule_to_start_timeout: object.timeouts[:schedule_to_start], start_to_close_timeout: object.timeouts[:start_to_close], heartbeat_timeout: object.timeouts[:heartbeat], - retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto, header: serialize_headers(object.headers) ) ) @@ -32,7 +29,7 @@ def to_proto def serialize_headers(headers) return unless headers - Temporalio::Api::Common::V1::Header.new(fields: to_payload_map(headers)) + Temporalio::Api::Common::V1::Header.new(fields: converter.to_payload_map(headers)) end end end diff --git a/lib/temporal/connection/serializer/schedule_policies.rb b/lib/temporal/connection/serializer/schedule_policies.rb index 4a92c226..42558899 100644 --- a/lib/temporal/connection/serializer/schedule_policies.rb +++ b/lib/temporal/connection/serializer/schedule_policies.rb @@ -9,7 +9,7 @@ def to_proto return unless object Temporalio::Api::Schedule::V1::SchedulePolicies.new( - overlap_policy: Temporal::Connection::Serializer::ScheduleOverlapPolicy.new(object.overlap_policy).to_proto, + overlap_policy: Temporal::Connection::Serializer::ScheduleOverlapPolicy.new(object.overlap_policy, converter).to_proto, catchup_window: object.catchup_window, pause_on_failure: object.pause_on_failure ) diff --git a/lib/temporal/connection/serializer/signal_external_workflow.rb b/lib/temporal/connection/serializer/signal_external_workflow.rb index 5cc640fd..ff229ddb 100644 --- a/lib/temporal/connection/serializer/signal_external_workflow.rb +++ b/lib/temporal/connection/serializer/signal_external_workflow.rb @@ -1,12 +1,9 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class SignalExternalWorkflow < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION, @@ -15,7 +12,7 @@ def to_proto namespace: object.namespace, execution: serialize_execution(object.execution), signal_name: object.signal_name, - input: to_signal_payloads(object.input), + input: converter.to_signal_payloads(object.input), control: "", # deprecated child_workflow_only: object.child_workflow_only ) diff --git a/lib/temporal/connection/serializer/start_child_workflow.rb b/lib/temporal/connection/serializer/start_child_workflow.rb index 90d08c79..dcb2fbf0 100644 --- a/lib/temporal/connection/serializer/start_child_workflow.rb +++ b/lib/temporal/connection/serializer/start_child_workflow.rb @@ -1,14 +1,11 @@ require 'temporal/connection/serializer/base' require 'temporal/connection/serializer/retry_policy' require 'temporal/connection/serializer/workflow_id_reuse_policy' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class StartChildWorkflow < Base - include Concerns::Payloads - PARENT_CLOSE_POLICY = { terminate: Temporalio::Api::Enums::V1::ParentClosePolicy::PARENT_CLOSE_POLICY_TERMINATE, abandon: Temporalio::Api::Enums::V1::ParentClosePolicy::PARENT_CLOSE_POLICY_ABANDON, @@ -24,16 +21,16 @@ def to_proto workflow_id: object.workflow_id.to_s, workflow_type: Temporalio::Api::Common::V1::WorkflowType.new(name: object.workflow_type), task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new(name: object.task_queue), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), workflow_execution_timeout: object.timeouts[:execution], workflow_run_timeout: object.timeouts[:run], workflow_task_timeout: object.timeouts[:task], - retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto, parent_close_policy: serialize_parent_close_policy(object.parent_close_policy), header: serialize_headers(object.headers), cron_schedule: object.cron_schedule, memo: serialize_memo(object.memo), - workflow_id_reuse_policy: Temporal::Connection::Serializer::WorkflowIdReusePolicy.new(object.workflow_id_reuse_policy).to_proto, + workflow_id_reuse_policy: Temporal::Connection::Serializer::WorkflowIdReusePolicy.new(object.workflow_id_reuse_policy, converter).to_proto, search_attributes: serialize_search_attributes(object.search_attributes), ) ) @@ -44,13 +41,13 @@ def to_proto def serialize_headers(headers) return unless headers - Temporalio::Api::Common::V1::Header.new(fields: to_payload_map(headers)) + Temporalio::Api::Common::V1::Header.new(fields: converter.to_payload_map(headers)) end def serialize_memo(memo) return unless memo - Temporalio::Api::Common::V1::Memo.new(fields: to_payload_map(memo)) + Temporalio::Api::Common::V1::Memo.new(fields: converter.to_payload_map(memo)) end def serialize_parent_close_policy(parent_close_policy) @@ -66,7 +63,7 @@ def serialize_parent_close_policy(parent_close_policy) def serialize_search_attributes(search_attributes) return unless search_attributes - Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map_without_codec(search_attributes)) + Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: converter.to_payload_map_without_codec(search_attributes)) end end end diff --git a/lib/temporal/connection/serializer/upsert_search_attributes.rb b/lib/temporal/connection/serializer/upsert_search_attributes.rb index e8aa652c..b1b0395a 100644 --- a/lib/temporal/connection/serializer/upsert_search_attributes.rb +++ b/lib/temporal/connection/serializer/upsert_search_attributes.rb @@ -1,19 +1,16 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class UpsertSearchAttributes < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES, upsert_workflow_search_attributes_command_attributes: Temporalio::Api::Command::V1::UpsertWorkflowSearchAttributesCommandAttributes.new( search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map_without_codec(object.search_attributes || {}) + indexed_fields: converter.to_payload_map_without_codec(object.search_attributes || {}) ), ) ) diff --git a/lib/temporal/converter_wrapper.rb b/lib/temporal/converter_wrapper.rb new file mode 100644 index 00000000..a14e2abf --- /dev/null +++ b/lib/temporal/converter_wrapper.rb @@ -0,0 +1,87 @@ +# This class provides convenience methods for accessing the converter/codec. It is fully backwards +# compatible with Temporal::Connection::Converter::Base interface, however it adds new convenience +# methods specific to different conversion scenarios. + +module Temporal + class ConverterWrapper + def initialize(converter, codec) + @converter = converter + @codec = codec + end + + def from_payloads(payloads) + payloads = codec.decodes(payloads) + converter.from_payloads(payloads) + end + + def from_payload(payload) + payload = codec.decode(payload) + converter.from_payload(payload) + end + + def from_payload_map_without_codec(payload_map) + payload_map.map { |key, value| [key, converter.from_payload(value)] }.to_h + end + + def from_result_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_details_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_signal_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_query_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_payload_map(payload_map) + payload_map.map { |key, value| [key, from_payload(value)] }.to_h + end + + def to_payloads(data) + payloads = converter.to_payloads(data) + codec.encodes(payloads) + end + + def to_payload(data) + payload = converter.to_payload(data) + codec.encode(payload) + end + + def to_payload_map_without_codec(data) + # skips the codec step because search attributes don't use this pipeline + data.transform_values do |value| + converter.to_payload(value) + end + end + + def to_result_payloads(data) + to_payloads([data]) + end + + def to_details_payloads(data) + to_payloads([data]) + end + + def to_signal_payloads(data) + to_payloads([data]) + end + + def to_query_payloads(data) + to_payloads([data]) + end + + def to_payload_map(data) + data.transform_values(&method(:to_payload)) + end + + private + + attr_reader :converter, :codec + end +end diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb index 7be46b31..5439029f 100644 --- a/lib/temporal/metadata.rb +++ b/lib/temporal/metadata.rb @@ -2,15 +2,12 @@ require 'temporal/metadata/activity' require 'temporal/metadata/workflow' require 'temporal/metadata/workflow_task' -require 'temporal/concerns/payloads' module Temporal module Metadata class << self - include Concerns::Payloads - - def generate_activity_metadata(task, namespace) + def generate_activity_metadata(task, namespace, converter) Metadata::Activity.new( namespace: namespace, id: task.activity_id, @@ -20,8 +17,8 @@ def generate_activity_metadata(task, namespace) workflow_run_id: task.workflow_execution.run_id, workflow_id: task.workflow_execution.workflow_id, workflow_name: task.workflow_type.name, - headers: from_payload_map(task.header&.fields || {}), - heartbeat_details: from_details_payloads(task.heartbeat_details), + headers: converter.from_payload_map(task.header&.fields || {}), + heartbeat_details: converter.from_details_payloads(task.heartbeat_details), scheduled_at: task.scheduled_time.to_time, current_attempt_scheduled_at: task.current_attempt_scheduled_time.to_time, heartbeat_timeout: task.heartbeat_timeout.seconds @@ -44,7 +41,7 @@ def generate_workflow_task_metadata(task, namespace) # @param event [Temporal::Workflow::History::Event] Workflow started history event # @param task_metadata [Temporal::Metadata::WorkflowTask] workflow task metadata - def generate_workflow_metadata(event, task_metadata) + def generate_workflow_metadata(event, task_metadata, converter) Metadata::Workflow.new( name: event.attributes.workflow_type.name, id: task_metadata.workflow_id, @@ -54,9 +51,9 @@ def generate_workflow_metadata(event, task_metadata) attempt: event.attributes.attempt, namespace: task_metadata.namespace, task_queue: event.attributes.task_queue.name, - headers: from_payload_map(event.attributes.header&.fields || {}), + headers: converter.from_payload_map(event.attributes.header&.fields || {}), run_started_at: event.timestamp, - memo: from_payload_map(event.attributes.memo&.fields || {}), + memo: converter.from_payload_map(event.attributes.memo&.fields || {}), ) end end diff --git a/lib/temporal/workflow/errors.rb b/lib/temporal/workflow/errors.rb index 42157376..832c2ac3 100644 --- a/lib/temporal/workflow/errors.rb +++ b/lib/temporal/workflow/errors.rb @@ -3,11 +3,9 @@ module Temporal class Workflow class Errors - extend Concerns::Payloads - # Convert a failure returned from the server to an Error to raise to the client # failure: Temporalio::Api::Failure::V1::Failure - def self.generate_error(failure, default_exception_class = StandardError) + def self.generate_error(failure, converter, default_exception_class = StandardError) case failure.failure_info when :application_failure_info @@ -25,7 +23,7 @@ def self.generate_error(failure, default_exception_class = StandardError) end begin details = failure.application_failure_info.details - exception_or_message = from_details_payloads(details) + exception_or_message = converter.from_details_payloads(details) # v1 serialization only supports StandardErrors with a single "message" argument. # v2 serialization supports complex errors using our converters to serialize them. # enable v2 serialization in activities with Temporal.configuration.use_error_serialization_v2 @@ -59,7 +57,7 @@ def self.generate_error(failure, default_exception_class = StandardError) TimeoutError.new("Timeout type: #{failure.timeout_failure_info.timeout_type.to_s}") when :canceled_failure_info # TODO: Distinguish between different entity cancellations - StandardError.new(from_payloads(failure.canceled_failure_info.details)) + StandardError.new(converter.from_payloads(failure.canceled_failure_info.details)) else StandardError.new(failure.message) end diff --git a/lib/temporal/workflow/execution_info.rb b/lib/temporal/workflow/execution_info.rb index e3f70021..77a27332 100644 --- a/lib/temporal/workflow/execution_info.rb +++ b/lib/temporal/workflow/execution_info.rb @@ -1,12 +1,9 @@ -require 'temporal/concerns/payloads' require 'temporal/workflow/status' module Temporal class Workflow class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status, :history_length, :memo, :search_attributes, keyword_init: true) - extend Concerns::Payloads - STATUSES = [ Temporal::Workflow::Status::RUNNING, Temporal::Workflow::Status::COMPLETED, @@ -17,8 +14,8 @@ class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, Temporal::Workflow::Status::TIMED_OUT ] - def self.generate_from(response) - search_attributes = response.search_attributes.nil? ? {} : from_payload_map_without_codec(response.search_attributes.indexed_fields) + def self.generate_from(response, converter) + search_attributes = response.search_attributes.nil? ? {} : converter.from_payload_map_without_codec(response.search_attributes.indexed_fields) new( workflow: response.type.name, workflow_id: response.execution.workflow_id, @@ -27,7 +24,7 @@ def self.generate_from(response) close_time: response.close_time&.to_time, status: Temporal::Workflow::Status::API_STATUS_MAP.fetch(response.status), history_length: response.history_length, - memo: from_payload_map(response.memo.fields), + memo: converter.from_payload_map(response.memo.fields), search_attributes: search_attributes ).freeze end diff --git a/lib/temporal/workflow/executions.rb b/lib/temporal/workflow/executions.rb index 83079b1a..15fb9109 100644 --- a/lib/temporal/workflow/executions.rb +++ b/lib/temporal/workflow/executions.rb @@ -9,7 +9,8 @@ class Executions next_page_token: nil }.freeze - def initialize(connection:, status:, request_options:) + def initialize(converter, connection:, status:, request_options:) + @converter = converter @connection = connection @status = status @request_options = DEFAULT_REQUEST_OPTIONS.merge(request_options) @@ -20,7 +21,7 @@ def next_page_token end def next_page - self.class.new(connection: @connection, status: @status, request_options: @request_options.merge(next_page_token: next_page_token)) + self.class.new(@converter, connection: @connection, status: @status, request_options: @request_options.merge(next_page_token: next_page_token)) end def each @@ -42,7 +43,7 @@ def each ) paginated_executions = response.executions.map do |raw_execution| - execution = Temporal::Workflow::ExecutionInfo.generate_from(raw_execution) + execution = Temporal::Workflow::ExecutionInfo.generate_from(raw_execution, @converter) if block_given? yield execution end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 6233feb0..f40fef3b 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -71,7 +71,7 @@ def process_query(query) end def execute_workflow(input, workflow_started_event) - metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata) + metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata, config.converter) context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config, query_registry, track_stack_trace) diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index f8c5cc64..c90ed3de 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -4,7 +4,6 @@ require 'temporal/workflow/command_state_machine' require 'temporal/workflow/history/event_target' require 'temporal/workflow/history/size' -require 'temporal/concerns/payloads' require 'temporal/workflow/errors' require 'temporal/workflow/sdk_flags' require 'temporal/workflow/signal' @@ -12,8 +11,6 @@ module Temporal class Workflow class StateManager - include Concerns::Payloads - SIDE_EFFECT_MARKER = 'SIDE_EFFECT'.freeze RELEASE_MARKER = 'RELEASE'.freeze @@ -34,6 +31,7 @@ def initialize(dispatcher, config) @replay = false @search_attributes = {} @config = config + @converter = config.converter # Current flags in use, built up from workflow task completed history entries @sdk_flags = Set.new @@ -167,7 +165,7 @@ def history_size private - attr_reader :commands, :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config + attr_reader :commands, :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config, :converter def use_signals_first(raw_events) # The presence of SAVE_FIRST_TASK_SIGNALS implies HANDLE_SIGNALS_FIRST @@ -250,14 +248,14 @@ def apply_event(event) case event.type when 'WORKFLOW_EXECUTION_STARTED' unless event.attributes.search_attributes.nil? - search_attributes.merge!(from_payload_map(event.attributes.search_attributes&.indexed_fields || {})) + search_attributes.merge!(converter.from_payload_map(event.attributes.search_attributes&.indexed_fields || {})) end state_machine.start dispatch( History::EventTarget.start_workflow, 'started', - from_payloads(event.attributes.input), + converter.from_payloads(event.attributes.input), event ) @@ -296,16 +294,16 @@ def apply_event(event) when 'ACTIVITY_TASK_COMPLETED' state_machine.complete - dispatch(history_target, 'completed', from_result_payloads(event.attributes.result)) + dispatch(history_target, 'completed', converter.from_result_payloads(event.attributes.result)) when 'ACTIVITY_TASK_FAILED' state_machine.fail dispatch(history_target, 'failed', - Temporal::Workflow::Errors.generate_error(event.attributes.failure, ActivityException)) + Temporal::Workflow::Errors.generate_error(event.attributes.failure, converter, ActivityException)) when 'ACTIVITY_TASK_TIMED_OUT' state_machine.time_out - dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure, converter)) when 'ACTIVITY_TASK_CANCEL_REQUESTED' state_machine.requested @@ -319,7 +317,7 @@ def apply_event(event) when 'ACTIVITY_TASK_CANCELED' state_machine.cancel dispatch(history_target, 'failed', - Temporal::ActivityCanceled.new(from_details_payloads(event.attributes.details))) + Temporal::ActivityCanceled.new(converter.from_details_payloads(event.attributes.details))) when 'TIMER_STARTED' state_machine.start @@ -356,13 +354,13 @@ def apply_event(event) when 'MARKER_RECORDED' state_machine.complete - handle_marker(event.id, event.attributes.marker_name, from_details_payloads(event.attributes.details['data'])) + handle_marker(event.id, event.attributes.marker_name, converter.from_details_payloads(event.attributes.details['data'])) when 'WORKFLOW_EXECUTION_SIGNALED' # relies on Signal#== for matching in Dispatcher signal_target = Signal.new(event.attributes.signal_name) dispatch(signal_target, 'signaled', event.attributes.signal_name, - from_signal_payloads(event.attributes.input)) + converter.from_signal_payloads(event.attributes.input)) when 'WORKFLOW_EXECUTION_TERMINATED' # todo @@ -388,15 +386,15 @@ def apply_event(event) when 'CHILD_WORKFLOW_EXECUTION_COMPLETED' state_machine.complete - dispatch(history_target, 'completed', from_result_payloads(event.attributes.result)) + dispatch(history_target, 'completed', converter.from_result_payloads(event.attributes.result)) when 'CHILD_WORKFLOW_EXECUTION_FAILED' state_machine.fail - dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure, converter)) when 'CHILD_WORKFLOW_EXECUTION_CANCELED' state_machine.cancel - dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure, converter)) when 'CHILD_WORKFLOW_EXECUTION_TIMED_OUT' state_machine.time_out @@ -427,7 +425,7 @@ def apply_event(event) dispatch(history_target, 'completed') when 'UPSERT_WORKFLOW_SEARCH_ATTRIBUTES' - search_attributes.merge!(from_payload_map(event.attributes.search_attributes&.indexed_fields || {})) + search_attributes.merge!(converter.from_payload_map(event.attributes.search_attributes&.indexed_fields || {})) # no need to track state; this is just a synchronous API call. discard_command(history_target) diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index f415aed5..b3620ad8 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -9,15 +9,13 @@ module Temporal class Workflow class TaskProcessor - Query = Struct.new(:query) do - include Concerns::Payloads - + Query = Struct.new(:query, :converter) do def query_type query.query_type end def query_args - from_query_payloads(query.query_args) + converter.from_query_payloads(query.query_args) end end @@ -125,10 +123,10 @@ def legacy_query_task? def parse_queries # Support for deprecated query style if legacy_query_task? - { LEGACY_QUERY_KEY => Query.new(task.query) } + { LEGACY_QUERY_KEY => Query.new(task.query, config.converter) } else task.queries.each_with_object({}) do |(query_id, query), result| - result[query_id] = Query.new(query) + result[query_id] = Query.new(query, config.converter) end end end diff --git a/spec/config/test_converter.rb b/spec/config/test_converter.rb new file mode 100644 index 00000000..6cb9fce5 --- /dev/null +++ b/spec/config/test_converter.rb @@ -0,0 +1,8 @@ +require 'temporal/converter_wrapper' + +# This is a barebones default converter that can be used in tests +# where default conversion behaviour is expected +TEST_CONVERTER = Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC +).freeze diff --git a/spec/fabricators/grpc/activity_task_fabricator.rb b/spec/fabricators/grpc/activity_task_fabricator.rb index 6d2a531d..82e0886f 100644 --- a/spec/fabricators/grpc/activity_task_fabricator.rb +++ b/spec/fabricators/grpc/activity_task_fabricator.rb @@ -6,7 +6,7 @@ activity_id { SecureRandom.uuid } task_token { |attrs| attrs[:task_token] || SecureRandom.uuid } activity_type { Fabricate(:api_activity_type) } - input { Temporal.configuration.converter.to_payloads(nil) } + input { TEST_CONVERTER.to_payloads(nil) } workflow_type { Fabricate(:api_workflow_type) } workflow_execution { Fabricate(:api_workflow_execution) } current_attempt_scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } @@ -15,7 +15,7 @@ current_attempt_scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } header do |attrs| fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h| - h[field] = Temporal.configuration.converter.to_payload(value) + h[field] = TEST_CONVERTER.to_payload(value) end Temporalio::Api::Common::V1::Header.new(fields: fields) end diff --git a/spec/fabricators/grpc/application_failure_fabricator.rb b/spec/fabricators/grpc/application_failure_fabricator.rb index 9d1396d8..95089cb7 100644 --- a/spec/fabricators/grpc/application_failure_fabricator.rb +++ b/spec/fabricators/grpc/application_failure_fabricator.rb @@ -1,7 +1,3 @@ -require 'temporal/concerns/payloads' -class TestDeserializer - include Temporal::Concerns::Payloads -end # Simulates Temporal::Connection::Serializer::Failure Fabricator(:api_application_failure, from: Temporalio::Api::Failure::V1::Failure) do transient :error_class, :backtrace @@ -10,7 +6,7 @@ class TestDeserializer application_failure_info do |attrs| Temporalio::Api::Failure::V1::ApplicationFailureInfo.new( type: attrs[:error_class], - details: TestDeserializer.new.to_details_payloads(attrs[:message]), + details: TEST_CONVERTER.to_details_payloads(attrs[:message]), ) end end diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb index 4562d7ef..ad9a55e8 100644 --- a/spec/fabricators/grpc/history_event_fabricator.rb +++ b/spec/fabricators/grpc/history_event_fabricator.rb @@ -1,11 +1,4 @@ require 'securerandom' -require 'temporal/concerns/payloads' - -class TestSerializer - extend Temporal::Concerns::Payloads -end - -include Temporal::Concerns::Payloads Fabricator(:api_history_event, from: Temporalio::Api::History::V1::HistoryEvent) do event_id { 1 } @@ -17,9 +10,9 @@ class TestSerializer event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED } event_time { Time.now } workflow_execution_started_event_attributes do |attrs| - header_fields = to_payload_map(attrs[:headers] || {}) + header_fields = TEST_CONVERTER.to_payload_map(attrs[:headers] || {}) header = Temporalio::Api::Common::V1::Header.new(fields: header_fields) - indexed_fields = attrs[:search_attributes] ? to_payload_map(attrs[:search_attributes]) : nil + indexed_fields = attrs[:search_attributes] ? TEST_CONVERTER.to_payload_map(attrs[:search_attributes]) : nil Temporalio::Api::History::V1::WorkflowExecutionStartedEventAttributes.new( workflow_type: Fabricate(:api_workflow_type), @@ -142,7 +135,7 @@ class TestSerializer event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_CANCELED } activity_task_canceled_event_attributes do |attrs| Temporalio::Api::History::V1::ActivityTaskCanceledEventAttributes.new( - details: TestSerializer.to_details_payloads('ACTIVITY_ID_NOT_STARTED'), + details: TEST_CONVERTER.to_details_payloads('ACTIVITY_ID_NOT_STARTED'), scheduled_event_id: attrs[:event_id] - 2, started_event_id: nil, identity: 'test-worker@test-host' @@ -197,7 +190,7 @@ class TestSerializer transient :search_attributes event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES } upsert_workflow_search_attributes_event_attributes do |attrs| - indexed_fields = attrs[:search_attributes] ? to_payload_map(attrs[:search_attributes]) : nil + indexed_fields = attrs[:search_attributes] ? TEST_CONVERTER.to_payload_map(attrs[:search_attributes]) : nil Temporalio::Api::History::V1::UpsertWorkflowSearchAttributesEventAttributes.new( workflow_task_completed_event_id: attrs[:event_id] - 1, search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( @@ -213,7 +206,7 @@ class TestSerializer Temporalio::Api::History::V1::MarkerRecordedEventAttributes.new( workflow_task_completed_event_id: attrs[:event_id] - 1, marker_name: 'SIDE_EFFECT', - details: to_payload_map({}) + details: TEST_CONVERTER.to_payload_map({}) ) end end diff --git a/spec/fabricators/grpc/memo_fabricator.rb b/spec/fabricators/grpc/memo_fabricator.rb index 38f764f2..cf499c8a 100644 --- a/spec/fabricators/grpc/memo_fabricator.rb +++ b/spec/fabricators/grpc/memo_fabricator.rb @@ -1,7 +1,7 @@ Fabricator(:memo, from: Temporalio::Api::Common::V1::Memo) do fields do Google::Protobuf::Map.new(:string, :message, Temporalio::Api::Common::V1::Payload).tap do |m| - m['foo'] = Temporal.configuration.converter.to_payload('bar') + m['foo'] = TEST_CONVERTER.to_payload('bar') end end end diff --git a/spec/fabricators/grpc/payload_fabricator.rb b/spec/fabricators/grpc/payload_fabricator.rb index badd8f36..9312da42 100644 --- a/spec/fabricators/grpc/payload_fabricator.rb +++ b/spec/fabricators/grpc/payload_fabricator.rb @@ -1,3 +1,23 @@ Fabricator(:api_payload, from: Temporalio::Api::Common::V1::Payload) do metadata { Google::Protobuf::Map.new(:string, :bytes) } end + +Fabricator(:api_payload_nil, from: :api_payload) do + metadata do + Google::Protobuf::Map.new(:string, :bytes).tap do |m| + m['encoding'] = Temporal::Connection::Converter::Payload::Nil::ENCODING + end + end +end + +Fabricator(:api_payload_bytes, from: :api_payload) do + transient :bytes + + metadata do + Google::Protobuf::Map.new(:string, :bytes).tap do |m| + m['encoding'] = Temporal::Connection::Converter::Payload::Bytes::ENCODING + end + end + + data { |attrs| attrs.fetch(:bytes, 'foobar') } +end diff --git a/spec/fabricators/grpc/payloads_fabricator.rb b/spec/fabricators/grpc/payloads_fabricator.rb new file mode 100644 index 00000000..a8f3aff0 --- /dev/null +++ b/spec/fabricators/grpc/payloads_fabricator.rb @@ -0,0 +1,9 @@ +Fabricator(:api_payloads, from: Temporalio::Api::Common::V1::Payloads) do + transient :payloads_array + + payloads do |attrs| + Google::Protobuf::RepeatedField.new(:message, Temporalio::Api::Common::V1::Payload).tap do |m| + m.concat(Array(attrs.fetch(:payloads_array, Fabricate(:api_payload)))) + end + end +end diff --git a/spec/fabricators/grpc/search_attributes_fabricator.rb b/spec/fabricators/grpc/search_attributes_fabricator.rb index 16a33675..1e98516e 100644 --- a/spec/fabricators/grpc/search_attributes_fabricator.rb +++ b/spec/fabricators/grpc/search_attributes_fabricator.rb @@ -1,7 +1,7 @@ Fabricator(:search_attributes, from: Temporalio::Api::Common::V1::SearchAttributes) do indexed_fields do Google::Protobuf::Map.new(:string, :message, Temporalio::Api::Common::V1::Payload).tap do |m| - m['foo'] = Temporal.configuration.converter.to_payload('bar') + m['foo'] = TEST_CONVERTER.to_payload('bar') end end end diff --git a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb index 172bd7a5..0c1449fe 100644 --- a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb +++ b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb @@ -12,7 +12,7 @@ task_queue { Fabricate(:api_task_queue) } header do |attrs| fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h| - h[field] = Temporal.configuration.converter.to_payload(value) + h[field] = TEST_CONVERTER.to_payload(value) end Temporalio::Api::Common::V1::Header.new(fields: fields) end diff --git a/spec/fabricators/grpc/workflow_query_fabricator.rb b/spec/fabricators/grpc/workflow_query_fabricator.rb index 024cdd59..f8831d49 100644 --- a/spec/fabricators/grpc/workflow_query_fabricator.rb +++ b/spec/fabricators/grpc/workflow_query_fabricator.rb @@ -1,4 +1,4 @@ Fabricator(:api_workflow_query, from: Temporalio::Api::Query::V1::WorkflowQuery) do query_type { 'state' } - query_args { Temporal.configuration.converter.to_payloads(['']) } + query_args { TEST_CONVERTER.to_payloads(['']) } end diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index e4ccdb2a..6999ba60 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -17,7 +17,7 @@ input: config.converter.to_payloads(input) ) end - let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace) } + let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace, config.converter) } let(:workflow_name) { task.workflow_type.name } let(:activity_name) { 'TestActivity' } let(:connection) { instance_double('Temporal::Connection::GRPC') } @@ -40,7 +40,7 @@ .and_return(connection) allow(Temporal::Metadata) .to receive(:generate_activity_metadata) - .with(task, namespace) + .with(task, namespace, config.converter) .and_return(metadata) allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata, config, heartbeat_thread_pool).and_return(context) diff --git a/spec/unit/lib/temporal/configuration_spec.rb b/spec/unit/lib/temporal/configuration_spec.rb index c1024e34..8ab2e282 100644 --- a/spec/unit/lib/temporal/configuration_spec.rb +++ b/spec/unit/lib/temporal/configuration_spec.rb @@ -62,4 +62,50 @@ def inject!(_); end expect(subject.for_connection).to have_attributes(identity: new_identity) end end -end \ No newline at end of file + + describe '#converter' do + it 'wraps the provided converter and codec' do + converter_wrapper = subject.converter + + expect(converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(converter_wrapper.send(:converter)).to eq(described_class::DEFAULT_CONVERTER) + expect(converter_wrapper.send(:codec)).to eq(described_class::DEFAULT_PAYLOAD_CODEC) + end + end + + describe '#converter=' do + let(:converter) { instance_double(Temporal::Connection::Converter::Composite) } + + it 'resets the wrapper when converter has changed' do + old_converter_wrapper = subject.converter + + expect(old_converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(old_converter_wrapper.send(:converter)).to eq(described_class::DEFAULT_CONVERTER) + + subject.converter = converter + new_converter_wrapper = subject.converter + + expect(new_converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(new_converter_wrapper.send(:converter)).to eq(converter) + expect(new_converter_wrapper.send(:codec)).to eq(old_converter_wrapper.send(:codec)) + end + end + + describe '#payload_codec=' do + let(:codec) { Temporal::Connection::Converter::Codec::Base.new } + + it 'resets the wrapper when converter has changed' do + old_converter_wrapper = subject.converter + + expect(old_converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(old_converter_wrapper.send(:codec)).to eq(described_class::DEFAULT_PAYLOAD_CODEC) + + subject.payload_codec = codec + new_converter_wrapper = subject.converter + + expect(new_converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(new_converter_wrapper.send(:codec)).to eq(codec) + expect(new_converter_wrapper.send(:converter)).to eq(old_converter_wrapper.send(:converter)) + end + end +end diff --git a/spec/unit/lib/temporal/connection/serializer/backfill_spec.rb b/spec/unit/lib/temporal/connection/serializer/backfill_spec.rb index e7d980f3..b4505a57 100644 --- a/spec/unit/lib/temporal/connection/serializer/backfill_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/backfill_spec.rb @@ -3,6 +3,12 @@ require "temporal/connection/serializer/backfill" describe Temporal::Connection::Serializer::Backfill do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end let(:example_backfill) do Temporal::Schedule::Backfill.new( start_time: Time.new(2000, 1, 1, 0, 0, 0), @@ -15,13 +21,13 @@ it "raises an error if an invalid overlap_policy is specified" do invalid = Temporal::Schedule::Backfill.new(overlap_policy: :foobar) expect do - described_class.new(invalid).to_proto + described_class.new(invalid, converter).to_proto end .to(raise_error(Temporal::Connection::ArgumentError, "Unknown schedule overlap policy specified: foobar")) end it "produces well-formed protobuf" do - result = described_class.new(example_backfill).to_proto + result = described_class.new(example_backfill, converter).to_proto expect(result).to(be_a(Temporalio::Api::Schedule::V1::BackfillRequest)) expect(result.overlap_policy).to(eq(:SCHEDULE_OVERLAP_POLICY_BUFFER_ALL)) diff --git a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb index 046b066c..398231da 100644 --- a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb @@ -2,6 +2,13 @@ require 'temporal/workflow/command' describe Temporal::Connection::Serializer::ContinueAsNew do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end + describe 'to_proto' do it 'produces a protobuf' do timeouts = { @@ -19,7 +26,7 @@ search_attributes: {'foo-search-attribute': 'qux'}, ) - result = described_class.new(command).to_proto + result = described_class.new(command, converter).to_proto expect(result).to be_an_instance_of(Temporalio::Api::Command::V1::Command) expect(result.command_type).to eql( diff --git a/spec/unit/lib/temporal/connection/serializer/failure_spec.rb b/spec/unit/lib/temporal/connection/serializer/failure_spec.rb index 4242554e..2bde0337 100644 --- a/spec/unit/lib/temporal/connection/serializer/failure_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/failure_spec.rb @@ -1,14 +1,17 @@ require 'temporal/connection/serializer/failure' require 'temporal/workflow/command' -class TestDeserializer - include Temporal::Concerns::Payloads -end - describe Temporal::Connection::Serializer::Failure do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end + describe 'to_proto' do it 'produces a protobuf' do - result = described_class.new(StandardError.new('test')).to_proto + result = described_class.new(StandardError.new('test'), converter).to_proto expect(result).to be_an_instance_of(Temporalio::Api::Failure::V1::Failure) end @@ -31,10 +34,10 @@ def initialize(foo, bar, bad_class:) it 'Serializes round-trippable full errors when asked to' do # Make sure serializing various bits round-trips e = MyError.new(['seven', 'three'], "Bar", bad_class: NaughtyClass) - failure_proto = described_class.new(e, serialize_whole_error: true).to_proto + failure_proto = described_class.new(e, converter, serialize_whole_error: true).to_proto expect(failure_proto.application_failure_info.type).to eq("MyError") - deserialized_error = TestDeserializer.new.from_details_payloads(failure_proto.application_failure_info.details) + deserialized_error = converter.from_details_payloads(failure_proto.application_failure_info.details) expect(deserialized_error).to be_an_instance_of(MyError) expect(deserialized_error.message).to eq("Hello, Bar!") expect(deserialized_error.foo).to eq(['seven', 'three']) @@ -53,23 +56,23 @@ def initialize(message) it 'deals with too-large serialization using the old path' do e = MyBigError.new('Uh oh!') # Normal serialization path - failure_proto = described_class.new(e, serialize_whole_error: true, max_bytes: 1000).to_proto + failure_proto = described_class.new(e, converter, serialize_whole_error: true, max_bytes: 1000).to_proto expect(failure_proto.application_failure_info.type).to eq('MyBigError') - deserialized_error = TestDeserializer.new.from_details_payloads(failure_proto.application_failure_info.details) + deserialized_error = converter.from_details_payloads(failure_proto.application_failure_info.details) expect(deserialized_error).to be_an_instance_of(MyBigError) expect(deserialized_error.big_payload).to eq('123456789012345678901234567890123456789012345678901234567890') # Exercise legacy serialization mechanism - failure_proto = described_class.new(e, serialize_whole_error: false).to_proto + failure_proto = described_class.new(e, converter, serialize_whole_error: false).to_proto expect(failure_proto.application_failure_info.type).to eq('MyBigError') - old_style_deserialized_error = MyBigError.new(TestDeserializer.new.from_details_payloads(failure_proto.application_failure_info.details)) + old_style_deserialized_error = MyBigError.new(converter.from_details_payloads(failure_proto.application_failure_info.details)) expect(old_style_deserialized_error).to be_an_instance_of(MyBigError) expect(old_style_deserialized_error.message).to eq('Uh oh!') # If the payload size exceeds the max_bytes, we fallback to the old-style serialization. - failure_proto = described_class.new(e, serialize_whole_error: true, max_bytes: 50).to_proto + failure_proto = described_class.new(e, converter, serialize_whole_error: true, max_bytes: 50).to_proto expect(failure_proto.application_failure_info.type).to eq('MyBigError') - avoids_truncation_error = MyBigError.new(TestDeserializer.new.from_details_payloads(failure_proto.application_failure_info.details)) + avoids_truncation_error = MyBigError.new(converter.from_details_payloads(failure_proto.application_failure_info.details)) expect(avoids_truncation_error).to be_an_instance_of(MyBigError) expect(avoids_truncation_error.message).to eq('Uh oh!') @@ -82,7 +85,7 @@ def initialize(message) allow(Temporal.logger).to receive(:error) max_bytes = 50 - described_class.new(e, serialize_whole_error: true, max_bytes: max_bytes).to_proto + described_class.new(e, converter, serialize_whole_error: true, max_bytes: max_bytes).to_proto expect(Temporal.logger) .to have_received(:error) .with( @@ -99,7 +102,7 @@ def initialize; end it 'successfully processes an error with no constructor arguments' do e = MyArglessError.new - failure_proto = described_class.new(e, serialize_whole_error: true).to_proto + failure_proto = described_class.new(e, converter, serialize_whole_error: true).to_proto expect(failure_proto.application_failure_info.type).to eq('MyArglessError') end diff --git a/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb b/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb index 62028824..5e912206 100644 --- a/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/query_answer_spec.rb @@ -1,23 +1,25 @@ require 'temporal/connection/serializer/query_failure' require 'temporal/workflow/query_result' -require 'temporal/concerns/payloads' describe Temporal::Connection::Serializer::QueryAnswer do - class TestDeserializer - extend Temporal::Concerns::Payloads + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) end describe 'to_proto' do let(:query_result) { Temporal::Workflow::QueryResult.answer(42) } it 'produces a protobuf' do - result = described_class.new(query_result).to_proto + result = described_class.new(query_result, converter).to_proto expect(result).to be_a(Temporalio::Api::Query::V1::WorkflowQueryResult) expect(result.result_type).to eq(Temporalio::Api::Enums::V1::QueryResultType.lookup( Temporalio::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED) ) - expect(result.answer).to eq(TestDeserializer.to_query_payloads(42)) + expect(result.answer).to eq(converter.to_query_payloads(42)) end end end diff --git a/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb b/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb index 0590c0c4..62926aea 100644 --- a/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/query_failure_spec.rb @@ -2,12 +2,19 @@ require 'temporal/workflow/query_result' describe Temporal::Connection::Serializer::QueryFailure do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end + describe 'to_proto' do let(:exception) { StandardError.new('Test query failure') } let(:query_result) { Temporal::Workflow::QueryResult.failure(exception) } it 'produces a protobuf' do - result = described_class.new(query_result).to_proto + result = described_class.new(query_result, converter).to_proto expect(result).to be_a(Temporalio::Api::Query::V1::WorkflowQueryResult) expect(result.result_type).to eq(Temporalio::Api::Enums::V1::QueryResultType.lookup( diff --git a/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb b/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb index 211f807f..5e27503f 100644 --- a/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb @@ -2,6 +2,13 @@ require 'temporal/connection/serializer/retry_policy' describe Temporal::Connection::Serializer::RetryPolicy do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end + describe 'to_proto' do let(:example_policy) do Temporal::RetryPolicy.new( @@ -14,7 +21,7 @@ end it 'converts to proto' do - proto = described_class.new(example_policy).to_proto + proto = described_class.new(example_policy, converter).to_proto expect(proto.initial_interval.seconds).to eq(1) expect(proto.backoff_coefficient).to eq(1.5) expect(proto.maximum_interval.seconds).to eq(5) diff --git a/spec/unit/lib/temporal/connection/serializer/schedule_action_spec.rb b/spec/unit/lib/temporal/connection/serializer/schedule_action_spec.rb index 275bb8e0..93f9e87c 100644 --- a/spec/unit/lib/temporal/connection/serializer/schedule_action_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/schedule_action_spec.rb @@ -3,6 +3,12 @@ require "temporal/connection/serializer/schedule_action" describe Temporal::Connection::Serializer::ScheduleAction do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end let(:timeouts) { {run: 100, task: 10} } let(:example_action) do @@ -24,7 +30,7 @@ describe "to_proto" do it "raises an error if an invalid action is specified" do expect do - described_class.new(123).to_proto + described_class.new(123, converter).to_proto end .to(raise_error(Temporal::Connection::ArgumentError)) do |e| expect(e.message).to(eq("Unknown action type Integer")) @@ -32,7 +38,7 @@ end it "produces well-formed protobuf" do - result = described_class.new(example_action).to_proto + result = described_class.new(example_action, converter).to_proto expect(result).to(be_a(Temporalio::Api::Schedule::V1::ScheduleAction)) diff --git a/spec/unit/lib/temporal/connection/serializer/schedule_policies_spec.rb b/spec/unit/lib/temporal/connection/serializer/schedule_policies_spec.rb index cf64ed98..2b51cee3 100644 --- a/spec/unit/lib/temporal/connection/serializer/schedule_policies_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/schedule_policies_spec.rb @@ -2,6 +2,12 @@ require "temporal/connection/serializer/schedule_policies" describe Temporal::Connection::Serializer::SchedulePolicies do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end let(:example_policies) do Temporal::Schedule::SchedulePolicies.new( overlap_policy: :buffer_one, @@ -12,7 +18,7 @@ describe "to_proto" do it "produces well-formed protobuf" do - result = described_class.new(example_policies).to_proto + result = described_class.new(example_policies, converter).to_proto expect(result).to(be_a(Temporalio::Api::Schedule::V1::SchedulePolicies)) expect(result.overlap_policy).to(eq(:SCHEDULE_OVERLAP_POLICY_BUFFER_ONE)) @@ -23,7 +29,7 @@ it "should raise if an unknown overlap policy is specified" do invalid_policies = Temporal::Schedule::SchedulePolicies.new(overlap_policy: :foobar) expect do - described_class.new(invalid_policies).to_proto + described_class.new(invalid_policies, converter).to_proto end .to(raise_error(Temporal::Connection::ArgumentError, "Unknown schedule overlap policy specified: foobar")) end diff --git a/spec/unit/lib/temporal/connection/serializer/schedule_spec_spec.rb b/spec/unit/lib/temporal/connection/serializer/schedule_spec_spec.rb index c0aa636f..ee0cd0f8 100644 --- a/spec/unit/lib/temporal/connection/serializer/schedule_spec_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/schedule_spec_spec.rb @@ -4,6 +4,12 @@ require "temporal/connection/serializer/schedule_spec" describe Temporal::Connection::Serializer::ScheduleSpec do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end let(:example_spec) do Temporal::Schedule::ScheduleSpec.new( cron_expressions: ["@hourly"], @@ -33,7 +39,7 @@ describe "to_proto" do it "produces well-formed protobuf" do - result = described_class.new(example_spec).to_proto + result = described_class.new(example_spec, converter).to_proto expect(result).to(be_a(Temporalio::Api::Schedule::V1::ScheduleSpec)) expect(result.cron_string).to(eq(["@hourly"])) diff --git a/spec/unit/lib/temporal/connection/serializer/schedule_state_spec.rb b/spec/unit/lib/temporal/connection/serializer/schedule_state_spec.rb index 16c47732..3fbe8051 100644 --- a/spec/unit/lib/temporal/connection/serializer/schedule_state_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/schedule_state_spec.rb @@ -2,6 +2,12 @@ require "temporal/connection/serializer/schedule_state" describe Temporal::Connection::Serializer::ScheduleState do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end let(:example_state) do Temporal::Schedule::ScheduleState.new( notes: "some notes", @@ -13,7 +19,7 @@ describe "to_proto" do it "produces well-formed protobuf" do - result = described_class.new(example_state).to_proto + result = described_class.new(example_state, converter).to_proto expect(result).to(be_a(Temporalio::Api::Schedule::V1::ScheduleState)) expect(result.notes).to(eq("some notes")) diff --git a/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb b/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb index 2e72951c..ae26f88f 100644 --- a/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb @@ -3,6 +3,12 @@ require 'temporal/connection/serializer/start_child_workflow' describe Temporal::Connection::Serializer::StartChildWorkflow do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end let(:example_command) do Temporal::Workflow::Command::StartChildWorkflow.new( workflow_id: SecureRandom.uuid, @@ -24,7 +30,7 @@ command.parent_close_policy = :invalid expect do - described_class.new(command).to_proto + described_class.new(command, converter).to_proto end.to raise_error(Temporal::Connection::ArgumentError) do |e| expect(e.message).to eq("Unknown parent_close_policy '#{command.parent_close_policy}' specified") end @@ -40,7 +46,7 @@ command = example_command command.parent_close_policy = policy_name - result = described_class.new(command).to_proto + result = described_class.new(command, converter).to_proto attribs = result.start_child_workflow_execution_command_attributes expect(attribs.parent_close_policy).to eq(expected_parent_close_policy) end diff --git a/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb b/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb index bc94128f..5bdace1a 100644 --- a/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb @@ -3,11 +3,14 @@ require 'temporal/connection/serializer/upsert_search_attributes' require 'temporal/workflow/command' -class TestDeserializer - extend Temporal::Concerns::Payloads -end - describe Temporal::Connection::Serializer::UpsertSearchAttributes do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end + it 'produces a protobuf that round-trips' do expected_attributes = { 'CustomStringField' => 'moo', @@ -22,14 +25,14 @@ class TestDeserializer search_attributes: expected_attributes ) - result = described_class.new(command).to_proto + result = described_class.new(command, converter).to_proto expect(result).to be_an_instance_of(Temporalio::Api::Command::V1::Command) expect(result.command_type).to eql( :COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES ) command_attributes = result.upsert_workflow_search_attributes_command_attributes expect(command_attributes).not_to be_nil - actual_attributes = TestDeserializer.from_payload_map_without_codec(command_attributes&.search_attributes&.indexed_fields) + actual_attributes = converter.from_payload_map_without_codec(command_attributes&.search_attributes&.indexed_fields) expect(actual_attributes).to eql(expected_attributes) end diff --git a/spec/unit/lib/temporal/connection/serializer/workflow_id_reuse_policy_spec.rb b/spec/unit/lib/temporal/connection/serializer/workflow_id_reuse_policy_spec.rb index ce139325..b1ee6cad 100644 --- a/spec/unit/lib/temporal/connection/serializer/workflow_id_reuse_policy_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/workflow_id_reuse_policy_spec.rb @@ -2,6 +2,13 @@ require 'temporal/connection/serializer/retry_policy' describe Temporal::Connection::Serializer::WorkflowIdReusePolicy do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end + describe 'to_proto' do SYM_TO_PROTO = { allow_failed: Temporalio::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, @@ -12,7 +19,7 @@ def self.test_valid_policy(policy_sym) it "serializes #{policy_sym}" do - proto_enum = described_class.new(policy_sym).to_proto + proto_enum = described_class.new(policy_sym, converter).to_proto expected = SYM_TO_PROTO[policy_sym] expect(proto_enum).to eq(expected) end @@ -25,7 +32,7 @@ def self.test_valid_policy(policy_sym) it "rejects invalid policies" do expect do - described_class.new(:not_a_valid_policy).to_proto + described_class.new(:not_a_valid_policy, converter).to_proto end.to raise_error(Temporal::Connection::ArgumentError, 'Unknown workflow_id_reuse_policy specified: not_a_valid_policy') end end diff --git a/spec/unit/lib/temporal/connection_spec.rb b/spec/unit/lib/temporal/connection_spec.rb index f334d6d8..a3e5642f 100644 --- a/spec/unit/lib/temporal/connection_spec.rb +++ b/spec/unit/lib/temporal/connection_spec.rb @@ -25,6 +25,7 @@ expect(subject).to be_kind_of(Temporal::Connection::GRPC) expect(subject.send(:identity)).not_to be_nil expect(subject.send(:credentials)).to eq(:this_channel_is_insecure) + expect(subject.send(:converter)).to eq(config.converter) end end @@ -35,6 +36,7 @@ expect(subject).to be_kind_of(Temporal::Connection::GRPC) expect(subject.send(:identity)).not_to be_nil expect(subject.send(:credentials)).to be_kind_of(GRPC::Core::ChannelCredentials) + expect(subject.send(:converter)).to eq(config.converter) end end @@ -45,6 +47,7 @@ expect(subject).to be_kind_of(Temporal::Connection::GRPC) expect(subject.send(:identity)).not_to be_nil expect(subject.send(:credentials)).to be_kind_of(GRPC::Core::CallCredentials) + expect(subject.send(:converter)).to eq(config.converter) end end @@ -61,6 +64,7 @@ expect(subject).to be_kind_of(Temporal::Connection::GRPC) expect(subject.send(:identity)).not_to be_nil expect(subject.send(:credentials)).to be_kind_of(GRPC::Core::ChannelCredentials) + expect(subject.send(:converter)).to eq(config.converter) end end end diff --git a/spec/unit/lib/temporal/converter_wrapper_spec.rb b/spec/unit/lib/temporal/converter_wrapper_spec.rb new file mode 100644 index 00000000..f5b06af4 --- /dev/null +++ b/spec/unit/lib/temporal/converter_wrapper_spec.rb @@ -0,0 +1,175 @@ +require 'temporal/converter_wrapper' +require 'temporal/connection/converter/payload/bytes' +require 'temporal/connection/converter/payload/nil' +require 'temporal/connection/converter/composite' + +describe Temporal::ConverterWrapper do + class TestCodec < Temporal::Connection::Converter::Codec::Base + def encode(payload) + return payload + end + + def decode(payload) + return payload + end + end + + subject { described_class.new(converter, codec) } + let(:converter) do + Temporal::Connection::Converter::Composite.new(payload_converters: [ + Temporal::Connection::Converter::Payload::Bytes.new, + Temporal::Connection::Converter::Payload::Nil.new + ]) + end + let(:codec) { Temporal::Connection::Converter::Codec::Chain.new(payload_codecs: [TestCodec.new]) } + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes, payload_nil]) } + let(:payload_bytes) { Fabricate(:api_payload_bytes, bytes: 'test-payload') } + let(:payload_nil) { Fabricate(:api_payload_nil) } + + before do + allow(codec).to receive(:encode).and_call_original + allow(codec).to receive(:encodes).and_call_original + allow(codec).to receive(:decode).and_call_original + allow(codec).to receive(:decodes).and_call_original + end + + describe '#from_payloads' do + it 'decodes and converts' do + expect(subject.from_payloads(payloads)).to eq(['test-payload', nil]) + expect(codec).to have_received(:decodes) + end + end + + describe '#from_payload' do + it 'decodes and converts' do + expect(subject.from_payload(payload_bytes)).to eq('test-payload') + expect(codec).to have_received(:decode) + end + end + + describe '#from_payload_map_without_codec' do + let(:payload_map) do + Google::Protobuf::Map.new(:string, :message, Temporalio::Api::Common::V1::Payload).tap do |m| + m['first'] = payload_bytes + m['second'] = payload_nil + end + end + + it 'converts' do + expect(subject.from_payload_map_without_codec(payload_map)) + .to eq('first' => 'test-payload', 'second' => nil) + expect(codec).not_to have_received(:decode) + end + end + + describe '#from_result_payloads' do + it 'decodes and converts' do + expect(subject.from_result_payloads(payloads)).to eq('test-payload') + expect(codec).to have_received(:decodes) + end + end + + describe '#from_details_payloads' do + it 'decodes and converts first payload' do + expect(subject.from_details_payloads(payloads)).to eq('test-payload') + expect(codec).to have_received(:decodes) + end + end + + describe '#from_signal_payloads' do + it 'decodes and converts first payload' do + expect(subject.from_signal_payloads(payloads)).to eq('test-payload') + expect(codec).to have_received(:decodes) + end + end + + describe '#from_query_payloads' do + it 'decodes and converts first payload' do + expect(subject.from_query_payloads(payloads)).to eq('test-payload') + expect(codec).to have_received(:decodes) + end + end + + describe '#from_payload_map' do + let(:payload_map) do + Google::Protobuf::Map.new(:string, :message, Temporalio::Api::Common::V1::Payload).tap do |m| + m['first'] = payload_bytes + m['second'] = payload_nil + end + end + + it 'decodes and converts first payload' do + expect(subject.from_payload_map(payload_map)) + .to eq('first' => 'test-payload', 'second' => nil) + expect(codec).to have_received(:decode).twice + end + end + + describe '#to_payloads' do + it 'converts and encodes' do + expect(subject.to_payloads(['test-payload'.b, nil])).to eq(payloads) + expect(codec).to have_received(:encodes) + end + end + + describe '#to_payload' do + it 'converts and encodes' do + expect(subject.to_payload('test-payload'.b)).to eq(payload_bytes) + expect(codec).to have_received(:encode) + end + end + + describe '#to_payload_map_without_codec' do + let(:payload_map) { { first: payload_bytes, second: payload_nil } } + + it 'converts' do + expect(subject.to_payload_map_without_codec(first: 'test-payload'.b, second: nil)).to eq(payload_map) + expect(codec).not_to have_received(:encode) + end + end + + describe '#to_result_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts and encodes' do + expect(subject.to_result_payloads('test-payload'.b)).to eq(payloads) + expect(codec).to have_received(:encodes) + end + end + + describe '#to_details_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts and encodes' do + expect(subject.to_details_payloads('test-payload'.b)).to eq(payloads) + expect(codec).to have_received(:encodes) + end + end + + describe '#to_signal_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts and encodes' do + expect(subject.to_signal_payloads('test-payload'.b)).to eq(payloads) + expect(codec).to have_received(:encodes) + end + end + + describe '#to_query_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts and encodes' do + expect(subject.to_query_payloads('test-payload'.b)).to eq(payloads) + expect(codec).to have_received(:encodes) + end + end + + describe '#to_payload_map' do + let(:payload_map) { { first: payload_bytes, second: payload_nil } } + + it 'converts and encodes' do + expect(subject.to_payload_map(first: 'test-payload'.b, second: nil)).to eq(payload_map) + expect(codec).to have_received(:encode).twice + end + end +end diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index cb97469f..9d0fe528 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -1,8 +1,15 @@ require 'temporal/connection/grpc' +require 'temporal/converter_wrapper' require 'temporal/workflow/query_result' describe Temporal::Connection::GRPC do let(:identity) { 'my-identity' } + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end let(:binary_checksum) { 'v1.0.0' } let(:grpc_stub) { double('grpc stub') } let(:grpc_operator_stub) { double('grpc stub') } @@ -10,12 +17,9 @@ let(:workflow_id) { SecureRandom.uuid } let(:run_id) { SecureRandom.uuid } let(:now) { Time.now} + let(:options) { {} } - subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure) } - - class TestDeserializer - extend Temporal::Concerns::Payloads - end + subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, converter, options) } before do allow(subject).to receive(:client).and_return(grpc_stub) @@ -535,7 +539,7 @@ class TestDeserializer expect(request.completed_type).to eq(Temporalio::Api::Enums::V1::QueryResultType.lookup( Temporalio::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED) ) - expect(request.query_result).to eq(TestDeserializer.to_query_payloads(42)) + expect(request.query_result).to eq(converter.to_query_payloads(42)) expect(request.error_message).to eq('') end end @@ -606,7 +610,7 @@ class TestDeserializer expect(request.query_results['1'].result_type).to eq(Temporalio::Api::Enums::V1::QueryResultType.lookup( Temporalio::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED) ) - expect(request.query_results['1'].answer).to eq(TestDeserializer.to_query_payloads(42)) + expect(request.query_results['1'].answer).to eq(converter.to_query_payloads(42)) expect(request.query_results['2']).to be_a(Temporalio::Api::Query::V1::WorkflowQueryResult) expect(request.query_results['2'].result_type).to eq(Temporalio::Api::Enums::V1::QueryResultType.lookup( @@ -880,7 +884,7 @@ class TestDeserializer end context "when keepalive_time_ms is passed" do - subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, keepalive_time_ms: 30_000) } + let(:options) { { keepalive_time_ms: 30_000 } } it "passes the option to the channel args" do expect(Temporalio::Api::WorkflowService::V1::WorkflowService::Stub).to receive(:new).with( @@ -897,7 +901,7 @@ class TestDeserializer end context "when passing retry_connection" do - subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, retry_connection: true) } + let(:options) { { retry_connection: true } } it "passes the option to the channel args" do expect(Temporalio::Api::WorkflowService::V1::WorkflowService::Stub).to receive(:new).with( @@ -932,8 +936,7 @@ class TestDeserializer end context "when passing a custom retry policy" do - subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, retry_policy: retry_policy) } - + let(:options) { { retry_policy: retry_policy } } let(:retry_policy) do { retryableStatusCodes: ["UNAVAILABLE", "INTERNAL"], diff --git a/spec/unit/lib/temporal/metadata_spec.rb b/spec/unit/lib/temporal/metadata_spec.rb index cd21fb76..b3f02955 100644 --- a/spec/unit/lib/temporal/metadata_spec.rb +++ b/spec/unit/lib/temporal/metadata_spec.rb @@ -1,8 +1,15 @@ require 'temporal/metadata' describe Temporal::Metadata do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end + describe '.generate_activity_metadata' do - subject { described_class.generate_activity_metadata(data, namespace) } + subject { described_class.generate_activity_metadata(data, namespace, converter) } let(:data) { Fabricate(:api_activity_task) } let(:namespace) { 'test-namespace' } @@ -46,7 +53,7 @@ end context '.generate_workflow_metadata' do - subject { described_class.generate_workflow_metadata(event, task_metadata) } + subject { described_class.generate_workflow_metadata(event, task_metadata, converter) } let(:event) { Temporal::Workflow::History::Event.new(Fabricate(:api_workflow_execution_started_event)) } let(:task_metadata) { Fabricate(:workflow_task_metadata) } let(:namespace) { nil } diff --git a/spec/unit/lib/temporal/workflow/errors_spec.rb b/spec/unit/lib/temporal/workflow/errors_spec.rb index 53d86b68..82fb6924 100644 --- a/spec/unit/lib/temporal/workflow/errors_spec.rb +++ b/spec/unit/lib/temporal/workflow/errors_spec.rb @@ -27,6 +27,13 @@ def initialize(foo, bar) end describe Temporal::Workflow::Errors do + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end + describe '.generate_error' do it "instantiates properly when the client has the error" do message = "An error message" @@ -38,7 +45,7 @@ def initialize(foo, bar) error_class: SomeError.to_s ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, converter) expect(e).to be_a(SomeError) expect(e.message).to eq(message) expect(e.backtrace).to eq(stack_trace) @@ -47,9 +54,9 @@ def initialize(foo, bar) it 'correctly deserializes a complex error' do error = MyFancyError.new('foo', 'bar') - failure = Temporal::Connection::Serializer::Failure.new(error, serialize_whole_error: true).to_proto + failure = Temporal::Connection::Serializer::Failure.new(error, converter, serialize_whole_error: true).to_proto - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, converter) expect(e).to be_a(MyFancyError) expect(e.foo).to eq('foo') expect(e.bar).to eq('bar') @@ -68,7 +75,7 @@ def initialize(foo, bar) error_class: 'NonexistentError', ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, converter) expect(e).to be_a(StandardError) expect(e.message).to eq("NonexistentError: An error message") expect(e.backtrace).to eq(stack_trace) @@ -94,7 +101,7 @@ def initialize(foo, bar) error_class: ErrorWithTwoArgs.to_s, ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, converter) expect(e).to be_a(StandardError) expect(e.message).to eq("ErrorWithTwoArgs: An error message") expect(e.backtrace).to eq(stack_trace) @@ -127,7 +134,7 @@ def initialize(foo, bar) error_class: ErrorThatRaisesInInitialize.to_s, ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, converter) expect(e).to be_a(StandardError) expect(e.message).to eq("ErrorThatRaisesInInitialize: An error message") expect(e.backtrace).to eq(stack_trace) diff --git a/spec/unit/lib/temporal/workflow/execution_info_spec.rb b/spec/unit/lib/temporal/workflow/execution_info_spec.rb index ad3368f2..6bef7b2d 100644 --- a/spec/unit/lib/temporal/workflow/execution_info_spec.rb +++ b/spec/unit/lib/temporal/workflow/execution_info_spec.rb @@ -1,7 +1,13 @@ require 'temporal/workflow/execution_info' describe Temporal::Workflow::ExecutionInfo do - subject { described_class.generate_from(api_info) } + subject { described_class.generate_from(api_info, converter) } + let(:converter) do + Temporal::ConverterWrapper.new( + Temporal::Configuration::DEFAULT_CONVERTER, + Temporal::Configuration::DEFAULT_PAYLOAD_CODEC + ) + end let(:api_info) { Fabricate(:api_workflow_execution_info, workflow: 'TestWorkflow', workflow_id: '') } describe '.generate_for' do @@ -25,7 +31,7 @@ it 'deserializes if search_attributes is nil' do api_info.search_attributes = nil - result = described_class.generate_from(api_info) + result = described_class.generate_from(api_info, converter) expect(result.search_attributes).to eq({}) end end diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb index ee567a8b..714dc72b 100644 --- a/spec/unit/lib/temporal/workflow/executor_spec.rb +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -134,9 +134,9 @@ def execute let(:query_2_error) { StandardError.new('Test query failure') } let(:queries) do { - '1' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'success')), - '2' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'failure')), - '3' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'unknown')) + '1' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'success'), config.converter), + '2' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'failure'), config.converter), + '3' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'unknown'), config.converter) } end diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index a1caaa2c..8aa8f9aa 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -229,7 +229,7 @@ class MyWorkflow < Temporal::Workflow; end state_manager.schedule( Temporal::Workflow::Command::RecordMarker.new( name: marker_entry.marker_recorded_event_attributes.marker_name, - details: to_payload_map({}) + details: TEST_CONVERTER.to_payload_map({}) ) )