const express = require('express'); const http = require('http'); const { Server } = require('socket.io'); const cors = require('cors'); const { db, con } = require('./db'); const app = express(); app.use(cors()); app.use(express.json()); const server = http.createServer(app); const io = new Server(server, { cors: { origin: "*", // Allow all for dev, restrict in prod methods: ["GET", "POST"] } }); const CHANNELS = [ { id: 'nebula', name: 'Nebula' }, { id: 'solstice', name: 'Solstice' }, { id: 'zenith', name: 'Zenith' }, { id: 'aether', name: 'Aether' }, { id: 'vortex', name: 'Vortex' }, { id: 'borealis', name: 'Borealis' }, { id: 'chronos', name: 'Chronos' }, { id: 'elysium', name: 'Elysium' }, { id: 'ignis', name: 'Ignis' }, { id: 'nova', name: 'Nova' } ]; // Store connected users in memory for "Online" status // Map const connectedSockets = new Map(); io.on('connection', (socket) => { console.log('A user connected:', socket.id); socket.on('join', async ({ walletAddress, username }) => { console.log(`User joining: ${username} (${walletAddress})`); connectedSockets.set(socket.id, walletAddress); const now = new Date().toISOString(); // First, check if this wallet already has a username con.prepare(`SELECT username FROM users WHERE wallet_address = ?`, (err, stmt) => { if (err) return console.error("Prepare error:", err); stmt.all(walletAddress, (err, rows) => { stmt.finalize(); if (err) return console.error("Execute error:", err); if (rows.length > 0) { // User exists, update last seen and ensure balance const existingUser = rows[0]; const existingUsername = existingUser.username; const existingBalance = existingUser.balance ?? 100; con.prepare(`UPDATE users SET last_seen = ?, balance = ? WHERE wallet_address = ?`, (err, uStmt) => { if (err) return console.error("Prepare error:", err); // If balance is NULL or 0, give them 100 const finalBalance = (existingBalance < 30) ? 100 : existingBalance; uStmt.run(now, finalBalance, walletAddress, (err) => { uStmt.finalize(); if (err) console.error("Update error:", err); socket.emit('usernameUpdated', { username: existingUsername }); socket.emit('balanceUpdated', { balance: finalBalance }); broadcastUserList(); }); }); } else { // New user, check if username is taken con.prepare(`SELECT wallet_address FROM users WHERE username = ?`, (err, nStmt) => { if (err) return console.error("Prepare error:", err); nStmt.all(username, (err, uRows) => { nStmt.finalize(); if (err) return console.error("Check error:", err); let finalUsername = username; if (uRows.length > 0) { finalUsername = `${username}_${walletAddress.slice(0, 4)}`; } con.prepare(`INSERT INTO users (wallet_address, username, last_seen, balance) VALUES (?, ?, ?, ?)`, (err, iStmt) => { if (err) return console.error("Prepare error:", err); iStmt.run(walletAddress, finalUsername, now, 100, (err) => { iStmt.finalize(); if (err) console.error("Insert error:", err); socket.emit('usernameUpdated', { username: finalUsername }); socket.emit('balanceUpdated', { balance: 100 }); broadcastUserList(); }); }); }); }); } }); }); }); socket.on('updateUsername', ({ walletAddress, newUsername, txId }) => { console.log(`User ${walletAddress} requesting username change to ${newUsername} (TX: ${txId})`); // First check if user exists and has enough balance con.prepare(`SELECT balance FROM users WHERE wallet_address = ?`, (err, bStmt) => { if (err) return socket.emit('error', { message: 'Database error' }); bStmt.all(walletAddress, (err, rows) => { bStmt.finalize(); if (err) return socket.emit('error', { message: 'Database error' }); if (rows.length === 0) return socket.emit('error', { message: 'User not found' }); if (rows[0].balance < 30) return socket.emit('error', { message: 'Insufficient $PLEXUS balance' }); // Check if username is taken con.prepare(`SELECT wallet_address FROM users WHERE username = ?`, (err, stmt) => { if (err) return socket.emit('error', { message: 'Database error' }); stmt.all(newUsername, (err, uRows) => { stmt.finalize(); if (err) return socket.emit('error', { message: 'Database error' }); if (uRows.length > 0) { return socket.emit('error', { message: 'Username already taken' }); } // Update username and deduct balance in one go if possible (or chain) con.prepare(`UPDATE users SET username = ?, balance = balance - 30 WHERE wallet_address = ?`, (err, uStmt) => { if (err) return socket.emit('error', { message: 'Failed to update username' }); uStmt.run(newUsername, walletAddress, (err) => { uStmt.finalize(); if (err) return socket.emit('error', { message: 'Failed to update username' }); console.log(`Username updated for ${walletAddress} to ${newUsername}`); socket.emit('usernameUpdated', { username: newUsername }); // Fetch new balance to sync con.prepare(`SELECT balance FROM users WHERE wallet_address = ?`, (err, sStmt) => { if (!err) { sStmt.all(walletAddress, (err, rRows) => { sStmt.finalize(); if (!err && rRows.length > 0) { socket.emit('balanceUpdated', { balance: rRows[0].balance }); } }); } }); broadcastUserList(); // Also broadcast a system message about the change const systemMsg = { id: Date.now(), channelId: 'nebula', walletAddress: 'system', username: 'System', content: `${walletAddress.slice(0, 4)}... changed their name to ${newUsername}`, timestamp: new Date().toISOString(), status: 'validated' }; io.emit('newMessage', systemMsg); }); }); }); }); }); }); }); socket.on('sendMessage', ({ channelId, walletAddress, content, txId }) => { if (!content || content.trim() === '') return; console.log(`Message from ${walletAddress} in ${channelId} (TX: ${txId}): ${content}`); const timestamp = new Date().toISOString(); con.prepare(`INSERT INTO messages (id, channel_id, wallet_address, content, timestamp, tx_id) VALUES (nextval('seq_msg_id'), ?, ?, ?, ?, ?) RETURNING id`, (err, stmt) => { if (err) return console.error("Prepare error:", err); // Deduct 1 PLEXUS con.prepare(`UPDATE users SET balance = balance - 1 WHERE wallet_address = ? AND balance >= 1`, (err, bStmt) => { if (err) return console.error("Balance update error:", err); bStmt.run(walletAddress, function (err) { // Use function to get this.changes bStmt.finalize(); if (err) return console.error("Balance deduct error:", err); // If no rows updated, balance was too low (though client should prevent this) // We proceed anyway for now as we don't have easy rollback here without transactions, // but in a real app we'd check first. // Fetch new balance to sync client con.prepare(`SELECT balance FROM users WHERE wallet_address = ?`, (err, sStmt) => { if (!err) { sStmt.all(walletAddress, (err, rows) => { sStmt.finalize(); if (!err && rows.length > 0) { // Emit to specific socket if possible, or broadcast? // We don't have the socket object here easily unless we map wallet -> socket // But we can just rely on client optimistic update for now, or... // Let's try to find the socket for (const [sid, wallet] of connectedSockets.entries()) { if (wallet === walletAddress) { io.to(sid).emit('balanceUpdated', { balance: rows[0].balance }); } } } }); } }); }); }); stmt.all(channelId, walletAddress, content, timestamp, txId, (err, rows) => { stmt.finalize(); if (err) { console.error("Error saving message:", err); return; } const msgId = rows[0].id; con.prepare(`SELECT username FROM users WHERE wallet_address = ?`, (err, uStmt) => { if (err) return console.error("Prepare error:", err); uStmt.all(walletAddress, (err, uRows) => { uStmt.finalize(); const username = uRows.length > 0 ? uRows[0].username : walletAddress.slice(0, 4) + '...'; const message = { id: msgId, channelId, walletAddress, username, content, timestamp, txId }; io.emit('newMessage', message); }); }); }); }); }); socket.on('getProfile', (targetAddress) => { console.log(`[Profile] Fetching for: ${targetAddress}`); con.prepare(`SELECT wallet_address, username, bio, banner_color, balance, last_seen FROM users WHERE wallet_address = ?`, (err, stmt) => { if (err) { console.error('[Profile] DB Prepare error:', err); return socket.emit('error', { message: 'Database error' }); } stmt.all(targetAddress, (err, rows) => { stmt.finalize(); if (err) { console.error('[Profile] DB Exec error:', err); return socket.emit('error', { message: 'Database error' }); } if (rows.length === 0) { console.warn(`[Profile] User not found: ${targetAddress}`); return socket.emit('error', { message: 'User not found' }); } const user = rows[0]; // Helper to handle BigInt serialization const serializeBigInt = (data) => { return JSON.parse(JSON.stringify(data, (key, value) => typeof value === 'bigint' ? value.toString() : value )); }; // Fetch posts with comment counts and repost counts con.prepare(` SELECT p.*, (SELECT CAST(COUNT(*) AS INTEGER) FROM comments WHERE post_id = p.id) as comment_count, (SELECT CAST(COUNT(*) AS INTEGER) FROM reposts WHERE post_id = p.id) as repost_count FROM posts p WHERE p.wallet_address = ? ORDER BY p.timestamp DESC LIMIT 50 `, (err, pStmt) => { if (err) { console.error('[Profile] Posts prepare error:', err); return socket.emit('profileData', serializeBigInt({ ...user, posts: [], reposts: [] })); } pStmt.all(targetAddress, (err, posts) => { pStmt.finalize(); if (err) console.error('[Profile] Posts exec error:', err); posts = posts || []; // Fetch who reposted each post if (posts.length > 0) { const postIds = posts.map(p => p.id); con.prepare(`SELECT post_id, wallet_address FROM reposts WHERE post_id IN (${postIds.join(',')})`, (err, rStmt) => { if (err) { console.error('[Profile] Reposts prepare error:', err); posts = posts.map(p => ({ ...p, reposted_by: [] })); emitProfileWithReposts(); return; } rStmt.all((err, repostRows) => { rStmt.finalize(); if (!err && repostRows) { posts = posts.map(p => ({ ...p, reposted_by: repostRows.filter(r => r.post_id === p.id).map(r => r.wallet_address) })); } else { posts = posts.map(p => ({ ...p, reposted_by: [] })); } emitProfileWithReposts(); }); }); } else { emitProfileWithReposts(); } // Fetch posts this user reposted (from other users) function emitProfileWithReposts() { con.prepare(` SELECT p.*, u.username as original_username, r.timestamp as repost_timestamp FROM reposts r JOIN posts p ON r.post_id = p.id JOIN users u ON p.wallet_address = u.wallet_address WHERE r.wallet_address = ? ORDER BY r.timestamp DESC LIMIT 20 `, (err, rpStmt) => { if (err) { console.error('[Profile] User reposts error:', err); return socket.emit('profileData', serializeBigInt({ ...user, posts, reposts: [] })); } rpStmt.all(targetAddress, (err, userReposts) => { rpStmt.finalize(); socket.emit('profileData', serializeBigInt({ ...user, posts, reposts: userReposts || [] })); }); }); } }); }); }); }); }); socket.on('repost', ({ postId, walletAddress }) => { console.log(`User ${walletAddress} toggling repost for post ${postId}`); // Check if user already reposted this post con.prepare(`SELECT id FROM reposts WHERE post_id = ? AND wallet_address = ?`, (err, checkStmt) => { if (err) return console.error("Prepare error:", err); checkStmt.all(postId, walletAddress, (err, rows) => { checkStmt.finalize(); if (err) return console.error("Check error:", err); if (rows.length > 0) { // Already reposted, so toggle OFF (delete) const repostId = rows[0].id; con.prepare(`DELETE FROM reposts WHERE id = ?`, (err, delStmt) => { if (err) return console.error("Delete prepare error:", err); delStmt.run(repostId, (err) => { delStmt.finalize(); if (err) return console.error("Delete error:", err); io.emit('repostToggled', { postId, walletAddress, action: 'removed' }); }); }); } else { // Not reposted yet, so toggle ON (insert) const timestamp = new Date().toISOString(); con.prepare(`INSERT INTO reposts (id, post_id, wallet_address, timestamp) VALUES (nextval('seq_repost_id'), ?, ?, ?) RETURNING id`, (err, stmt) => { if (err) return console.error("Prepare error:", err); stmt.all(postId, walletAddress, timestamp, (err, insertRows) => { stmt.finalize(); if (err) return console.error("Insert error:", err); io.emit('repostToggled', { postId, walletAddress, repostId: insertRows[0].id, action: 'added' }); }); }); } }); }); }); socket.on('updateProfile', ({ walletAddress, bio, bannerColor }) => { console.log(`Updating profile for ${walletAddress}`); con.prepare(`UPDATE users SET bio = ?, banner_color = ? WHERE wallet_address = ?`, (err, stmt) => { if (err) return socket.emit('error', { message: 'Database error' }); stmt.run(bio, bannerColor, walletAddress, (err) => { stmt.finalize(); if (err) return socket.emit('error', { message: 'Failed to update profile' }); socket.emit('profileUpdated', { bio, bannerColor }); broadcastUserList(); }); }); }); socket.on('createPost', ({ walletAddress, content }) => { console.log(`Creating post for ${walletAddress}`); const timestamp = new Date().toISOString(); con.prepare(`INSERT INTO posts (id, wallet_address, content, timestamp) VALUES (nextval('seq_post_id'), ?, ?, ?) RETURNING id`, (err, stmt) => { if (err) return console.error("Prepare error:", err); stmt.all(walletAddress, content, timestamp, (err, rows) => { stmt.finalize(); if (err) return console.error("Insert error:", err); const post = { id: rows[0].id, wallet_address: walletAddress, content, timestamp, comments: [] }; // Broadcast to all (or just profile viewers? for now all) io.emit('postCreated', post); }); }); }); socket.on('createComment', ({ postId, walletAddress, content }) => { console.log(`Creating comment on post ${postId} by ${walletAddress}`); const timestamp = new Date().toISOString(); con.prepare(`INSERT INTO comments (id, post_id, wallet_address, content, timestamp) VALUES (nextval('seq_comment_id'), ?, ?, ?, ?) RETURNING id`, (err, stmt) => { if (err) return console.error("Prepare error:", err); stmt.all(postId, walletAddress, content, timestamp, (err, rows) => { stmt.finalize(); if (err) return console.error("Insert error:", err); // Fetch username con.prepare(`SELECT username FROM users WHERE wallet_address = ?`, (err, uStmt) => { if (err) return; uStmt.all(walletAddress, (err, uRows) => { uStmt.finalize(); const username = uRows.length > 0 ? uRows[0].username : walletAddress.slice(0, 4); const comment = { id: rows[0].id, post_id: postId, wallet_address: walletAddress, username, content, timestamp }; io.emit('commentCreated', comment); }); }); }); }); }); socket.on('getComments', (postId) => { con.prepare(` SELECT c.*, u.username FROM comments c LEFT JOIN users u ON c.wallet_address = u.wallet_address WHERE c.post_id = ? ORDER BY c.timestamp ASC `, (err, stmt) => { if (err) return; stmt.all(postId, (err, rows) => { stmt.finalize(); if (!err) { socket.emit('commentsLoaded', { postId, comments: rows }); } }); }); }); socket.on('toggleReaction', ({ messageId, walletAddress, emoji }) => { console.log(`Toggling reaction: ${emoji} on message ${messageId} by ${walletAddress}`); con.prepare(`SELECT * FROM reactions WHERE message_id = ? AND wallet_address = ? AND emoji = ?`, (err, stmt) => { if (err) return console.error("Prepare error:", err); stmt.all(messageId, walletAddress, emoji, (err, rows) => { stmt.finalize(); if (err) return console.error("Error checking reaction:", err); if (rows.length > 0) { con.prepare(`DELETE FROM reactions WHERE message_id = ? AND wallet_address = ? AND emoji = ?`, (err, dStmt) => { if (err) return console.error("Prepare error:", err); dStmt.run(messageId, walletAddress, emoji, (err) => { dStmt.finalize(); if (err) return console.error("Error removing reaction:", err); broadcastReactions(messageId); }); }); } else { con.prepare(`INSERT INTO reactions (message_id, wallet_address, emoji) VALUES (?, ?, ?)`, (err, iStmt) => { if (err) return console.error("Prepare error:", err); iStmt.run(messageId, walletAddress, emoji, (err) => { iStmt.finalize(); if (err) return console.error("Error adding reaction:", err); broadcastReactions(messageId); }); }); } }); }); }); function broadcastReactions(messageId) { console.log("Broadcasting reactions for message:", messageId); con.prepare(`SELECT emoji, wallet_address FROM reactions WHERE message_id = ?`, (err, stmt) => { if (err) return console.error("Prepare error:", err); stmt.all(messageId, (err, rows) => { stmt.finalize(); if (err) { console.error("Error fetching reactions for broadcast:", err); return; } console.log(`Found ${rows.length} reactions, emitting updateReactions`); io.emit('updateReactions', { messageId, reactions: rows }); }); }); } socket.on('disconnect', () => { const walletAddress = connectedSockets.get(socket.id); connectedSockets.delete(socket.id); if (walletAddress) { // Update last seen? broadcastUserList(); } console.log('User disconnected:', socket.id); }); function broadcastUserList() { // Get all users from DB to show offline ones too con.prepare(`SELECT wallet_address, username, last_seen FROM users`, (err, stmt) => { if (err) return console.error("Prepare error:", err); stmt.all((err, rows) => { stmt.finalize(); if (err) return; const onlineWallets = new Set(connectedSockets.values()); const users = rows.map(u => ({ ...u, online: onlineWallets.has(u.wallet_address) })); io.emit('userList', users); }); }); } }); app.get('/api/channels', (req, res) => { res.json(CHANNELS); }); app.get('/api/messages/:channelId', (req, res) => { const { channelId } = req.params; con.prepare(` SELECT m.*, u.username FROM messages m LEFT JOIN users u ON m.wallet_address = u.wallet_address WHERE m.channel_id = ? ORDER BY m.timestamp ASC LIMIT 100 `, (err, stmt) => { if (err) return res.status(500).json({ error: err.message }); stmt.all(channelId, (err, messages) => { stmt.finalize(); if (err) return res.status(500).json({ error: err.message }); // Fetch reactions for all these messages con.prepare(`SELECT * FROM reactions WHERE message_id IN (SELECT id FROM messages WHERE channel_id = ?)`, (err, rStmt) => { if (err) return res.json(messages.map(m => ({ ...m, reactions: [] }))); rStmt.all(channelId, (err, reactions) => { rStmt.finalize(); if (err) return res.json(messages.map(m => ({ ...m, reactions: [] }))); const messagesWithReactions = messages.map(m => ({ ...m, reactions: reactions.filter(r => r.message_id === m.id) })); res.json(messagesWithReactions); }); }); }); }); }); const PORT = process.env.PORT || 3000; // AI Summary Endpoint app.post('/api/summary', async (req, res) => { const { channelId } = req.body; console.log(`[AI] Summary request for channel: ${channelId}`); if (!process.env.OPENROUTER_API_KEY) { console.error('[AI] Missing OPENROUTER_API_KEY environment variable'); return res.status(500).json({ error: 'AI service not configured (API key missing)' }); } try { con.prepare(` SELECT m.content, u.username, m.timestamp FROM messages m JOIN users u ON m.wallet_address = u.wallet_address WHERE m.channel_id = ? ORDER BY m.timestamp DESC LIMIT 50 `, (err, stmt) => { if (err) { console.error('[AI] DB Prepare error:', err); return res.status(500).json({ error: 'Database error' }); } stmt.all(channelId, async (err, rows) => { stmt.finalize(); if (err) { console.error('[AI] DB Execution error:', err); return res.status(500).json({ error: 'Database error' }); } if (!rows || rows.length === 0) { console.log(`[AI] No messages found for channel ${channelId}`); return res.json({ summary: "This channel is a quiet void... for now. Send some messages to generate a summary!" }); } const conversation = rows.reverse().map(r => `${r.username}: ${r.content}`).join('\n'); console.log(`[AI] Summarizing ${rows.length} messages...`); try { const response = await fetch("https://openrouter.ai/api/v1/chat/completions", { method: "POST", headers: { "Authorization": `Bearer ${process.env.OPENROUTER_API_KEY}`, "Content-Type": "application/json", "HTTP-Referer": "https://plexus.social", "X-Title": "Plexus Social" }, body: JSON.stringify({ "model": "xiaomi/mimo-v2-flash:free", "messages": [ { "role": "system", "content": `You are Plexus AI, a high-signal crypto analyst. Summarize the conversation for #${channelId} with extreme precision. Structure your output in Markdown: # 📊 EXECUTIVE SUMMARY # 💎 KEY TOPICS & ALPHA # 🎭 SENTIMENT ANALYSIS # 📜 NOTABLE QUOTES Use emojis and bold text for impact. Keep it high-signal.` }, { "role": "user", "content": `Analyze and summarize this conversation:\n\n${conversation}` } ] }) }); const data = await response.json(); if (data.choices && data.choices[0]) { console.log('[AI] Summary generated successfully'); res.json({ summary: data.choices[0].message.content }); } else { console.error('[AI] OpenRouter error response:', JSON.stringify(data)); res.status(500).json({ error: 'AI Error: ' + (data.error?.message || 'Unknown provider error') }); } } catch (apiErr) { console.error('[AI] Fetch exception:', apiErr); res.status(500).json({ error: 'Failed to reach the AI collective.' }); } }); }); } catch (e) { console.error('[AI] Critical error:', e); res.status(500).json({ error: 'Internal Server Error' }); } }); server.listen(PORT, () => { console.log(`Server running on port ${PORT}`); });