Datadog: Show Metric Usage Warning From HPA Metrics

Problem

When editing metrics in datadog UI (i.e. /metrics/summary) a warning is shown when editing an in-use metric (i.e. a dashboard or monitor uses it). But if that metrics is used by a Kubernetes HorizontalPodAutoscaler, no such warning will show.

Solution

Generate a dashboard that uses 1 widget for every query an HPA uses.

require 'kennel'

class HpaDashboard
  SOURCE_METRIC = "datadog.cluster_agent.external_metrics.delay_seconds".freeze
  attr_reader :id

  def initialize(id, timeframe:)
    @id = id
    @api = Kennel::Api.new
    @from = Time.now.to_i - timeframe
  end

  # see https://docs.datadoghq.com/api/latest/metrics/#get-active-metrics-list
  # this has an undocumented limit of 250000 metrics so we can't just use super old @from
  # also tried /api/v2/metrics which returns similar results but is even slower (filtering it with 'queried' + big window did not help)
  def available_metrics
    @api.send(
      :request, :get, "/api/v1/metrics",
      params: { from: @from }
    ).fetch(:metrics).to_set
  end

  def queries_used_by_any_hpa
    @api.send(
      :request, :get, "/api/v1/query",
      params: {
        query: "avg:#{SOURCE_METRIC}{*} by {metric}",
        from: @from,
        to: Time.now.to_i
      }
    ).fetch(:series).map do |data|
      data.fetch(:scope).split(",").to_h { |t| t.split(":", 2) }["metric"]
    end.uniq
  end

  # covert fallout from query normalization to find actual metrics
  # for example default_zero(foo{a:b}) is converted to "default_zero_foo_a:b"
  # this ignores when multiple metrics are in a single query for example a / b * 100
  # since a and b are usually the same
  def extract_metrics(queries)
    queries = queries.dup
    queries.each do |query|
      query.sub!(/\.total_\d+$/, ".total") # math leftover *.total_100 -> *.total
      query.sub!(/^_*(ewma_\d+|default_zero)_*/, "") # remove math
    end
    queries.uniq!
    queries.sort! # for debug printing and to keep the dashboard stable
    queries.to_set
  end

  # since available_metrics is not reliable (hits limit or just has old data)
  # we verify each potentially unknown metric 1-by-1 by hitting this cheap endpoint
  # https://docs.datadoghq.com/api/latest/metrics/?code-lang=curl#get-metric-metadata
  def slow_filter_unknown!(unknown)
    unknown.select! do |metric|
      print "Verifying potentially unknown metric #{metric} ..."
      not_found = @api.send(:request, :get, "/api/v1/metrics/#{metric}", ignore_404: true)[:error]
      print "#{not_found ? "not found" : "found"}\n"
      not_found # keep the truly not found
    end
  end

  def update(used_metrics)
    attributes = {
      title: "HPA metrics used",
      description: <<~DESC,
        1 widget for each metric used in compute maintained kubernetes clusters (anything that reports #{SOURCE_METRIC})
        Automatically filled by a `rake hpa_dashboard` cron from kennel GHA.
        Last updated: #{Time.now} #{$stdout.tty? ? "manually" : RakeHelper.ci_url}
      DESC
      layout_type: "ordered",
      reflow_type: "auto",
      tags: ["team:compute", "team:compute-accelerate"],
      widgets: used_metrics.map do |m|
        {
          definition: {
            title: m,
            type: "timeseries",
            requests: [
              {
                response_format: "timeseries",
                queries: [
                  {
                    name: "query1",
                    data_source: "metrics",
                    query: "avg:#{m}{*}"
                  }
                ],
                display_type: "line"
              }
            ]
          }
        }
      end
    }
    @api.update("dashboard", @id, attributes)
  end
end

desc "Update hpa dashboard to track all currently used external metrics people that change metrics in the UI see that they are used"
task hpa_dashboard: "kennel:environment" do
  dashboard = HpaDashboard.new(DASHBOARD_ID, timeframe: 24 * 60 * 60)

  available_metrics = dashboard.available_metrics
  puts "Found #{available_metrics.size} available metrics"

  used_queries = dashboard.queries_used_by_any_hpa
  puts "Found #{used_queries.size} used queries"

  used_metrics = dashboard.extract_metrics(used_queries)
  puts "Found #{used_metrics.size} used metrics"

  # validate we found everything
  unknown = used_metrics - available_metrics
  dashboard.slow_filter_unknown! unknown if unknown.size < 100
  if unknown.any?
    $stdout.flush # otherwise mixes with stderr in GHA
    abort <<~MSG
      #{unknown.size} unknown metrics found, these would not be displayable on the dashboard, improve parsing code
      usually that means some part of the metrics got mangled and it cannot be found in datadog
      see https://datadoghq.com/metric/summary to find valid metrics

      #{unknown.join("\n")}
    MSG
  end

  dashboard.update used_metrics
  puts "Updated dashboard https://datadoghq.com/dashboard/#{dashboard.id}"
rescue Exception # rubocop:disable Lint/RescueException
  unless $stdout.tty? # do not spam slack when debugging
    send_to_slack <<~MSG
      HPA dashboard update failed #{RakeHelper.ci_url}, fix it!
    MSG
  end
  raise
end

Kubernetes Changelog from Audit log

Often we want to ask “what exactly changed about this resource ?” especially during or after an incident.
The answer usually is “check the audit log”.
But the audit log is very verbose and hard to scan, so here is a ruby rake task to parse the audit log and spit out a nice diff. (Customize to read from the log source of your choice)

require 'uri'
require 'cgi'
require 'time'
require 'json'
require 'hashdiff' # gem install hashdiff
require 'kennel' # gem install kennel

class Logs
  class << self
    # does not flatten arrays, but we don't need this here
    def flatten_hash(hash)
      hash.each_with_object({}) do |(k, v), h|
        if v.is_a? Hash
          flatten_hash(v).map do |h_k, h_v|
            h["#{k}.#{h_k}".to_sym] = h_v
          end
        else
          h[k] = v
        end
      end
    end

    def clean_for_diff(object, ignore_status:)
      # datadog turns labels like metadata.labels.foo.bar into a nested foo: bar hash
      object.replace flatten_hash object

      # general
      object.delete :"metadata.annotations.deployment.kubernetes.io/revision"
      object.delete :"metadata.annotations.kubectl.kubernetes.io/last-applied-configuration"
      object.delete :"metadata.generation"
      object.delete :"metadata.managedFields"
      object.delete :"metadata.resourceVersion"
      object.delete :"spec.template.metadata.creationTimestamp"

      # status
      if ignore_status
        object.delete_if { |k, _| k.start_with? "status" }
      else
        object.delete :"status.observedGeneration"
      end
    end
  end
end

namespace :logs do
  desc "show change history for a given resource by parsing the audit log CLUSTER= RESOURCE= [NAMESPACE=] NAME= [DAYS=7] [STATUS=ignore|include]"
    cluster = ENV.fetch("CLUSTER")
    resource = ENV.fetch("RESOURCE")
    name = ENV.fetch("NAME")
    namespace = ENV["NAMESPACE"]
    ignore_status = ((ENV["STATUS"] || "ignore") == "ignore")
    days = Integer(ENV["DAYS"] || "7")

    # get current version to be able to diff the latest update
    result = `kubectl --context #{cluster} get #{resource} #{name} #{"-n #{namespace}" if namespace} -o json --ignore-not-found`
    raise unless $?.success?
    if result == ""
      warn "Resource not found, assuming it was deleted"
      current = nil
    else
      current = Logs.clean_for_diff(JSON.parse(result, symbolize_names: true), ignore_status:)
    end

    # build log url
    url = <whatever your log system is>

    # say what we are looking at
    warn "Inspecting #{days} days of logs #{ignore_status ? "ignoring" : "including"} status changes."
    warn url

    # produce diff from logs
    verb_colors = { "update" => :yellow, "delete" => :red, "patch" => :cyan, "create" => :green }
    printer = Kennel::AttributeDiffer.new
    list_logs(url) do |line| # build this method for whatever your log system is
      status = line.dig(:attributes, :http, :status_code)
      next if status >= 300

      # print what happened
      verb = line.dig(:attributes, :verb)
      time = line.dig(:attributes, :requestReceivedTimestamp).sub(/\..*/, "")
      user = line.dig(:attributes, :user, :username)
      puts(Kennel::Console.color(verb_colors.fetch(verb), "#{time} #{verb} by #{user}"))
      next if verb == "delete"

      # print diff
      previous = Logs.clean_for_diff(line.dig(:attributes, :responseObject), ignore_status:)
      unless current # support looking at deleted resources
        current = previous
        next
      end
      diff = Hashdiff.diff(previous, current, use_lcs: false, strict: false, similarity: 1)
      diff.each { |l| puts printer.format(*l) }
      current = previous
    end

And you get a nice diff like this

Verify Pagerduty reaches On-Call by Cron

We had a few incidents were on-call devs missed their calls because of various spam-blocking setups or “do not disturb” settings.
We now run a small service that test-notifies everyone once a month to make sure notifications go through. Notifications go out shortly before their ‘do not disturb’ stops so we do not wake them in the middle of the night, but still have a realistic situation.
Our setup has more logging/stats etc, but it goes something like this:

# configure user schedule
require 'yaml'
users = YAML.load <<~YAML
- name: "John Doe"
  id: ABCD
#  cron: "* * * * * America/Los_Angeles" # every minute ... for local testing
  cron: "55 6 * * 2#1 America/Los_Angeles" # every first Tuesday of the month at 6:55am
# ... more users here
YAML

# code to notify users
require 'json'
require 'faraday'
def create_test_incident(user)
  connection = Faraday.new
  response = nil
  2.times do
    response = connection.post do |req|
      req.url "https://api.pagerduty.com/incidents"
      req.headers['Content-Type'] = 'application/json'
      req.headers['Accept'] = 'application/vnd.pagerduty+json;version=2'
      req.headers['From'] = 'realusers@email.com' # incident owner 
      req.headers['Authorization'] = "Token token=#{ENV.fetch("PAGERDUTY_TOKEN")}"
      req.body = {
        incident: {
          type: "incident",
          title: "Pagerduty Tester: Incident for #{user.fetch("name")}, press resolve",
          service: {
            id: ENV.fetch("SERVICE_ID"),
            type: "service_reference"
          },
          assignments: [{
            assignee: {
              id: user.fetch("id"),
              type: "user_reference"
            }
          }]
        }
      }.to_json
    end
    if response.status == 429 # pagerduty rate-limits to 6 incidents/min/service
      sleep 60
      next
    end
    raise "Request failed #{response.status} -- #{response.body}" if response.status >= 300
  end
  JSON.parse(response.body).fetch("incident").fetch("id")
end

# run on a schedule (no threading / forking)
require 'serial_scheduler'
require 'fugit'
scheduler = SerialScheduler.new
users.each do |user|
  scheduler.add("Notify #{user.fetch("name")}", cron: user.fetch("cron"), timeout: 10) do
    user_id = user.fetch("id")
    incident_id = PagerdutyTester.create_test_incident(user)
    puts "Created incident for #{user_id} https://#{ENV.fetch('SUBDOMAIN')}.pagerduty.com/incidents/#{incident_id}"
  rescue StandardError => e
    puts "Creating incident for #{user_id} failed #{e}"
  end
end
scheduler.run