Skip to content

Commit

Permalink
[Refactor] Remove Temporal::Concerns::Payloads (#314)
Browse files Browse the repository at this point in the history
* Implement ConverterWrapper as a replacement for Concern::Payloads

* Wrap concerter & payload codec with ConverterWrapper in Configuration

* Use ConverterWrapper in Connection::GRPC

* Fix failing specs

* Remove Concerns::Payload from worker and client

* Remove Concerns::Payloads dependency from all the serializers

* Remove Concerns::Payloads from Metadata

* fixup! Remove Concerns::Payloads dependency from all the serializers

* Remove Concerns::Payloads from Errors

* Remove Concerns::Payloads from Executions

* Remove Concerns::Payloads from fabricators

* Remove Concerns::Payloads
  • Loading branch information
antstorm authored Sep 4, 2024
1 parent e3c351f commit c73a07e
Show file tree
Hide file tree
Showing 66 changed files with 687 additions and 370 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
require 'workflows/call_failing_activity_workflow'

describe CallFailingActivityWorkflow, :integration do

class TestDeserializer
include Temporal::Concerns::Payloads
end

it 'correctly re-raises an activity-thrown exception in the workflow' do
workflow_id = SecureRandom.uuid
expected_message = "a failure message"
Expand Down
18 changes: 10 additions & 8 deletions examples/spec/integration/converter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
require 'grpc/errors'

describe 'Converter', :integration do
let(:codec) do
Temporal::Connection::Converter::Codec::Chain.new(
payload_codecs: [
Temporal::CryptPayloadCodec.new
]
)
end

around(:each) do |example|
task_queue = Temporal.configuration.task_queue

Temporal.configure do |config|
config.task_queue = 'crypt'
config.payload_codec = Temporal::Connection::Converter::Codec::Chain.new(
payload_codecs: [
Temporal::CryptPayloadCodec.new
]
)
config.payload_codec = codec
end

example.run
Expand Down Expand Up @@ -67,8 +71,6 @@
completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first
result = completion_event.workflow_execution_completed_event_attributes.result

payload_codec = Temporal.configuration.payload_codec

expect(payload_codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"')
expect(codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"')
end
end
3 changes: 3 additions & 0 deletions lib/temporal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ module Temporal
class << self
def configure(&block)
yield config
# Reset the singleton client after configuration was altered to ensure
# it is initialized with the latest attributes
@default_client = nil
end

def configuration
Expand Down
7 changes: 2 additions & 5 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@
require 'temporal/error_handler'
require 'temporal/errors'
require 'temporal/activity/context'
require 'temporal/concerns/payloads'
require 'temporal/connection/retryer'
require 'temporal/connection'
require 'temporal/metric_keys'

module Temporal
class Activity
class TaskProcessor
include Concerns::Payloads

def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool)
@task = task
@task_queue = task_queue
@namespace = namespace
@metadata = Metadata.generate_activity_metadata(task, namespace)
@metadata = Metadata.generate_activity_metadata(task, namespace, config.converter)
@task_token = task.task_token
@activity_name = task.activity_type.name
@activity_class = activity_lookup.find(activity_name)
Expand All @@ -38,7 +35,7 @@ def process
end

result = middleware_chain.invoke(metadata) do
activity_class.execute_in_context(context, from_payloads(task.input))
activity_class.execute_in_context(context, config.converter.from_payloads(task.input))
end

# Do not complete asynchronous activities, these should be completed manually
Expand Down
20 changes: 8 additions & 12 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module Temporal
class Client
def initialize(config)
@config = config
@converter = config.converter
end

# Start a workflow with an optional signal
Expand Down Expand Up @@ -251,15 +252,15 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
case closed_event.type
when 'WORKFLOW_EXECUTION_COMPLETED'
payloads = closed_event.attributes.result
return ResultConverter.from_result_payloads(payloads)
return converter.from_result_payloads(payloads)
when 'WORKFLOW_EXECUTION_TIMED_OUT'
raise Temporal::WorkflowTimedOut
when 'WORKFLOW_EXECUTION_TERMINATED'
raise Temporal::WorkflowTerminated
when 'WORKFLOW_EXECUTION_CANCELED'
raise Temporal::WorkflowCanceled
when 'WORKFLOW_EXECUTION_FAILED'
raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure)
raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure, converter)
when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW'
new_run_id = closed_event.attributes.new_execution_run_id
# Throw to let the caller know they're not getting the result
Expand Down Expand Up @@ -355,7 +356,7 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id)
run_id: run_id
)

