Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Force Lease Expiration When Leader Exits #2379

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions kubernetes/base/leaderelection/electionconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import logging
logging.basicConfig(level=logging.INFO)


class Config:

# Validate config, exit if an error is detected
def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading):
def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading, context):
self.jitter_factor = 1.2

if lock is None:
Expand Down Expand Up @@ -53,6 +53,7 @@ def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted
self.onstopped_leading = self.on_stoppedleading_callback
else:
self.onstopped_leading = onstopped_leading
self.context = context

# Default callback for when the current candidate if a leader, stops leading
def on_stoppedleading_callback(self):
Expand Down
11 changes: 6 additions & 5 deletions kubernetes/base/leaderelection/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@

import uuid
from kubernetes import client, config
from kubernetes.leaderelection import leaderelection
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
from kubernetes.leaderelection import electionconfig

import leaderelection
from resourcelock.configmaplock import ConfigMapLock
import electionconfig

# Authenticate using config file
config.load_kube_config(config_file=r"")
Expand All @@ -33,6 +32,8 @@
# Kubernetes namespace
lock_namespace = "default"

context = leaderelection.Context()


# The function that a user wants to run once a candidate is elected as a leader
def example_func():
Expand All @@ -45,7 +46,7 @@ def example_func():
# Create config
config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17,
renew_deadline=15, retry_period=5, onstarted_leading=example_func,
onstopped_leading=None)
onstopped_leading=None, context=context)

# Enter leader election
leaderelection.LeaderElection(config).run()
Expand Down
70 changes: 68 additions & 2 deletions kubernetes/base/leaderelection/leaderelection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import time
import json
import threading
from .leaderelectionrecord import LeaderElectionRecord
from leaderelectionrecord import LeaderElectionRecord
import logging
import signal
# if condition to be removed when support for python2 will be removed
if sys.version_info > (3, 0):
from http import HTTPStatus
Expand All @@ -36,8 +37,21 @@
lease.
"""

class Context:
def __init__(self):
self.cancelled = False

def cancel(self):
self.cancelled = True

# This currently only handles Ctrl+C on a leader, which is not the only way a leader may exit
def handle_sigint(signal_received, frame):
print("\nSIGINT received! Cancelling election...")
if LeaderElection.global_context:
LeaderElection.global_context.cancel()

class LeaderElection:
global_context = None
def __init__(self, election_config):
if election_config is None:
sys.exit("argument config not passed")
Expand All @@ -51,13 +65,18 @@ def __init__(self, election_config):
# Latest update time of the lock
self.observed_time_milliseconds = 0

LeaderElection.global_context = self.election_config.context

# Attach signal handler to Ctrl+C (SIGINT)
signal.signal(signal.SIGINT, handle_sigint)

# Point of entry to Leader election
def run(self):
# Try to create/ acquire a lock
if self.acquire():
logging.info("{} successfully acquired lease".format(self.election_config.lock.identity))

# Start leading and call OnStartedLeading()
# Start the leader callback in a new daemon thread.
threading.daemon = True
threading.Thread(target=self.election_config.onstarted_leading).start()

Expand All @@ -72,13 +91,15 @@ def acquire(self):
retry_period = self.election_config.retry_period

while True:

succeeded = self.try_acquire_or_renew()

if succeeded:
return True

time.sleep(retry_period)


def renew_loop(self):
# Leader
logging.info("Leader has entered renew loop and will try to update lease continuously")
Expand All @@ -87,10 +108,20 @@ def renew_loop(self):
renew_deadline = self.election_config.renew_deadline * 1000

while True:
# Check for context cancellation
if self.election_config.context.cancelled:
self.force_expire_lease()
return

timeout = int(time.time() * 1000) + renew_deadline
succeeded = False

while int(time.time() * 1000) < timeout:
if self.election_config.context.cancelled:
logging.info(f"Context cancelled during renew loop. Reason: {self.election_config.context.cancel_reason}")
self.force_expire_lease()
return

succeeded = self.try_acquire_or_renew()

if succeeded:
Expand All @@ -104,6 +135,41 @@ def renew_loop(self):
# failed to renew, return
return

def force_expire_lease(self, max_retries=3):
"""
Force the lease to be considered expired by updating the leader election record's renewTime
to a value in the past. Retries the update if a conflict (HTTP 409) is encountered.
"""
expired_time = time.time() - self.election_config.lease_duration - 1 # Expired timestamp
retries = 0
while retries < max_retries:
# Re-read the current state of the lock to get the latest version.
lock_status, current_record = self.election_config.lock.get(
self.election_config.lock.name,
self.election_config.lock.namespace
)
# Create a new record using the current record's acquireTime if available.
new_record = LeaderElectionRecord(
self.election_config.lock.identity,
str(self.election_config.lease_duration),
None,
str(expired_time)
)
update_status = self.election_config.lock.update(
self.election_config.lock.name,
self.election_config.lock.namespace,
new_record
)
if update_status:
logging.info("Lease forcibly expired.")
return True
else:
logging.info(f"Conflict encountered, retrying update... (attempt {retries+1})")
retries += 1
time.sleep(0.5) # wait a bit before retrying, this is very hacky
logging.info("Failed to force lease expiration after retries.")
return False

def try_acquire_or_renew(self):
now_timestamp = time.time()
now = datetime.datetime.fromtimestamp(now_timestamp)
Expand Down
5 changes: 3 additions & 2 deletions kubernetes/base/leaderelection/resourcelock/configmaplock.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
sys.path.append("..")
from kubernetes.client.rest import ApiException
from kubernetes import client, config
from kubernetes.client.api_client import ApiClient
from ..leaderelectionrecord import LeaderElectionRecord
from leaderelectionrecord import LeaderElectionRecord
import json
import logging
logging.basicConfig(level=logging.INFO)
Expand Down