MCP HubMCP Hub
スキル一覧に戻る

community-feed

majiayu000
更新日 Yesterday
18 閲覧
58
9
58
GitHubで表示
ドキュメントdata

について

コミュニティフィードスキルは、無限スクロールアプリケーション向けにカーソルページネーションとトレンドアルゴリズムを備えた効率的なソーシャルフィードを実装します。N+1問題を防ぐバッチ読み込みによる最適化されたデータベースクエリを提供し、エンゲージメント追跡システムを含みます。いいねシステムと人気コンテンツランキングを備えたスケーラブルなフィードが必要なソーシャルプラットフォーム構築時にご利用ください。

クイックインストール

Claude Code

推奨
プラグインコマンド推奨
/plugin add https://github.com/majiayu000/claude-skill-registry
Git クローン代替
git clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/community-feed

このコマンドをClaude Codeにコピー&ペーストしてスキルをインストールします

ドキュメント

Community Feed

Social feed with trending algorithms, cursor pagination, and engagement tracking.

When to Use This Skill

  • Building social feeds with infinite scroll
  • Need trending/hot content algorithms
  • Implementing like/engagement systems
  • Want efficient pagination for large datasets

Core Concepts

Cursor pagination beats offset for large datasets. Batch-load relationships to avoid N+1. Store trending scores as computed columns for efficient sorting.

Implementation

Python

from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, List, Dict
import base64
import json


@dataclass
class PaginatedPosts:
    posts: List[Dict]
    total_count: int
    has_more: bool
    next_cursor: Optional[str]


class CommunityFeedService:
    """Service for community feed with cursor pagination."""

    def __init__(self, db):
        self.db = db

    async def get_feed(
        self,
        feed_type: str = "trending",
        viewer_id: Optional[str] = None,
        cursor: Optional[str] = None,
        limit: int = 20,
        tags: Optional[List[str]] = None,
    ) -> PaginatedPosts:
        cursor_data = self._parse_cursor(cursor) if cursor else None

        query = self.db.table("community_posts").select(
            "*",
            "assets!inner(url, asset_type)",
            "users!inner(id, display_name, avatar_url)",
        )

        if tags:
            query = query.contains("tags", tags)

        # Feed-specific ordering
        if feed_type == "following" and viewer_id:
            following = await self._get_following_ids(viewer_id)
            if not following:
                return PaginatedPosts(posts=[], total_count=0, has_more=False, next_cursor=None)
            query = query.in_("user_id", following)

        if feed_type == "trending":
            query = query.order("trending_score", desc=True)
            if cursor_data:
                query = query.lt("trending_score", cursor_data["score"])
        else:
            query = query.order("created_at", desc=True)
            if cursor_data:
                query = query.lt("created_at", cursor_data["created_at"])

        # Fetch one extra to check has_more
        query = query.limit(limit + 1)
        result = query.execute()
        posts = result.data or []

        has_more = len(posts) > limit
        if has_more:
            posts = posts[:limit]

        # Batch load viewer's likes
        if viewer_id and posts:
            liked_ids = await self._get_liked_post_ids(viewer_id, [p["id"] for p in posts])
            for post in posts:
                post["is_liked_by_viewer"] = post["id"] in liked_ids

        next_cursor = None
        if has_more and posts:
            next_cursor = self._generate_cursor(posts[-1], feed_type)

        return PaginatedPosts(
            posts=posts,
            total_count=await self._get_total_count(tags),
            has_more=has_more,
            next_cursor=next_cursor,
        )

    async def _get_following_ids(self, user_id: str) -> List[str]:
        result = self.db.table("user_follows").select("following_id").eq("follower_id", user_id).execute()
        return [r["following_id"] for r in (result.data or [])]

    async def _get_liked_post_ids(self, user_id: str, post_ids: List[str]) -> set:
        result = self.db.table("post_likes").select("post_id").eq("user_id", user_id).in_("post_id", post_ids).execute()
        return {r["post_id"] for r in (result.data or [])}

    def _parse_cursor(self, cursor: str) -> dict:
        try:
            return json.loads(base64.b64decode(cursor).decode())
        except:
            return {}

    def _generate_cursor(self, post: dict, feed_type: str) -> str:
        if feed_type == "trending":
            data = {"score": post.get("trending_score", 0)}
        else:
            data = {"created_at": post["created_at"]}
        return base64.b64encode(json.dumps(data).encode()).decode()
