Kubernetes: (Graceful) Sidekiq Worker Lifecycle
Here is a scripted solution to achieve graceful shutdown of Sidekiq workers via Pre-Stop hooks, part of the Kubernetes pod shutdown lifecycle.
Join the DZone community and get the full member experience.
Join For FreeWith our recent release of Container Stacks v2 into public beta, we're totally loving Kubernetes. But as with all love affairs, there are some bothersome aspects that we have to accept and work with. One such aspect is in the inflexibility of the vanilla shutdown sequence provided by Kubernetes.
We're also prolific users of Sidekiq for the parts of our backend that are Ruby-based (we're running a bunch of other technologies, but we think Sidekiq is hands-down the best for running Ruby jobs). As with any background workers, Sidekiq is sensitive to its shutdown sequence. We need to have more control over this.
Problem
There is a lot of documentation out there around the current Kubernetes pod shutdown sequence (see appendices for some starting points). NOTE: I say current as this information is only valid as of now... this might change (though I think at this point that is fairly unlikely). The current shutdown sequence looks like the following:
- POD marked as *terminating*
- Optional: PreStop hook called synchronously
- SIGTERM sent to container process if still present
- Kubernetes waits upto *grace-period* for container to exit
- SIGKILL sent to container process if still present
Kubernetes allows you to specify the terminationGracePeriodSeconds
(ie. how long it will wait for shutdown after SIGTERM sent) in your spec. Unfortunately, Kubernetes doesn't allow you to specify the shutdown sequence itself.
At Cloud 66, we were previously lucky enough to be controlling the shutdown process via our own homegrown scheduler, this enabled us to expose the shutdown sequence to our users directly (in the form of USR1;1h;TERM;10s;KILL
, for example). But now we need another solution.
Furthermore (and specific to Sidekiq) as we have some very long running jobs (dependent on external resources), we want to have a long wait time; but also want to terminate the workers as soon as they are no longer busy. So our ideal Sidekiq shutdown sequence looks like the following:
- Send USR1 (or TSTP for sidekiq > 5.0.0) to workers
- Wait until they are no longer processing jobs
- Send TERM
Solution: Use a Pre-Stop Hook
Looking at the shutdown sequence above, you'll see that there is a Pre-Stop hook point called during the sequence. More on this can be found in the Kubernetes Container Lifecycle Hooks documentation. The salient bit of information is, essentially, that Kubernetes will execute some command of your choosing at that hook point, and it will execute it synchronously, waiting for the command to complete before resuming the shutdown sequence.
Using this hook point, we can inject the graceful shutdown behavior we want for our Sidekiq workers. And because we need this ourselves (and given that Sidekiq is Ruby-based) I put together the following ruby script to do just that!
#! /usr/bin/env ruby
# encoding: utf-8
## PURPOSE: this script will quiet any sidekiq workers it finds,
## and then shut them down when they are no longer handling jobs
# utility class for logging and running commands
class Utils
require 'open3'
attr_accessor :output
def initialize(output)
@output = output
end
def run_command(command)
log("RUNNING: \"#{command}\"")
stdout, stderr, status = Open3.capture3(command)
return stdout.strip if status.success?
# handle errors
stderr = stderr.strip
if stderr.empty?
log("FATAL:\nCommand: \"#{command}\"")
else
log("FATAL:\nCommand: \"#{command}\"\nError: #{stderr}")
end
exit(-1)
end
def log(message)
line = "[#{Time.now}] #{message}"
@output == 'stdout' ? puts(line) : File.open(@output, 'a') {|file| file.puts(line)}
end
def log_underline
log('-' * 70)
end
end
# class to encapsulate the worker manager
class WorkerManager
attr_accessor :timeout, :utils
STATUS_WAITING_THREADS = :waiting_threads
STATUS_CAN_BE_TERMINATED = :can_be_terminated
STATUS_CAN_BE_QUIETED = :can_be_quieted
POLL_FREQUENCY = 10
def initialize(timeout, utils)
@timeout = timeout
@utils = utils
end
def initiate_shutdown
@utils.log('*******************************')
@utils.log('** STARTED SHUTDOWN SEQUENCE **')
@utils.log('*******************************')
# figure out the timeout time
current_time = Time.now
timeout_time = current_time + @timeout
# fetch latest worker info
workers = materialize_workers
while Time.now <= timeout_time && !workers.empty?
# do what is needed for each worker
workers.each {|worker| worker.handle_shutdown(false)}
# sleep for the poll time
@utils.log("...sleeping for #{POLL_FREQUENCY} seconds...")
sleep(POLL_FREQUENCY)
# fetch latest worker info
workers = materialize_workers
end
if Time.now > timeout_time && !workers.empty?
@utils.log('[[ TIMED-OUT ]]')
# fetch latest worker info
workers = materialize_workers
# do what is needed for each worker
workers.each {|worker| worker.handle_shutdown(true)}
# give process time to respond to the signals
@utils.log("...sleeping for #{POLL_FREQUENCY} seconds...")
sleep(POLL_FREQUENCY)
end
end
private
def materialize_workers
workers = []
stdout = @utils.run_command('ps aux | grep [s]idekiq | grep busy\] || true')
stdout.lines.each do |line|
line = line.strip
if line =~ Worker::WORKER_REGEX
pid = $~[:pid].to_i
version = $~[:version]
active_threads = $~[:worker_count].to_i
total_threads = $~[:total_threads].to_i
is_quiet = line =~ /stopping$/
worker = Worker.new(pid, version, active_threads, total_threads, is_quiet, @utils)
workers << worker
end
end
@utils.log_underline
if workers.empty?
@utils.log('CURRENT STATE: No workers found!')
else
@utils.log('CURRENT STATE:')
workers.each {|worker| @utils.log(worker.status_text)}
end
@utils.log_underline
return workers
end
end
# class to encapsulate workers
class Worker
attr_accessor :utils, :pid, :status, :active_threads, :total_threads, :version
STATUS_WAITING_THREADS = :waiting_threads
STATUS_CAN_BE_TERMINATED = :can_be_terminated
STATUS_CAN_BE_QUIETED = :can_be_quieted
WORKER_REGEX = /^.*?\s+(?<pid>\d+).*sidekiq\s+(?<version>[\d\.]+).*?\[(?<worker_count>\d+)\sof\s(?<total_threads>\d+) busy\]/
def initialize(pid, version, active_threads, total_threads, is_quiet, utils)
@utils = utils
@pid = pid
@version = version
@active_threads = active_threads
@total_threads = total_threads
@status = parse_status(active_threads, is_quiet)
end
def status_text
output = @status == STATUS_CAN_BE_QUIETED ? '[ACTIVE]' : '[QUIET]'
output = "#{output} [PID:#{@pid}] [VERSION:#{@version}] [#{@active_threads} of #{@total_threads}]"
return "#{output} - waiting for threads to complete" if status == STATUS_WAITING_THREADS
return "#{output} - can be terminated" if status == STATUS_CAN_BE_TERMINATED
return "#{output} - can be quieted" if status == STATUS_CAN_BE_QUIETED
end
def handle_shutdown(aggressive)
if aggressive
# kill worker
@utils.run_command("kill -9 #{@pid}")
else
if @status == STATUS_CAN_BE_QUIETED
major_version = @version.gsub(/\..*/,'').to_i
if major_version < 5
# quiet worker
@utils.run_command("kill -USR1 #{@pid}")
else
# quiet worker
@utils.run_command("kill -TSTP #{@pid}")
end
elsif @status == STATUS_CAN_BE_TERMINATED
# stop worker
@utils.run_command("kill -TERM #{@pid}")
end
end
end
private
def parse_status(active_threads, is_quiet)
return STATUS_CAN_BE_QUIETED unless is_quiet
return STATUS_WAITING_THREADS if active_threads > 0
STATUS_CAN_BE_TERMINATED
end
end
# parse arguments
require 'optparse'
require 'ostruct'
options = OpenStruct.new
options.timeout = 120
options.output = 'stdout'
OptionParser.new do |opts|
opts.banner = 'Usage: sidekiq_safe_shutdown.rb [options]'
opts.on('-o [ARG]', '--output [ARG]', 'File-path or stdout (default: stdout)') {|v| options.output = v}
opts.on('-t [ARG]', '--timeout [ARG]', 'Timeout in seconds (default: 120)') {|v| options.timeout = v}
opts.on('-h', '--help', 'Display this help') do
puts opts
exit
end
end.parse!
# handle timeou
utils = Utils.new(options.output)
options.timeout = options.timeout.to_i
if options.timeout < 10
utils.log("FATAL:\nTimeout #{options.timeout} too short!")
exit(-1)
end
# initiate shutdown
WorkerManager.new(options.timeout, utils).initiate_shutdown
As the hook command executes in the context of your image, you'll need to include this script inside your image (simply put it in your source code if you're using Cloud 66 SkyCap). Note that the script is executed with the following arguments:
Usage: sidekiq_safe_shutdown.rb [options]
-o, --output [ARG] File-path or stdout (default: stdout)
-t, --timeout [ARG] Timeout in seconds (default: 120)
-h, --help Display this help
For the example, below we're putting this script in our image in the path:
/tmp/sidekiq_safe_shutdown.rb
And don't forget to make it executable with:
chmod +x /tmp/sidekiq_safe_shutdown.rb
Invoking via Kubernetes Manually
If you're running Kubes directly, then you'll need to manually modify your pod spec to include terminationGracePeriodSeconds and invoking the Pre-Stop hook:
spec:
#with default timeout
terminationGracePeriodSeconds: 15
#or with specific timeout
terminationGracePeriodSeconds: 3605
lifecycle:
preStop:
exec:
#with default timeout
command: ["/tmp/sidekiq_safe_shutdown.rb"]
#or with specific timeout
command: ["/tmp/sidekiq_safe_shutdown.rb", "-t", "3600"]
Invoking via Cloud 66
If you're running via our awesome Container Stacks v2, then simply add this script to your service.yml with the following line:
#with default timeout
pre_stop_command: /tmp/sidekiq_safe_shutdown.rb
stop_grace: 15s
#or with specific timeout
pre_stop_command: /tmp/sidekiq_safe_shutdown.rb -t 3600
stop_grace: 3605s
And that should be all you need — now when your Sidekiq workers shut down, they will do so gracefully!
Appendices (Further Reading)
Published at DZone with permission of Vic van Gool, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments