Skip to main content

Workflow message passing - Ruby SDK

A Workflow can act like a stateful service that receives messages: Queries, Signals, and Updates. These messages interact with the Workflow via handler methods defined in the Workflow code. Clients use messages to read Workflow state or change its behavior.

See Workflow message passing for a general overview.

Write message handlers

info

The code that follows is part of a working solution.

Follow these guidelines when writing your message handlers:

  • Message handlers are defined as methods on the Workflow class, decorated by calling one of three class methods before defining the handler method: workflow_query, workflow_signal, and workflow_update.
  • These also implicitly create class-methods with the same name as the instance methods for use by callers.
  • The parameters and return values of handlers and the main Workflow function must be serializable.
  • Prefer single hash/object input parameter to multiple input parameters. Hash/object parameters allow you to add fields without changing the calling signature.

Query handlers

A Query is a synchronous operation that retrieves state from a Workflow Execution. Define as a method:

class GreetingWorkflow < Temporalio::Workflow::Definition
# ...

workflow_query
def languages(input)
# A query handler returns a value: it can inspect but must not mutate the Workflow state.
if input['include_unsupported']
CallGreetingService.greetings.keys.sort
else
@greetings.keys.sort
end
end

# ...
end

Or as an attribute reader:

class GreetingWorkflow < Temporalio::Workflow::Definition
# This is the equivalent of:
# workflow_query
# def language
# @language
# end
workflow_query_attr_reader :language

# ...
end
  • The workflow_query class method can accept arguments. See the API reference docs: workflow_query.
  • A Query handler must not modify Workflow state.
  • You can't perform async blocking operations such as executing an Activity in a Query handler.

Signal handlers

A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:

class GreetingWorkflow < Temporalio::Workflow::Definition
# ...

workflow_signal
def approve(input)
# A signal handler mutates the workflow state but cannot return a value.
@approved_for_release = true
@approver_name = input['name']
end

# ...
end
  • The workflow_signal class method can accept arguments. Refer to the API docs: workflow_signal.

  • The handler should not return a value. The response is sent immediately from the server, without waiting for the Workflow to process the Signal.

  • Signal (and Update) handlers can be asynchronous and blocking. This allows you to use Activities, Child Workflows, durable Timers, wait conditions, and more. See Async handlers and Workflow message passing for guidelines on safely using async Signal and Update handlers.

Update handlers and validators

An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:

class GreetingWorkflow < Temporalio::Workflow::Definition
# ...

workflow_update
def set_language(new_language) # rubocop:disable Naming/AccessorMethodName
# An update handler can mutate the workflow state and return a value.
prev = @language.to_sym
@language = new_language.to_sym
prev
end

workflow_update_validator(:set_language)
def validate_set_language(new_language)
# In an update validator you raise any exception to reject the update.
raise "#{new_language} is not supported" unless @greetings.include?(new_language.to_sym)
end

# ...
end
  • The workflow_update class method can take arguments as described in the API reference docs for workflow_update.

  • About validators:

    • Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you can skip them.
    • Define an Update validator with the workflow_update_validator class method invoked before defining the method. The first parameter when declaring the validator is the name of the Update handler method. The validator must accept the same argument types as the handler and should not return a value.
  • Accepting and rejecting Updates with validators:

    • To reject an Update, raise an exception of any type in the validator.
    • Without a validator, Updates are always accepted.
  • Validators and Event History:

    • The WorkflowExecutionUpdateAccepted event is written into the History whether the acceptance was automatic or programmatic.
    • When a Validator raises an error, the Update is rejected, the Update is not run, and WorkflowExecutionUpdateAccepted won't be added to the Event History. The caller receives an "Update failed" error.
  • Use current_update_info to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once.

  • Update (and Signal) handlers can be asynchronous and blocking. This allows you to use Activities, Child Workflows, durable Timers, wait conditions, and more. See Async handlers and Workflow message passing for guidelines on safely using async Update and Signal handlers.

Send messages

To send Queries, Signals, or Updates you call methods on a WorkflowHandle instance. To obtain the Workflow handle, you can:

For example:

