Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Remove Temporal::Concerns::Payloads #314

Merged
merged 12 commits into from
Sep 4, 2024
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
require 'workflows/call_failing_activity_workflow'

describe CallFailingActivityWorkflow, :integration do

class TestDeserializer
include Temporal::Concerns::Payloads
end

it 'correctly re-raises an activity-thrown exception in the workflow' do
workflow_id = SecureRandom.uuid
expected_message = "a failure message"
Expand Down
18 changes: 10 additions & 8 deletions examples/spec/integration/converter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
require 'grpc/errors'

describe 'Converter', :integration do
let(:codec) do
Temporal::Connection::Converter::Codec::Chain.new(
payload_codecs: [
Temporal::CryptPayloadCodec.new
]
)
end

around(:each) do |example|
task_queue = Temporal.configuration.task_queue

Temporal.configure do |config|
config.task_queue = 'crypt'
config.payload_codec = Temporal::Connection::Converter::Codec::Chain.new(
payload_codecs: [
Temporal::CryptPayloadCodec.new
]
)
config.payload_codec = codec
end

example.run
Expand Down Expand Up @@ -67,8 +71,6 @@
completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first
result = completion_event.workflow_execution_completed_event_attributes.result

payload_codec = Temporal.configuration.payload_codec

expect(payload_codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"')
expect(codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"')
end
end
3 changes: 3 additions & 0 deletions lib/temporal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ module Temporal
class << self
def configure(&block)
yield config
# Reset the singleton client after configuration was altered to ensure
# it is initialized with the latest attributes
@default_client = nil
end

def configuration
Expand Down
7 changes: 2 additions & 5 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@
require 'temporal/error_handler'
require 'temporal/errors'
require 'temporal/activity/context'
require 'temporal/concerns/payloads'
require 'temporal/connection/retryer'
require 'temporal/connection'
require 'temporal/metric_keys'

module Temporal
class Activity
class TaskProcessor
include Concerns::Payloads

def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool)
@task = task
@task_queue = task_queue
@namespace = namespace
@metadata = Metadata.generate_activity_metadata(task, namespace)
@metadata = Metadata.generate_activity_metadata(task, namespace, config.converter)
@task_token = task.task_token
@activity_name = task.activity_type.name
@activity_class = activity_lookup.find(activity_name)
Expand All @@ -38,7 +35,7 @@ def process
end

result = middleware_chain.invoke(metadata) do
activity_class.execute_in_context(context, from_payloads(task.input))
activity_class.execute_in_context(context, config.converter.from_payloads(task.input))
end

# Do not complete asynchronous activities, these should be completed manually
Expand Down
20 changes: 8 additions & 12 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module Temporal
class Client
def initialize(config)
@config = config
@converter = config.converter
end

# Start a workflow with an optional signal
Expand Down Expand Up @@ -251,15 +252,15 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
case closed_event.type
when 'WORKFLOW_EXECUTION_COMPLETED'
payloads = closed_event.attributes.result
return ResultConverter.from_result_payloads(payloads)
return converter.from_result_payloads(payloads)
when 'WORKFLOW_EXECUTION_TIMED_OUT'
raise Temporal::WorkflowTimedOut
when 'WORKFLOW_EXECUTION_TERMINATED'
raise Temporal::WorkflowTerminated
when 'WORKFLOW_EXECUTION_CANCELED'
raise Temporal::WorkflowCanceled
when 'WORKFLOW_EXECUTION_FAILED'
raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure)
raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure, converter)
when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW'
new_run_id = closed_event.attributes.new_execution_run_id
# Throw to let the caller know they're not getting the result
Expand Down Expand Up @@ -355,7 +356,7 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id)
run_id: run_id
)

Workflow::ExecutionInfo.generate_from(response.workflow_execution_info)
Workflow::ExecutionInfo.generate_from(response.workflow_execution_info, converter)
end

