Skip to content

Commit

Permalink
Support passing activity task rate limit on worker options (#311)
Browse files Browse the repository at this point in the history
* support passing activity task rate limit on worker options

* remove extra space in README
  • Loading branch information
jazev-stripe authored Jul 30, 2024
1 parent c3991a9 commit 0d3a8bb
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 12 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ Temporal::Worker.new(
workflow_thread_pool_size: 10, # how many threads poll for workflows
binary_checksum: nil, # identifies the version of workflow worker code
activity_poll_retry_seconds: 0, # how many seconds to wait after unsuccessful poll for activities
workflow_poll_retry_seconds: 0 # how many seconds to wait after unsuccessful poll for workflows
workflow_poll_retry_seconds: 0, # how many seconds to wait after unsuccessful poll for workflows
activity_max_tasks_per_second: 0 # rate-limit for starting activity tasks (new activities + retries) on the task queue
)
```

Expand Down
10 changes: 8 additions & 2 deletions lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class Activity
class Poller
DEFAULT_OPTIONS = {
thread_pool_size: 20,
poll_retry_seconds: 0
poll_retry_seconds: 0,
max_tasks_per_second: 0 # unlimited
}.freeze

def initialize(namespace, task_queue, activity_lookup, config, middleware = [], options = {})
Expand Down Expand Up @@ -91,7 +92,8 @@ def poll_loop
end

def poll_for_task
connection.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)
connection.poll_activity_task_queue(namespace: namespace, task_queue: task_queue,
max_tasks_per_second: max_tasks_per_second)
rescue ::GRPC::Cancelled
# We're shutting down and we've already reported that in the logs
nil
Expand All @@ -115,6 +117,10 @@ def poll_retry_seconds
@options[:poll_retry_seconds]
end

def max_tasks_per_second
@options[:max_tasks_per_second]
end

def thread_pool
@thread_pool ||= ThreadPool.new(
options[:thread_pool_size],
Expand Down
8 changes: 7 additions & 1 deletion lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def respond_workflow_task_failed(namespace:, task_token:, cause:, exception:, bi
client.respond_workflow_task_failed(request)
end

def poll_activity_task_queue(namespace:, task_queue:)
def poll_activity_task_queue(namespace:, task_queue:, max_tasks_per_second: 0)
request = Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueRequest.new(
identity: identity,
namespace: namespace,
Expand All @@ -265,6 +265,12 @@ def poll_activity_task_queue(namespace:, task_queue:)
)
)

if max_tasks_per_second > 0
request.task_queue_metadata = Temporalio::Api::TaskQueue::V1::TaskQueueMetadata.new(
max_tasks_per_second: Google::Protobuf::DoubleValue.new(value: max_tasks_per_second)
)
end

poll_mutex.synchronize do
return unless can_poll?

Expand Down
19 changes: 16 additions & 3 deletions lib/temporal/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Temporal
class Worker
# activity_thread_pool_size: number of threads that the poller can use to run activities.
# can be set to 1 if you want no paralellism in your activities, at the cost of throughput.

#
# binary_checksum: The binary checksum identifies the version of workflow worker code. It is set on each completed or failed workflow
# task. It is present in API responses that return workflow execution info, and is shown in temporal-web and tctl.
# It is traditionally a checksum of the application binary. However, Temporal server treats this as an opaque
Expand All @@ -21,13 +21,25 @@ class Worker
# from workers with these bad versions.
#
# See https://docs.temporal.io/docs/tctl/how-to-use-tctl/#recovery-from-bad-deployment----auto-reset-workflow
#
# activity_max_tasks_per_second: Optional: Sets the rate limiting on number of activities that can be executed per second
#
# This limits new activities being started and activity attempts being scheduled. It does NOT
# limit the number of concurrent activities being executed on this task queue.
#
# This is managed by the server and controls activities per second for the entire task queue
# across all the workers. Notice that the number is represented in double, so that you can set
# it to less than 1 if needed. For example, set the number to 0.1 means you want your activity
# to be executed once every 10 seconds. This can be used to protect down stream services from
# flooding. The zero value of this uses the default value. Default is unlimited.
def initialize(
config = Temporal.configuration,
activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size],
workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size],
binary_checksum: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:binary_checksum],
activity_poll_retry_seconds: Temporal::Activity::Poller::DEFAULT_OPTIONS[:poll_retry_seconds],
workflow_poll_retry_seconds: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:poll_retry_seconds]
workflow_poll_retry_seconds: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:poll_retry_seconds],
activity_max_tasks_per_second: Temporal::Activity::Poller::DEFAULT_OPTIONS[:max_tasks_per_second]
)
@config = config
@workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
Expand All @@ -39,7 +51,8 @@ def initialize(
@shutting_down = false
@activity_poller_options = {
thread_pool_size: activity_thread_pool_size,
poll_retry_seconds: activity_poll_retry_seconds
poll_retry_seconds: activity_poll_retry_seconds,
max_tasks_per_second: activity_max_tasks_per_second
}
@workflow_poller_options = {
thread_pool_size: workflow_thread_pool_size,
Expand Down
30 changes: 30 additions & 0 deletions spec/unit/lib/temporal/activity/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,36 @@ def call(_); end
end
end

context 'when max_tasks_per_second is set' do
subject do
described_class.new(
namespace,
task_queue,
lookup,
config,
middleware,
{
max_tasks_per_second: 32
}
)
end

it 'sends PollActivityTaskQueue requests with the configured task rate-limit' do
times = poll(nil, times: 2)
expect(times).to be >= 2

expect(connection).to have_received(:poll_activity_task_queue)
.with(
namespace: namespace,
task_queue: task_queue,
max_tasks_per_second: 32
)
.at_least(2)
.times
end
end


context 'when connection is unable to poll and poll_retry_seconds is set' do
subject do
described_class.new(
Expand Down
43 changes: 43 additions & 0 deletions spec/unit/lib/temporal/grpc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,49 @@ class TestDeserializer
end
end

describe '#poll_activity_task_queue' do
let(:task_queue) { 'test-task-queue' }
let(:temporal_response) do
Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueResponse.new
end
let(:poll_request) do
instance_double(
"GRPC::ActiveCall::Operation",
execute: temporal_response
)
end

before do
allow(grpc_stub).to receive(:poll_activity_task_queue).with(anything, return_op: true).and_return(poll_request)
end

it 'makes an API request' do
subject.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)

expect(grpc_stub).to have_received(:poll_activity_task_queue) do |request|
expect(request).to be_an_instance_of(Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueRequest)
expect(request.namespace).to eq(namespace)
expect(request.task_queue.name).to eq(task_queue)
expect(request.identity).to eq(identity)
expect(request.task_queue_metadata).to be_nil
end
end

it 'makes an API request with max_tasks_per_second in the metadata' do
subject.poll_activity_task_queue(namespace: namespace, task_queue: task_queue, max_tasks_per_second: 10)

expect(grpc_stub).to have_received(:poll_activity_task_queue) do |request|
expect(request).to be_an_instance_of(Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueRequest)
expect(request.namespace).to eq(namespace)
expect(request.task_queue.name).to eq(task_queue)
expect(request.identity).to eq(identity)
expect(request.task_queue_metadata).to_not be_nil
expect(request.task_queue_metadata.max_tasks_per_second).to_not be_nil
expect(request.task_queue_metadata.max_tasks_per_second.value).to eq(10)
end
end
end

describe '#add_custom_search_attributes' do
it 'calls GRPC service with supplied arguments' do
allow(grpc_operator_stub).to receive(:add_search_attributes)
Expand Down
35 changes: 30 additions & 5 deletions spec/unit/lib/temporal/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ def start_and_stop(worker)
config,
[],
thread_pool_size: 20,
poll_retry_seconds: 0
poll_retry_seconds: 0,
max_tasks_per_second: 0
)
.and_return(activity_poller_1)

Expand All @@ -306,7 +307,8 @@ def start_and_stop(worker)
config,
[],
thread_pool_size: 20,
poll_retry_seconds: 0
poll_retry_seconds: 0,
max_tasks_per_second: 0
)
.and_return(activity_poller_2)

Expand All @@ -333,7 +335,7 @@ def start_and_stop(worker)
an_instance_of(Temporal::ExecutableLookup),
an_instance_of(Temporal::Configuration),
[],
{thread_pool_size: 10, poll_retry_seconds: 0}
{thread_pool_size: 10, poll_retry_seconds: 0, max_tasks_per_second: 0}
)
.and_return(activity_poller)

Expand Down Expand Up @@ -406,7 +408,7 @@ def start_and_stop(worker)
an_instance_of(Temporal::ExecutableLookup),
an_instance_of(Temporal::Configuration),
[],
{thread_pool_size: 20, poll_retry_seconds: 10}
{thread_pool_size: 20, poll_retry_seconds: 10, max_tasks_per_second: 0}
)
.and_return(activity_poller)

Expand Down Expand Up @@ -441,6 +443,28 @@ def start_and_stop(worker)
expect(workflow_poller).to have_received(:start)
end

it 'can have an activity poller that registers a task rate limit' do
activity_poller = instance_double(Temporal::Activity::Poller, start: nil, stop_polling: nil, cancel_pending_requests: nil, wait: nil)
expect(Temporal::Activity::Poller)
.to receive(:new)
.with(
'default-namespace',
'default-task-queue',
an_instance_of(Temporal::ExecutableLookup),
an_instance_of(Temporal::Configuration),
[],
{thread_pool_size: 20, poll_retry_seconds: 0, max_tasks_per_second: 5}
)
.and_return(activity_poller)

worker = Temporal::Worker.new(activity_max_tasks_per_second: 5)
worker.register_activity(TestWorkerActivity)

start_and_stop(worker)

expect(activity_poller).to have_received(:start)
end

context 'when middleware is configured' do
let(:entry_1) { instance_double(Temporal::Middleware::Entry) }
let(:entry_2) { instance_double(Temporal::Middleware::Entry) }
Expand Down Expand Up @@ -492,7 +516,8 @@ def start_and_stop(worker)
config,
[entry_2],
thread_pool_size: 20,
poll_retry_seconds: 0
poll_retry_seconds: 0,
max_tasks_per_second: 0
)
.and_return(activity_poller_1)

Expand Down

0 comments on commit 0d3a8bb

Please sign in to comment.