import { Q } from '@nozbe/watermelondb'; import { database } from 'src/watermelondb'; import Message from 'src/watermelondb/models/Message'; import { chatApi } from '@api/chat'; import NetInfo from '@react-native-community/netinfo'; import { testConnectionSpeed } from 'src/database/speedService'; export function makeChatKey(params: { chatUid?: number | null; groupChatToken?: string | null }) { if (params.chatUid) return `u:${params.chatUid}`; if (params.groupChatToken) return `g:${params.groupChatToken}`; throw new Error('Invalid chat identity'); } function now() { return Date.now(); } export type MessageDirtyAction = | { type: 'send'; value: { text: string; currentUid: number; attachment?: any; reply_to_id?: number; replyMessage?: any; }; ts: number; } | { type: 'edit'; value: { text: string }; ts: number } | { type: 'delete'; value?: any; ts: number } | { type: 'read'; value: { messagesIds: number[] }; ts: number } | { type: 'reaction'; value: string; ts: number } | { type: 'unreaction'; value: string; ts: number }; export function addMessageDirtyAction(msg: Message, action: Omit) { const list: MessageDirtyAction[] = msg.dirtyActions ? JSON.parse(msg.dirtyActions) : []; list.push({ ...(action as MessageDirtyAction), ts: now() }); msg.isDirty = true; msg.dirtyActions = JSON.stringify(list); (msg as any)._raw._status = 'synced'; (msg as any)._raw._changed = ''; } export function compactMessageActions(actions: MessageDirtyAction[]): MessageDirtyAction[] { if (!actions.length) return []; const sorted = [...actions].sort((a, b) => a.ts - b.ts); const res: MessageDirtyAction[] = []; for (const a of sorted) { const last = res[res.length - 1]; if (a.type === 'delete') { return [a]; } if (a.type === 'edit' && last?.type === 'edit') { last.value = a.value; last.ts = a.ts; continue; } if ( (last?.type === 'reaction' && a.type === 'unreaction') || (last?.type === 'unreaction' && a.type === 'reaction') ) { res.pop(); continue; } if (a.type === 'reaction' && last?.type === 'reaction') { last.value = a.value; last.ts = a.ts; continue; } if (a.type === 'read' && last?.type === 'read') { last.ts = a.ts; continue; } res.push(a); } return res; } async function performMessageAction(token: string, msg: Message, action: MessageDirtyAction) { const isGroup = Boolean(msg.isGroup); const chatKey = msg.chatKey; if (action.type !== 'send' && msg.messageId == null) { throw new Error('Message has no server id yet'); } switch (action.type) { case 'send': { if (isGroup) { const res = await chatApi.sendGroupMessage({ token, to_group_token: chatKey.slice(2), text: action.value.text, attachment: action.value.attachment, reply_to_id: action.value.reply_to_id }); return { newId: res.data.message_id, attachment: res.data.attachment, wsEvent: { action: 'new_message', payload: { message: { _id: res.data.message_id, text: action.value.text, replyMessage: action.value.replyMessage, attachment: res.data.attachment ? res.data.attachment : -1 } } } }; } else { const res = await chatApi.sendMessage({ token, to_uid: Number(chatKey.slice(2)), text: action.value.text, attachment: action.value.attachment, reply_to_id: action.value.reply_to_id }); return { newId: res.data.message_id, attachment: res.data.attachment, wsEvent: { action: 'new_message', payload: { message: { _id: res.data.message_id, text: action.value.text, replyMessage: action.value.replyMessage, attachment: res.data.attachment ? res.data.attachment : -1 } } } }; } } case 'edit': { if (isGroup) { await chatApi.editGroupMessage({ token, group_token: chatKey.slice(2), message_id: msg.messageId!, text: action.value.text }); } else { await chatApi.editMessage({ token, to_uid: Number(chatKey.slice(2)), message_id: msg.messageId!, text: action.value.text }); } return {}; } case 'delete': { if (isGroup) { await chatApi.deleteGroupMessage({ token, group_token: chatKey.slice(2), message_id: msg.messageId! }); } else { await chatApi.deleteMessage({ token, conversation_with_user: Number(chatKey.slice(2)), message_id: msg.messageId! }); } return { deleted: true }; } case 'reaction': { if (isGroup) { await chatApi.reactToGroupMessage({ token, group_token: chatKey.slice(2), message_id: msg.messageId!, reaction: action.value }); } else { await chatApi.reactToMessage({ token, conversation_with_user: Number(chatKey.slice(2)), message_id: msg.messageId!, reaction: action.value }); } return {}; } case 'unreaction': { if (isGroup) { await chatApi.unreactToGroupMessage({ token, group_token: chatKey.slice(2), message_id: msg.messageId! }); } else { await chatApi.unreactToMessage({ token, conversation_with_user: Number(chatKey.slice(2)), message_id: msg.messageId! }); } return {}; } case 'read': { if (isGroup) { await chatApi.groupMessagesRead({ token, group_token: chatKey.slice(2), messages_id: action.value.messagesIds }); } else { await chatApi.messagesRead({ token, from_user: Number(chatKey.slice(2)), messages_id: action.value.messagesIds }); } return {}; } } } export async function upsertMessagesIntoDB({ chatUid, groupToken, apiMessages, avatar = null, name = '' }: { chatUid?: number; groupToken?: string; apiMessages: any[]; avatar?: string | null; name?: string | null; }) { if (!apiMessages?.length) return; const chatKey = makeChatKey({ chatUid, groupChatToken: groupToken }); const isGroup = Boolean(groupToken); const col = database.get('messages'); await database.write(async () => { const batch: any[] = []; for (const msg of apiMessages) { const compositeId = `${chatKey}:${msg.id}`; const existing = await col .query(Q.where('chat_key', chatKey), Q.where('message_id', msg.id)) .fetch(); if (existing.length) { const record = existing[0]; const hasDirty = Boolean(msg.dirtyActions); try { batch.push( record.prepareUpdate((r) => { r.messageId = msg.id; r.sentAt = msg.sent_datetime; r.receivedAt = msg.received_datetime ?? r.receivedAt; r.readAt = msg.read_datetime ?? r.readAt; if (!hasDirty) { r.text = msg.text; } if (msg.attachement && msg.attachement !== -1) { const prev = r.attachment ? JSON.parse(r.attachment) : {}; r.attachment = JSON.stringify({ ...msg.attachement, local_uri: prev?.local_uri ?? null }); } else { r.attachment = null; } r.status = msg.status; r.isSending = false; r.replyToId = msg.reply_to_id; r.replyTo = msg.reply_to ? JSON.stringify(msg.reply_to) : null; if (avatar && groupToken) { r.senderAvatar = avatar; } else { r.senderAvatar = msg.sender_avatar ?? null; } if (name && groupToken) { r.senderName = name; } else { r.senderName = msg.sender_name ?? ''; } r.reactions = msg.reactions ?? '{}'; r.edits = msg.edits ?? '{}'; (r as any)._raw._status = 'synced'; (r as any)._raw._changed = ''; }) ); } catch (err) {} } else { try { batch.push( col.prepareCreate((r) => { r.chatKey = chatKey; r.isGroup = isGroup; r.messageId = msg.id; r.compositeId = compositeId; r.sentAt = msg.sent_datetime; r.receivedAt = msg.received_datetime ?? null; r.readAt = msg.read_datetime ?? null; r.senderId = msg.sender; r.recipientId = msg.recipient; r.text = msg.text; r.status = msg.status; r.isSending = false; r.reactions = msg.reactions ?? '{}'; r.edits = msg.edits ?? '{}'; r.attachment = msg.attachement && msg.attachement !== -1 ? JSON.stringify(msg.attachement) : null; r.encrypted = msg.encrypted ?? 0; r.replyToId = msg.reply_to_id ?? -1; r.replyTo = msg.reply_to ? JSON.stringify(msg.reply_to) : null; if (avatar && groupToken) { r.senderAvatar = avatar; } else { r.senderAvatar = msg.sender_avatar ?? null; } if (name && groupToken) { r.senderName = name; } else { r.senderName = msg.sender_name ?? ''; } r.isDirty = false; r.dirtyActions = null; (r as any)._raw._status = 'synced'; (r as any)._raw._changed = ''; }) ); } catch (err) {} } } if (batch.length) { await database.batch(batch); } }); } export async function reconcileChatRange( chatKey: string, serverMessages: any[], isLatest: boolean ) { if (!serverMessages.length) return; const col = database.get('messages'); if (serverMessages.length === 1 && serverMessages[0].status === 4 && isLatest) { const keepCompositeId = `${chatKey}:${serverMessages[0].id}`; const local = await col.query(Q.where('chat_key', chatKey)).fetch(); await database.write(async () => { for (const msg of local) { if (msg.compositeId !== keepCompositeId) { await msg.destroyPermanently(); } } }); return; } const serverIds = new Set(serverMessages.map((m) => m.id)); const minId = Math.min(...serverMessages.map((m) => m.id)); const maxId = Math.max(...serverMessages.map((m) => m.id)); const local = await col .query(Q.where('chat_key', chatKey), Q.where('message_id', Q.between(minId, maxId))) .fetch(); await database.write(async () => { for (const msg of local) { if (msg.messageId && msg.messageId > 0 && !serverIds.has(msg.messageId) && !msg.isDirty) { await msg.destroyPermanently(); } } }); } export type OutgoingWsEvent = { action: string; payload: Record; }; export async function pushMessageChanges( token: string, onWsEvent?: (event: OutgoingWsEvent) => void ) { const col = database.get('messages'); const dirty = await col.query(Q.where('is_dirty', true)).fetch(); if (!dirty.length) return; for (const msg of dirty) { const actions: MessageDirtyAction[] = msg.dirtyActions ? JSON.parse(msg.dirtyActions) : []; const compacted = compactMessageActions(actions); for (const a of compacted) { const res = await performMessageAction(token, msg, a); await database.write(async () => { if (res?.newId) { const duplicates = await col .query(Q.where('chat_key', msg.chatKey), Q.where('message_id', res.newId)) .fetch(); for (const d of duplicates) { if (d.id !== msg.id) { await d.destroyPermanently(); } } } msg.update((m) => { if (res?.newId) { m.messageId = res.newId; m.compositeId = `${msg.chatKey}:${res.newId}`; m.status = 1; m.isSending = false; } if (res?.attachment) { const prev = m.attachment ? JSON.parse(m.attachment) : {}; m.attachment = JSON.stringify({ ...res.attachment, local_uri: prev?.local_uri ?? null }); } m.isDirty = false; m.dirtyActions = null; (m as any)._raw._status = 'synced'; (m as any)._raw._changed = ''; if (res?.wsEvent && onWsEvent) { onWsEvent(res.wsEvent); } }); }); } } } let pushInFlight = false; let needsAnotherRun = false; export async function triggerMessagePush( token: string, onWsEvent?: (event: OutgoingWsEvent) => void ) { if (pushInFlight) { needsAnotherRun = true; return; } pushInFlight = true; try { do { needsAnotherRun = false; await pushMessageChanges(token, onWsEvent); } while (needsAnotherRun); } finally { pushInFlight = false; } } export function normalizeServerMessage(s: any, chatKey: string, isGroup: boolean) { return { messageId: s.id, compositeId: `${chatKey}:${s.id}`, chatKey, isGroup, senderId: s.sender, recipientId: s.recipient, text: s.text, sentAt: s.sent_datetime, receivedAt: s.received_datetime, readAt: s.read_datetime, status: s.status, reactions: s.reactions, edits: s.edits, attachment: s.attachement !== -1 ? JSON.stringify(s.attachement) : null, replyToId: s.reply_to_id ?? null, replyTo: s.reply_to_id && s.reply_to_id !== -1 ? JSON.stringify(s.reply_to) : null, encrypted: s.encrypted, senderName: s.sender_name ?? null, senderAvatar: s.sender_avatar ?? null, isSending: false }; } export async function syncMessagesIncremental(token: string) { const net = await NetInfo.fetch(); if (!net.isConnected) return; try { const speed = await testConnectionSpeed(); if ((speed?.downloadSpeed && speed.downloadSpeed < 0.2) || (speed?.ping && speed.ping > 1500)) { console.warn('Internet too slow for sync'); return; } } catch {} await pushMessageChanges(token); }