# Manually complete an activity
Expand Down Expand Up @@ -458,19 +459,19 @@ def get_workflow_history_protobuf(namespace: nil, workflow_id:, run_id: nil)
def list_open_workflow_executions(namespace, from, to = Time.now, filter: {}, next_page_token: nil, max_page_size: nil)
validate_filter(filter, :workflow, :workflow_id)

Temporal::Workflow::Executions.new(connection: connection, status: :open, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter))
Temporal::Workflow::Executions.new(converter, connection: connection, status: :open, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter))
end

def list_closed_workflow_executions(namespace, from, to = Time.now, filter: {}, next_page_token: nil, max_page_size: nil)
validate_filter(filter, :status, :workflow, :workflow_id)

Temporal::Workflow::Executions.new(connection: connection, status: :closed, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter))
Temporal::Workflow::Executions.new(converter, connection: connection, status: :closed, request_options: { namespace: namespace, from: from, to: to, next_page_token: next_page_token, max_page_size: max_page_size}.merge(filter))
end

def query_workflow_executions(namespace, query, filter: {}, next_page_token: nil, max_page_size: nil)
validate_filter(filter, :status, :workflow, :workflow_id)

Temporal::Workflow::Executions.new(connection: connection, status: :all, request_options: { namespace: namespace, query: query, next_page_token: next_page_token, max_page_size: max_page_size }.merge(filter))
Temporal::Workflow::Executions.new(converter, connection: connection, status: :all, request_options: { namespace: namespace, query: query, next_page_token: next_page_token, max_page_size: max_page_size }.merge(filter))
end

# Count the number of workflows matching the provided query
Expand Down Expand Up @@ -598,14 +599,9 @@ def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end

class ResultConverter
extend Concerns::Payloads
end
private_constant :ResultConverter

private

attr_reader :config
attr_reader :config, :converter

def compute_run_timeout(execution_options)
execution_options.timeouts[:run] || execution_options.timeouts[:execution]
Expand Down
86 changes: 0 additions & 86 deletions lib/temporal/concerns/payloads.rb

This file was deleted.

24 changes: 20 additions & 4 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'temporal/capabilities'
require 'temporal/converter_wrapper'
require 'temporal/logger'
require 'temporal/metrics_adapters/null'
require 'temporal/middleware/header_propagator_chain'
Expand All @@ -12,13 +13,13 @@

module Temporal
class Configuration
Connection = Struct.new(:type, :host, :port, :credentials, :identity, :connection_options, keyword_init: true)
Connection = Struct.new(:type, :host, :port, :credentials, :identity, :converter, :connection_options, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true)

attr_reader :timeouts, :error_handlers, :capabilities
attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity,
attr_reader :timeouts, :error_handlers, :capabilities, :payload_codec
attr_accessor :connection_type, :use_error_serialization_v2, :host, :port, :credentials, :identity,
:logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators,
:payload_codec, :legacy_signals, :no_signals_in_first_task, :connection_options, :log_on_workflow_replay
:legacy_signals, :no_signals_in_first_task, :connection_options, :log_on_workflow_replay

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
Expand Down Expand Up @@ -124,6 +125,7 @@ def for_connection
port: port,
credentials: credentials,
identity: identity || default_identity,
converter: converter,
connection_options: connection_options.merge(use_error_serialization_v2: @use_error_serialization_v2)
).freeze
end
Expand All @@ -148,6 +150,20 @@ def header_propagator_chain
Middleware::HeaderPropagatorChain.new(header_propagators)
end

def converter
@converter_wrapper ||= ConverterWrapper.new(@converter, @payload_codec)
end

def converter=(new_converter)
@converter = new_converter
@converter_wrapper = nil
end

def payload_codec=(new_codec)
@payload_codec = new_codec
@converter_wrapper = nil
end

private

def default_identity
Expand Down
3 changes: 2 additions & 1 deletion lib/temporal/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ def self.generate(configuration)
port = configuration.port
credentials = configuration.credentials
identity = configuration.identity
converter = configuration.converter
options = configuration.connection_options

connection_class.new(host, port, identity, credentials, options)
connection_class.new(host, port, identity, credentials, converter, options)
end
end
end
Loading
Loading