Building an Intelligent Financial Score Classification System with Graphiti

In the world of fintech applications, understanding and classifying user financial behavior is crucial for making informed lending decisions, providing personalized recommendations, and managing risk. Traditional approaches often rely on static credit scores and limited data points. But what if we could build a dynamic, intelligent system that learns from user interactions and evolves over time?

Enter Graphiti - a revolutionary knowledge graph framework designed for building real-time, dynamically updating knowledge graphs for AI agents. In this comprehensive tutorial, we'll explore how to leverage Graphiti to create a sophisticated financial score classification system using TypeScript, Python, and Google PubSub.

What is Graphiti?

Graphiti is an advanced knowledge graph framework that addresses the challenges of dynamic and frequently updated datasets. Unlike traditional knowledge graphs that require complete rebuilds for updates, Graphiti provides:

Architecture Overview

Our financial score classification system will consist of three main components:

  1. Python API Service: A FastAPI-based service that manages the Graphiti knowledge graph
  2. TypeScript Client: A frontend service that interacts with users and sends data
  3. PubSub Integration: Google Cloud PubSub for asynchronous message processing
graph TD
    A[User Interaction] --> B[TypeScript Client]
    B --> C[PubSub Topic]
    C --> D[Python API Service]
    D --> E[Graphiti Knowledge Graph]
    E --> F[Financial Score Classification]
    F --> G[Real-time Insights]

Setting Up the Python API Service

Let's start by building our Python API service that will manage the Graphiti knowledge graph. This service will handle financial transactions, user behaviors, and provide intelligent scoring capabilities.

Installation and Dependencies

First, create a new Python project and install the required dependencies:

pip install fastapi uvicorn python-dotenv graphiti-core google-cloud-pubsub

Core API Implementation

"""
Financial Score Classification API with Graphiti

A comprehensive FastAPI service for building intelligent financial scoring
using Graphiti knowledge graphs.
"""

import os
import logging
import json
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any, Literal
from contextlib import asynccontextmanager
from enum import Enum

from fastapi import FastAPI, HTTPException, Depends, Security, status, Request, BackgroundTasks
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from dotenv import load_dotenv
from google.cloud import pubsub_v1

from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
)
logger = logging.getLogger(__name__)

# Global instances
graphiti_instance: Optional[Graphiti] = None
pubsub_publisher: Optional[pubsub_v1.PublisherClient] = None

# Security
security = HTTPBearer()

# Environment variables
NEO4J_URI = os.getenv('NEO4J_URI', 'bolt://localhost:7687')
NEO4J_USER = os.getenv('NEO4J_USER', 'neo4j')
NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD')
AUTHORIZATION_TOKEN = os.getenv('AUTHORIZATION')
GOOGLE_CLOUD_PROJECT = os.getenv('GOOGLE_CLOUD_PROJECT')
LLM_MODEL = os.getenv('LLM_MODEL', 'gpt-4o')

# Validate required environment variables
if not NEO4J_PASSWORD:
    raise ValueError("NEO4J_PASSWORD environment variable is required")
if not AUTHORIZATION_TOKEN or AUTHORIZATION_TOKEN.strip() == "":
    AUTHORIZATION_TOKEN = None
    logger.info("API will run without authentication")

# Financial scoring enums and models
class TransactionType(str, Enum):
    DEPOSIT = "deposit"
    WITHDRAWAL = "withdrawal"
    PAYMENT = "payment"
    TRANSFER = "transfer"
    LOAN_PAYMENT = "loan_payment"
    CREDIT_CARD_PAYMENT = "credit_card_payment"
    INVESTMENT = "investment"