client = Temporalio::Client.connect('localhost:7233', 'default')
handle = client.start_workflow(
MessagePassingSimple::GreetingWorkflow,
id: 'message-passing-simple-sample-workflow-id',
task_queue: 'message-passing-simple-sample'
)

To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.

Using Continue-as-New and Updates
  • Temporal does not support Continue-as-New functionality within Update handlers.
  • Complete all handlers before using Continue-as-New.
  • Use Continue-as-New from your main Workflow Definition method, just as you would complete or fail a Workflow Execution.

Send a Query

Call a Query method with WorkflowHandle#query:

supported_languages = handle.query(MessagePassingSimple::GreetingWorkflow.languages, { include_unsupported: false })
  • Sending a Query doesn’t add events to a Workflow's Event History.

  • You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not safe and, therefore, not supported.

  • A Worker must be online and polling the Task Queue to process a Query.

Send a Signal

You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that haven’t closed.

From a Client

Use WorkflowHandle#signal from Client code to send a Signal:

handle.signal(MessagePassingSimple::GreetingWorkflow.approve, { name: 'John Q. Approver' })
  • The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.

  • The WorkflowExecutionSignaled Event appears in the Workflow's Event History.

From a Workflow

A Workflow can send a Signal to another Workflow, known as an External Signal. In this case you need to obtain a Workflow handle for the external Workflow. Use Temporalio::Workflow.external_workflow_handle, passing a running Workflow Id, to retrieve a Workflow handle:

class WorkflowB < Temporalio::Workflow::Definition
def execute
handle = Temporalio::Workflow.external_workflow_handle('workflow-a-id')
handle.signal(WorkflowA.some_signal, 'some signal arg')
end
end

When an External Signal is sent:

Signal-With-Start

Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running. If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.

To use Signal-With-Start, call signal_with_start_workflow with a WithStartWorkflowOperation:

client = Temporalio::Client.connect('localhost:7233', 'default')

# Create start-workflow operation for use with signal-with-start
start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new(
MyWorkflow, 'my-workflow-input',
id: 'my-workflow-id', task_queue: 'my-workflow-task-queue'
)
# Perform signal-with-start
handle = client.signal_with_start_workflow(
MyWorkflow.my_signal, 'signal-input', start_workflow_operation:
)

Send an Update

An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.

A Client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions.

  • WorkflowExecutionUpdateAccepted is added to the Event History when the Worker confirms that the Update passed validation.
  • WorkflowExecutionUpdateCompleted is added to the Event History when the Worker confirms that the Update has finished.

To send an Update to a Workflow Execution, you can:

  • Call the Update method with execute_update from the Workflow handle and wait for the Update to complete. This code fetches an Update result:

    prev_language = handle.execute_update(MessagePassingSimple::GreetingWorkflow.set_language, :chinese)
    1. Use start_update to receive a handle as soon as the Update is accepted. It returns a WorkflowUpdateHandle
    • Use this WorkflowUpdateHandle later to fetch your results.
    • Asynchronous Update handlers normally perform long-running async Activities.
    • start_update only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.

    For example:

    # Start an update and then wait for it to complete
    update_handle = handle.start_update(
    MessagePassingSimple::GreetingWorkflow.apply_language_with_lookup,
    :arabic,
    wait_for_stage: Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED
    )
    prev_language = update_handle.result

    For more details, see the "Async handlers" section.

Update-With-Start

Stability

In Public Preview in Temporal Cloud.

Minimum Temporal Server version Temporal Server version 1.26

Update-with-Start lets you send an Update that checks whether an already-running Workflow with that ID exists:

  • If the Workflow exists, the Update is processed.
  • If the Workflow does not exist, a new Workflow Execution is started with the given ID, and the Update is processed before the main Workflow method starts to execute.

Use execute_update_with_start_workflow to start the Update and wait for the result in one go.

Alternatively, use start_update_with_start_workflow to start the Update and receive a WorkflowUpdateHandle, and then use update_handle.result to retrieve the result from the Update.

These calls return once the requested Update wait stage has been reached, or when the request times out.

  • You will need to provide a WithStartWorkflowOperation to define the Workflow that will be started if necessary, and its arguments.
  • You must specify an id_conflict_policy when creating the WithStartWorkflowOperation. Note that a WithStartWorkflowOperation can only be used once.

