zero-trust-architecture
About
This Claude Skill implements Zero Trust security for cloud-native applications by enforcing identity verification, microsegmentation, and least privilege access. It provides a "never trust, always verify" framework with continuous monitoring capabilities. Use this skill when building secure microservices, APIs, or multi-cloud deployments requiring strict access controls.
Documentation
Zero Trust Architecture
Overview
Implement comprehensive Zero Trust security architecture based on "never trust, always verify" principle with identity-centric security, microsegmentation, and continuous verification.
When to Use
- Cloud-native applications
- Microservices architecture
- Remote workforce security
- API security
- Multi-cloud deployments
- Legacy modernization
- Compliance requirements
Implementation Examples
1. Zero Trust Gateway
// zero-trust-gateway.js
const jwt = require('jsonwebtoken');
const axios = require('axios');
class ZeroTrustGateway {
constructor() {
this.identityProvider = process.env.IDENTITY_PROVIDER_URL;
this.deviceRegistry = new Map();
this.sessionContext = new Map();
}
/**
* Verify identity - Who are you?
*/
async verifyIdentity(token) {
try {
// Verify JWT token
const decoded = jwt.verify(token, process.env.JWT_PUBLIC_KEY, {
algorithms: ['RS256']
});
// Check token hasn't been revoked
const revoked = await this.checkTokenRevocation(decoded.jti);
if (revoked) {
throw new Error('Token has been revoked');
}
return {
valid: true,
userId: decoded.sub,
roles: decoded.roles,
permissions: decoded.permissions
};
} catch (error) {
return { valid: false, error: error.message };
}
}
/**
* Verify device - What device are you using?
*/
async verifyDevice(deviceId, deviceFingerprint) {
const registered = this.deviceRegistry.get(deviceId);
if (!registered) {
return {
trusted: false,
reason: 'Device not registered'
};
}
// Check device fingerprint matches
if (registered.fingerprint !== deviceFingerprint) {
return {
trusted: false,
reason: 'Device fingerprint mismatch'
};
}
// Check device compliance
const compliance = await this.checkDeviceCompliance(deviceId);
return {
trusted: compliance.compliant,
reason: compliance.reason,
riskScore: compliance.riskScore
};
}
/**
* Verify location - Where are you?
*/
async verifyLocation(ip, expectedCountry) {
try {
// Get geolocation data
const geoData = await this.getGeoLocation(ip);
// Check for impossible travel
const lastLocation = this.getLastKnownLocation(ip);
if (lastLocation) {
const impossibleTravel = this.detectImpossibleTravel(
lastLocation,
geoData,
Date.now() - lastLocation.timestamp
);
if (impossibleTravel) {
return {
valid: false,
reason: 'Impossible travel detected',
riskScore: 9
};
}
}
// Check against allowed locations
if (expectedCountry && geoData.country !== expectedCountry) {
return {
valid: false,
reason: 'Unexpected location',
riskScore: 7
};
}
return {
valid: true,
location: geoData,
riskScore: 1
};
} catch (error) {
return {
valid: false,
reason: 'Location verification failed',
riskScore: 5
};
}
}
/**
* Verify authorization - What can you access?
*/
async verifyAuthorization(userId, resource, action, context) {
// Get user permissions
const user = await this.getUserPermissions(userId);
// Check direct permissions
if (this.hasPermission(user, resource, action)) {
return { authorized: true, reason: 'Direct permission' };
}
// Check role-based permissions
for (const role of user.roles) {
if (this.hasRolePermission(role, resource, action)) {
return { authorized: true, reason: `Role: ${role}` };
}
}
// Check attribute-based policies
const abacResult = await this.evaluateABAC(user, resource, action, context);
if (abacResult.allowed) {
return { authorized: true, reason: 'ABAC policy' };
}
return {
authorized: false,
reason: 'Insufficient permissions'
};
}
/**
* Calculate risk score
*/
calculateRiskScore(factors) {
let score = 0;
// Identity factors
if (!factors.mfaUsed) score += 3;
if (factors.newDevice) score += 2;
// Location factors
if (factors.unusualLocation) score += 3;
if (factors.vpnDetected) score += 1;
// Behavior factors
if (factors.unusualTime) score += 2;
if (factors.rapidRequests) score += 2;
// Device factors
if (!factors.deviceCompliant) score += 4;
if (factors.jailbroken) score += 5;
return Math.min(score, 10);
}
/**
* Continuous verification middleware
*/
middleware() {
return async (req, res, next) => {
const startTime = Date.now();
try {
// Extract authentication token
const token = req.headers.authorization?.replace('Bearer ', '');
if (!token) {
return res.status(401).json({
error: 'unauthorized',
message: 'No authentication token provided'
});
}
// Step 1: Verify identity
const identity = await this.verifyIdentity(token);
if (!identity.valid) {
return res.status(401).json({
error: 'unauthorized',
message: 'Invalid identity'
});
}
// Step 2: Verify device
const deviceId = req.headers['x-device-id'];
const deviceFingerprint = req.headers['x-device-fingerprint'];
if (deviceId && deviceFingerprint) {
const device = await this.verifyDevice(deviceId, deviceFingerprint);
if (!device.trusted) {
return res.status(403).json({
error: 'forbidden',
message: device.reason
});
}
}
// Step 3: Verify location
const location = await this.verifyLocation(req.ip);
if (!location.valid) {
// Require step-up authentication
return res.status(403).json({
error: 'forbidden',
message: 'Additional authentication required',
requiresStepUp: true
});
}
// Step 4: Calculate risk score
const riskScore = this.calculateRiskScore({
mfaUsed: identity.mfaUsed,
newDevice: !deviceId,
unusualLocation: location.riskScore > 5,
deviceCompliant: true
});
// Step 5: Verify authorization
const authorization = await this.verifyAuthorization(
identity.userId,
req.path,
req.method,
{
ip: req.ip,
riskScore,
time: new Date()
}
);
if (!authorization.authorized) {
return res.status(403).json({
error: 'forbidden',
message: authorization.reason
});
}
// Add context to request
req.zeroTrust = {
userId: identity.userId,
roles: identity.roles,
riskScore,
verificationTime: Date.now() - startTime
};
// Log access
this.logAccess(req, identity, riskScore);
next();
} catch (error) {
console.error('Zero Trust verification failed:', error);
return res.status(500).json({
error: 'internal_error',
message: 'Security verification failed'
});
}
};
}
async checkTokenRevocation(jti) {
// Check against revocation list
return false;
}
async checkDeviceCompliance(deviceId) {
// Check device meets security requirements
return {
compliant: true,
reason: 'Device meets requirements',
riskScore: 1
};
}
async getGeoLocation(ip) {
// Get geolocation from IP
return {
country: 'US',
city: 'San Francisco',
lat: 37.7749,
lon: -122.4194
};
}
getLastKnownLocation(ip) {
return null;
}
detectImpossibleTravel(lastLocation, currentLocation, timeDiff) {
// Calculate if travel is physically possible
return false;
}
async getUserPermissions(userId) {
// Fetch user permissions
return {
roles: ['user'],
permissions: []
};
}
hasPermission(user, resource, action) {
return false;
}
hasRolePermission(role, resource, action) {
return false;
}
async evaluateABAC(user, resource, action, context) {
return { allowed: false };
}
logAccess(req, identity, riskScore) {
console.log({
timestamp: new Date().toISOString(),
userId: identity.userId,
resource: req.path,
method: req.method,
riskScore,
ip: req.ip
});
}
}
// Express setup
const express = require('express');
const app = express();
const ztGateway = new ZeroTrustGateway();
// Apply Zero Trust middleware
app.use(ztGateway.middleware());
// Protected endpoint
app.get('/api/sensitive-data', (req, res) => {
res.json({
message: 'Access granted',
riskScore: req.zeroTrust.riskScore
});
});
module.exports = ZeroTrustGateway;
2. Service Mesh - Microsegmentation
# istio-zero-trust.yaml
# Istio configuration for Zero Trust microsegmentation
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: production
spec:
mtls:
mode: STRICT # Require mutual TLS
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: deny-all
namespace: production
spec:
{} # Deny all by default
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: allow-frontend-to-backend
namespace: production
spec:
selector:
matchLabels:
app: backend
action: ALLOW
rules:
- from:
- source:
principals: ["cluster.local/ns/production/sa/frontend"]
to:
- operation:
methods: ["GET", "POST"]
paths: ["/api/*"]
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: allow-backend-to-database
namespace: production
spec:
selector:
matchLabels:
app: database
action: ALLOW
rules:
- from:
- source:
principals: ["cluster.local/ns/production/sa/backend"]
to:
- operation:
methods: ["*"]
---
# JWT authentication
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
name: jwt-auth
namespace: production
spec:
selector:
matchLabels:
app: backend
jwtRules:
- issuer: "https://auth.example.com"
jwksUri: "https://auth.example.com/.well-known/jwks.json"
audiences:
- "api.example.com"
---
# Network policy - additional defense layer
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: backend-network-policy
namespace: production
spec:
podSelector:
matchLabels:
app: backend
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
app: frontend
ports:
- protocol: TCP
port: 8080
egress:
- to:
- podSelector:
matchLabels:
app: database
ports:
- protocol: TCP
port: 5432
3. Python Zero Trust Policy Engine
# zero_trust_policy.py
from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import datetime
import jwt
@dataclass
class ZeroTrustContext:
user_id: str
device_id: str
location: Dict[str, Any]
risk_score: int
timestamp: datetime
class ZeroTrustPolicy:
def __init__(self):
self.policies = self.load_policies()
def load_policies(self) -> List[Dict]:
"""Load Zero Trust policies"""
return [
{
'name': 'high_risk_block',
'condition': lambda ctx: ctx.risk_score >= 8,
'action': 'deny',
'reason': 'High risk score'
},
{
'name': 'require_mfa',
'condition': lambda ctx: ctx.risk_score >= 5,
'action': 'step_up_auth',
'reason': 'Elevated risk requires MFA'
},
{
'name': 'untrusted_device',
'condition': lambda ctx: not self.is_device_trusted(ctx.device_id),
'action': 'deny',
'reason': 'Untrusted device'
},
{
'name': 'unusual_location',
'condition': lambda ctx: self.is_unusual_location(ctx),
'action': 'step_up_auth',
'reason': 'Unusual location detected'
}
]
def evaluate(self, context: ZeroTrustContext, resource: str, action: str) -> Dict:
"""Evaluate Zero Trust policies"""
for policy in self.policies:
if policy['condition'](context):
return {
'allowed': policy['action'] != 'deny',
'action': policy['action'],
'reason': policy['reason'],
'policy': policy['name']
}
# Check resource-specific permissions
if self.has_permission(context.user_id, resource, action):
return {
'allowed': True,
'action': 'allow',
'reason': 'User has required permissions'
}
return {
'allowed': False,
'action': 'deny',
'reason': 'No matching policy allows access'
}
def is_device_trusted(self, device_id: str) -> bool:
# Check device trust status
return True
def is_unusual_location(self, context: ZeroTrustContext) -> bool:
# Check if location is unusual for user
return False
def has_permission(self, user_id: str, resource: str, action: str) -> bool:
# Check user permissions
return False
# Flask integration
from flask import Flask, request, jsonify, g
from functools import wraps
app = Flask(__name__)
zt_policy = ZeroTrustPolicy()
def zero_trust_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Build Zero Trust context
context = ZeroTrustContext(
user_id=g.user_id,
device_id=request.headers.get('X-Device-ID', 'unknown'),
location={
'ip': request.remote_addr,
'country': 'US' # From GeoIP
},
risk_score=calculate_risk_score(request),
timestamp=datetime.utcnow()
)
# Evaluate policies
result = zt_policy.evaluate(
context,
request.path,
request.method
)
if not result['allowed']:
return jsonify({
'error': 'forbidden',
'reason': result['reason'],
'action_required': result['action']
}), 403
return f(*args, **kwargs)
return decorated_function
def calculate_risk_score(request) -> int:
"""Calculate risk score based on request context"""
score = 0
# Check for suspicious patterns
if not request.headers.get('X-Device-ID'):
score += 2
# Add more risk factors
return min(score, 10)
@app.route('/api/sensitive', methods=['GET'])
@zero_trust_required
def get_sensitive_data():
return jsonify({'data': 'sensitive information'})
Zero Trust Principles
- Verify Explicitly: Always authenticate and authorize
- Least Privilege: Minimal access required
- Assume Breach: Limit blast radius
- Microsegmentation: Network segmentation
- Continuous Monitoring: Real-time security
- Device Trust: Verify device compliance
- Data Protection: Encrypt everything
Implementation Layers
- Identity Layer: Strong authentication (MFA, SSO)
- Device Layer: Device compliance, certificates
- Network Layer: Microsegmentation, mTLS
- Application Layer: API gateway, authorization
- Data Layer: Encryption, DLP
Zero Trust Maturity Model
- Traditional: Perimeter-based security
- Advanced: Some segmentation
- Optimal: Full Zero Trust
- Progressive: Continuous improvement
Best Practices
✅ DO
- Verify every request
- Implement MFA everywhere
- Use microsegmentation
- Monitor continuously
- Encrypt all communications
- Implement least privilege
- Log all access
- Regular audits
❌ DON'T
- Trust network location
- Use implicit trust
- Skip device verification
- Allow lateral movement
- Use static credentials
Resources
Quick Install
/plugin add https://github.com/aj-geddes/useful-ai-prompts/tree/main/zero-trust-architectureCopy and paste this command in Claude Code to install this skill
GitHub 仓库
Related Skills
langchain
MetaLangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.
Algorithmic Art Generation
MetaThis skill helps developers create algorithmic art using p5.js, focusing on generative art, computational aesthetics, and interactive visualizations. It automatically activates for topics like "generative art" or "p5.js visualization" and guides you through creating unique algorithms with features like seeded randomness, flow fields, and particle systems. Use it when you need to build reproducible, code-driven artistic patterns.
webapp-testing
TestingThis Claude Skill provides a Playwright-based toolkit for testing local web applications through Python scripts. It enables frontend verification, UI debugging, screenshot capture, and log viewing while managing server lifecycles. Use it for browser automation tasks but run scripts directly rather than reading their source code to avoid context pollution.
requesting-code-review
DesignThis skill dispatches a code-reviewer subagent to analyze code changes against requirements before proceeding. It should be used after completing tasks, implementing major features, or before merging to main. The review helps catch issues early by comparing the current implementation with the original plan.
