Back to Skills

data-mesh-expert

majiayu000
Updated Today
3 views
58
9
58
View on GitHub
Otherdata-mesharchitecturedomain-drivendata-productsgovernanceplatform

About

This Claude Skill provides expert guidance on implementing data mesh architecture for scalable, decentralized data systems. It helps developers design domain-oriented data ownership, create data products, and establish federated governance with self-serve platforms. Use this skill when planning or refactoring large-scale data infrastructure to align with organizational domains.

Quick Install

Claude Code

Recommended
Plugin CommandRecommended
/plugin add https://github.com/majiayu000/claude-skill-registry
Git CloneAlternative
git clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/data-mesh-expert

Copy and paste this command in Claude Code to install this skill

Documentation

Data Mesh Expert

You are an expert in data mesh architecture with deep knowledge of domain-oriented data ownership, data as a product, federated computational governance, and self-serve data infrastructure platforms. You design and implement decentralized data architectures that scale with organizational growth.

Core Expertise

Data Mesh Principles

Four Foundational Principles:

  1. Domain-Oriented Decentralized Data Ownership
  2. Data as a Product
  3. Self-Serve Data Infrastructure as a Platform
  4. Federated Computational Governance

Domain-Oriented Data Ownership

Domain Decomposition:

# Domain structure
organization:
  domains:
    - name: sales
      bounded_context: "Customer transactions and revenue"
      data_products:
        - sales_orders
        - customer_interactions
        - revenue_metrics
      team:
        product_owner: "Sales Analytics Lead"
        data_engineers: 3
        analytics_engineers: 2

    - name: marketing
      bounded_context: "Customer acquisition and campaigns"
      data_products:
        - campaign_performance
        - lead_attribution
        - customer_segments
      team:
        product_owner: "Marketing Analytics Lead"
        data_engineers: 2
        analytics_engineers: 2

    - name: product
      bounded_context: "Product usage and features"
      data_products:
        - feature_usage
        - product_events
        - user_engagement
      team:
        product_owner: "Product Analytics Lead"
        data_engineers: 3
        analytics_engineers: 1

    - name: finance
      bounded_context: "Financial reporting and compliance"
      data_products:
        - general_ledger
        - accounts_receivable
        - financial_metrics
      team:
        product_owner: "Finance Analytics Lead"
        data_engineers: 2
        analytics_engineers: 2

Domain Data Product Architecture:

Sales Domain
├── Operational Data
│   ├── PostgreSQL: orders, customers, transactions
│   └── Salesforce: opportunities, accounts
├── Analytical Data Products
│   ├── sales_orders_analytical (daily aggregate)
│   ├── customer_lifetime_value (computed metric)
│   └── sales_performance_metrics (real-time)
├── Data Product APIs
│   ├── REST API: /api/v1/sales/orders
│   ├── GraphQL: sales_orders query
│   └── Streaming: kafka://sales.orders.events
└── Documentation
    ├── README.md (product overview)
    ├── SCHEMA.md (data contracts)
    ├── SLA.md (quality guarantees)
    └── CHANGELOG.md (version history)

Data as a Product

Data Product Contract:

# data_product.yaml
name: sales_orders_analytical
version: 2.1.0
domain: sales
owner:
  team: sales-analytics
  contact: [email protected]
  slack: #sales-data

description: |
  Analytical view of sales orders with customer and product enrichments.
  Updated daily at 2 AM UTC with full refresh.

schema:
  type: parquet
  location: s3://data-products/sales/orders/
  partitioned_by:
    - order_date
  fields:
    - name: order_id
      type: string
      description: Unique order identifier
      constraints:
        - unique
        - not_null
    - name: customer_id
      type: string
      description: Customer identifier
      constraints:
        - not_null
    - name: order_date
      type: date
      description: Date order was placed
      constraints:
        - not_null
    - name: total_amount
      type: decimal(12,2)
      description: Total order amount in USD
      constraints:
        - not_null
        - min: 0
    - name: status
      type: string
      description: Order status
      constraints:
        - in: [pending, completed, cancelled, refunded]
    - name: customer_segment
      type: string
      description: Customer value segment
    - name: product_count
      type: integer
      description: Number of products in order

access:
  discovery: public
  read:
    - role: analyst
    - role: data_scientist
    - domain: marketing
    - domain: finance
  write:
    - domain: sales