Here's an example:

client = Temporalio::Client.connect('localhost:7233', 'default')

# Create start-workflow operation for use with update-with-start
start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new(
MyWorkflow, 'my-workflow-input',
id: 'my-workflow-id', task_queue: 'my-workflow-task-queue',
id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::USE_EXISTING
)
# Perform update-with-start and get update result
update_result = client.execute_with_start_workflow(
MyWorkflow.my_update, 'update-input', start_workflow_operation:
)
# The workflow handle is on the start operation, here's an example of waiting on
# workflow result
workflow_result = start_workflow_operation.workflow_handle.result

Message handler patterns

This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.

Add async handlers

Signal and Update handlers can be asynchronous as well as blocking. Using asynchronous calls allows you to wait for Activities, Child Workflows, Durable Timers, wait conditions, etc. This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await calls.

It's essential to understand the things that could go wrong in order to use asynchronous handlers safely. See Workflow message passing for guidance on safe usage of async Signal and Update handlers, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.

The following code is an Activity that simulates a network call to a remote service:

class CallGreetingService < Temporalio::Activity::Definition
def execute(to_language)
# Simulate a network call
sleep(0.2)
# This intentionally returns nil on not found
CallGreetingService.greetings[to_language.to_sym]
end

def self.greetings
@greetings ||= {
arabic: 'مرحبا بالعالم',
chinese: '你好,世界',
english: 'Hello, world',
french: 'Bonjour, monde',
hindi: 'नमस्ते दुनिया',
portuguese: 'Olá mundo',
spanish: 'Hola mundo'
}
end
end

The following code is a Workflow Update for asynchronous use of the preceding Activity:

class GreetingWorkflow < Temporalio::Workflow::Definition
# ...

workflow_update
def apply_language_with_lookup(new_language)
# Call an activity if it's not there.
unless @greetings.include?(new_language.to_sym)
# We use a mutex so that, if this handler is executed multiple times, each execution
# can schedule the activity only when the previously scheduled activity has
# completed. This ensures that multiple calls to apply_language_with_lookup are
# processed in order.
@apply_language_mutex ||= Mutex.new
@apply_language_mutex.synchronize do
greeting = Temporalio::Workflow.execute_activity(
CallGreetingService, new_language, start_to_close_timeout: 10
)
# The requested language might not be supported by the remote service. If so, we
# raise ApplicationError, which will fail the update. The
# WorkflowExecutionUpdateAccepted event will still be added to history. (Update
# validators can be used to reject updates before any event is written to history,
# but they cannot be async, and so we cannot use an update validator for this
# purpose.)
raise Temporalio::Error::ApplicationError, "Greeting service does not support #{new_language}" unless greeting

@greetings[new_language.to_sym] = greeting
end
end
set_language(new_language)
end
end

After updating the code for asynchronous calls, your Update handler can schedule an Activity and await the result. Although an async Signal handler can initiate similar network tasks, using an Update handler allows the Client to receive a result or error once the Activity completes. This lets your Client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc.

Use wait conditions

Sometimes, async Signal or Update handlers need to meet certain conditions before they should continue. Using a wait condition with wait_condition sets a function that prevents the code from proceeding until the condition is truthy. This is an important feature that helps you control your handler logic.

Here are two important use cases for wait_condition:

  • Waiting in a handler until it is appropriate to continue.
  • Waiting in the main Workflow until all active handlers have finished.

The condition state you're waiting for can be updated by and reflect any part of the Workflow code. This includes the main Workflow method, other handlers, or child coroutines spawned by the main Workflow method, and so forth.

In handlers

Sometimes, async Signal or Update handlers need to meet certain conditions before they should continue. Using a wait condition with wait_condition sets a function that prevents the code from proceeding until the condition is truthy. This is an important feature that helps you control your handler logic.

Consider a ready_for_update_to_execute method that runs before your Update handler executes. The wait_condition call waits until your condition is met:

workflow_update
def my_update(my_update_input)
Temporalio::Workflow.wait_condition { ready_for_update_to_execute(my_update_input) }
# ...
end

Remember: Handlers can execute before the main Workflow method starts.

Before finishing the Workflow

