Testing Kubernetes CEL admission/valdiation

Why it’s needed

CEL expressions are normally only executed at runtime, so it’s hard to know what a new expression will actually do without deploying it. Even when test-deploying it it’s hard to know if all edge-cases are covered.

Why it’s hard

  • There is no simple CEL test package
  • The expressions themselves are not code, but yaml on kubernetes resources

How to do it

  • Read CEL expressions from manifests
  • Load resources from 1-All clusters (… or write up good example resources)
  • Run resources against expressions locally (code below)
  • Print the outcomes
func selectResourcesMatchingAllConditions(resources []unstructured.Unstructured, expressions []adregv1.MatchCondition) ([]unstructured.Unstructured, error) {
	// avoid setup overhead if there is nothing to do
	if len(resources) == 0 || len(expressions) == 0 {
		return resources, nil
	}

	env, err := createCELEnvironment()
	if err != nil {
		return nil, err
	}

	programs, err := compileToCELPrograms(expressions, env)
	if err != nil {
		return nil, err
	}

	return selectResourcesMatchingAllCELPrograms(resources, programs)
}

func selectResourcesMatchingAllCELPrograms(resources []unstructured.Unstructured, programs []cel.Program) (selectedResources []unstructured.Unstructured, err error) {
	for _, u := range resources {
		// convert resource to program input
		object, err := structpb.NewStruct(u.Object)
		if err != nil {
			return nil, fmt.Errorf("converting to struct: %w", err)
		}
		input := map[string]interface{}{"object": object}

		// select resource if it matches all programs
		selected := true
		for _, program := range programs {
			out, _, err := program.Eval(input)
			if err != nil {
				return nil, fmt.Errorf("evaluation: %w", err)
			}
			if !out.Value().(bool) {
				selected = false
				break
			}
		}
		if selected {
			selectedResources = append(selectedResources, u)
		}
	}
	return
}

func createCELEnvironment() (*cel.Env, error) {
	env, err := cel.NewEnv(
		cel.Declarations(decls.NewVar("object", decls.NewMapType(decls.String, decls.Dyn))),
	)
	if err != nil {
		return nil, fmt.Errorf("creating CEL environment: %w", err)
	}
	return env, nil
}

func compileToCELPrograms(expressions []adregv1.MatchCondition, env *cel.Env) (programs []cel.Program, err error) {
	for _, exp := range expressions {
		ast, issues := env.Compile(exp.Expression)
		if issues != nil && issues.Err() != nil {
			return nil, fmt.Errorf("compile for %s %s: %w", exp.Name, exp.Expression, issues.Err())
		}
		prg, err := env.Program(ast)
		if err != nil {
			return nil, fmt.Errorf("program construction: %w", err)
		}
		programs = append(programs, prg)
	}
	return
}

Leave a comment