Random notes about (Lyft) Envoy
- Envoy is threaded + evented
- Threading model
- There are N number of threads (usually equal to number of CPUs, but is configurable).
- Each thread has its own event loop (using libevent)
- How threads communicate with each other: TLS etc
- How the work is divided among threads? A connection stays with the thread?
- Notes
- Uses epoll with “level triggered” mode among multiple threads for
listen socket! (probably inherited from libevent?)
- All threads are woken up on an incoming connection, except for 1, they will just go back to sleep without doing anything meaningful (“thundering herd” problem)
- A connection stays with the thread for long time
- Uses epoll with “level triggered” mode among multiple threads for
listen socket! (probably inherited from libevent?)
- main
- Envoy::main_common()
- HotRestartImpl::HotRestartImpl()
- Event::Libevent::Global::initialize()
- Network::Utility::getLocalAddress(options.localAddressIpVersion())
- ares_library_init()
- Logger::Registry::initialize()
- Stats::ThreadLocalStoreImpl stats_store
- Server::InstanceImpl server
- Create dispatcher_, dns_resolver_ etc
- restarter_.initialize()
- Create a dispatcher event for domain socket event
- OnSocketEvent()
- TODO
- drain_manager_ = component_factory.createDrainManager()
- initialize()
- MessageUtil::loadFromFile(configPath)
- restarter_.shutdownParentAdmin(info)
- admin_.reset(new AdminImpl())
- New TCP listen socket for admin interface
- Register various admin paths (“/certs”, “/clusters” etc)
- Create admin access log file
- stats_store_.createScope(“listener.admin”)
- handler_->addListener(admin_, admin_->mutable_socket(),…)
- evconnlistener_new(…)
- listener_manager_.reset(new ListenerManagerImpl())
- Create workers equal to server.options().concurrency()
- Stored in workers_
- NOTE: Threads are not created yet, just workers objects
- thread_local_.registerThread()
- stats_store_.initializeThreading()
- component_factory.createRuntime()
- ssl_context_manager_.reset(new Ssl::ContextManagerImpl())
- cluster_manager_factory_.reset(new Upstream::ProdClusterManagerFactory(…))
- main_config = new Configuration::MainImpl()
- main_config->initialize()
- cluster_manager_factory.clusterManagerFromProto()
- new ClusterManagerImpl()
- For each cluster
- loadcluster()
- factory_.clusterFromProto()
- ClusterImplBase::create()
- switch (cluster.type())
- case envoy::api::v2::Cluster::STATIC
- case envoy::api::v2::Cluster::STRICT_DNS
- case envoy::api::v2::Cluster::LOGICAL_DNS
- new_cluster = new LogicalDnsCluster()
- LogicalDnsCluster::startResolve()
- Resolving is started here, and when it completes sometime later in the furture, we will get notified from the event loop (event_base_loop).
- dns_resolver_->resolve(dns_address, dns_lookup_family, [this]() { … See the base_event_loop below …})
- case envoy::api::v2::Cluster::ORIGINAL_DST
- case envoy::api::v2::Cluster::EDS
- loadcluster()
- For each cluster
- new ClusterManagerImpl()
- For each listener
- server.listenerManager().addOrUpdateListener()
- Addfactory_.createListenSocket()
- server.listenerManager().addOrUpdateListener()
- Guarddog creation (thread)
- cluster_manager_factory.clusterManagerFromProto()
- server.run()
- RunHelper(…, [this]() void { startWorkers(); })
- watchdog->startWatchdog()
- dispatcher_->run(Event::Dispatcher::RunType::Block)
- runPostCallbacks()
- event_base_loop() (from libevent)
- This runs forever, reacting to events
- On DNS resolved
- dns_resolver_->resolve(dns_address, dns_lookup_family, [this]() {… below flow …})
- if new_address != current_resolved_address_
- thread tls_->runOnAllThreads( [this, new_address]() { tls_->getTyped<PerThreadCurrentHostData>().current_resolved_address_ = new_address} )
- if initialize_callback_ (== lambda from ClusterManagerInitHelper::addCluster)
- initialize_callback_()
- removeCluster()
- maybeFinishInitialize()
- if initialize_callback_ (== lambda from RunHelper::RunHelper)
- initialize_callback_()
- init_manager.initialize([this, workers_start_cb]() { workers_start_cb() })
- workers_start_cb (== InstanceImpl::run())
- startWorkers()
- listen_manager_->startWorkers()
- for worker in workers_
- for each listener in active_listeners_
- addListenerToWorker(worker, listener)
- worker->start()
- thread_.reset(new Thread::Thread([this]() { threadRoutine(guard_dog) }))
- threadRoutine (in new thread)
- watchdog->startWatchdog(*dispatcher_)
- dispatcher_->run(Event::Dispatcher::RunType::Block)
- runPostCallbacks()
- event_base_loop()
- initialize_callback_()
- if new_address != current_resolved_address_
- dns_resolver_->resolve(dns_address, dns_lookup_family, [this]() {… below flow …})
- Envoy::main_common()