Workflow wait conditions can ensure your handler completes before a Workflow finishes. When your Workflow uses async Signal or Update handlers, your main Workflow method can return or continue-as-new while a handler is still waiting on an async task, such as an Activity result. The Workflow completing may interrupt the handler before it finishes crucial work and cause Client errors when trying retrieve Update results. Use Temporalio::Workflow.all_handlers_finished? to address this problem and allow your Workflow to end smoothly:

class MyWorkflow < Temporalio::Workflow::Definition
def execute
# ...

Temporalio::Workflow.wait_condition { Temporalio::Workflow.all_handlers_finished? }
'workflow-result'
end
end

By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions. You can silence these warnings on a per-handler basis by passing the unfinished_policy argument to the workflow_signal / workflow_update class methods:

workflow_update unfinished_policy: Temporalio::Workflow::HandlerUnfinishedPolicy::ABANDON
def my_update
# ...

See Finishing handlers before the Workflow completes for more information.

Use workflow_init to access input early

The workflow_init class method above initialize gives it access to Workflow input. When you use the workflow_init on your constructor, you give the constructor the same Workflow parameters as your execute method. The SDK will then ensure that your constructor receives the Workflow input arguments that the Client sent. The Workflow input arguments are also passed to your execute method -- that always happens, whether or not you use the workflow_init class method above initialize.

Here's an example. The constructor and execute must have the same parameters with the same types:

class WorkflowInitWorkflow < Temporalio::Workflow::Definition
workflow_init
def initialize(input)
@name_with_title = "Sir #{input['name']}"
end

def execute(input)
Temporalio::Workflow.wait_condition { @title_has_been_checked }
"Hello, #{@name_with_title}"
end

workflow_update
def check_title_validity
# The handler is now guaranteed to see some workflow input since it was
# processed by the constructor
valid = Temporalio::Workflow.execute_activity(
CheckTitleValidityActivity,
@name_with_title,
start_to_close_timeout: 100
)
@title_has_been_checked = true
valid
end
end

Use locks to prevent concurrent handler execution

Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:

class MyWorkflow < Temporalio::Workflow::Definition
# ...

workflow_signal
def bad_handler
data = Temporalio::Workflow.execute_activity(
FetchDataActivity,
start_to_close_timeout: 100
)
@x = data['x']
# 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
# there may be times when the Workflow has @x from one Activity execution and @y
# from another.
Temporalio::Workflow.sleep(1)
@y = data['y']
end
end

Coordinating access with Mutex, a mutual exclusion lock, corrects this code. Locking makes sure that only one handler instance can execute a specific section of code at any given time:

class MyWorkflow < Temporalio::Workflow::Definition
# ...

workflow_signal
def safe_handler
@mutex ||= Mutex.new
@mutex.synchronize do
data = Temporalio::Workflow.execute_activity(
FetchDataActivity,
start_to_close_timeout: 100
)
@x = data['x']
# 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
# there may be times when the Workflow has @x from one Activity execution and @y
# from another.
Temporalio::Workflow.sleep(1)
@y = data['y']
end
end
end

For additional concurrency options, wait_condition can be used to do more advanced things such as using an integer attribute + wait_condition as a semaphore.

Troubleshooting

When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:

See Exceptions in message handlers for a non–Ruby-specific discussion of this topic.

Signal issues

When using Signal, the only exception that will result from your requests during its execution is RPCError. All handlers may experience additional exceptions during the initial (pre-Worker) part of a handler request lifecycle.

For Queries and Updates, the Client waits for a response from the Worker. If an issue occurs during the handler Execution by the Worker, the Client may receive an exception.

Update issues