class RiskLevel(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    VERY_HIGH = "very_high"

class FinancialBehaviorCategory(str, Enum):
    CONSERVATIVE = "conservative"
    MODERATE = "moderate"
    AGGRESSIVE = "aggressive"
    SPECULATIVE = "speculative"

class UserFinancialProfile(BaseModel):
    user_id: str = Field(..., description="Unique user identifier")
    annual_income: float = Field(..., gt=0, description="Annual income in USD")
    credit_score: Optional[int] = Field(None, ge=300, le=850, description="Traditional credit score")
    employment_status: str = Field(..., description="Employment status")
    debt_to_income_ratio: Optional[float] = Field(None, ge=0, le=1, description="Debt-to-income ratio")
    age: int = Field(..., ge=18, le=120, description="User age")
    education_level: str = Field(..., description="Education level")
    
    class Config:
        json_schema_extra = {
            "example": {
                "user_id": "user_12345",
                "annual_income": 75000.0,
                "credit_score": 720,
                "employment_status": "full_time",
                "debt_to_income_ratio": 0.25,
                "age": 32,
                "education_level": "bachelor"
            }
        }

class FinancialTransaction(BaseModel):
    user_id: str = Field(..., description="User identifier")
    transaction_id: str = Field(..., description="Unique transaction identifier")
    amount: float = Field(..., description="Transaction amount")
    transaction_type: TransactionType = Field(..., description="Type of transaction")
    category: str = Field(..., description="Transaction category")
    merchant: Optional[str] = Field(None, description="Merchant name")
    description: str = Field(..., description="Transaction description")
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    location: Optional[str] = Field(None, description="Transaction location")
    
    class Config:
        json_schema_extra = {
            "example": {
                "user_id": "user_12345",
                "transaction_id": "txn_67890",
                "amount": 1250.75,
                "transaction_type": "payment",
                "category": "rent",
                "merchant": "Property Management Co",
                "description": "Monthly rent payment",
                "location": "New York, NY"
            }
        }

class FinancialBehaviorEvent(BaseModel):
    user_id: str = Field(..., description="User identifier")
    event_type: str = Field(..., description="Type of financial behavior")
    description: str = Field(..., description="Detailed description of the behavior")
    risk_indicators: List[str] = Field(default_factory=list, description="Risk indicators observed")
    positive_indicators: List[str] = Field(default_factory=list, description="Positive indicators observed")
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    
    class Config:
        json_schema_extra = {
            "example": {
                "user_id": "user_12345",
                "event_type": "payment_pattern_analysis",
                "description": "User consistently pays bills 3-5 days early",
                "risk_indicators": [],
                "positive_indicators": ["early_payment", "consistent_behavior", "financial_discipline"],
            }
        }

class FinancialScoreRequest(BaseModel):
    user_id: str = Field(..., description="User identifier to score")
    include_reasoning: bool = Field(True, description="Include reasoning in response")
    
class FinancialScoreResponse(BaseModel):
    user_id: str
    financial_score: int = Field(..., ge=0, le=1000, description="Financial score (0-1000)")
    risk_level: RiskLevel
    behavior_category: FinancialBehaviorCategory
    confidence: float = Field(..., ge=0, le=1, description="Confidence in the score")
    key_factors: List[str] = Field(..., description="Key factors influencing the score")
    recommendations: List[str] = Field(..., description="Personalized recommendations")
    reasoning: Optional[str] = Field(None, description="Detailed reasoning behind the score")
    last_updated: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

# Authentication dependency
async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    """Verify the authorization token"""
    if not AUTHORIZATION_TOKEN:
        return None
    if credentials.credentials != AUTHORIZATION_TOKEN:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authorization token",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return credentials.credentials

# Startup and shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan manager"""
    global graphiti_instance, pubsub_publisher
    
    logger.info("Starting Financial Scoring API with Graphiti...")
    
    # Initialize Graphiti
    try:
        graphiti_instance = Graphiti(
            uri=NEO4J_URI,
            user=NEO4J_USER,
            password=NEO4J_PASSWORD
        )
        
        # Build indices and constraints
        await graphiti_instance.build_indices_and_constraints()
        logger.info("Graphiti initialized successfully")
        
        # Initialize PubSub client if project is configured
        if GOOGLE_CLOUD_PROJECT:
            pubsub_publisher = pubsub_v1.PublisherClient()
            logger.info("PubSub publisher initialized")
        
        yield
        
    except Exception as e:
        logger.error(f"Failed to initialize services: {e}")
        raise
    finally:
        # Cleanup
        if graphiti_instance:
            await graphiti_instance.close()
            logger.info("Graphiti connection closed")

# Create FastAPI app
app = FastAPI(
    title="Financial Score Classification API",
    description="An intelligent financial scoring system powered by Graphiti knowledge graphs",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc",
    lifespan=lifespan
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Financial scoring logic
class FinancialScoringEngine:
    """Advanced financial scoring engine using Graphiti knowledge graph"""
    
    @staticmethod
    async def calculate_financial_score(user_id: str, include_reasoning: bool = True) -> FinancialScoreResponse:
        """Calculate comprehensive financial score for a user"""
        if not graphiti_instance:
            raise HTTPException(500, "Graphiti instance not available")
        
        try:
            # Search for user's financial data in the knowledge graph
            user_query = f"financial profile and behavior data for user {user_id}"
            search_results = await graphiti_instance.search(query=user_query)
            
            # Extract relevant information from search results
            financial_data = []
            for result in search_results:
                financial_data.append(result.fact)
            
            # If no data found, return default score
            if not financial_data:
                return FinancialScoreResponse(
                    user_id=user_id,
                    financial_score=500,  # Neutral score
                    risk_level=RiskLevel.MEDIUM,
                    behavior_category=FinancialBehaviorCategory.MODERATE,
                    confidence=0.3,
                    key_factors=["insufficient_data"],
                    recommendations=["Provide more financial data for better scoring"],
                    reasoning="Limited financial data available for scoring" if include_reasoning else None
                )
            
            # Analyze the financial data to calculate score
            score_components = await FinancialScoringEngine._analyze_financial_behavior(financial_data)
            
            # Calculate final score (0-1000)
            base_score = 500  # Starting point
            final_score = base_score + score_components['adjustment']
            final_score = max(0, min(1000, final_score))  # Clamp to range
            
            # Determine risk level and behavior category
            risk_level = FinancialScoringEngine._determine_risk_level(final_score)
            behavior_category = FinancialScoringEngine._determine_behavior_category(score_components)
            
            return FinancialScoreResponse(
                user_id=user_id,
                financial_score=int(final_score),
                risk_level=risk_level,
                behavior_category=behavior_category,
                confidence=score_components['confidence'],
                key_factors=score_components['key_factors'],
                recommendations=score_components['recommendations'],
                reasoning=score_components['reasoning'] if include_reasoning else None
            )
            
        except Exception as e:
            logger.error(f"Error calculating financial score for user {user_id}: {e}")
            raise HTTPException(500, f"Failed to calculate financial score: {str(e)}")
    
    @staticmethod
    async def _analyze_financial_behavior(financial_data: List[str]) -> Dict[str, Any]:
        """Analyze financial behavior patterns from knowledge graph data"""
        # This is a simplified scoring algorithm
        # In production, you'd use more sophisticated ML models
        
        adjustment = 0
        key_factors = []
        recommendations = []
        confidence = 0.5
        reasoning_parts = []
        
        data_text = " ".join(financial_data).lower()
        
        # Positive indicators
        positive_patterns = {
            "early payment": 50,
            "consistent savings": 60,
            "low debt ratio": 70,
            "stable income": 40,
            "emergency fund": 80,
            "investment": 30,
            "good credit": 60
        }
        
        # Negative indicators
        negative_patterns = {
            "late payment": -60,
            "high debt": -70,
            "overdraft": -50,
            "bankruptcy": -200,
            "missed payment": -80,
            "maxed credit": -90,
            "frequent borrowing": -40
        }
        
        # Analyze positive patterns
        for pattern, points in positive_patterns.items():
            if pattern in data_text:
                adjustment += points
                key_factors.append(pattern.replace(" ", "_"))
                reasoning_parts.append(f"Positive: {pattern} (+{points} points)")
                confidence += 0.1
        
        # Analyze negative patterns
        for pattern, points in negative_patterns.items():
            if pattern in data_text:
                adjustment += points  # Points are already negative
                key_factors.append(pattern.replace(" ", "_"))
                reasoning_parts.append(f"Negative: {pattern} ({points} points)")
                confidence += 0.1
        
        # Generate recommendations based on analysis
        if "late payment" in data_text:
            recommendations.append("Set up automatic payments to avoid late fees")
        if "high debt" in data_text:
            recommendations.append("Consider debt consolidation or payment plan")
        if "emergency fund" not in data_text:
            recommendations.append("Build an emergency fund covering 3-6 months of expenses")
        if "investment" not in data_text and adjustment > 0:
            recommendations.append("Consider starting an investment portfolio for long-term growth")
        
        confidence = min(1.0, confidence)
        reasoning = "; ".join(reasoning_parts) if reasoning_parts else "Analysis based on available financial data"
        
        return {
            'adjustment': adjustment,
            'key_factors': key_factors,
            'recommendations': recommendations,
            'confidence': confidence,
            'reasoning': reasoning
        }
    
    @staticmethod
    def _determine_risk_level(score: int) -> RiskLevel:
        """Determine risk level based on financial score"""
        if score >= 750:
            return RiskLevel.LOW
        elif score >= 600:
            return RiskLevel.MEDIUM
        elif score >= 400:
            return RiskLevel.HIGH
        else:
            return RiskLevel.VERY_HIGH
    
    @staticmethod
    def _determine_behavior_category(score_components: Dict[str, Any]) -> FinancialBehaviorCategory:
        """Determine financial behavior category"""
        key_factors = score_components['key_factors']
        
        conservative_indicators = ['emergency_fund', 'consistent_savings', 'low_debt_ratio']
        aggressive_indicators = ['investment', 'frequent_borrowing', 'high_debt']
        
        conservative_count = sum(1 for indicator in conservative_indicators if indicator in key_factors)
        aggressive_count = sum(1 for indicator in aggressive_indicators if indicator in key_factors)
        
        if conservative_count >= 2:
            return FinancialBehaviorCategory.CONSERVATIVE
        elif aggressive_count >= 2:
            return FinancialBehaviorCategory.AGGRESSIVE
        elif 'investment' in key_factors and aggressive_count >= 1:
            return FinancialBehaviorCategory.SPECULATIVE
        else:
            return FinancialBehaviorCategory.MODERATE

# PubSub message handling
async def process_financial_event(message_data: Dict[str, Any]):
    """Process financial events from PubSub"""
    try:
        event_type = message_data.get('event_type')
        
        if event_type == 'user_profile':
            await handle_user_profile_update(UserFinancialProfile(**message_data['data']))
        elif event_type == 'transaction':
            await handle_financial_transaction(FinancialTransaction(**message_data['data']))
        elif event_type == 'behavior':
            await handle_financial_behavior(FinancialBehaviorEvent(**message_data['data']))
        
    except Exception as e:
        logger.error(f"Error processing financial event: {e}")

async def handle_user_profile_update(profile: UserFinancialProfile):
    """Handle user profile updates"""
    if not graphiti_instance:
        return
    
    episode_name = f"user_profile_{profile.user_id}_{datetime.now().isoformat()}"
    episode_body = f"""
    User {profile.user_id} financial profile update:
    - Annual income: ${profile.annual_income:,.2f}
    - Credit score: {profile.credit_score or 'Not provided'}
    - Employment status: {profile.employment_status}
    - Debt-to-income ratio: {profile.debt_to_income_ratio or 'Not provided'}
    - Age: {profile.age}
    - Education level: {profile.education_level}
    
    This represents the current financial standing and demographic information for the user.
    """
    
    await graphiti_instance.add_episode(
        name=episode_name,
        episode_body=episode_body,
        source=EpisodeType.text,
        source_description="User financial profile update",
        reference_time=datetime.now(timezone.utc)
    )

async def handle_financial_transaction(transaction: FinancialTransaction):
    """Handle financial transaction events"""
    if not graphiti_instance:
        return
    
    episode_name = f"transaction_{transaction.transaction_id}"
    episode_body = f"""
    Financial transaction for user {transaction.user_id}:
    - Transaction ID: {transaction.transaction_id}
    - Amount: ${transaction.amount:,.2f}
    - Type: {transaction.transaction_type}
    - Category: {transaction.category}
    - Merchant: {transaction.merchant or 'Not specified'}
    - Description: {transaction.description}
    - Location: {transaction.location or 'Not specified'}
    - Timestamp: {transaction.timestamp.isoformat()}
    
    This transaction represents financial behavior and spending patterns for the user.
    """
    
    await graphiti_instance.add_episode(
        name=episode_name,
        episode_body=episode_body,
        source=EpisodeType.text,
        source_description="Financial transaction record",
        reference_time=transaction.timestamp
    )

async def handle_financial_behavior(behavior: FinancialBehaviorEvent):
    """Handle financial behavior analysis events"""
    if not graphiti_instance:
        return
    
    episode_name = f"behavior_{behavior.user_id}_{datetime.now().isoformat()}"
    episode_body = f"""
    Financial behavior analysis for user {behavior.user_id}:
    - Event type: {behavior.event_type}
    - Description: {behavior.description}
    - Risk indicators: {', '.join(behavior.risk_indicators) if behavior.risk_indicators else 'None identified'}
    - Positive indicators: {', '.join(behavior.positive_indicators) if behavior.positive_indicators else 'None identified'}
    - Analysis timestamp: {behavior.timestamp.isoformat()}
    
    This analysis provides insights into the user's financial behavior patterns and risk profile.
    """
    
    await graphiti_instance.add_episode(
        name=episode_name,
        episode_body=episode_body,
        source=EpisodeType.text,
        source_description="Financial behavior analysis",
        reference_time=behavior.timestamp
    )

# API Endpoints

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    neo4j_connected = graphiti_instance is not None
    
    return {
        "status": "healthy",
        "timestamp": datetime.now(timezone.utc),
        "neo4j_connected": neo4j_connected,
        "version": "1.0.0"
    }

@app.post("/financial/profile", status_code=201)
async def create_user_profile(
    profile: UserFinancialProfile,
    background_tasks: BackgroundTasks,
    token: Optional[str] = Depends(verify_token)
):
    """Create or update user financial profile"""
    background_tasks.add_task(handle_user_profile_update, profile)
    
    return {
        "message": "User profile update queued for processing",
        "user_id": profile.user_id,
        "status": "success"
    }

@app.post("/financial/transaction", status_code=201)
async def record_transaction(
    transaction: FinancialTransaction,
    background_tasks: BackgroundTasks,
    token: Optional[str] = Depends(verify_token)
):
    """Record a financial transaction"""
    background_tasks.add_task(handle_financial_transaction, transaction)
    
    return {
        "message": "Transaction recorded successfully",
        "transaction_id": transaction.transaction_id,
        "status": "success"
    }

@app.post("/financial/behavior", status_code=201)
async def record_behavior(
    behavior: FinancialBehaviorEvent,
    background_tasks: BackgroundTasks,
    token: Optional[str] = Depends(verify_token)
):
    """Record financial behavior analysis"""
    background_tasks.add_task(handle_financial_behavior, behavior)
    
    return {
        "message": "Financial behavior recorded successfully",
        "user_id": behavior.user_id,
        "status": "success"
    }

@app.post("/financial/score", response_model=FinancialScoreResponse)
async def get_financial_score(
    score_request: FinancialScoreRequest,
    token: Optional[str] = Depends(verify_token)
):
    """Get comprehensive financial score for a user"""
    return await FinancialScoringEngine.calculate_financial_score(
        score_request.user_id,
        score_request.include_reasoning
    )

@app.get("/financial/user/{user_id}/history")
async def get_user_financial_history(
    user_id: str,
    limit: int = 50,
    token: Optional[str] = Depends(verify_token)
):
    """Get financial history for a user from the knowledge graph"""
    if not graphiti_instance:
        raise HTTPException(500, "Graphiti instance not available")
    
    query = f"financial history transactions and behaviors for user {user_id}"
    results = await graphiti_instance.search(query=query)
    
    # Limit results
    limited_results = results[:limit]
    
    return {
        "user_id": user_id,
        "history": [
            {
                "uuid": result.uuid,
                "fact": result.fact,
                "valid_at": result.valid_at,
                "invalid_at": result.invalid_at
            }
            for result in limited_results
        ],
        "total_results": len(limited_results)
    }

@app.post("/pubsub/webhook")
async def pubsub_webhook(request: Request, background_tasks: BackgroundTasks):
    """Webhook endpoint for PubSub messages"""
    try:
        # Decode PubSub message
        body = await request.body()
        message_data = json.loads(body)
        
        # Extract and decode the actual message
        if 'message' in message_data and 'data' in message_data['message']:
            import base64
            decoded_data = base64.b64decode(message_data['message']['data']).decode('utf-8')
            financial_event = json.loads(decoded_data)
            
            # Process the financial event
            background_tasks.add_task(process_financial_event, financial_event)
            
            return {"status": "message processed"}
        else:
            raise HTTPException(400, "Invalid PubSub message format")
            
    except Exception as e:
        logger.error(f"Error processing PubSub message: {e}")
        raise HTTPException(500, f"Failed to process message: {str(e)}")

@app.get("/")
async def root():
    """Root endpoint with API information"""
    return {
        "message": "Financial Score Classification API",
        "version": "1.0.0",
        "description": "Intelligent financial scoring powered by Graphiti",
        "docs": "/docs",
        "health": "/health"
    }

if __name__ == "__main__":
    import uvicorn
    port = int(os.getenv("PORT", 8000))
    uvicorn.run(app, host="0.0.0.0", port=port)

TypeScript Client Implementation

Now let's build the TypeScript client that will interact with our Python API and handle PubSub messaging.

Installation and Setup

npm install axios google-cloud/pubsub
npm install -D @types/node typescript

TypeScript Helper Classes

// helpers/FinancialGraphiti.helper.ts
import axios, { AxiosResponse } from 'axios';

// Types matching our Python API
export interface UserFinancialProfile {
    user_id: string;
    annual_income: number;
    credit_score?: number;
    employment_status: string;
    debt_to_income_ratio?: number;
    age: number;
    education_level: string;
}

export interface FinancialTransaction {
    user_id: string;
    transaction_id: string;
    amount: number;
    transaction_type: 'deposit' | 'withdrawal' | 'payment' | 'transfer' | 'loan_payment' | 'credit_card_payment' | 'investment';
    category: string;
    merchant?: string;
    description: string;
    timestamp?: string;
    location?: string;
}

export interface FinancialBehaviorEvent {
    user_id: string;
    event_type: string;
    description: string;
    risk_indicators: string[];
    positive_indicators: string[];
    timestamp?: string;
}

export interface FinancialScoreRequest {
    user_id: string;
    include_reasoning: boolean;
}

export interface FinancialScoreResponse {
    user_id: string;
    financial_score: number;
    risk_level: 'low' | 'medium' | 'high' | 'very_high';
    behavior_category: 'conservative' | 'moderate' | 'aggressive' | 'speculative';
    confidence: number;
    key_factors: string[];
    recommendations: string[];
    reasoning?: string;
    last_updated: string;
}

export interface UserFinancialHistory {
    user_id: string;
    history: Array<{
        uuid: string;
        fact: string;
        valid_at?: string;
        invalid_at?: string;
    }>;
    total_results: number;
}

const FINANCIAL_API_BASE_URL = process.env.FINANCIAL_API_BASE_URL || 'http://localhost:8000';
const FINANCIAL_API_TOKEN = process.env.FINANCIAL_API_TOKEN;

export default class FinancialGraphitiHelper {
    private static getHeaders() {
        return {
            'Content-Type': 'application/json',
            ...(FINANCIAL_API_TOKEN && { Authorization: `Bearer ${FINANCIAL_API_TOKEN}` })
        };
    }

    /**
     * Check the health status of the financial API
     */
    static async healthCheck(): Promise<{ status: string; neo4j_connected: boolean }> {
        const response: AxiosResponse = await axios.get(`${FINANCIAL_API_BASE_URL}/health`, {
            headers: this.getHeaders()
        });

        return response.data;
    }

    /**
     * Create or update a user's financial profile
     */
    static async createUserProfile(profile: UserFinancialProfile): Promise<{ message: string; user_id: string; status: string }> {
        const response: AxiosResponse = await axios.post(`${FINANCIAL_API_BASE_URL}/financial/profile`, profile, {
            headers: this.getHeaders()
        });

        return response.data;
    }

    /**
     * Record a financial transaction
     */
    static async recordTransaction(transaction: FinancialTransaction): Promise<{ message: string; transaction_id: string; status: string }> {
        const response: AxiosResponse = await axios.post(`${FINANCIAL_API_BASE_URL}/financial/transaction`, transaction, {
            headers: this.getHeaders()
        });

        return response.data;
    }

    /**
     * Record financial behavior analysis
     */
    static async recordBehavior(behavior: FinancialBehaviorEvent): Promise<{ message: string; user_id: string; status: string }> {
        const response: AxiosResponse = await axios.post(`${FINANCIAL_API_BASE_URL}/financial/behavior`, behavior, {
            headers: this.getHeaders()
        });

        return response.data;
    }

    /**
     * Get comprehensive financial score for a user
     */
    static async getFinancialScore(scoreRequest: FinancialScoreRequest): Promise<FinancialScoreResponse> {
        const response: AxiosResponse<FinancialScoreResponse> = await axios.post(`${FINANCIAL_API_BASE_URL}/financial/score`, scoreRequest, {
            headers: this.getHeaders()
        });

        return response.data;
    }

    /**
     * Get financial history for a user
     */
    static async getUserFinancialHistory(userId: string, limit: number = 50): Promise<UserFinancialHistory> {
        const response: AxiosResponse<UserFinancialHistory> = await axios.get(`${FINANCIAL_API_BASE_URL}/financial/user/${userId}/history?limit=${limit}`, {
            headers: this.getHeaders()
        });

        return response.data;
    }

    /**
     * Analyze spending patterns and generate insights
     */
    static async analyzeSpendingPatterns(userId: string, transactions: FinancialTransaction[]): Promise<FinancialBehaviorEvent[]> {
        const insights: FinancialBehaviorEvent[] = [];
        
        // Group transactions by category
        const categoryTotals = transactions.reduce((acc, transaction) => {
            acc[transaction.category] = (acc[transaction.category] || 0) + Math.abs(transaction.amount);
            return acc;
        }, {} as Record<string, number>);

        // Analyze payment timing
        const payments = transactions.filter(t => t.transaction_type === 'payment');
        const earlyPayments = payments.filter(t => {
            const day = new Date(t.timestamp || Date.now()).getDate();
            return day <= 5; // Payments in first 5 days of month
        });

        if (earlyPayments.length / payments.length > 0.7) {
            insights.push({
                user_id: userId,
                event_type: 'payment_pattern_analysis',
                description: 'User consistently makes payments early in the month',
                risk_indicators: [],
                positive_indicators: ['early_payment', 'consistent_behavior', 'financial_discipline']
            });
        }

        // Analyze spending diversity
        const categories = Object.keys(categoryTotals);
        if (categories.length > 5) {
            insights.push({
                user_id: userId,
                event_type: 'spending_diversity_analysis',
                description: 'User shows diverse spending patterns across multiple categories',
                risk_indicators: [],
                positive_indicators: ['spending_diversity', 'balanced_lifestyle']
            });
        }

        // Check for large transactions
        const totalSpending = transactions.reduce((sum, t) => sum + Math.abs(t.amount), 0);
        const averageTransaction = totalSpending / transactions.length;
        const largeTransactions = transactions.filter(t => Math.abs(t.amount) > averageTransaction * 3);

        if (largeTransactions.length > 0) {
            insights.push({
                user_id: userId,
                event_type: 'large_transaction_analysis',
                description: `User has ${largeTransactions.length} transaction(s) significantly above average`,
                risk_indicators: largeTransactions.length > 3 ? ['frequent_large_transactions'] : [],
                positive_indicators: largeTransactions.some(t => t.category === 'investment') ? ['investment_activity'] : []
            });
        }

        return insights;
    }

    /**
     * Generate financial recommendations based on user profile and history
     */
    static async generateRecommendations(userId: string): Promise<string[]> {
        try {
            const scoreResponse = await this.getFinancialScore({ user_id: userId, include_reasoning: true });
            const history = await this.getUserFinancialHistory(userId, 100);
            
            const recommendations: string[] = [...scoreResponse.recommendations];
            
            // Analyze transaction patterns from history
            const hasInvestmentMention = history.history.some(h => 
                h.fact.toLowerCase().includes('investment') || 
                h.fact.toLowerCase().includes('investing')
            );
            
            const hasEmergencyFundMention = history.history.some(h => 
                h.fact.toLowerCase().includes('emergency fund') || 
                h.fact.toLowerCase().includes('savings')
            );

            if (!hasInvestmentMention && scoreResponse.financial_score > 600) {
                recommendations.push('Consider diversifying into low-risk investment options like index funds');
            }

            if (!hasEmergencyFundMention) {
                recommendations.push('Build an emergency fund covering 3-6 months of expenses');
            }

            if (scoreResponse.risk_level === 'high' || scoreResponse.risk_level === 'very_high') {
                recommendations.push('Consider speaking with a financial advisor for personalized guidance');
                recommendations.push('Review and optimize your monthly budget to reduce unnecessary expenses');
            }

            return [...new Set(recommendations)]; // Remove duplicates
            
        } catch (error) {
            console.error('Error generating recommendations:', error);
            return ['Unable to generate recommendations at this time'];
        }
    }
}

PubSub Integration Service

// services/FinancialPubSub.service.ts
import { PubSub } from '@google-cloud/pubsub';
import FinancialGraphitiHelper, { 
    UserFinancialProfile, 
    FinancialTransaction, 
    FinancialBehaviorEvent 
} from '../helpers/FinancialGraphiti.helper';

interface PubSubMessage {
    event_type: 'user_profile' | 'transaction' | 'behavior';
    data: UserFinancialProfile | FinancialTransaction | FinancialBehaviorEvent;
    timestamp: string;
}

export default class FinancialPubSubService {
    private pubsub: PubSub;
    private projectId: string;

    constructor(projectId?: string) {
        this.projectId = projectId || process.env.GOOGLE_CLOUD_PROJECT || '';
        this.pubsub = new PubSub({ projectId: this.projectId });
    }

    /**
     * Publish user profile update to PubSub
     */
    async publishUserProfile(profile: UserFinancialProfile): Promise<void> {
        const message: PubSubMessage = {
            event_type: 'user_profile',
            data: profile,
            timestamp: new Date().toISOString()
        };

        await this.publishMessage('financial-events', message);
    }

    /**
     * Publish financial transaction to PubSub
     */
    async publishTransaction(transaction: FinancialTransaction): Promise<void> {
        const message: PubSubMessage = {
            event_type: 'transaction',
            data: transaction,
            timestamp: new Date().toISOString()
        };

        await this.publishMessage('financial-events', message);
    }

    /**
     * Publish financial behavior event to PubSub
     */
    async publishBehavior(behavior: FinancialBehaviorEvent): Promise<void> {
        const message: PubSubMessage = {
            event_type: 'behavior',
            data: behavior,
            timestamp: new Date().toISOString()
        };

        await this.publishMessage('financial-events', message);
    }

    /**
     * Process a batch of financial transactions
     */
    async processBatchTransactions(userId: string, transactions: FinancialTransaction[]): Promise<void> {
        try {
            // Publish all transactions
            for (const transaction of transactions) {
                await this.publishTransaction(transaction);
            }

            // Analyze patterns and publish behaviors
            const behaviors = await FinancialGraphitiHelper.analyzeSpendingPatterns(userId, transactions);
            for (const behavior of behaviors) {
                await this.publishBehavior(behavior);
            }

            console.log(`Processed ${transactions.length} transactions and ${behaviors.length} behavior insights for user ${userId}`);
            
        } catch (error) {
            console.error('Error processing batch transactions:', error);
            throw error;
        }
    }

    /**
     * Subscribe to financial events and process them
     */
    async subscribeToFinancialEvents(subscriptionName: string): Promise<void> {
        const subscription = this.pubsub.subscription(subscriptionName);

        const messageHandler = async (message: any) => {
            try {
                const messageData: PubSubMessage = JSON.parse(message.data.toString());
                
                console.log(`Received ${messageData.event_type} event:`, messageData);

                // Process the message through our direct API calls
                switch (messageData.event_type) {
                    case 'user_profile':
                        await FinancialGraphitiHelper.createUserProfile(messageData.data as UserFinancialProfile);
                        break;
                    case 'transaction':
                        await FinancialGraphitiHelper.recordTransaction(messageData.data as FinancialTransaction);
                        break;
                    case 'behavior':
                        await FinancialGraphitiHelper.recordBehavior(messageData.data as FinancialBehaviorEvent);
                        break;
                }

                message.ack();
                console.log(`Successfully processed ${messageData.event_type} event`);
                
            } catch (error) {
                console.error('Error processing message:', error);
                message.nack();
            }
        };

        subscription.on('message', messageHandler);
        subscription.on('error', error => {
            console.error('Subscription error:', error);
        });

        console.log(`Listening for messages on subscription: ${subscriptionName}`);
    }

    /**
     * Create a real-time financial monitoring system
     */
    async startFinancialMonitoring(userId: string): Promise<void> {
        console.log(`Starting financial monitoring for user: ${userId}`);

        // Set up a subscription specifically for this user
        const userSubscriptionName = `financial-monitoring-${userId}`;
        
        try {
            await this.subscribeToFinancialEvents(userSubscriptionName);
            
            // Periodically generate financial scores and insights
            setInterval(async () => {
                try {
                    const score = await FinancialGraphitiHelper.getFinancialScore({
                        user_id: userId,
                        include_reasoning: true
                    });

                    console.log(`Financial Score Update for ${userId}:`, {
                        score: score.financial_score,
                        risk_level: score.risk_level,
                        behavior_category: score.behavior_category,
                        confidence: score.confidence
                    });

                    // If score has changed significantly, publish an update
                    if (score.confidence > 0.7) {
                        await this.publishBehavior({
                            user_id: userId,
                            event_type: 'score_update',
                            description: `Financial score updated to ${score.financial_score} with ${score.risk_level} risk level`,
                            risk_indicators: score.risk_level === 'high' || score.risk_level === 'very_high' ? ['elevated_risk'] : [],
                            positive_indicators: score.financial_score > 700 ? ['strong_financial_profile'] : []
                        });
                    }
                    
                } catch (error) {
                    console.error(`Error updating financial score for user ${userId}:`, error);
                }
            }, 300000); // Update every 5 minutes

        } catch (error) {
            console.error(`Error setting up financial monitoring for user ${userId}:`, error);
            throw error;
        }
    }

    /**
     * Helper method to publish messages to PubSub
     */
    private async publishMessage(topicName: string, message: PubSubMessage): Promise<void> {
        try {
            const topic = this.pubsub.topic(topicName);
            const messageBuffer = Buffer.from(JSON.stringify(message));
            
            await topic.publishMessage({ data: messageBuffer });
            console.log(`Published ${message.event_type} message to ${topicName}`);
            
        } catch (error) {
            console.error(`Error publishing message to ${topicName}:`, error);
            throw error;
        }
    }

    /**
     * Batch process multiple financial events efficiently
     */
    async batchProcessFinancialEvents(events: PubSubMessage[]): Promise<void> {
        const batchSize = 10;
        
        for (let i = 0; i < events.length; i += batchSize) {
            const batch = events.slice(i, i + batchSize);
            
            await Promise.all(batch.map(async (event) => {
                try {
                    await this.publishMessage('financial-events', event);
                } catch (error) {
                    console.error(`Error processing event in batch:`, error);
                }
            }));

            // Small delay between batches to avoid overwhelming the system
            await new Promise(resolve => setTimeout(resolve, 100));
        }
    }
}

Complete Integration Example

Let's create a comprehensive example that demonstrates how all components work together:

// examples/FinancialScoringDemo.ts
import FinancialGraphitiHelper, { 
    UserFinancialProfile, 
    FinancialTransaction 
} from '../helpers/FinancialGraphiti.helper';
import FinancialPubSubService from '../services/FinancialPubSub.service';

/**
 * Comprehensive demonstration of the financial scoring system
 */
export class FinancialScoringDemo {
    private pubsubService: FinancialPubSubService;

    constructor(projectId?: string) {
        this.pubsubService = new FinancialPubSubService(projectId);
    }

    /**
     * Run a complete demonstration of the financial scoring system
     */
    async runDemo(): Promise<void> {
        console.log('šŸ¦ Starting Financial Scoring System Demo...\n');

        // 1. Create a sample user profile
        const userProfile: UserFinancialProfile = {
            user_id: 'demo_user_12345',
            annual_income: 85000,
            credit_score: 740,
            employment_status: 'full_time',
            debt_to_income_ratio: 0.28,
            age: 29,
            education_level: 'bachelor'
        };

        console.log('šŸ“Š Creating user profile...');
        await FinancialGraphitiHelper.createUserProfile(userProfile);
        await this.delay(2000); // Allow time for processing

        // 2. Simulate financial transactions
        const transactions: FinancialTransaction[] = [
            {
                user_id: userProfile.user_id,
                transaction_id: 'txn_001',
                amount: 1200,
                transaction_type: 'payment',
                category: 'rent',
                merchant: 'Downtown Apartments',
                description: 'Monthly rent payment',
                location: 'San Francisco, CA'
            },
            {
                user_id: userProfile.user_id,
                transaction_id: 'txn_002',
                amount: 500,
                transaction_type: 'investment',
                category: 'stocks',
                merchant: 'Investment Platform',
                description: 'Monthly investment contribution',
                location: 'Online'
            },
            {
                user_id: userProfile.user_id,
                transaction_id: 'txn_003',
                amount: 300,
                transaction_type: 'deposit',
                category: 'savings',
                merchant: 'Local Bank',
                description: 'Emergency fund contribution',
                location: 'San Francisco, CA'
            },
            {
                user_id: userProfile.user_id,
                transaction_id: 'txn_004',
                amount: 75,
                transaction_type: 'payment',
                category: 'utilities',
                merchant: 'Electric Company',
                description: 'Monthly electricity bill',
                location: 'San Francisco, CA'
            },
            {
                user_id: userProfile.user_id,
                transaction_id: 'txn_005',
                amount: 450,
                transaction_type: 'payment',
                category: 'groceries',
                merchant: 'Whole Foods',
                description: 'Weekly grocery shopping',
                location: 'San Francisco, CA'
            }
        ];

        console.log('šŸ’³ Recording financial transactions...');
        for (const transaction of transactions) {
            await FinancialGraphitiHelper.recordTransaction(transaction);
            await this.delay(500); // Simulate real-time transaction flow
        }

        // 3. Process transactions through PubSub for behavior analysis
        console.log('šŸ”„ Processing transactions for behavior analysis...');
        await this.pubsubService.processBatchTransactions(userProfile.user_id, transactions);
        await this.delay(3000); // Allow time for analysis

        // 4. Get initial financial score
        console.log('šŸ“ˆ Calculating initial financial score...');
        const initialScore = await FinancialGraphitiHelper.getFinancialScore({
            user_id: userProfile.user_id,
            include_reasoning: true
        });

        this.displayScoreResults('Initial Score', initialScore);

        // 5. Simulate additional positive financial behavior
        console.log('\nšŸ’” Simulating additional positive financial behaviors...');
        
        const additionalTransactions: FinancialTransaction[] = [
            {
                user_id: userProfile.user_id,
                transaction_id: 'txn_006',
                amount: 800,
                transaction_type: 'payment',
                category: 'loan_payment',
                merchant: 'Student Loan Servicer',
                description: 'Extra student loan payment',
                location: 'Online'
            },
            {
                user_id: userProfile.user_id,
                transaction_id: 'txn_007',
                amount: 200,
                transaction_type: 'investment',
                category: 'retirement',
                merchant: '401k Provider',
                description: 'Additional 401k contribution',
                location: 'Online'
            }
        ];

        for (const transaction of additionalTransactions) {
            await FinancialGraphitiHelper.recordTransaction(transaction);
            await this.delay(500);
        }

        // Record positive behavior insights
        await FinancialGraphitiHelper.recordBehavior({
            user_id: userProfile.user_id,
            event_type: 'debt_reduction_behavior',
            description: 'User made additional loan payment above minimum requirement',
            risk_indicators: [],
            positive_indicators: ['debt_reduction', 'extra_payment', 'financial_discipline']
        });

        await FinancialGraphitiHelper.recordBehavior({
            user_id: userProfile.user_id,
            event_type: 'retirement_planning',
            description: 'User increased retirement contributions beyond employer match',
            risk_indicators: [],
            positive_indicators: ['retirement_planning', 'long_term_thinking', 'financial_wellness']
        });

        await this.delay(3000); // Allow time for processing

        // 6. Get updated financial score
        console.log('šŸ“Š Calculating updated financial score...');
        const updatedScore = await FinancialGraphitiHelper.getFinancialScore({
            user_id: userProfile.user_id,
            include_reasoning: true
        });

        this.displayScoreResults('Updated Score', updatedScore);

        // 7. Generate personalized recommendations
        console.log('\nšŸŽÆ Generating personalized recommendations...');
        const recommendations = await FinancialGraphitiHelper.generateRecommendations(userProfile.user_id);
        
        console.log('šŸ“‹ Personalized Recommendations:');
        recommendations.forEach((rec, index) => {
            console.log(`   ${index + 1}. ${rec}`);
        });

        // 8. Display financial history
        console.log('\nšŸ“š Financial History Summary:');
        const history = await FinancialGraphitiHelper.getUserFinancialHistory(userProfile.user_id, 10);
        
        console.log(`šŸ“– Recent Financial Events (${history.total_results} total):`);
        history.history.slice(0, 5).forEach((event, index) => {
            console.log(`   ${index + 1}. ${event.fact.substring(0, 100)}...`);
        });

        // 9. Start real-time monitoring
        console.log('\nšŸ” Starting real-time financial monitoring...');
        console.log('   (This would run continuously in a production environment)');
        
        // In a real application, this would run as a background service
        // await this.pubsubService.startFinancialMonitoring(userProfile.user_id);

        console.log('\nāœ… Financial Scoring System Demo Complete!');
        console.log('\nšŸš€ The system is now ready to:');
        console.log('   • Process real-time financial transactions');
        console.log('   • Analyze spending patterns and behaviors');
        console.log('   • Generate dynamic risk assessments');
        console.log('   • Provide personalized financial recommendations');
        console.log('   • Track financial health over time');
    }

    /**
     * Display formatted score results
     */
    private displayScoreResults(title: string, score: any): void {
        console.log(`\nšŸ“Š ${title}:`);
        console.log(`   Score: ${score.financial_score}/1000`);
        console.log(`   Risk Level: ${score.risk_level.toUpperCase()}`);
        console.log(`   Behavior Category: ${score.behavior_category.toUpperCase()}`);
        console.log(`   Confidence: ${(score.confidence * 100).toFixed(1)}%`);
        
        if (score.key_factors.length > 0) {
            console.log(`   Key Factors: ${score.key_factors.join(', ')}`);
        }
        
        if (score.reasoning) {
            console.log(`   Reasoning: ${score.reasoning}`);
        }
    }

    /**
     * Utility method for delays
     */
    private delay(ms: number): Promise<void> {
        return new Promise(resolve => setTimeout(resolve, ms));
    }
}

// Run the demo
async function main() {
    try {
        const demo = new FinancialScoringDemo();
        await demo.runDemo();
    } catch (error) {
        console.error('Demo failed:', error);
    }
}

// Uncomment to run the demo
// main();

Environment Configuration

Create the necessary environment files:

Python .env file:

# Neo4j Configuration
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=your_neo4j_password

# API Security
AUTHORIZATION=your_api_token

# LLM Configuration
LLM_MODEL=gpt-4o
SMALL_LLM_MODEL=gpt-4o-mini
OPENAI_API_KEY=your_openai_key

# Google Cloud
GOOGLE_CLOUD_PROJECT=your_project_id

TypeScript .env file:

# Financial API
FINANCIAL_API_BASE_URL=http://localhost:8000
FINANCIAL_API_TOKEN=your_api_token

# Google Cloud
GOOGLE_CLOUD_PROJECT=your_project_id
GOOGLE_APPLICATION_CREDENTIALS=path/to/service-account.json

Deployment and Production Considerations

Docker Configuration

Create a Dockerfile for the Python API:

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: financial-scoring-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: financial-scoring-api
  template:
    metadata:
      labels:
        app: financial-scoring-api
    spec:
      containers:
      - name: api
        image: your-registry/financial-scoring-api:latest
        ports:
        - containerPort: 8000
        env:
        - name: NEO4J_URI
          valueFrom:
            secretKeyRef:
              name: financial-secrets
              key: neo4j_uri
        - name: NEO4J_PASSWORD
          valueFrom:
            secretKeyRef:
              name: financial-secrets
              key: neo4j_password
---
apiVersion: v1
kind: Service
metadata:
  name: financial-scoring-api-service
spec:
  selector:
    app: financial-scoring-api
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

Advanced Features and Extensions

Machine Learning Integration

You can extend the system with more sophisticated ML models:

# Advanced scoring with ML models
import joblib
from sklearn.ensemble import RandomForestClassifier
import numpy as np

class AdvancedScoringEngine:
    def __init__(self):
        # Load pre-trained models
        self.risk_model = joblib.load('models/risk_classifier.pkl')
        self.score_model = joblib.load('models/score_regressor.pkl')
    
    async def calculate_ml_score(self, features: Dict[str, float]) -> Dict[str, Any]:
        """Calculate score using machine learning models"""
        feature_vector = np.array([[
            features.get('income', 0),
            features.get('debt_ratio', 0),
            features.get('credit_score', 0),
            features.get('transaction_frequency', 0),
            features.get('savings_rate', 0)
        ]])
        
        # Predict risk level
        risk_prob = self.risk_model.predict_proba(feature_vector)[0]
        risk_level = self.risk_model.classes_[np.argmax(risk_prob)]
        
        # Predict financial score
        predicted_score = self.score_model.predict(feature_vector)[0]
        
        return {
            'ml_score': int(predicted_score),
            'ml_risk_level': risk_level,
            'confidence': float(np.max(risk_prob))
        }

Real-time Fraud Detection

// Fraud detection service
export class FraudDetectionService {
    static async detectAnomalies(userId: string, transaction: FinancialTransaction): Promise<boolean> {
        // Get user's transaction history
        const history = await FinancialGraphitiHelper.getUserFinancialHistory(userId, 100);
        
        // Analyze for anomalies
        const anomalies = this.analyzeTransactionAnomalies(transaction, history);
        
        if (anomalies.length > 0) {
            // Record suspicious behavior
            await FinancialGraphitiHelper.recordBehavior({
                user_id: userId,
                event_type: 'fraud_detection',
                description: `Potential fraudulent activity detected: ${anomalies.join(', ')}`,
                risk_indicators: ['potential_fraud', ...anomalies],
                positive_indicators: []
            });
            
            return true;
        }
        
        return false;
    }
    
    private static analyzeTransactionAnomalies(transaction: FinancialTransaction, history: any): string[] {
        const anomalies: string[] = [];
        
        // Check for unusual amount
        // Check for unusual location
        // Check for unusual time
        // Check for unusual merchant
        
        return anomalies;
    }
}

Key Benefits of This Architecture

  1. Real-time Processing: PubSub enables immediate processing of financial events
  2. Dynamic Learning: Graphiti's knowledge graph evolves with new data
  3. Temporal Tracking: Bi-temporal model tracks changes over time
  4. Scalable: Microservices architecture scales independently
  5. Intelligent Insights: Graph-based reasoning provides deeper insights
  6. Flexible Scoring: Easily adaptable scoring algorithms
  7. Historical Analysis: Complete audit trail of financial behaviors

Conclusion

Building an intelligent financial score classification system with Graphiti represents a significant advancement over traditional static scoring methods. By leveraging the power of knowledge graphs, real-time data processing, and intelligent reasoning, we can create systems that:

The combination of Graphiti's knowledge graph capabilities, Python's robust ecosystem, TypeScript's type safety, and PubSub's real-time messaging creates a powerful foundation for next-generation fintech applications.

This tutorial provides a comprehensive starting point, but the possibilities are endless. You can extend this system with machine learning models, real-time fraud detection, predictive analytics, and much more. The key is that Graphiti provides the intelligent, dynamic foundation that makes all these advanced features possible.

Start building your intelligent financial systems today, and unlock the power of knowledge graphs for better financial decision-making! šŸš€


Want to learn more about Graphiti? Check out the official repository and join the community of developers building the future of knowledge-driven applications.