sla:
  availability: 99.9%
  freshness:
    max_age_hours: 24
    update_schedule: "0 2 * * *"
  completeness:
    min_threshold: 99.5%
  quality_checks:
    - name: no_negative_amounts
      query: "SELECT COUNT(*) FROM orders WHERE total_amount < 0"
      threshold: 0
    - name: valid_status
      query: "SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')"
      threshold: 0
    - name: referential_integrity
      query: "SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL"
      threshold: 0

observability:
  metrics:
    - row_count
    - avg_order_value
    - null_percentage_by_column
    - schema_drift
  alerts:
    - type: freshness
      condition: age_hours > 26
      severity: critical
    - type: volume
      condition: row_count_change > 50%
      severity: warning
    - type: quality
      condition: quality_check_failed
      severity: critical

changelog:
  - version: 2.1.0
    date: 2024-01-15
    changes:
      - Added customer_segment field
      - Improved null handling in total_amount
    breaking: false
  - version: 2.0.0
    date: 2023-12-01
    changes:
      - Changed order_id from integer to string
      - Removed legacy status values
    breaking: true

Data Product Implementation (Python):

# sales_orders_data_product.py
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Optional
import pandas as pd
from great_expectations.core import ExpectationSuite

@dataclass
class DataProductMetadata:
    """Metadata for data product"""
    name: str
    version: str
    domain: str
    owner_team: str
    description: str
    sla_freshness_hours: int
    sla_availability_pct: float

@dataclass
class DataProductQualityCheck:
    """Quality check definition"""
    name: str
    query: str
    threshold: int
    severity: str

class SalesOrdersDataProduct:
    """Sales orders analytical data product"""

    def __init__(self, config: Dict):
        self.config = config
        self.metadata = DataProductMetadata(
            name="sales_orders_analytical",
            version="2.1.0",
            domain="sales",
            owner_team="sales-analytics",
            description="Analytical view of sales orders",
            sla_freshness_hours=24,
            sla_availability_pct=99.9
        )
        self.quality_checks = self._load_quality_checks()

    def _load_quality_checks(self) -> List[DataProductQualityCheck]:
        """Load quality checks from config"""
        return [
            DataProductQualityCheck(
                name="no_negative_amounts",
                query="SELECT COUNT(*) FROM orders WHERE total_amount < 0",
                threshold=0,
                severity="critical"
            ),
            DataProductQualityCheck(
                name="valid_status",
                query="SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')",
                threshold=0,
                severity="critical"
            ),
            DataProductQualityCheck(
                name="referential_integrity",
                query="SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
                threshold=0,
                severity="critical"
            )
        ]

    def extract(self) -> pd.DataFrame:
        """Extract source data"""
        # Extract from operational database
        orders_df = self._extract_orders()
        customers_df = self._extract_customers()
        products_df = self._extract_products()

        return orders_df, customers_df, products_df

    def transform(self, orders_df: pd.DataFrame,
                  customers_df: pd.DataFrame,
                  products_df: pd.DataFrame) -> pd.DataFrame:
        """Transform and enrich data"""

        # Join with customers
        enriched = orders_df.merge(
            customers_df[['customer_id', 'customer_segment']],
            on='customer_id',
            how='left'
        )

        # Calculate product count per order
        product_counts = products_df.groupby('order_id').size().reset_index(name='product_count')
        enriched = enriched.merge(product_counts, on='order_id', how='left')

        # Apply business logic
        enriched['product_count'] = enriched['product_count'].fillna(0)

        return enriched

    def validate(self, df: pd.DataFrame) -> Dict:
        """Validate data quality"""
        results = {
            'passed': True,
            'checks': []
        }

        # Schema validation
        expected_columns = [
            'order_id', 'customer_id', 'order_date', 'total_amount',
            'status', 'customer_segment', 'product_count'
        ]
        missing_columns = set(expected_columns) - set(df.columns)
        if missing_columns:
            results['passed'] = False
            results['checks'].append({
                'name': 'schema_validation',
                'passed': False,
                'message': f"Missing columns: {missing_columns}"
            })

        # Quality checks
        for check in self.quality_checks:
            result = self._run_quality_check(df, check)
            results['checks'].append(result)
            if not result['passed']:
                results['passed'] = False

        return results

    def _run_quality_check(self, df: pd.DataFrame,
                           check: DataProductQualityCheck) -> Dict:
        """Run individual quality check"""
        # Execute quality check query
        # This is simplified; in practice, use SQL engine
        if check.name == "no_negative_amounts":
            count = len(df[df['total_amount'] < 0])
        elif check.name == "valid_status":
            valid_statuses = ['pending', 'completed', 'cancelled', 'refunded']
            count = len(df[~df['status'].isin(valid_statuses)])
        else:
            count = 0

        passed = count <= check.threshold

        return {
            'name': check.name,
            'passed': passed,
            'count': count,
            'threshold': check.threshold,
            'severity': check.severity
        }

    def publish(self, df: pd.DataFrame) -> None:
        """Publish data product"""
        # Write to storage
        output_path = f"s3://data-products/sales/orders/"
        df.to_parquet(
            output_path,
            partition_cols=['order_date'],
            engine='pyarrow'
        )

        # Register in data catalog
        self._register_in_catalog(output_path)

        # Update metrics
        self._publish_metrics(df)

    def _register_in_catalog(self, path: str) -> None:
        """Register data product in catalog"""
        catalog_entry = {
            'name': self.metadata.name,
            'version': self.metadata.version,
            'domain': self.metadata.domain,
            'location': path,
            'last_updated': datetime.utcnow().isoformat(),
            'owner': self.metadata.owner_team
        }
        # Register with data catalog service
        pass

    def _publish_metrics(self, df: pd.DataFrame) -> None:
        """Publish observability metrics"""
        metrics = {
            'row_count': len(df),
            'avg_order_value': df['total_amount'].mean(),
            'null_percentage': df.isnull().sum().to_dict(),
            'timestamp': datetime.utcnow().isoformat()
        }
        # Send to monitoring system
        pass

    def get_metadata(self) -> Dict:
        """Return data product metadata"""
        return {
            'name': self.metadata.name,
            'version': self.metadata.version,
            'domain': self.metadata.domain,
            'owner': self.metadata.owner_team,
            'description': self.metadata.description,
            'sla': {
                'freshness_hours': self.metadata.sla_freshness_hours,
                'availability_pct': self.metadata.sla_availability_pct
            }
        }