When working with Updates, you may encounter these errors:

  • No Workflow Workers are polling the Task Queue: Your request will be retried by the SDK Client indefinitely. Use a Cancellation in your RPC options to cancel the Update. This raises a WorkflowUpdateRPCTimeoutOrCanceledError exception .

  • Update failed: You'll receive a WorkflowUpdateFailedError exception. There are two ways this can happen:

    • The Update was rejected by an Update validator defined in the Workflow alongside the Update handler.

    • The Update failed after having been accepted.

    Update failures are like Workflow failures. Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. These might include:

    • A failed Child Workflow
    • A failed Activity (if the Activity retries have been set to a finite number)
    • The Workflow author raising ApplicationError
    • Any error listed in workflow_failure_exception_types on the Worker or workflow_failure_exception_type on the Workflow (empty by default)
  • The handler caused the Workflow Task to fail: A Workflow Task Failure causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:

    • If the request hasn't been accepted by the server, you receive a FAILED_PRECONDITION Temporalio::Error::RPCError exception.
    • If the request has been accepted, it is durable. Once the Workflow is healthy again after a code deploy, use an WorkflowUpdateHandle to fetch the Update result.
  • The Workflow finished while the Update handler execution was in progress: You'll receive a Temporalio::Error::RPCError "workflow execution already completed".

    This will happen if the Workflow finished while the Update handler execution was in progress, for example because

Query issues

When working with Queries, you may encounter these errors:

  • There is no Workflow Worker polling the Task Queue: You'll receive a Temporalio::Error::RPCError exception whose code is a FAILED_PRECONDITION constant defined in Code.

  • Query failed: You'll receive a WorkflowQueryFailedError exception if something goes wrong during a Query. Any exception in a Query handler will trigger this error. This differs from Signal and Update requests, where exceptions can lead to Workflow Task Failure instead.

  • The handler caused the Workflow Task to fail. This would happen, for example, if the Query handler blocks the thread for too long without yielding.

Dynamic handlers

Temporal supports Dynamic Queries, Signals, Updates, Workflows, and Activities. These are unnamed handlers that are invoked if no other statically defined handler with the given name exists.

Dynamic Handlers provide flexibility to handle cases where the names of Queries, Signals, Updates, Workflows, or Activities, aren't known at run time.

caution

Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. Overusing them can lead to maintainability and debugging issues down the line.

Instead, Signals, Queries, Workflows, or Activities should be defined statically whenever possible, with clear names that indicate their purpose. Use static definitions as the primary way of structuring your Workflows.

Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic.

Dynamic Query

A Dynamic Query in Temporal is a Query method that is invoked dynamically at runtime if no other Query with the same name is registered. A Query can be made dynamic by setting dynamic to true on the workflow_query class method. Only one Dynamic Query can be present on a Workflow.

The Query Handler parameters must accept a string name as the first parameter. Often users set raw_args to true and set the second parameter as *args which will be an array of Temporalio::Converters::RawValue. The Temporalio::Workflow.payload_converter property is used to convert the raw value instances to proper types.

workflow_query dynamic: true, raw_args: true
def dynamic_query(query_name, *args)
first_param = Temporalio::Workflow.payload_converter.from_payload(
args.first || raise 'Missing first parameter'
)
"Got parameter #{first_param} for query #{query_name}"
end

Dynamic Signal

A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered. A Signal can be made dynamic by setting dynamic to true on the workflow_signal class method. Only one Dynamic Signal can be present on a Workflow.

The Signal Handler parameters must accept a string name as the first parameter. Often users set raw_args to true and set the second parameter as *args which will be an array of Temporalio::Converters::RawValue. The Temporalio::Workflow.payload_converter property is used to convert the raw value instances to proper types.

workflow_signal dynamic: true, raw_args: true
def dynamic_signal(signal_name, *args)
first_param = Temporalio::Workflow.payload_converter.from_payload(
args.first || raise 'Missing first parameter'
)
@pending_things << "Got parameter #{first_param} for signal #{signal_name}"
end

Dynamic Update

A Dynamic Update in Temporal is an Update that is invoked dynamically at runtime if no other Update with the same input is registered. An Update can be made dynamic by setting dynamic to true on the workflow_update class method. Only one Dynamic Update can be present on a Workflow.

The Query Handler parameters must accept a string name as the first parameter. Often users set raw_args to true and set the second parameter as *args which will be an array of Temporalio::Converters::RawValue. The Temporalio::Workflow.payload_converter property is used to convert the raw value instances to proper types.

workflow_update dynamic: true, raw_args: true
def dynamic_update(update_name, *args)
first_param = Temporalio::Workflow.payload_converter.from_payload(
args.first || raise 'Missing first parameter'
)
@pending_things << "Got parameter #{first_param} for update #{update_name}"
end