In the world of fintech applications, understanding and classifying user financial behavior is crucial for making informed lending decisions, providing personalized recommendations, and managing risk. Traditional approaches often rely on static credit scores and limited data points. But what if we could build a dynamic, intelligent system that learns from user interactions and evolves over time?
Enter Graphiti - a revolutionary knowledge graph framework designed for building real-time, dynamically updating knowledge graphs for AI agents. In this comprehensive tutorial, we'll explore how to leverage Graphiti to create a sophisticated financial score classification system using TypeScript, Python, and Google PubSub.
Graphiti is an advanced knowledge graph framework that addresses the challenges of dynamic and frequently updated datasets. Unlike traditional knowledge graphs that require complete rebuilds for updates, Graphiti provides:
Our financial score classification system will consist of three main components:
graph TD
A[User Interaction] --> B[TypeScript Client]
B --> C[PubSub Topic]
C --> D[Python API Service]
D --> E[Graphiti Knowledge Graph]
E --> F[Financial Score Classification]
F --> G[Real-time Insights]
Let's start by building our Python API service that will manage the Graphiti knowledge graph. This service will handle financial transactions, user behaviors, and provide intelligent scoring capabilities.
First, create a new Python project and install the required dependencies:
pip install fastapi uvicorn python-dotenv graphiti-core google-cloud-pubsub
"""
Financial Score Classification API with Graphiti
A comprehensive FastAPI service for building intelligent financial scoring
using Graphiti knowledge graphs.
"""
import os
import logging
import json
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any, Literal
from contextlib import asynccontextmanager
from enum import Enum
from fastapi import FastAPI, HTTPException, Depends, Security, status, Request, BackgroundTasks
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from dotenv import load_dotenv
from google.cloud import pubsub_v1
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
logger = logging.getLogger(__name__)
# Global instances
graphiti_instance: Optional[Graphiti] = None
pubsub_publisher: Optional[pubsub_v1.PublisherClient] = None
# Security
security = HTTPBearer()
# Environment variables
NEO4J_URI = os.getenv('NEO4J_URI', 'bolt://localhost:7687')
NEO4J_USER = os.getenv('NEO4J_USER', 'neo4j')
NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD')
AUTHORIZATION_TOKEN = os.getenv('AUTHORIZATION')
GOOGLE_CLOUD_PROJECT = os.getenv('GOOGLE_CLOUD_PROJECT')
LLM_MODEL = os.getenv('LLM_MODEL', 'gpt-4o')
# Validate required environment variables
if not NEO4J_PASSWORD:
raise ValueError("NEO4J_PASSWORD environment variable is required")
if not AUTHORIZATION_TOKEN or AUTHORIZATION_TOKEN.strip() == "":
AUTHORIZATION_TOKEN = None
logger.info("API will run without authentication")
# Financial scoring enums and models
class TransactionType(str, Enum):
DEPOSIT = "deposit"
WITHDRAWAL = "withdrawal"
PAYMENT = "payment"
TRANSFER = "transfer"
LOAN_PAYMENT = "loan_payment"
CREDIT_CARD_PAYMENT = "credit_card_payment"
INVESTMENT = "investment"
class RiskLevel(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
VERY_HIGH = "very_high"
class FinancialBehaviorCategory(str, Enum):
CONSERVATIVE = "conservative"
MODERATE = "moderate"
AGGRESSIVE = "aggressive"
SPECULATIVE = "speculative"
class UserFinancialProfile(BaseModel):
user_id: str = Field(..., description="Unique user identifier")
annual_income: float = Field(..., gt=0, description="Annual income in USD")
credit_score: Optional[int] = Field(None, ge=300, le=850, description="Traditional credit score")
employment_status: str = Field(..., description="Employment status")
debt_to_income_ratio: Optional[float] = Field(None, ge=0, le=1, description="Debt-to-income ratio")
age: int = Field(..., ge=18, le=120, description="User age")
education_level: str = Field(..., description="Education level")
class Config:
json_schema_extra = {
"example": {
"user_id": "user_12345",
"annual_income": 75000.0,
"credit_score": 720,
"employment_status": "full_time",
"debt_to_income_ratio": 0.25,
"age": 32,
"education_level": "bachelor"
}
}
class FinancialTransaction(BaseModel):
user_id: str = Field(..., description="User identifier")
transaction_id: str = Field(..., description="Unique transaction identifier")
amount: float = Field(..., description="Transaction amount")
transaction_type: TransactionType = Field(..., description="Type of transaction")
category: str = Field(..., description="Transaction category")
merchant: Optional[str] = Field(None, description="Merchant name")
description: str = Field(..., description="Transaction description")
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
location: Optional[str] = Field(None, description="Transaction location")
class Config:
json_schema_extra = {
"example": {
"user_id": "user_12345",
"transaction_id": "txn_67890",
"amount": 1250.75,
"transaction_type": "payment",
"category": "rent",
"merchant": "Property Management Co",
"description": "Monthly rent payment",
"location": "New York, NY"
}
}
class FinancialBehaviorEvent(BaseModel):
user_id: str = Field(..., description="User identifier")
event_type: str = Field(..., description="Type of financial behavior")
description: str = Field(..., description="Detailed description of the behavior")
risk_indicators: List[str] = Field(default_factory=list, description="Risk indicators observed")
positive_indicators: List[str] = Field(default_factory=list, description="Positive indicators observed")
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Config:
json_schema_extra = {
"example": {
"user_id": "user_12345",
"event_type": "payment_pattern_analysis",
"description": "User consistently pays bills 3-5 days early",
"risk_indicators": [],
"positive_indicators": ["early_payment", "consistent_behavior", "financial_discipline"],
}
}
class FinancialScoreRequest(BaseModel):
user_id: str = Field(..., description="User identifier to score")
include_reasoning: bool = Field(True, description="Include reasoning in response")
class FinancialScoreResponse(BaseModel):
user_id: str
financial_score: int = Field(..., ge=0, le=1000, description="Financial score (0-1000)")
risk_level: RiskLevel
behavior_category: FinancialBehaviorCategory
confidence: float = Field(..., ge=0, le=1, description="Confidence in the score")
key_factors: List[str] = Field(..., description="Key factors influencing the score")
recommendations: List[str] = Field(..., description="Personalized recommendations")
reasoning: Optional[str] = Field(None, description="Detailed reasoning behind the score")
last_updated: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
# Authentication dependency
async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
"""Verify the authorization token"""
if not AUTHORIZATION_TOKEN:
return None
if credentials.credentials != AUTHORIZATION_TOKEN:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authorization token",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials.credentials
# Startup and shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager"""
global graphiti_instance, pubsub_publisher
logger.info("Starting Financial Scoring API with Graphiti...")
# Initialize Graphiti
try:
graphiti_instance = Graphiti(
uri=NEO4J_URI,
user=NEO4J_USER,
password=NEO4J_PASSWORD
)
# Build indices and constraints
await graphiti_instance.build_indices_and_constraints()
logger.info("Graphiti initialized successfully")
# Initialize PubSub client if project is configured
if GOOGLE_CLOUD_PROJECT:
pubsub_publisher = pubsub_v1.PublisherClient()
logger.info("PubSub publisher initialized")
yield
except Exception as e:
logger.error(f"Failed to initialize services: {e}")
raise
finally:
# Cleanup
if graphiti_instance:
await graphiti_instance.close()
logger.info("Graphiti connection closed")
# Create FastAPI app
app = FastAPI(
title="Financial Score Classification API",
description="An intelligent financial scoring system powered by Graphiti knowledge graphs",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Financial scoring logic
class FinancialScoringEngine:
"""Advanced financial scoring engine using Graphiti knowledge graph"""
@staticmethod
async def calculate_financial_score(user_id: str, include_reasoning: bool = True) -> FinancialScoreResponse:
"""Calculate comprehensive financial score for a user"""
if not graphiti_instance:
raise HTTPException(500, "Graphiti instance not available")
try:
# Search for user's financial data in the knowledge graph
user_query = f"financial profile and behavior data for user {user_id}"
search_results = await graphiti_instance.search(query=user_query)
# Extract relevant information from search results
financial_data = []
for result in search_results:
financial_data.append(result.fact)
# If no data found, return default score
if not financial_data:
return FinancialScoreResponse(
user_id=user_id,
financial_score=500, # Neutral score
risk_level=RiskLevel.MEDIUM,
behavior_category=FinancialBehaviorCategory.MODERATE,
confidence=0.3,
key_factors=["insufficient_data"],
recommendations=["Provide more financial data for better scoring"],
reasoning="Limited financial data available for scoring" if include_reasoning else None
)
# Analyze the financial data to calculate score
score_components = await FinancialScoringEngine._analyze_financial_behavior(financial_data)
# Calculate final score (0-1000)
base_score = 500 # Starting point
final_score = base_score + score_components['adjustment']
final_score = max(0, min(1000, final_score)) # Clamp to range
# Determine risk level and behavior category
risk_level = FinancialScoringEngine._determine_risk_level(final_score)
behavior_category = FinancialScoringEngine._determine_behavior_category(score_components)
return FinancialScoreResponse(
user_id=user_id,
financial_score=int(final_score),
risk_level=risk_level,
behavior_category=behavior_category,
confidence=score_components['confidence'],
key_factors=score_components['key_factors'],
recommendations=score_components['recommendations'],
reasoning=score_components['reasoning'] if include_reasoning else None
)
except Exception as e:
logger.error(f"Error calculating financial score for user {user_id}: {e}")
raise HTTPException(500, f"Failed to calculate financial score: {str(e)}")
@staticmethod
async def _analyze_financial_behavior(financial_data: List[str]) -> Dict[str, Any]:
"""Analyze financial behavior patterns from knowledge graph data"""
# This is a simplified scoring algorithm
# In production, you'd use more sophisticated ML models
adjustment = 0
key_factors = []
recommendations = []
confidence = 0.5
reasoning_parts = []
data_text = " ".join(financial_data).lower()
# Positive indicators
positive_patterns = {
"early payment": 50,
"consistent savings": 60,
"low debt ratio": 70,
"stable income": 40,
"emergency fund": 80,
"investment": 30,
"good credit": 60
}
# Negative indicators
negative_patterns = {
"late payment": -60,
"high debt": -70,
"overdraft": -50,
"bankruptcy": -200,
"missed payment": -80,
"maxed credit": -90,
"frequent borrowing": -40
}
# Analyze positive patterns
for pattern, points in positive_patterns.items():
if pattern in data_text:
adjustment += points
key_factors.append(pattern.replace(" ", "_"))
reasoning_parts.append(f"Positive: {pattern} (+{points} points)")
confidence += 0.1
# Analyze negative patterns
for pattern, points in negative_patterns.items():
if pattern in data_text:
adjustment += points # Points are already negative
key_factors.append(pattern.replace(" ", "_"))
reasoning_parts.append(f"Negative: {pattern} ({points} points)")
confidence += 0.1
# Generate recommendations based on analysis
if "late payment" in data_text:
recommendations.append("Set up automatic payments to avoid late fees")
if "high debt" in data_text:
recommendations.append("Consider debt consolidation or payment plan")
if "emergency fund" not in data_text:
recommendations.append("Build an emergency fund covering 3-6 months of expenses")
if "investment" not in data_text and adjustment > 0:
recommendations.append("Consider starting an investment portfolio for long-term growth")
confidence = min(1.0, confidence)
reasoning = "; ".join(reasoning_parts) if reasoning_parts else "Analysis based on available financial data"
return {
'adjustment': adjustment,
'key_factors': key_factors,
'recommendations': recommendations,
'confidence': confidence,
'reasoning': reasoning
}
@staticmethod
def _determine_risk_level(score: int) -> RiskLevel:
"""Determine risk level based on financial score"""
if score >= 750:
return RiskLevel.LOW
elif score >= 600:
return RiskLevel.MEDIUM
elif score >= 400:
return RiskLevel.HIGH
else:
return RiskLevel.VERY_HIGH
@staticmethod
def _determine_behavior_category(score_components: Dict[str, Any]) -> FinancialBehaviorCategory:
"""Determine financial behavior category"""
key_factors = score_components['key_factors']
conservative_indicators = ['emergency_fund', 'consistent_savings', 'low_debt_ratio']
aggressive_indicators = ['investment', 'frequent_borrowing', 'high_debt']
conservative_count = sum(1 for indicator in conservative_indicators if indicator in key_factors)
aggressive_count = sum(1 for indicator in aggressive_indicators if indicator in key_factors)
if conservative_count >= 2:
return FinancialBehaviorCategory.CONSERVATIVE
elif aggressive_count >= 2:
return FinancialBehaviorCategory.AGGRESSIVE
elif 'investment' in key_factors and aggressive_count >= 1:
return FinancialBehaviorCategory.SPECULATIVE
else:
return FinancialBehaviorCategory.MODERATE
# PubSub message handling
async def process_financial_event(message_data: Dict[str, Any]):
"""Process financial events from PubSub"""
try:
event_type = message_data.get('event_type')
if event_type == 'user_profile':
await handle_user_profile_update(UserFinancialProfile(**message_data['data']))
elif event_type == 'transaction':
await handle_financial_transaction(FinancialTransaction(**message_data['data']))
elif event_type == 'behavior':
await handle_financial_behavior(FinancialBehaviorEvent(**message_data['data']))
except Exception as e:
logger.error(f"Error processing financial event: {e}")
async def handle_user_profile_update(profile: UserFinancialProfile):
"""Handle user profile updates"""
if not graphiti_instance:
return
episode_name = f"user_profile_{profile.user_id}_{datetime.now().isoformat()}"
episode_body = f"""
User {profile.user_id} financial profile update:
- Annual income: ${profile.annual_income:,.2f}
- Credit score: {profile.credit_score or 'Not provided'}
- Employment status: {profile.employment_status}
- Debt-to-income ratio: {profile.debt_to_income_ratio or 'Not provided'}
- Age: {profile.age}
- Education level: {profile.education_level}
This represents the current financial standing and demographic information for the user.
"""
await graphiti_instance.add_episode(
name=episode_name,
episode_body=episode_body,
source=EpisodeType.text,
source_description="User financial profile update",
reference_time=datetime.now(timezone.utc)
)
async def handle_financial_transaction(transaction: FinancialTransaction):
"""Handle financial transaction events"""
if not graphiti_instance:
return
episode_name = f"transaction_{transaction.transaction_id}"
episode_body = f"""
Financial transaction for user {transaction.user_id}:
- Transaction ID: {transaction.transaction_id}
- Amount: ${transaction.amount:,.2f}
- Type: {transaction.transaction_type}
- Category: {transaction.category}
- Merchant: {transaction.merchant or 'Not specified'}
- Description: {transaction.description}
- Location: {transaction.location or 'Not specified'}
- Timestamp: {transaction.timestamp.isoformat()}
This transaction represents financial behavior and spending patterns for the user.
"""
await graphiti_instance.add_episode(
name=episode_name,
episode_body=episode_body,
source=EpisodeType.text,
source_description="Financial transaction record",
reference_time=transaction.timestamp
)
async def handle_financial_behavior(behavior: FinancialBehaviorEvent):
"""Handle financial behavior analysis events"""
if not graphiti_instance:
return
episode_name = f"behavior_{behavior.user_id}_{datetime.now().isoformat()}"
episode_body = f"""
Financial behavior analysis for user {behavior.user_id}:
- Event type: {behavior.event_type}
- Description: {behavior.description}
- Risk indicators: {', '.join(behavior.risk_indicators) if behavior.risk_indicators else 'None identified'}
- Positive indicators: {', '.join(behavior.positive_indicators) if behavior.positive_indicators else 'None identified'}
- Analysis timestamp: {behavior.timestamp.isoformat()}
This analysis provides insights into the user's financial behavior patterns and risk profile.
"""
await graphiti_instance.add_episode(
name=episode_name,
episode_body=episode_body,
source=EpisodeType.text,
source_description="Financial behavior analysis",
reference_time=behavior.timestamp
)
# API Endpoints
@app.get("/health")
async def health_check():
"""Health check endpoint"""
neo4j_connected = graphiti_instance is not None
return {
"status": "healthy",
"timestamp": datetime.now(timezone.utc),
"neo4j_connected": neo4j_connected,
"version": "1.0.0"
}
@app.post("/financial/profile", status_code=201)
async def create_user_profile(
profile: UserFinancialProfile,
background_tasks: BackgroundTasks,
token: Optional[str] = Depends(verify_token)
):
"""Create or update user financial profile"""
background_tasks.add_task(handle_user_profile_update, profile)
return {
"message": "User profile update queued for processing",
"user_id": profile.user_id,
"status": "success"
}
@app.post("/financial/transaction", status_code=201)
async def record_transaction(
transaction: FinancialTransaction,
background_tasks: BackgroundTasks,
token: Optional[str] = Depends(verify_token)
):
"""Record a financial transaction"""
background_tasks.add_task(handle_financial_transaction, transaction)
return {
"message": "Transaction recorded successfully",
"transaction_id": transaction.transaction_id,
"status": "success"
}
@app.post("/financial/behavior", status_code=201)
async def record_behavior(
behavior: FinancialBehaviorEvent,
background_tasks: BackgroundTasks,
token: Optional[str] = Depends(verify_token)
):
"""Record financial behavior analysis"""
background_tasks.add_task(handle_financial_behavior, behavior)
return {
"message": "Financial behavior recorded successfully",
"user_id": behavior.user_id,
"status": "success"
}
@app.post("/financial/score", response_model=FinancialScoreResponse)
async def get_financial_score(
score_request: FinancialScoreRequest,
token: Optional[str] = Depends(verify_token)
):
"""Get comprehensive financial score for a user"""
return await FinancialScoringEngine.calculate_financial_score(
score_request.user_id,
score_request.include_reasoning
)
@app.get("/financial/user/{user_id}/history")
async def get_user_financial_history(
user_id: str,
limit: int = 50,
token: Optional[str] = Depends(verify_token)
):
"""Get financial history for a user from the knowledge graph"""
if not graphiti_instance:
raise HTTPException(500, "Graphiti instance not available")
query = f"financial history transactions and behaviors for user {user_id}"
results = await graphiti_instance.search(query=query)
# Limit results
limited_results = results[:limit]
return {
"user_id": user_id,
"history": [
{
"uuid": result.uuid,
"fact": result.fact,
"valid_at": result.valid_at,
"invalid_at": result.invalid_at
}
for result in limited_results
],
"total_results": len(limited_results)
}
@app.post("/pubsub/webhook")
async def pubsub_webhook(request: Request, background_tasks: BackgroundTasks):
"""Webhook endpoint for PubSub messages"""
try:
# Decode PubSub message
body = await request.body()
message_data = json.loads(body)
# Extract and decode the actual message
if 'message' in message_data and 'data' in message_data['message']:
import base64
decoded_data = base64.b64decode(message_data['message']['data']).decode('utf-8')
financial_event = json.loads(decoded_data)
# Process the financial event
background_tasks.add_task(process_financial_event, financial_event)
return {"status": "message processed"}
else:
raise HTTPException(400, "Invalid PubSub message format")
except Exception as e:
logger.error(f"Error processing PubSub message: {e}")
raise HTTPException(500, f"Failed to process message: {str(e)}")
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Financial Score Classification API",
"version": "1.0.0",
"description": "Intelligent financial scoring powered by Graphiti",
"docs": "/docs",
"health": "/health"
}
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 8000))
uvicorn.run(app, host="0.0.0.0", port=port)
Now let's build the TypeScript client that will interact with our Python API and handle PubSub messaging.
npm install axios google-cloud/pubsub
npm install -D @types/node typescript
// helpers/FinancialGraphiti.helper.ts
import axios, { AxiosResponse } from 'axios';
// Types matching our Python API
export interface UserFinancialProfile {
user_id: string;
annual_income: number;
credit_score?: number;
employment_status: string;
debt_to_income_ratio?: number;
age: number;
education_level: string;
}
export interface FinancialTransaction {
user_id: string;
transaction_id: string;
amount: number;
transaction_type: 'deposit' | 'withdrawal' | 'payment' | 'transfer' | 'loan_payment' | 'credit_card_payment' | 'investment';
category: string;
merchant?: string;
description: string;
timestamp?: string;
location?: string;
}
export interface FinancialBehaviorEvent {
user_id: string;
event_type: string;
description: string;
risk_indicators: string[];
positive_indicators: string[];
timestamp?: string;
}
export interface FinancialScoreRequest {
user_id: string;
include_reasoning: boolean;
}
export interface FinancialScoreResponse {
user_id: string;
financial_score: number;
risk_level: 'low' | 'medium' | 'high' | 'very_high';
behavior_category: 'conservative' | 'moderate' | 'aggressive' | 'speculative';
confidence: number;
key_factors: string[];
recommendations: string[];
reasoning?: string;
last_updated: string;
}
export interface UserFinancialHistory {
user_id: string;
history: Array<{
uuid: string;
fact: string;
valid_at?: string;
invalid_at?: string;
}>;
total_results: number;
}
const FINANCIAL_API_BASE_URL = process.env.FINANCIAL_API_BASE_URL || 'http://localhost:8000';
const FINANCIAL_API_TOKEN = process.env.FINANCIAL_API_TOKEN;
export default class FinancialGraphitiHelper {
private static getHeaders() {
return {
'Content-Type': 'application/json',
...(FINANCIAL_API_TOKEN && { Authorization: `Bearer ${FINANCIAL_API_TOKEN}` })
};
}
/**
* Check the health status of the financial API
*/
static async healthCheck(): Promise<{ status: string; neo4j_connected: boolean }> {
const response: AxiosResponse = await axios.get(`${FINANCIAL_API_BASE_URL}/health`, {
headers: this.getHeaders()
});
return response.data;
}
/**
* Create or update a user's financial profile
*/
static async createUserProfile(profile: UserFinancialProfile): Promise<{ message: string; user_id: string; status: string }> {
const response: AxiosResponse = await axios.post(`${FINANCIAL_API_BASE_URL}/financial/profile`, profile, {
headers: this.getHeaders()
});
return response.data;
}
/**
* Record a financial transaction
*/
static async recordTransaction(transaction: FinancialTransaction): Promise<{ message: string; transaction_id: string; status: string }> {
const response: AxiosResponse = await axios.post(`${FINANCIAL_API_BASE_URL}/financial/transaction`, transaction, {
headers: this.getHeaders()
});
return response.data;
}
/**
* Record financial behavior analysis
*/
static async recordBehavior(behavior: FinancialBehaviorEvent): Promise<{ message: string; user_id: string; status: string }> {
const response: AxiosResponse = await axios.post(`${FINANCIAL_API_BASE_URL}/financial/behavior`, behavior, {
headers: this.getHeaders()
});
return response.data;
}
/**
* Get comprehensive financial score for a user
*/
static async getFinancialScore(scoreRequest: FinancialScoreRequest): Promise<FinancialScoreResponse> {
const response: AxiosResponse<FinancialScoreResponse> = await axios.post(`${FINANCIAL_API_BASE_URL}/financial/score`, scoreRequest, {
headers: this.getHeaders()
});
return response.data;
}
/**
* Get financial history for a user
*/
static async getUserFinancialHistory(userId: string, limit: number = 50): Promise<UserFinancialHistory> {
const response: AxiosResponse<UserFinancialHistory> = await axios.get(`${FINANCIAL_API_BASE_URL}/financial/user/${userId}/history?limit=${limit}`, {
headers: this.getHeaders()
});
return response.data;
}
/**
* Analyze spending patterns and generate insights
*/
static async analyzeSpendingPatterns(userId: string, transactions: FinancialTransaction[]): Promise<FinancialBehaviorEvent[]> {
const insights: FinancialBehaviorEvent[] = [];
// Group transactions by category
const categoryTotals = transactions.reduce((acc, transaction) => {
acc[transaction.category] = (acc[transaction.category] || 0) + Math.abs(transaction.amount);
return acc;
}, {} as Record<string, number>);
// Analyze payment timing
const payments = transactions.filter(t => t.transaction_type === 'payment');
const earlyPayments = payments.filter(t => {
const day = new Date(t.timestamp || Date.now()).getDate();
return day <= 5; // Payments in first 5 days of month
});
if (earlyPayments.length / payments.length > 0.7) {
insights.push({
user_id: userId,
event_type: 'payment_pattern_analysis',
description: 'User consistently makes payments early in the month',
risk_indicators: [],
positive_indicators: ['early_payment', 'consistent_behavior', 'financial_discipline']
});
}
// Analyze spending diversity
const categories = Object.keys(categoryTotals);
if (categories.length > 5) {
insights.push({
user_id: userId,
event_type: 'spending_diversity_analysis',
description: 'User shows diverse spending patterns across multiple categories',
risk_indicators: [],
positive_indicators: ['spending_diversity', 'balanced_lifestyle']
});
}
// Check for large transactions
const totalSpending = transactions.reduce((sum, t) => sum + Math.abs(t.amount), 0);
const averageTransaction = totalSpending / transactions.length;
const largeTransactions = transactions.filter(t => Math.abs(t.amount) > averageTransaction * 3);
if (largeTransactions.length > 0) {
insights.push({
user_id: userId,
event_type: 'large_transaction_analysis',
description: `User has ${largeTransactions.length} transaction(s) significantly above average`,
risk_indicators: largeTransactions.length > 3 ? ['frequent_large_transactions'] : [],
positive_indicators: largeTransactions.some(t => t.category === 'investment') ? ['investment_activity'] : []
});
}
return insights;
}
/**
* Generate financial recommendations based on user profile and history
*/
static async generateRecommendations(userId: string): Promise<string[]> {
try {
const scoreResponse = await this.getFinancialScore({ user_id: userId, include_reasoning: true });
const history = await this.getUserFinancialHistory(userId, 100);
const recommendations: string[] = [...scoreResponse.recommendations];
// Analyze transaction patterns from history
const hasInvestmentMention = history.history.some(h =>
h.fact.toLowerCase().includes('investment') ||
h.fact.toLowerCase().includes('investing')
);
const hasEmergencyFundMention = history.history.some(h =>
h.fact.toLowerCase().includes('emergency fund') ||
h.fact.toLowerCase().includes('savings')
);
if (!hasInvestmentMention && scoreResponse.financial_score > 600) {
recommendations.push('Consider diversifying into low-risk investment options like index funds');
}
if (!hasEmergencyFundMention) {
recommendations.push('Build an emergency fund covering 3-6 months of expenses');
}
if (scoreResponse.risk_level === 'high' || scoreResponse.risk_level === 'very_high') {
recommendations.push('Consider speaking with a financial advisor for personalized guidance');
recommendations.push('Review and optimize your monthly budget to reduce unnecessary expenses');
}
return [...new Set(recommendations)]; // Remove duplicates
} catch (error) {
console.error('Error generating recommendations:', error);
return ['Unable to generate recommendations at this time'];
}
}
}
// services/FinancialPubSub.service.ts
import { PubSub } from '@google-cloud/pubsub';
import FinancialGraphitiHelper, {
UserFinancialProfile,
FinancialTransaction,
FinancialBehaviorEvent
} from '../helpers/FinancialGraphiti.helper';
interface PubSubMessage {
event_type: 'user_profile' | 'transaction' | 'behavior';
data: UserFinancialProfile | FinancialTransaction | FinancialBehaviorEvent;
timestamp: string;
}
export default class FinancialPubSubService {
private pubsub: PubSub;
private projectId: string;
constructor(projectId?: string) {
this.projectId = projectId || process.env.GOOGLE_CLOUD_PROJECT || '';
this.pubsub = new PubSub({ projectId: this.projectId });
}
/**
* Publish user profile update to PubSub
*/
async publishUserProfile(profile: UserFinancialProfile): Promise<void> {
const message: PubSubMessage = {
event_type: 'user_profile',
data: profile,
timestamp: new Date().toISOString()
};
await this.publishMessage('financial-events', message);
}
/**
* Publish financial transaction to PubSub
*/
async publishTransaction(transaction: FinancialTransaction): Promise<void> {
const message: PubSubMessage = {
event_type: 'transaction',
data: transaction,
timestamp: new Date().toISOString()
};
await this.publishMessage('financial-events', message);
}
/**
* Publish financial behavior event to PubSub
*/
async publishBehavior(behavior: FinancialBehaviorEvent): Promise<void> {
const message: PubSubMessage = {
event_type: 'behavior',
data: behavior,
timestamp: new Date().toISOString()
};
await this.publishMessage('financial-events', message);
}
/**
* Process a batch of financial transactions
*/
async processBatchTransactions(userId: string, transactions: FinancialTransaction[]): Promise<void> {
try {
// Publish all transactions
for (const transaction of transactions) {
await this.publishTransaction(transaction);
}
// Analyze patterns and publish behaviors
const behaviors = await FinancialGraphitiHelper.analyzeSpendingPatterns(userId, transactions);
for (const behavior of behaviors) {
await this.publishBehavior(behavior);
}
console.log(`Processed ${transactions.length} transactions and ${behaviors.length} behavior insights for user ${userId}`);
} catch (error) {
console.error('Error processing batch transactions:', error);
throw error;
}
}
/**
* Subscribe to financial events and process them
*/
async subscribeToFinancialEvents(subscriptionName: string): Promise<void> {
const subscription = this.pubsub.subscription(subscriptionName);
const messageHandler = async (message: any) => {
try {
const messageData: PubSubMessage = JSON.parse(message.data.toString());
console.log(`Received ${messageData.event_type} event:`, messageData);
// Process the message through our direct API calls
switch (messageData.event_type) {
case 'user_profile':
await FinancialGraphitiHelper.createUserProfile(messageData.data as UserFinancialProfile);
break;
case 'transaction':
await FinancialGraphitiHelper.recordTransaction(messageData.data as FinancialTransaction);
break;
case 'behavior':
await FinancialGraphitiHelper.recordBehavior(messageData.data as FinancialBehaviorEvent);
break;
}
message.ack();
console.log(`Successfully processed ${messageData.event_type} event`);
} catch (error) {
console.error('Error processing message:', error);
message.nack();
}
};
subscription.on('message', messageHandler);
subscription.on('error', error => {
console.error('Subscription error:', error);
});
console.log(`Listening for messages on subscription: ${subscriptionName}`);
}
/**
* Create a real-time financial monitoring system
*/
async startFinancialMonitoring(userId: string): Promise<void> {
console.log(`Starting financial monitoring for user: ${userId}`);
// Set up a subscription specifically for this user
const userSubscriptionName = `financial-monitoring-${userId}`;
try {
await this.subscribeToFinancialEvents(userSubscriptionName);
// Periodically generate financial scores and insights
setInterval(async () => {
try {
const score = await FinancialGraphitiHelper.getFinancialScore({
user_id: userId,
include_reasoning: true
});
console.log(`Financial Score Update for ${userId}:`, {
score: score.financial_score,
risk_level: score.risk_level,
behavior_category: score.behavior_category,
confidence: score.confidence
});
// If score has changed significantly, publish an update
if (score.confidence > 0.7) {
await this.publishBehavior({
user_id: userId,
event_type: 'score_update',
description: `Financial score updated to ${score.financial_score} with ${score.risk_level} risk level`,
risk_indicators: score.risk_level === 'high' || score.risk_level === 'very_high' ? ['elevated_risk'] : [],
positive_indicators: score.financial_score > 700 ? ['strong_financial_profile'] : []
});
}
} catch (error) {
console.error(`Error updating financial score for user ${userId}:`, error);
}
}, 300000); // Update every 5 minutes
} catch (error) {
console.error(`Error setting up financial monitoring for user ${userId}:`, error);
throw error;
}
}
/**
* Helper method to publish messages to PubSub
*/
private async publishMessage(topicName: string, message: PubSubMessage): Promise<void> {
try {
const topic = this.pubsub.topic(topicName);
const messageBuffer = Buffer.from(JSON.stringify(message));
await topic.publishMessage({ data: messageBuffer });
console.log(`Published ${message.event_type} message to ${topicName}`);
} catch (error) {
console.error(`Error publishing message to ${topicName}:`, error);
throw error;
}
}
/**
* Batch process multiple financial events efficiently
*/
async batchProcessFinancialEvents(events: PubSubMessage[]): Promise<void> {
const batchSize = 10;
for (let i = 0; i < events.length; i += batchSize) {
const batch = events.slice(i, i + batchSize);
await Promise.all(batch.map(async (event) => {
try {
await this.publishMessage('financial-events', event);
} catch (error) {
console.error(`Error processing event in batch:`, error);
}
}));
// Small delay between batches to avoid overwhelming the system
await new Promise(resolve => setTimeout(resolve, 100));
}
}
}
Let's create a comprehensive example that demonstrates how all components work together:
// examples/FinancialScoringDemo.ts
import FinancialGraphitiHelper, {
UserFinancialProfile,
FinancialTransaction
} from '../helpers/FinancialGraphiti.helper';
import FinancialPubSubService from '../services/FinancialPubSub.service';
/**
* Comprehensive demonstration of the financial scoring system
*/
export class FinancialScoringDemo {
private pubsubService: FinancialPubSubService;
constructor(projectId?: string) {
this.pubsubService = new FinancialPubSubService(projectId);
}
/**
* Run a complete demonstration of the financial scoring system
*/
async runDemo(): Promise<void> {
console.log('š¦ Starting Financial Scoring System Demo...\n');
// 1. Create a sample user profile
const userProfile: UserFinancialProfile = {
user_id: 'demo_user_12345',
annual_income: 85000,
credit_score: 740,
employment_status: 'full_time',
debt_to_income_ratio: 0.28,
age: 29,
education_level: 'bachelor'
};
console.log('š Creating user profile...');
await FinancialGraphitiHelper.createUserProfile(userProfile);
await this.delay(2000); // Allow time for processing
// 2. Simulate financial transactions
const transactions: FinancialTransaction[] = [
{
user_id: userProfile.user_id,
transaction_id: 'txn_001',
amount: 1200,
transaction_type: 'payment',
category: 'rent',
merchant: 'Downtown Apartments',
description: 'Monthly rent payment',
location: 'San Francisco, CA'
},
{
user_id: userProfile.user_id,
transaction_id: 'txn_002',
amount: 500,
transaction_type: 'investment',
category: 'stocks',
merchant: 'Investment Platform',
description: 'Monthly investment contribution',
location: 'Online'
},
{
user_id: userProfile.user_id,
transaction_id: 'txn_003',
amount: 300,
transaction_type: 'deposit',
category: 'savings',
merchant: 'Local Bank',
description: 'Emergency fund contribution',
location: 'San Francisco, CA'
},
{
user_id: userProfile.user_id,
transaction_id: 'txn_004',
amount: 75,
transaction_type: 'payment',
category: 'utilities',
merchant: 'Electric Company',
description: 'Monthly electricity bill',
location: 'San Francisco, CA'
},
{
user_id: userProfile.user_id,
transaction_id: 'txn_005',
amount: 450,
transaction_type: 'payment',
category: 'groceries',
merchant: 'Whole Foods',
description: 'Weekly grocery shopping',
location: 'San Francisco, CA'
}
];
console.log('š³ Recording financial transactions...');
for (const transaction of transactions) {
await FinancialGraphitiHelper.recordTransaction(transaction);
await this.delay(500); // Simulate real-time transaction flow
}
// 3. Process transactions through PubSub for behavior analysis
console.log('š Processing transactions for behavior analysis...');
await this.pubsubService.processBatchTransactions(userProfile.user_id, transactions);
await this.delay(3000); // Allow time for analysis
// 4. Get initial financial score
console.log('š Calculating initial financial score...');
const initialScore = await FinancialGraphitiHelper.getFinancialScore({
user_id: userProfile.user_id,
include_reasoning: true
});
this.displayScoreResults('Initial Score', initialScore);
// 5. Simulate additional positive financial behavior
console.log('\nš” Simulating additional positive financial behaviors...');
const additionalTransactions: FinancialTransaction[] = [
{
user_id: userProfile.user_id,
transaction_id: 'txn_006',
amount: 800,
transaction_type: 'payment',
category: 'loan_payment',
merchant: 'Student Loan Servicer',
description: 'Extra student loan payment',
location: 'Online'
},
{
user_id: userProfile.user_id,
transaction_id: 'txn_007',
amount: 200,
transaction_type: 'investment',
category: 'retirement',
merchant: '401k Provider',
description: 'Additional 401k contribution',
location: 'Online'
}
];
for (const transaction of additionalTransactions) {
await FinancialGraphitiHelper.recordTransaction(transaction);
await this.delay(500);
}
// Record positive behavior insights
await FinancialGraphitiHelper.recordBehavior({
user_id: userProfile.user_id,
event_type: 'debt_reduction_behavior',
description: 'User made additional loan payment above minimum requirement',
risk_indicators: [],
positive_indicators: ['debt_reduction', 'extra_payment', 'financial_discipline']
});
await FinancialGraphitiHelper.recordBehavior({
user_id: userProfile.user_id,
event_type: 'retirement_planning',
description: 'User increased retirement contributions beyond employer match',
risk_indicators: [],
positive_indicators: ['retirement_planning', 'long_term_thinking', 'financial_wellness']
});
await this.delay(3000); // Allow time for processing
// 6. Get updated financial score
console.log('š Calculating updated financial score...');
const updatedScore = await FinancialGraphitiHelper.getFinancialScore({
user_id: userProfile.user_id,
include_reasoning: true
});
this.displayScoreResults('Updated Score', updatedScore);
// 7. Generate personalized recommendations
console.log('\nšÆ Generating personalized recommendations...');
const recommendations = await FinancialGraphitiHelper.generateRecommendations(userProfile.user_id);
console.log('š Personalized Recommendations:');
recommendations.forEach((rec, index) => {
console.log(` ${index + 1}. ${rec}`);
});
// 8. Display financial history
console.log('\nš Financial History Summary:');
const history = await FinancialGraphitiHelper.getUserFinancialHistory(userProfile.user_id, 10);
console.log(`š Recent Financial Events (${history.total_results} total):`);
history.history.slice(0, 5).forEach((event, index) => {
console.log(` ${index + 1}. ${event.fact.substring(0, 100)}...`);
});
// 9. Start real-time monitoring
console.log('\nš Starting real-time financial monitoring...');
console.log(' (This would run continuously in a production environment)');
// In a real application, this would run as a background service
// await this.pubsubService.startFinancialMonitoring(userProfile.user_id);
console.log('\nā
Financial Scoring System Demo Complete!');
console.log('\nš The system is now ready to:');
console.log(' ⢠Process real-time financial transactions');
console.log(' ⢠Analyze spending patterns and behaviors');
console.log(' ⢠Generate dynamic risk assessments');
console.log(' ⢠Provide personalized financial recommendations');
console.log(' ⢠Track financial health over time');
}
/**
* Display formatted score results
*/
private displayScoreResults(title: string, score: any): void {
console.log(`\nš ${title}:`);
console.log(` Score: ${score.financial_score}/1000`);
console.log(` Risk Level: ${score.risk_level.toUpperCase()}`);
console.log(` Behavior Category: ${score.behavior_category.toUpperCase()}`);
console.log(` Confidence: ${(score.confidence * 100).toFixed(1)}%`);
if (score.key_factors.length > 0) {
console.log(` Key Factors: ${score.key_factors.join(', ')}`);
}
if (score.reasoning) {
console.log(` Reasoning: ${score.reasoning}`);
}
}
/**
* Utility method for delays
*/
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Run the demo
async function main() {
try {
const demo = new FinancialScoringDemo();
await demo.runDemo();
} catch (error) {
console.error('Demo failed:', error);
}
}
// Uncomment to run the demo
// main();
Create the necessary environment files:
.env
file:# Neo4j Configuration
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=your_neo4j_password
# API Security
AUTHORIZATION=your_api_token
# LLM Configuration
LLM_MODEL=gpt-4o
SMALL_LLM_MODEL=gpt-4o-mini
OPENAI_API_KEY=your_openai_key
# Google Cloud
GOOGLE_CLOUD_PROJECT=your_project_id
.env
file:# Financial API
FINANCIAL_API_BASE_URL=http://localhost:8000
FINANCIAL_API_TOKEN=your_api_token
# Google Cloud
GOOGLE_CLOUD_PROJECT=your_project_id
GOOGLE_APPLICATION_CREDENTIALS=path/to/service-account.json
Create a Dockerfile
for the Python API:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
apiVersion: apps/v1
kind: Deployment
metadata:
name: financial-scoring-api
spec:
replicas: 3
selector:
matchLabels:
app: financial-scoring-api
template:
metadata:
labels:
app: financial-scoring-api
spec:
containers:
- name: api
image: your-registry/financial-scoring-api:latest
ports:
- containerPort: 8000
env:
- name: NEO4J_URI
valueFrom:
secretKeyRef:
name: financial-secrets
key: neo4j_uri
- name: NEO4J_PASSWORD
valueFrom:
secretKeyRef:
name: financial-secrets
key: neo4j_password
---
apiVersion: v1
kind: Service
metadata:
name: financial-scoring-api-service
spec:
selector:
app: financial-scoring-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
You can extend the system with more sophisticated ML models:
# Advanced scoring with ML models
import joblib
from sklearn.ensemble import RandomForestClassifier
import numpy as np
class AdvancedScoringEngine:
def __init__(self):
# Load pre-trained models
self.risk_model = joblib.load('models/risk_classifier.pkl')
self.score_model = joblib.load('models/score_regressor.pkl')
async def calculate_ml_score(self, features: Dict[str, float]) -> Dict[str, Any]:
"""Calculate score using machine learning models"""
feature_vector = np.array([[
features.get('income', 0),
features.get('debt_ratio', 0),
features.get('credit_score', 0),
features.get('transaction_frequency', 0),
features.get('savings_rate', 0)
]])
# Predict risk level
risk_prob = self.risk_model.predict_proba(feature_vector)[0]
risk_level = self.risk_model.classes_[np.argmax(risk_prob)]
# Predict financial score
predicted_score = self.score_model.predict(feature_vector)[0]
return {
'ml_score': int(predicted_score),
'ml_risk_level': risk_level,
'confidence': float(np.max(risk_prob))
}
// Fraud detection service
export class FraudDetectionService {
static async detectAnomalies(userId: string, transaction: FinancialTransaction): Promise<boolean> {
// Get user's transaction history
const history = await FinancialGraphitiHelper.getUserFinancialHistory(userId, 100);
// Analyze for anomalies
const anomalies = this.analyzeTransactionAnomalies(transaction, history);
if (anomalies.length > 0) {
// Record suspicious behavior
await FinancialGraphitiHelper.recordBehavior({
user_id: userId,
event_type: 'fraud_detection',
description: `Potential fraudulent activity detected: ${anomalies.join(', ')}`,
risk_indicators: ['potential_fraud', ...anomalies],
positive_indicators: []
});
return true;
}
return false;
}
private static analyzeTransactionAnomalies(transaction: FinancialTransaction, history: any): string[] {
const anomalies: string[] = [];
// Check for unusual amount
// Check for unusual location
// Check for unusual time
// Check for unusual merchant
return anomalies;
}
}
Building an intelligent financial score classification system with Graphiti represents a significant advancement over traditional static scoring methods. By leveraging the power of knowledge graphs, real-time data processing, and intelligent reasoning, we can create systems that:
The combination of Graphiti's knowledge graph capabilities, Python's robust ecosystem, TypeScript's type safety, and PubSub's real-time messaging creates a powerful foundation for next-generation fintech applications.
This tutorial provides a comprehensive starting point, but the possibilities are endless. You can extend this system with machine learning models, real-time fraud detection, predictive analytics, and much more. The key is that Graphiti provides the intelligent, dynamic foundation that makes all these advanced features possible.
Start building your intelligent financial systems today, and unlock the power of knowledge graphs for better financial decision-making! š
Want to learn more about Graphiti? Check out the official repository and join the community of developers building the future of knowledge-driven applications.