# Engagement operations
async def like_post(self, post_id: str, user_id: str) -> bool:
    existing = self.db.table("post_likes").select("id").eq("post_id", post_id).eq("user_id", user_id).execute()
    if existing.data:
        return False  # Already liked

    self.db.table("post_likes").insert({
        "post_id": post_id,
        "user_id": user_id,
        "created_at": datetime.now(timezone.utc).isoformat(),
    }).execute()

    # Atomic increment
    self.db.rpc("increment_like_count", {"post_id": post_id}).execute()
    return True

async def unlike_post(self, post_id: str, user_id: str) -> bool:
    result = self.db.table("post_likes").delete().eq("post_id", post_id).eq("user_id", user_id).execute()
    if not result.data:
        return False

    self.db.rpc("decrement_like_count", {"post_id": post_id}).execute()
    return True


# Trending algorithm
def calculate_trending_score(
    like_count: int,
    comment_count: int,
    view_count: int,
    created_at: datetime,
    is_featured: bool = False,
) -> float:
    """
    Trending score = engagement / age^decay
    Higher engagement + newer = higher score
    """
    engagement = like_count * 1.0 + comment_count * 2.0 + view_count * 0.1
    age_hours = max((datetime.now(timezone.utc) - created_at).total_seconds() / 3600, 0.1)
    score = engagement / (age_hours ** 1.5)
    return score * 1.5 if is_featured else score

SQL Schema

CREATE TABLE community_posts (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    title VARCHAR(200) NOT NULL,
    like_count INTEGER DEFAULT 0,
    comment_count INTEGER DEFAULT 0,
    view_count INTEGER DEFAULT 0,
    is_featured BOOLEAN DEFAULT FALSE,
    tags TEXT[] DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    -- Computed trending score
    trending_score FLOAT GENERATED ALWAYS AS (
        (like_count + comment_count * 2 + view_count * 0.1) / 
        POWER(GREATEST(EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600, 0.1), 1.5)
    ) STORED
);

CREATE INDEX idx_posts_trending ON community_posts(trending_score DESC);
CREATE INDEX idx_posts_created ON community_posts(created_at DESC);

-- Atomic increment functions
CREATE FUNCTION increment_like_count(post_id UUID) RETURNS VOID AS $$
BEGIN UPDATE community_posts SET like_count = like_count + 1 WHERE id = post_id; END;
$$ LANGUAGE plpgsql;

Usage Examples

feed_service = CommunityFeedService(db)

# Get trending feed
result = await feed_service.get_feed(feed_type="trending", viewer_id="user_123", limit=20)
for post in result.posts:
    print(f"{post['title']} - {post['like_count']} likes")

# Load next page
if result.has_more:
    next_page = await feed_service.get_feed(feed_type="trending", cursor=result.next_cursor)

# Like a post
await feed_service.like_post("post_456", "user_123")

Best Practices

  1. Use cursor pagination over offset for large datasets
  2. Batch-load relationships to avoid N+1 queries
  3. Store trending score as computed column for efficient sorting
  4. Use atomic database functions for counter updates
  5. Cache total counts (expensive to compute)

Common Mistakes

  • Using offset pagination (slow for large offsets)
  • N+1 queries for author/like data
  • Computing trending score on every query
  • Non-atomic counter updates (race conditions)

Related Patterns

  • analytics-pipeline (event tracking)
  • intelligent-cache (caching feeds)

GitHub リポジトリ

majiayu000/claude-skill-registry
パス: skills/community-feed

関連スキル

content-collections

メタ

This skill provides a production-tested setup for Content Collections, a TypeScript-first tool that transforms Markdown/MDX files into type-safe data collections with Zod validation. Use it when building blogs, documentation sites, or content-heavy Vite + React applications to ensure type safety and automatic content validation. It covers everything from Vite plugin configuration and MDX compilation to deployment optimization and schema validation.

スキルを見る

polymarket

メタ

This skill enables developers to build applications with the Polymarket prediction markets platform, including API integration for trading and market data. It also provides real-time data streaming via WebSocket to monitor live trades and market activity. Use it for implementing trading strategies or creating tools that process live market updates.

スキルを見る

hybrid-cloud-networking

メタ

This skill configures secure hybrid cloud networking between on-premises infrastructure and cloud platforms like AWS, Azure, and GCP. Use it when connecting data centers to the cloud, building hybrid architectures, or implementing secure cross-premises connectivity. It supports key capabilities such as VPNs and dedicated connections like AWS Direct Connect for high-performance, reliable setups.

スキルを見る

llamaindex

メタ

LlamaIndex is a data framework for building RAG-powered LLM applications, specializing in document ingestion, indexing, and querying. It provides key features like vector indices, query engines, and agents, and supports over 300 data connectors. Use it for document Q&A, chatbots, and knowledge retrieval when building data-centric applications.

スキルを見る