data-mesh-expert
About
This Claude Skill provides expert guidance on implementing data mesh architecture for scalable, decentralized data systems. It helps developers design domain-oriented data ownership, create data products, and establish federated governance with self-serve platforms. Use this skill when planning or refactoring large-scale data infrastructure to align with organizational domains.
Quick Install
Claude Code
Recommended/plugin add https://github.com/majiayu000/claude-skill-registrygit clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/data-mesh-expertCopy and paste this command in Claude Code to install this skill
Documentation
Data Mesh Expert
You are an expert in data mesh architecture with deep knowledge of domain-oriented data ownership, data as a product, federated computational governance, and self-serve data infrastructure platforms. You design and implement decentralized data architectures that scale with organizational growth.
Core Expertise
Data Mesh Principles
Four Foundational Principles:
- Domain-Oriented Decentralized Data Ownership
- Data as a Product
- Self-Serve Data Infrastructure as a Platform
- Federated Computational Governance
Domain-Oriented Data Ownership
Domain Decomposition:
# Domain structure
organization:
domains:
- name: sales
bounded_context: "Customer transactions and revenue"
data_products:
- sales_orders
- customer_interactions
- revenue_metrics
team:
product_owner: "Sales Analytics Lead"
data_engineers: 3
analytics_engineers: 2
- name: marketing
bounded_context: "Customer acquisition and campaigns"
data_products:
- campaign_performance
- lead_attribution
- customer_segments
team:
product_owner: "Marketing Analytics Lead"
data_engineers: 2
analytics_engineers: 2
- name: product
bounded_context: "Product usage and features"
data_products:
- feature_usage
- product_events
- user_engagement
team:
product_owner: "Product Analytics Lead"
data_engineers: 3
analytics_engineers: 1
- name: finance
bounded_context: "Financial reporting and compliance"
data_products:
- general_ledger
- accounts_receivable
- financial_metrics
team:
product_owner: "Finance Analytics Lead"
data_engineers: 2
analytics_engineers: 2
Domain Data Product Architecture:
Sales Domain
├── Operational Data
│ ├── PostgreSQL: orders, customers, transactions
│ └── Salesforce: opportunities, accounts
├── Analytical Data Products
│ ├── sales_orders_analytical (daily aggregate)
│ ├── customer_lifetime_value (computed metric)
│ └── sales_performance_metrics (real-time)
├── Data Product APIs
│ ├── REST API: /api/v1/sales/orders
│ ├── GraphQL: sales_orders query
│ └── Streaming: kafka://sales.orders.events
└── Documentation
├── README.md (product overview)
├── SCHEMA.md (data contracts)
├── SLA.md (quality guarantees)
└── CHANGELOG.md (version history)
Data as a Product
Data Product Contract:
# data_product.yaml
name: sales_orders_analytical
version: 2.1.0
domain: sales
owner:
team: sales-analytics
contact: [email protected]
slack: #sales-data
description: |
Analytical view of sales orders with customer and product enrichments.
Updated daily at 2 AM UTC with full refresh.
schema:
type: parquet
location: s3://data-products/sales/orders/
partitioned_by:
- order_date
fields:
- name: order_id
type: string
description: Unique order identifier
constraints:
- unique
- not_null
- name: customer_id
type: string
description: Customer identifier
constraints:
- not_null
- name: order_date
type: date
description: Date order was placed
constraints:
- not_null
- name: total_amount
type: decimal(12,2)
description: Total order amount in USD
constraints:
- not_null
- min: 0
- name: status
type: string
description: Order status
constraints:
- in: [pending, completed, cancelled, refunded]
- name: customer_segment
type: string
description: Customer value segment
- name: product_count
type: integer
description: Number of products in order
access:
discovery: public
read:
- role: analyst
- role: data_scientist
- domain: marketing
- domain: finance
write:
- domain: sales
sla:
availability: 99.9%
freshness:
max_age_hours: 24
update_schedule: "0 2 * * *"
completeness:
min_threshold: 99.5%
quality_checks:
- name: no_negative_amounts
query: "SELECT COUNT(*) FROM orders WHERE total_amount < 0"
threshold: 0
- name: valid_status
query: "SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')"
threshold: 0
- name: referential_integrity
query: "SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL"
threshold: 0
observability:
metrics:
- row_count
- avg_order_value
- null_percentage_by_column
- schema_drift
alerts:
- type: freshness
condition: age_hours > 26
severity: critical
- type: volume
condition: row_count_change > 50%
severity: warning
- type: quality
condition: quality_check_failed
severity: critical
changelog:
- version: 2.1.0
date: 2024-01-15
changes:
- Added customer_segment field
- Improved null handling in total_amount
breaking: false
- version: 2.0.0
date: 2023-12-01
changes:
- Changed order_id from integer to string
- Removed legacy status values
breaking: true
Data Product Implementation (Python):
# sales_orders_data_product.py
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Optional
import pandas as pd
from great_expectations.core import ExpectationSuite
@dataclass
class DataProductMetadata:
"""Metadata for data product"""
name: str
version: str
domain: str
owner_team: str
description: str
sla_freshness_hours: int
sla_availability_pct: float
@dataclass
class DataProductQualityCheck:
"""Quality check definition"""
name: str
query: str
threshold: int
severity: str
class SalesOrdersDataProduct:
"""Sales orders analytical data product"""
def __init__(self, config: Dict):
self.config = config
self.metadata = DataProductMetadata(
name="sales_orders_analytical",
version="2.1.0",
domain="sales",
owner_team="sales-analytics",
description="Analytical view of sales orders",
sla_freshness_hours=24,
sla_availability_pct=99.9
)
self.quality_checks = self._load_quality_checks()
def _load_quality_checks(self) -> List[DataProductQualityCheck]:
"""Load quality checks from config"""
return [
DataProductQualityCheck(
name="no_negative_amounts",
query="SELECT COUNT(*) FROM orders WHERE total_amount < 0",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="valid_status",
query="SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="referential_integrity",
query="SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
threshold=0,
severity="critical"
)
]
def extract(self) -> pd.DataFrame:
"""Extract source data"""
# Extract from operational database
orders_df = self._extract_orders()
customers_df = self._extract_customers()
products_df = self._extract_products()
return orders_df, customers_df, products_df
def transform(self, orders_df: pd.DataFrame,
customers_df: pd.DataFrame,
products_df: pd.DataFrame) -> pd.DataFrame:
"""Transform and enrich data"""
# Join with customers
enriched = orders_df.merge(
customers_df[['customer_id', 'customer_segment']],
on='customer_id',
how='left'
)
# Calculate product count per order
product_counts = products_df.groupby('order_id').size().reset_index(name='product_count')
enriched = enriched.merge(product_counts, on='order_id', how='left')
# Apply business logic
enriched['product_count'] = enriched['product_count'].fillna(0)
return enriched
def validate(self, df: pd.DataFrame) -> Dict:
"""Validate data quality"""
results = {
'passed': True,
'checks': []
}
# Schema validation
expected_columns = [
'order_id', 'customer_id', 'order_date', 'total_amount',
'status', 'customer_segment', 'product_count'
]
missing_columns = set(expected_columns) - set(df.columns)
if missing_columns:
results['passed'] = False
results['checks'].append({
'name': 'schema_validation',
'passed': False,
'message': f"Missing columns: {missing_columns}"
})
# Quality checks
for check in self.quality_checks:
result = self._run_quality_check(df, check)
results['checks'].append(result)
if not result['passed']:
results['passed'] = False
return results
def _run_quality_check(self, df: pd.DataFrame,
check: DataProductQualityCheck) -> Dict:
"""Run individual quality check"""
# Execute quality check query
# This is simplified; in practice, use SQL engine
if check.name == "no_negative_amounts":
count = len(df[df['total_amount'] < 0])
elif check.name == "valid_status":
valid_statuses = ['pending', 'completed', 'cancelled', 'refunded']
count = len(df[~df['status'].isin(valid_statuses)])
else:
count = 0
passed = count <= check.threshold
return {
'name': check.name,
'passed': passed,
'count': count,
'threshold': check.threshold,
'severity': check.severity
}
def publish(self, df: pd.DataFrame) -> None:
"""Publish data product"""
# Write to storage
output_path = f"s3://data-products/sales/orders/"
df.to_parquet(
output_path,
partition_cols=['order_date'],
engine='pyarrow'
)
# Register in data catalog
self._register_in_catalog(output_path)
# Update metrics
self._publish_metrics(df)
def _register_in_catalog(self, path: str) -> None:
"""Register data product in catalog"""
catalog_entry = {
'name': self.metadata.name,
'version': self.metadata.version,
'domain': self.metadata.domain,
'location': path,
'last_updated': datetime.utcnow().isoformat(),
'owner': self.metadata.owner_team
}
# Register with data catalog service
pass
def _publish_metrics(self, df: pd.DataFrame) -> None:
"""Publish observability metrics"""
metrics = {
'row_count': len(df),
'avg_order_value': df['total_amount'].mean(),
'null_percentage': df.isnull().sum().to_dict(),
'timestamp': datetime.utcnow().isoformat()
}
# Send to monitoring system
pass
def get_metadata(self) -> Dict:
"""Return data product metadata"""
return {
'name': self.metadata.name,
'version': self.metadata.version,
'domain': self.metadata.domain,
'owner': self.metadata.owner_team,
'description': self.metadata.description,
'sla': {
'freshness_hours': self.metadata.sla_freshness_hours,
'availability_pct': self.metadata.sla_availability_pct
}
}
Self-Serve Data Infrastructure Platform
Platform Components:
# Platform architecture
platform:
compute:
- name: spark_cluster
type: databricks
purpose: Large-scale transformations
auto_scaling: true
- name: dbt_runner
type: kubernetes
purpose: SQL transformations
resources:
cpu: 4
memory: 16Gi
storage:
- name: data_lake
type: s3
purpose: Raw and processed data
lifecycle_policies:
- transition_to_glacier: 90_days
- expire: 365_days
- name: data_warehouse
type: snowflake
purpose: Analytical queries
auto_suspend: 10_minutes
orchestration:
- name: airflow
type: managed_airflow
purpose: Workflow orchestration
version: 2.8.0
data_catalog:
- name: datahub
purpose: Metadata management
features:
- lineage_tracking
- data_discovery
- schema_registry
quality:
- name: great_expectations
purpose: Data validation
integration: airflow
observability:
- name: datadog
purpose: Metrics and monitoring
dashboards:
- data_product_health
- pipeline_performance
access_control:
- name: okta
type: identity_provider
integration: sso
- name: ranger
type: authorization
purpose: Fine-grained access control
Platform APIs:
# platform_api.py
from typing import Dict, List
from dataclasses import dataclass
@dataclass
class DataProductSpec:
"""Specification for creating data product"""
name: str
domain: str
source_tables: List[str]
transformation_sql: str
schedule: str
quality_checks: List[Dict]
class DataMeshPlatform:
"""Self-serve data mesh platform API"""
def create_data_product(self, spec: DataProductSpec) -> str:
"""
Create new data product with platform automation
Steps:
1. Provision compute resources
2. Create storage location
3. Deploy transformation pipeline
4. Configure quality checks
5. Register in catalog
6. Set up monitoring
"""
# Generate unique ID
product_id = f"{spec.domain}_{spec.name}"
# Create storage location
storage_path = self._provision_storage(product_id)
# Deploy dbt project
dbt_project = self._create_dbt_project(spec)
self._deploy_dbt_project(dbt_project)
# Create Airflow DAG
dag = self._create_airflow_dag(spec, storage_path)
self._deploy_dag(dag)
# Register in catalog
self._register_in_catalog(product_id, spec, storage_path)
# Set up monitoring
self._setup_monitoring(product_id, spec)
return product_id
def _provision_storage(self, product_id: str) -> str:
"""Provision storage for data product"""
path = f"s3://data-products/{product_id}/"
# Create S3 bucket/prefix
# Set lifecycle policies
# Configure access control
return path
def _create_dbt_project(self, spec: DataProductSpec) -> Dict:
"""Generate dbt project for data product"""
return {
'name': spec.name,
'models': {
f"{spec.name}.sql": spec.transformation_sql
},
'tests': self._generate_dbt_tests(spec.quality_checks),
'docs': self._generate_dbt_docs(spec)
}
def _create_airflow_dag(self, spec: DataProductSpec, storage_path: str) -> str:
"""Generate Airflow DAG for data product"""
dag_template = f"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
dag = DAG(
dag_id='{spec.name}_pipeline',
schedule='{spec.schedule}',
start_date=datetime(2024, 1, 1),
catchup=False
)
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='dbt run --models {spec.name}',
dag=dag
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='dbt test --models {spec.name}',
dag=dag
)
publish = BashOperator(
task_id='publish',
bash_command='python publish_data_product.py {spec.name} {storage_path}',
dag=dag
)
dbt_run >> dbt_test >> publish
"""
return dag_template
def get_data_product(self, product_id: str) -> Dict:
"""Retrieve data product information"""
return self._catalog.get(product_id)
def list_data_products(self, domain: Optional[str] = None) -> List[Dict]:
"""List available data products"""
products = self._catalog.search(domain=domain)
return products
def discover_data_products(self, query: str) -> List[Dict]:
"""Search for data products"""
return self._catalog.search(query=query)
def request_access(self, product_id: str, requester: str) -> str:
"""Request access to data product"""
# Create access request ticket
# Notify data product owner
# Track approval workflow
pass
def grant_access(self, product_id: str, user: str, access_level: str):
"""Grant access to data product"""
# Update IAM policies
# Configure row-level security
# Log access grant
pass
Federated Computational Governance
Governance Framework:
# governance_policy.yaml
governance:
global_policies:
- name: data_classification
mandatory: true
policy: |
All data products must be classified as:
- Public: Freely accessible within organization
- Internal: Restricted to employees
- Confidential: Restricted to specific roles
- Restricted: Requires explicit approval
- name: pii_handling
mandatory: true
policy: |
Data products containing PII must:
- Mark PII fields in schema
- Implement column-level encryption
- Enable audit logging
- Comply with GDPR/CCPA requirements
- name: data_retention
mandatory: true
policy: |
Data retention periods:
- Operational data: 7 years
- Analytical data: 3 years
- Logs: 1 year
- Deleted data: 30 days in trash
domain_policies:
sales:
data_quality:
- completeness: ">= 99%"
- accuracy: ">= 99.5%"
- freshness: "<= 24 hours"
access_control:
- default_access: internal
- pii_fields: [customer_email, customer_phone]
- approval_required: [customer_ssn]
finance:
data_quality:
- completeness: ">= 99.9%"
- accuracy: ">= 99.99%"
- freshness: "<= 1 hour"
access_control:
- default_access: confidential
- sox_compliance: true
- audit_all_access: true
automated_policies:
- name: schema_validation
enforcement: pre-publish
check: |
Schema must include:
- Primary key
- Column descriptions
- Data types
- Constraints
- name: quality_gates
enforcement: pre-publish
check: |
All quality checks must pass:
- No critical failures
- Warning threshold: <= 5%
- name: breaking_changes
enforcement: pre-publish
check: |
Breaking changes require:
- Major version increment
- 30-day deprecation notice
- Migration guide
observability_requirements:
- metrics:
- row_count
- null_rate
- distinct_count
- value_distribution
- alerts:
- freshness_violation
- quality_check_failure
- schema_drift
- volume_anomaly
Governance Implementation:
# governance_engine.py
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class PolicyViolationSeverity(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class PolicyViolation:
policy_name: str
severity: PolicyViolationSeverity
message: str
field: Optional[str] = None
class GovernanceEngine:
"""Automated governance enforcement"""
def __init__(self, policies: Dict):
self.policies = policies
def validate_data_product(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate data product against governance policies"""
violations = []
# Check data classification
violations.extend(self._check_data_classification(product_spec))
# Check PII handling
violations.extend(self._check_pii_compliance(product_spec))
# Check schema requirements
violations.extend(self._check_schema_requirements(product_spec))
# Check quality checks
violations.extend(self._check_quality_requirements(product_spec))
# Check retention policy
violations.extend(self._check_retention_policy(product_spec))
return violations
def _check_data_classification(self, product_spec: Dict) -> List[PolicyViolation]:
"""Verify data classification is set"""
violations = []
if 'classification' not in product_spec:
violations.append(PolicyViolation(
policy_name="data_classification",
severity=PolicyViolationSeverity.ERROR,
message="Data classification not specified"
))
valid_classifications = ['public', 'internal', 'confidential', 'restricted']
if product_spec.get('classification') not in valid_classifications:
violations.append(PolicyViolation(
policy_name="data_classification",
severity=PolicyViolationSeverity.ERROR,
message=f"Invalid classification. Must be one of: {valid_classifications}"
))
return violations
def _check_pii_compliance(self, product_spec: Dict) -> List[PolicyViolation]:
"""Check PII handling compliance"""
violations = []
schema = product_spec.get('schema', {})
pii_fields = [f for f in schema.get('fields', []) if f.get('is_pii')]
if pii_fields:
# Check encryption
if not product_spec.get('encryption_enabled'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.CRITICAL,
message="PII fields present but encryption not enabled"
))
# Check audit logging
if not product_spec.get('audit_logging_enabled'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.CRITICAL,
message="PII fields present but audit logging not enabled"
))
# Check field marking
for field in pii_fields:
if not field.get('pii_category'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.ERROR,
message=f"PII field {field['name']} missing pii_category",
field=field['name']
))
return violations
def _check_schema_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate schema completeness"""
violations = []
schema = product_spec.get('schema', {})
if not schema:
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.ERROR,
message="Schema not defined"
))
return violations
# Check for primary key
fields = schema.get('fields', [])
has_primary_key = any(f.get('is_primary_key') for f in fields)
if not has_primary_key:
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.WARNING,
message="No primary key defined"
))
# Check field documentation
for field in fields:
if not field.get('description'):
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.WARNING,
message=f"Field {field['name']} missing description",
field=field['name']
))
return violations
def _check_quality_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate quality check configuration"""
violations = []
quality_checks = product_spec.get('sla', {}).get('quality_checks', [])
if not quality_checks:
violations.append(PolicyViolation(
policy_name="quality_gates",
severity=PolicyViolationSeverity.WARNING,
message="No quality checks defined"
))
# Check for minimum required checks
check_names = [check['name'] for check in quality_checks]
required_checks = ['completeness', 'freshness']
missing_checks = set(required_checks) - set(check_names)
if missing_checks:
violations.append(PolicyViolation(
policy_name="quality_gates",
severity=PolicyViolationSeverity.WARNING,
message=f"Missing required quality checks: {missing_checks}"
))
return violations
def _check_retention_policy(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate retention policy"""
violations = []
if 'retention_days' not in product_spec:
violations.append(PolicyViolation(
policy_name="data_retention",
severity=PolicyViolationSeverity.ERROR,
message="Retention policy not specified"
))
return violations
def enforce_policies(self, violations: List[PolicyViolation]) -> bool:
"""Determine if data product can be published based on violations"""
# Block on ERROR or CRITICAL violations
blocking_violations = [
v for v in violations
if v.severity in [PolicyViolationSeverity.ERROR, PolicyViolationSeverity.CRITICAL]
]
return len(blocking_violations) == 0
def generate_compliance_report(self, product_id: str) -> Dict:
"""Generate compliance report for data product"""
return {
'product_id': product_id,
'compliance_status': 'compliant',
'last_checked': datetime.utcnow().isoformat(),
'policies_evaluated': len(self.policies),
'violations': []
}
Best Practices
1. Domain Design
- Align domains with organizational structure
- Clear bounded contexts for each domain
- Domain teams own their data end-to-end
- Cross-domain collaboration through well-defined interfaces
- Avoid centralized data teams; embed in domains
2. Data Product Design
- Treat data as a product with SLAs
- Document data contracts explicitly
- Version data products semantically
- Implement comprehensive quality checks
- Provide discoverability and self-service access
- Monitor data product health continuously
3. Platform Design
- Abstract infrastructure complexity
- Provide self-serve capabilities
- Automate repetitive tasks
- Enable domain autonomy
- Standardize common patterns
- Invest in developer experience
4. Governance
- Automate policy enforcement
- Make governance policies executable
- Balance autonomy with control
- Federate decisions to domains
- Global standards, local implementation
- Continuous compliance monitoring
5. Cultural Transformation
- Shift from centralized to federated model
- Build data literacy across organization
- Incentivize data product quality
- Foster collaboration between domains
- Celebrate data product owners
Anti-Patterns
1. Centralized Data Team
// Bad: Central data team owns all data
Central Team -> All domains (bottleneck)
// Good: Domain teams own their data
Sales Domain -> Sales data products
Marketing Domain -> Marketing data products
Product Domain -> Product data products
2. Monolithic Data Lake
// Bad: Single giant data lake
s3://data-lake/everything/
// Good: Domain-oriented storage
s3://data-products/sales/
s3://data-products/marketing/
s3://data-products/product/
3. No Data Contracts
// Bad: Undocumented schema changes
Breaking change deployed without notice
// Good: Versioned contracts with deprecation
v1: Deprecated (30 days notice)
v2: Current
v3: Beta
4. Manual Governance
// Bad: Manual approval processes
Email -> Ticket -> Manual review -> Access granted (weeks)
// Good: Automated governance
Request -> Policy check -> Auto-approval (minutes)
Resources
GitHub Repository
Related Skills
sparc-methodology
DevelopmentThe SPARC methodology provides a systematic development framework with 17 specialized modes for comprehensive software development from specification to completion. It integrates multi-agent orchestration to handle complex development workflows including architecture design, testing, and deployment. Use this skill when you need structured guidance throughout the entire development lifecycle with automated agent coordination.
sparc-methodology
DevelopmentThe SPARC methodology provides a systematic, multi-agent framework for comprehensive software development. It structures projects through five phases—Specification, Pseudocode, Architecture, Refinement, and Completion—with 17 specialized modes. Developers should use this skill when they need an orchestrated, end-to-end workflow from initial research to deployment and monitoring.
sparc-methodology
DevelopmentThe SPARC methodology provides a systematic development framework using Specification, Pseudocode, Architecture, Refinement, and Completion phases. It integrates multi-agent orchestration with 17 specialized modes to guide projects from initial research through deployment. Use this skill when you need a structured approach to software development with built-in TDD workflows and architectural planning.
github-multi-repo
OtherThis skill enables AI swarm orchestration for managing multiple GitHub repositories simultaneously. It provides cross-repository coordination, package synchronization, and architecture optimization for distributed development workflows. Use it when you need to automate organization-wide repository management, template propagation, or multi-project synchronization.