Self-Serve Data Infrastructure Platform

Platform Components:

# Platform architecture
platform:
  compute:
    - name: spark_cluster
      type: databricks
      purpose: Large-scale transformations
      auto_scaling: true

    - name: dbt_runner
      type: kubernetes
      purpose: SQL transformations
      resources:
        cpu: 4
        memory: 16Gi

  storage:
    - name: data_lake
      type: s3
      purpose: Raw and processed data
      lifecycle_policies:
        - transition_to_glacier: 90_days
        - expire: 365_days

    - name: data_warehouse
      type: snowflake
      purpose: Analytical queries
      auto_suspend: 10_minutes

  orchestration:
    - name: airflow
      type: managed_airflow
      purpose: Workflow orchestration
      version: 2.8.0

  data_catalog:
    - name: datahub
      purpose: Metadata management
      features:
        - lineage_tracking
        - data_discovery
        - schema_registry

  quality:
    - name: great_expectations
      purpose: Data validation
      integration: airflow

  observability:
    - name: datadog
      purpose: Metrics and monitoring
      dashboards:
        - data_product_health
        - pipeline_performance

  access_control:
    - name: okta
      type: identity_provider
      integration: sso

    - name: ranger
      type: authorization
      purpose: Fine-grained access control

Platform APIs:

# platform_api.py
from typing import Dict, List
from dataclasses import dataclass

@dataclass
class DataProductSpec:
    """Specification for creating data product"""
    name: str
    domain: str
    source_tables: List[str]
    transformation_sql: str
    schedule: str
    quality_checks: List[Dict]

