diff --git a/README.md b/README.md index 13e6ef56..722a5dbe 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ Temporal.configure do |config| config.port = 7233 config.namespace = 'ruby-samples' config.task_queue = 'hello-world' + config.credentials = :this_channel_is_insecure end begin @@ -114,6 +115,24 @@ curl -O https://raw.githubusercontent.com/temporalio/docker-compose/main/docker- docker-compose up ``` +### Connecting via SSL + +In many production deployments you will end up connecting to your Temporal Services via SSL. In which +case you must read the public cert of the CA that issued your Temporal server's SSL cert and create +an instance of [gRPC Channel Credentials](https://grpc.io/docs/guides/auth/#with-server-authentication-ssltls-1). + +Configure your Temporal connection: + +```ruby +Temporal.configure do |config| + config.host = 'temporal-prod.mycompany.com' + config.port = 7233 + config.namespace = 'ruby-samples' + config.task_queue = 'hello-world' + config.credentials = GRPC::Core::ChannelCredentials.new(root_cert, client_key, client_chain) +end +``` + ## Workflows A workflow is defined using pure Ruby code, however it should contain only a high-level diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index 5e622743..2feda9fb 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -204,7 +204,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam timeout: timeout || max_timeout, ) rescue GRPC::DeadlineExceeded => e - message = if timeout + message = if timeout "Timed out after your specified limit of timeout: #{timeout} seconds" else "Timed out after #{max_timeout} seconds, which is the maximum supported amount." diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 8d36615c..dc856732 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -7,12 +7,12 @@ module Temporal class Configuration - Connection = Struct.new(:type, :host, :port, keyword_init: true) + Connection = Struct.new(:type, :host, :port, :credentials, keyword_init: true) Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true) attr_reader :timeouts, :error_handlers attr_writer :converter - attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers + attr_accessor :connection_type, :host, :port, :credentials, :logger, :metrics_adapter, :namespace, :task_queue, :headers # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -45,6 +45,7 @@ class Configuration def initialize @connection_type = :grpc + @credentials = :this_channel_is_insecure @logger = Temporal::Logger.new(STDOUT, progname: 'temporal_client') @metrics_adapter = MetricsAdapters::Null.new @timeouts = DEFAULT_TIMEOUTS @@ -79,7 +80,8 @@ def for_connection Connection.new( type: connection_type, host: host, - port: port + port: port, + credentials: credentials, ).freeze end diff --git a/lib/temporal/connection.rb b/lib/temporal/connection.rb index b499ca73..791534bf 100644 --- a/lib/temporal/connection.rb +++ b/lib/temporal/connection.rb @@ -10,12 +10,13 @@ def self.generate(configuration) connection_class = CLIENT_TYPES_MAP[configuration.type] host = configuration.host port = configuration.port + credentials = configuration.credentials hostname = `hostname` thread_id = Thread.current.object_id identity = "#{thread_id}@#{hostname}" - connection_class.new(host, port, identity) + connection_class.new(host, port, identity, credentials) end end end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index af5ef156..bcec9f6f 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -30,9 +30,10 @@ class GRPC max_page_size: 100 }.freeze - def initialize(host, port, identity, options = {}) + def initialize(host, port, identity, credentials, options = {}) @url = "#{host}:#{port}" @identity = identity + @credentials = credentials @poll = true @poll_mutex = Mutex.new @poll_request = nil @@ -141,7 +142,7 @@ def get_workflow_execution_history( event_type: :all, timeout: nil ) - if wait_for_new_event + if wait_for_new_event if timeout.nil? # This is an internal error. Wrappers should enforce this. raise "You must specify a timeout when wait_for_new_event = true." @@ -495,12 +496,12 @@ def cancel_polling_request private - attr_reader :url, :identity, :options, :poll_mutex, :poll_request + attr_reader :url, :identity, :credentials, :options, :poll_mutex, :poll_request def client @client ||= Temporal::Api::WorkflowService::V1::WorkflowService::Stub.new( url, - :this_channel_is_insecure, + credentials, timeout: 60 ) end diff --git a/spec/unit/lib/temporal/connection_spec.rb b/spec/unit/lib/temporal/connection_spec.rb new file mode 100644 index 00000000..376a3f81 --- /dev/null +++ b/spec/unit/lib/temporal/connection_spec.rb @@ -0,0 +1,42 @@ +describe Temporal::Connection do + subject { described_class.generate(config.for_connection) } + + let(:connection_type) { :unknown } + let(:credentials) { nil } + let(:config) do + config = Temporal::Configuration.new + config.connection_type = connection_type + config.credentials = credentials if credentials + config + end + + context 'grpc' do + let(:connection_type) { :grpc } + + it 'generates a grpc connection' do + expect(subject).to be_kind_of(Temporal::Connection::GRPC) + expect(subject.send(:identity)).not_to be_nil + expect(subject.send(:credentials)).to eq(:this_channel_is_insecure) + end + + context 'insecure' do + let(:credentials) { :this_channel_is_insecure } + + it 'generates a grpc connection' do + expect(subject).to be_kind_of(Temporal::Connection::GRPC) + expect(subject.send(:identity)).not_to be_nil + expect(subject.send(:credentials)).to eq(:this_channel_is_insecure) + end + end + + context 'ssl' do + let(:credentials) { GRPC::Core::ChannelCredentials.new } + + it 'generates a grpc connection' do + expect(subject).to be_kind_of(Temporal::Connection::GRPC) + expect(subject.send(:identity)).not_to be_nil + expect(subject.send(:credentials)).to be_kind_of(GRPC::Core::ChannelCredentials) + end + end + end +end diff --git a/spec/unit/lib/temporal/grpc_client_spec.rb b/spec/unit/lib/temporal/grpc_client_spec.rb index 70a7885d..b1d8ad59 100644 --- a/spec/unit/lib/temporal/grpc_client_spec.rb +++ b/spec/unit/lib/temporal/grpc_client_spec.rb @@ -1,14 +1,14 @@ describe Temporal::Connection::GRPC do - subject { Temporal::Connection::GRPC.new(nil, nil, nil) } + subject { Temporal::Connection::GRPC.new(nil, nil, nil, :this_channel_is_insecure) } let(:grpc_stub) { double('grpc stub') } let(:namespace) { 'test-namespace' } let(:workflow_id) { SecureRandom.uuid } let(:run_id) { SecureRandom.uuid } - let(:now) { Time.now} + let(:now) { Time.now.utc } before do allow(subject).to receive(:client).and_return(grpc_stub) - + allow(Time).to receive(:now).and_return(now) end @@ -35,7 +35,7 @@ end end end - + describe '#signal_with_start_workflow' do let(:temporal_response) do Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx') @@ -148,7 +148,7 @@ end end - it 'demands a timeout to be specified' do + it 'demands a timeout to be specified' do expect do subject.get_workflow_execution_history( namespace: namespace, @@ -161,7 +161,7 @@ end end - it 'disallows a timeout larger than the server timeout' do + it 'disallows a timeout larger than the server timeout' do expect do subject.get_workflow_execution_history( namespace: namespace,