Workflow::ExecutionInfo.generate_from(response.workflow_execution_info)
Workflow::ExecutionInfo.generate_from(response.workflow_execution_info, converter)
end

# Manually complete an activity
Expand Down Expand Up @@ -458,19 +459,19 @@ def get_workflow_history_protobuf(namespace: nil, workflow_id:, run_id: nil)
def list_open_workflow_executions(namespace, from, to = Time.now, filter: {}, next_page_token: nil, max_page_size: nil)
validate_filter(filter, :workflow, :workflow_id)

Temporal::Workflow::Executions.new(connection: connection, status: :open, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter))
Temporal::Workflow::Executions.new(converter, connection: connection, status: :open, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter))
end

def list_closed_workflow_executions(namespace, from, to = Time.now, filter: {}, next_page_token: nil, max_page_size: nil)
validate_filter(filter, :status, :workflow, :workflow_id)

Temporal::Workflow::Executions.new(connection: connection, status: :closed, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter))
Temporal::Workflow::Executions.new(converter, connection: connection, status: :closed, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter))
end

def query_workflow_executions(namespace, query, filter: {}, next_page_token: nil, max_page_size: nil)
validate_filter(filter, :status, :workflow, :workflow_id)

Temporal::Workflow::Executions.new(connection: connection, status: :all, request_options: { namespace: namespace, query: query, next_page_token: next_page_token, max_page_size: max_page_size }.merge(filter))
Temporal::Workflow::Executions.new(converter, connection: connection, status: :all, request_options: { namespace: namespace, query: query, next_page_token: next_page_token, max_page_size: max_page_size }.merge(filter))
end

# Count the number of workflows matching the provided query
Expand Down Expand Up @@ -598,14 +599,9 @@ def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end

class ResultConverter
extend Concerns::Payloads
end
private_constant :ResultConverter

private

attr_reader :config
attr_reader :config, :converter

def compute_run_timeout(execution_options)
execution_options.timeouts[:run] || execution_options.timeouts[:execution]
Expand Down
86 changes: 0 additions & 86 deletions lib/temporal/concerns/payloads.rb

This file was deleted.

24 changes: 20 additions & 4 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'temporal/capabilities'
require 'temporal/converter_wrapper'
require 'temporal/logger'
require 'temporal/metrics_adapters/null'
require 'temporal/middleware/header_propagator_chain'
Expand All @@ -12,13 +13,13 @@

module Temporal
class Configuration
Connection = Struct.new(:type, :host, :port, :credentials, :identity, :connection_options, keyword_init: true)
Connection = Struct.new(:type, :host, :port, :credentials, :identity, :converter, :connection_options, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true)

attr_reader :timeouts, :error_handlers, :capabilities
attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity,
attr_reader :timeouts, :error_handlers, :capabilities, :payload_codec
attr_accessor :connection_type, :use_error_serialization_v2, :host, :port, :credentials, :identity,
:logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators,
:payload_codec, :legacy_signals, :no_signals_in_first_task, :connection_options, :log_on_workflow_replay
:legacy_signals, :no_signals_in_first_task, :connection_options, :log_on_workflow_replay

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
Expand Down Expand Up @@ -124,6 +125,7 @@ def for_connection
port: port,
credentials: credentials,
identity: identity || default_identity,
converter: converter,
connection_options: connection_options.merge(use_error_serialization_v2: @use_error_serialization_v2)
).freeze
end
Expand All @@ -148,6 +150,20 @@ def header_propagator_chain
Middleware::HeaderPropagatorChain.new(header_propagators)
end

def converter
@converter_wrapper ||= ConverterWrapper.new(@converter, @payload_codec)
end

def converter=(new_converter)
@converter = new_converter
@converter_wrapper = nil
end

def payload_codec=(new_codec)
@payload_codec = new_codec
@converter_wrapper = nil
end

private

def default_identity
Expand Down
3 changes: 2 additions & 1 deletion lib/temporal/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ def self.generate(configuration)
port = configuration.port
credentials = configuration.credentials
identity = configuration.identity
converter = configuration.converter
options = configuration.connection_options

connection_class.new(host, port, identity, credentials, options)
connection_class.new(host, port, identity, credentials, converter, options)
end
end
end
Loading

0 comments on commit c73a07e

Please sign in to comment.