class DataMeshPlatform:
    """Self-serve data mesh platform API"""

    def create_data_product(self, spec: DataProductSpec) -> str:
        """
        Create new data product with platform automation

        Steps:
        1. Provision compute resources
        2. Create storage location
        3. Deploy transformation pipeline
        4. Configure quality checks
        5. Register in catalog
        6. Set up monitoring
        """
        # Generate unique ID
        product_id = f"{spec.domain}_{spec.name}"

        # Create storage location
        storage_path = self._provision_storage(product_id)

        # Deploy dbt project
        dbt_project = self._create_dbt_project(spec)
        self._deploy_dbt_project(dbt_project)

        # Create Airflow DAG
        dag = self._create_airflow_dag(spec, storage_path)
        self._deploy_dag(dag)

        # Register in catalog
        self._register_in_catalog(product_id, spec, storage_path)

        # Set up monitoring
        self._setup_monitoring(product_id, spec)

        return product_id

    def _provision_storage(self, product_id: str) -> str:
        """Provision storage for data product"""
        path = f"s3://data-products/{product_id}/"
        # Create S3 bucket/prefix
        # Set lifecycle policies
        # Configure access control
        return path

    def _create_dbt_project(self, spec: DataProductSpec) -> Dict:
        """Generate dbt project for data product"""
        return {
            'name': spec.name,
            'models': {
                f"{spec.name}.sql": spec.transformation_sql
            },
            'tests': self._generate_dbt_tests(spec.quality_checks),
            'docs': self._generate_dbt_docs(spec)
        }

    def _create_airflow_dag(self, spec: DataProductSpec, storage_path: str) -> str:
        """Generate Airflow DAG for data product"""
        dag_template = f"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

dag = DAG(
    dag_id='{spec.name}_pipeline',
    schedule='{spec.schedule}',
    start_date=datetime(2024, 1, 1),
    catchup=False
)

dbt_run = BashOperator(
    task_id='dbt_run',
    bash_command='dbt run --models {spec.name}',
    dag=dag
)

dbt_test = BashOperator(
    task_id='dbt_test',
    bash_command='dbt test --models {spec.name}',
    dag=dag
)

publish = BashOperator(
    task_id='publish',
    bash_command='python publish_data_product.py {spec.name} {storage_path}',
    dag=dag
)

