Writing Rego Policies
Deep dive into OPA's Rego language for writing powerful security policies.
Rego Basics
Rego is a declarative query language designed for policy as code. It's used by Open Policy Agent (OPA) to define and enforce policies.
Rule Structure
Go
package firewall
# A rule that returns a value
decision = {"result": "allow"} {
# conditions that must be true
input.action.operation == "GET"
safe_url(input.action.params.url)
}
# Helper function
safe_url(url) {
startswith(url, "https://api.company.com")
}Core Concepts
Input Document
Policies receive the action data as input:
Go
# Access action properties
input.action.tool # "http_proxy"
input.action.operation # "POST"
input.action.params.url # "https://api.example.com"
# Access agent context
input.agent.id # "my-agent"
input.agent.org_id # "org_123"
# Access request context
input.context.timestamp # "2024-12-25T12:00:00Z"
input.context.ip_address # "192.168.1.1"Default Values
Go
# Always define a default for your main decision
default decision = {"result": "deny", "reason": "No matching policy"}
# The default applies when no other rule matches
decision = {"result": "allow"} {
input.action.operation == "GET"
}Logical Operators
Go
# AND: Multiple conditions on separate lines
allow {
input.action.tool == "http_proxy"
input.action.operation == "GET"
}
# OR: Multiple rule definitions
allow {
input.action.operation == "GET"
}
allow {
input.action.operation == "HEAD"
}
# NOT: Use not keyword
deny {
not safe_domain(input.action.params.url)
}Common Patterns
URL Allowlisting
Go
package firewall
allowed_domains := [
"api.company.com",
"internal.company.com",
"api.stripe.com"
]
decision = {"result": "allow"} {
input.action.tool == "http_proxy"
domain_allowed(input.action.params.url)
}
domain_allowed(url) {
some domain in allowed_domains
contains(url, domain)
}Role-Based Access
Go
package firewall
# Define role permissions
role_permissions := {
"admin": ["read", "write", "delete"],
"developer": ["read", "write"],
"viewer": ["read"]
}
decision = {"result": "allow"} {
role := input.context.user_role
operation := input.action.operation
operation in role_permissions[role]
}Risk-Based Approval
Go
package firewall
# Low risk: auto-approve
decision = {"result": "allow"} {
risk_level == "low"
}
# Medium/High: require approval
decision = {"result": "require_approval", "reason": reason} {
risk_level in ["medium", "high"]
reason := sprintf("Action has %s risk level", [risk_level])
}
# Critical: always deny
decision = {"result": "deny", "reason": "Critical risk actions blocked"} {
risk_level == "critical"
}
# Calculate risk level
risk_level = "critical" {
input.action.params.url contains "admin"
input.action.operation in ["DELETE", "PUT"]
}
risk_level = "high" {
input.action.operation in ["DELETE", "PUT"]
}
risk_level = "medium" {
input.action.operation == "POST"
}
risk_level = "low" {
input.action.operation in ["GET", "HEAD"]
}Built-in Functions
| Function | Description |
|---|---|
contains(str, substr) | Check if string contains substring |
startswith(str, prefix) | Check string starts with prefix |
regex.match(pattern, str) | Regex pattern matching |
time.now_ns() | Current time in nanoseconds |
json.marshal(value) | Convert to JSON string |
Testing Policies
Go
package firewall_test
import data.firewall
test_allow_get_requests {
firewall.decision == {"result": "allow"} with input as {
"action": {
"tool": "http_proxy",
"operation": "GET",
"params": {"url": "https://api.company.com/data"}
}
}
}
test_deny_unknown_domain {
firewall.decision.result == "deny" with input as {
"action": {
"tool": "http_proxy",
"operation": "GET",
"params": {"url": "https://malicious.com/data"}
}
}
}