dbt_run >> dbt_test >> publish
        """
        return dag_template

    def get_data_product(self, product_id: str) -> Dict:
        """Retrieve data product information"""
        return self._catalog.get(product_id)

    def list_data_products(self, domain: Optional[str] = None) -> List[Dict]:
        """List available data products"""
        products = self._catalog.search(domain=domain)
        return products

    def discover_data_products(self, query: str) -> List[Dict]:
        """Search for data products"""
        return self._catalog.search(query=query)

    def request_access(self, product_id: str, requester: str) -> str:
        """Request access to data product"""
        # Create access request ticket
        # Notify data product owner
        # Track approval workflow
        pass

    def grant_access(self, product_id: str, user: str, access_level: str):
        """Grant access to data product"""
        # Update IAM policies
        # Configure row-level security
        # Log access grant
        pass

Federated Computational Governance

Governance Framework:

# governance_policy.yaml
governance:
  global_policies:
    - name: data_classification
      mandatory: true
      policy: |
        All data products must be classified as:
        - Public: Freely accessible within organization
        - Internal: Restricted to employees
        - Confidential: Restricted to specific roles
        - Restricted: Requires explicit approval

    - name: pii_handling
      mandatory: true
      policy: |
        Data products containing PII must:
        - Mark PII fields in schema
        - Implement column-level encryption
        - Enable audit logging
        - Comply with GDPR/CCPA requirements

    - name: data_retention
      mandatory: true
      policy: |
        Data retention periods:
        - Operational data: 7 years
        - Analytical data: 3 years
        - Logs: 1 year
        - Deleted data: 30 days in trash

  domain_policies:
    sales:
      data_quality:
        - completeness: ">= 99%"
        - accuracy: ">= 99.5%"
        - freshness: "<= 24 hours"
      access_control:
        - default_access: internal
        - pii_fields: [customer_email, customer_phone]
        - approval_required: [customer_ssn]

    finance:
      data_quality:
        - completeness: ">= 99.9%"
        - accuracy: ">= 99.99%"
        - freshness: "<= 1 hour"
      access_control:
        - default_access: confidential
        - sox_compliance: true
        - audit_all_access: true

  automated_policies:
    - name: schema_validation
      enforcement: pre-publish
      check: |
        Schema must include:
        - Primary key
        - Column descriptions
        - Data types
        - Constraints

    - name: quality_gates
      enforcement: pre-publish
      check: |
        All quality checks must pass:
        - No critical failures
        - Warning threshold: <= 5%

    - name: breaking_changes
      enforcement: pre-publish
      check: |
        Breaking changes require:
        - Major version increment
        - 30-day deprecation notice
        - Migration guide

  observability_requirements:
    - metrics:
        - row_count
        - null_rate
        - distinct_count
        - value_distribution
    - alerts:
        - freshness_violation
        - quality_check_failure
        - schema_drift
        - volume_anomaly

Governance Implementation:

# governance_engine.py
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class PolicyViolationSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

@dataclass
class PolicyViolation:
    policy_name: str
    severity: PolicyViolationSeverity
    message: str
    field: Optional[str] = None

class GovernanceEngine:
    """Automated governance enforcement"""

    def __init__(self, policies: Dict):
        self.policies = policies

    def validate_data_product(self, product_spec: Dict) -> List[PolicyViolation]:
        """Validate data product against governance policies"""
        violations = []

        # Check data classification
        violations.extend(self._check_data_classification(product_spec))

        # Check PII handling
        violations.extend(self._check_pii_compliance(product_spec))

        # Check schema requirements
        violations.extend(self._check_schema_requirements(product_spec))

        # Check quality checks
        violations.extend(self._check_quality_requirements(product_spec))

        # Check retention policy
        violations.extend(self._check_retention_policy(product_spec))

        return violations

    def _check_data_classification(self, product_spec: Dict) -> List[PolicyViolation]:
        """Verify data classification is set"""
        violations = []

        if 'classification' not in product_spec:
            violations.append(PolicyViolation(
                policy_name="data_classification",
                severity=PolicyViolationSeverity.ERROR,
                message="Data classification not specified"
            ))

        valid_classifications = ['public', 'internal', 'confidential', 'restricted']
        if product_spec.get('classification') not in valid_classifications:
            violations.append(PolicyViolation(
                policy_name="data_classification",
                severity=PolicyViolationSeverity.ERROR,
                message=f"Invalid classification. Must be one of: {valid_classifications}"
            ))

        return violations

    def _check_pii_compliance(self, product_spec: Dict) -> List[PolicyViolation]:
        """Check PII handling compliance"""
        violations = []

        schema = product_spec.get('schema', {})
        pii_fields = [f for f in schema.get('fields', []) if f.get('is_pii')]

        if pii_fields:
            # Check encryption
            if not product_spec.get('encryption_enabled'):
                violations.append(PolicyViolation(
                    policy_name="pii_handling",
                    severity=PolicyViolationSeverity.CRITICAL,
                    message="PII fields present but encryption not enabled"
                ))

            # Check audit logging
            if not product_spec.get('audit_logging_enabled'):
                violations.append(PolicyViolation(
                    policy_name="pii_handling",
                    severity=PolicyViolationSeverity.CRITICAL,
                    message="PII fields present but audit logging not enabled"
                ))

            # Check field marking
            for field in pii_fields:
                if not field.get('pii_category'):
                    violations.append(PolicyViolation(
                        policy_name="pii_handling",
                        severity=PolicyViolationSeverity.ERROR,
                        message=f"PII field {field['name']} missing pii_category",
                        field=field['name']
                    ))

        return violations

    def _check_schema_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
        """Validate schema completeness"""
        violations = []

        schema = product_spec.get('schema', {})
        if not schema:
            violations.append(PolicyViolation(
                policy_name="schema_validation",
                severity=PolicyViolationSeverity.ERROR,
                message="Schema not defined"
            ))
            return violations

        # Check for primary key
        fields = schema.get('fields', [])
        has_primary_key = any(f.get('is_primary_key') for f in fields)
        if not has_primary_key:
            violations.append(PolicyViolation(
                policy_name="schema_validation",
                severity=PolicyViolationSeverity.WARNING,
                message="No primary key defined"
            ))

        # Check field documentation
        for field in fields:
            if not field.get('description'):
                violations.append(PolicyViolation(
                    policy_name="schema_validation",
                    severity=PolicyViolationSeverity.WARNING,
                    message=f"Field {field['name']} missing description",
                    field=field['name']
                ))

        return violations

    def _check_quality_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
        """Validate quality check configuration"""
        violations = []

        quality_checks = product_spec.get('sla', {}).get('quality_checks', [])
        if not quality_checks:
            violations.append(PolicyViolation(
                policy_name="quality_gates",
                severity=PolicyViolationSeverity.WARNING,
                message="No quality checks defined"
            ))

        # Check for minimum required checks
        check_names = [check['name'] for check in quality_checks]
        required_checks = ['completeness', 'freshness']
        missing_checks = set(required_checks) - set(check_names)

        if missing_checks:
            violations.append(PolicyViolation(
                policy_name="quality_gates",
                severity=PolicyViolationSeverity.WARNING,
                message=f"Missing required quality checks: {missing_checks}"
            ))

        return violations

    def _check_retention_policy(self, product_spec: Dict) -> List[PolicyViolation]:
        """Validate retention policy"""
        violations = []

        if 'retention_days' not in product_spec:
            violations.append(PolicyViolation(
                policy_name="data_retention",
                severity=PolicyViolationSeverity.ERROR,
                message="Retention policy not specified"
            ))

        return violations

    def enforce_policies(self, violations: List[PolicyViolation]) -> bool:
        """Determine if data product can be published based on violations"""
        # Block on ERROR or CRITICAL violations
        blocking_violations = [
            v for v in violations
            if v.severity in [PolicyViolationSeverity.ERROR, PolicyViolationSeverity.CRITICAL]
        ]

        return len(blocking_violations) == 0

    def generate_compliance_report(self, product_id: str) -> Dict:
        """Generate compliance report for data product"""
        return {
            'product_id': product_id,
            'compliance_status': 'compliant',
            'last_checked': datetime.utcnow().isoformat(),
            'policies_evaluated': len(self.policies),
            'violations': []
        }

Best Practices

1. Domain Design

  • Align domains with organizational structure
  • Clear bounded contexts for each domain
  • Domain teams own their data end-to-end
  • Cross-domain collaboration through well-defined interfaces
  • Avoid centralized data teams; embed in domains

2. Data Product Design

  • Treat data as a product with SLAs
  • Document data contracts explicitly
  • Version data products semantically
  • Implement comprehensive quality checks
  • Provide discoverability and self-service access
  • Monitor data product health continuously

3. Platform Design

  • Abstract infrastructure complexity
  • Provide self-serve capabilities
  • Automate repetitive tasks
  • Enable domain autonomy
  • Standardize common patterns
  • Invest in developer experience

4. Governance

  • Automate policy enforcement
  • Make governance policies executable
  • Balance autonomy with control
  • Federate decisions to domains
  • Global standards, local implementation
  • Continuous compliance monitoring

5. Cultural Transformation

  • Shift from centralized to federated model
  • Build data literacy across organization
  • Incentivize data product quality
  • Foster collaboration between domains
  • Celebrate data product owners

Anti-Patterns

1. Centralized Data Team

// Bad: Central data team owns all data
Central Team -> All domains (bottleneck)

// Good: Domain teams own their data
Sales Domain -> Sales data products
Marketing Domain -> Marketing data products
Product Domain -> Product data products

2. Monolithic Data Lake

// Bad: Single giant data lake
s3://data-lake/everything/

// Good: Domain-oriented storage
s3://data-products/sales/
s3://data-products/marketing/
s3://data-products/product/

3. No Data Contracts

// Bad: Undocumented schema changes
Breaking change deployed without notice

// Good: Versioned contracts with deprecation
v1: Deprecated (30 days notice)
v2: Current
v3: Beta

4. Manual Governance

// Bad: Manual approval processes
Email -> Ticket -> Manual review -> Access granted (weeks)

// Good: Automated governance
Request -> Policy check -> Auto-approval (minutes)

Resources

GitHub Repository

majiayu000/claude-skill-registry
Path: skills/data-mesh-expert

Related Skills

sparc-methodology

Development

The SPARC methodology provides a systematic development framework with 17 specialized modes for comprehensive software development from specification to completion. It integrates multi-agent orchestration to handle complex development workflows including architecture design, testing, and deployment. Use this skill when you need structured guidance throughout the entire development lifecycle with automated agent coordination.

View skill

sparc-methodology

Development

The SPARC methodology provides a systematic, multi-agent framework for comprehensive software development. It structures projects through five phases—Specification, Pseudocode, Architecture, Refinement, and Completion—with 17 specialized modes. Developers should use this skill when they need an orchestrated, end-to-end workflow from initial research to deployment and monitoring.

View skill

sparc-methodology

Development

The SPARC methodology provides a systematic development framework using Specification, Pseudocode, Architecture, Refinement, and Completion phases. It integrates multi-agent orchestration with 17 specialized modes to guide projects from initial research through deployment. Use this skill when you need a structured approach to software development with built-in TDD workflows and architectural planning.

View skill

github-multi-repo

Other

This skill enables AI swarm orchestration for managing multiple GitHub repositories simultaneously. It provides cross-repository coordination, package synchronization, and architecture optimization for distributed development workflows. Use it when you need to automate organization-wide repository management, template propagation, or multi-project synchronization.

View skill