|
|
@@ -0,0 +1,548 @@
|
|
|
+import { Q } from '@nozbe/watermelondb';
|
|
|
+import { database } from 'src/watermelondb';
|
|
|
+import Message from 'src/watermelondb/models/Message';
|
|
|
+import { chatApi } from '@api/chat';
|
|
|
+import NetInfo from '@react-native-community/netinfo';
|
|
|
+import { testConnectionSpeed } from 'src/database/speedService';
|
|
|
+
|
|
|
+export function makeChatKey(params: { chatUid?: number | null; groupChatToken?: string | null }) {
|
|
|
+ if (params.chatUid) return `u:${params.chatUid}`;
|
|
|
+ if (params.groupChatToken) return `g:${params.groupChatToken}`;
|
|
|
+ throw new Error('Invalid chat identity');
|
|
|
+}
|
|
|
+
|
|
|
+function now() {
|
|
|
+ return Date.now();
|
|
|
+}
|
|
|
+
|
|
|
+export type MessageDirtyAction =
|
|
|
+ | {
|
|
|
+ type: 'send';
|
|
|
+ value: {
|
|
|
+ text: string;
|
|
|
+ currentUid: number;
|
|
|
+ attachment?: any;
|
|
|
+ reply_to_id?: number;
|
|
|
+ replyMessage?: any;
|
|
|
+ };
|
|
|
+ ts: number;
|
|
|
+ }
|
|
|
+ | { type: 'edit'; value: { text: string }; ts: number }
|
|
|
+ | { type: 'delete'; ts: number }
|
|
|
+ | { type: 'read'; value: { messagesIds: number[] }; ts: number }
|
|
|
+ | { type: 'reaction'; value: string; ts: number }
|
|
|
+ | { type: 'unreaction'; value: string; ts: number };
|
|
|
+
|
|
|
+export function addMessageDirtyAction(msg: Message, action: Omit<MessageDirtyAction, 'ts'>) {
|
|
|
+ const list: MessageDirtyAction[] = msg.dirtyActions ? JSON.parse(msg.dirtyActions) : [];
|
|
|
+
|
|
|
+ list.push({ ...action, ts: now() });
|
|
|
+
|
|
|
+ msg.isDirty = true;
|
|
|
+ msg.dirtyActions = JSON.stringify(list);
|
|
|
+}
|
|
|
+
|
|
|
+export function compactMessageActions(actions: MessageDirtyAction[]): MessageDirtyAction[] {
|
|
|
+ if (!actions.length) return [];
|
|
|
+
|
|
|
+ const sorted = [...actions].sort((a, b) => a.ts - b.ts);
|
|
|
+ const res: MessageDirtyAction[] = [];
|
|
|
+
|
|
|
+ for (const a of sorted) {
|
|
|
+ const last = res[res.length - 1];
|
|
|
+
|
|
|
+ if (a.type === 'delete') {
|
|
|
+ return [a];
|
|
|
+ }
|
|
|
+
|
|
|
+ if (a.type === 'edit' && last?.type === 'edit') {
|
|
|
+ last.value = a.value;
|
|
|
+ last.ts = a.ts;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (
|
|
|
+ (last?.type === 'reaction' && a.type === 'unreaction') ||
|
|
|
+ (last?.type === 'unreaction' && a.type === 'reaction')
|
|
|
+ ) {
|
|
|
+ res.pop();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (a.type === 'reaction' && last?.type === 'reaction') {
|
|
|
+ last.value = a.value;
|
|
|
+ last.ts = a.ts;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (a.type === 'read' && last?.type === 'read') {
|
|
|
+ last.ts = a.ts;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ res.push(a);
|
|
|
+ }
|
|
|
+
|
|
|
+ return res;
|
|
|
+}
|
|
|
+
|
|
|
+async function performMessageAction(token: string, msg: Message, action: MessageDirtyAction) {
|
|
|
+ const isGroup = Boolean(msg.isGroup);
|
|
|
+ const chatKey = msg.chatKey;
|
|
|
+ if (action.type !== 'send' && msg.messageId == null) {
|
|
|
+ throw new Error('Message has no server id yet');
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (action.type) {
|
|
|
+ case 'send': {
|
|
|
+ if (isGroup) {
|
|
|
+ const res = await chatApi.sendGroupMessage({
|
|
|
+ token,
|
|
|
+ to_group_token: chatKey.slice(2),
|
|
|
+ text: action.value.text,
|
|
|
+ attachment: action.value.attachment,
|
|
|
+ reply_to_id: action.value.reply_to_id
|
|
|
+ });
|
|
|
+
|
|
|
+ return {
|
|
|
+ newId: res.data.message_id,
|
|
|
+ attachment: res.data.attachment,
|
|
|
+ wsEvent: {
|
|
|
+ action: 'new_message',
|
|
|
+ payload: {
|
|
|
+ message: {
|
|
|
+ _id: res.data.message_id,
|
|
|
+ text: action.value.text,
|
|
|
+ replyMessage: action.value.replyMessage,
|
|
|
+ attachment: res.data.attachment ? res.data.attachment : -1
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ } else {
|
|
|
+ const res = await chatApi.sendMessage({
|
|
|
+ token,
|
|
|
+ to_uid: Number(chatKey.slice(2)),
|
|
|
+ text: action.value.text,
|
|
|
+ attachment: action.value.attachment,
|
|
|
+ reply_to_id: action.value.reply_to_id
|
|
|
+ });
|
|
|
+
|
|
|
+ return {
|
|
|
+ newId: res.data.message_id,
|
|
|
+ attachment: res.data.attachment,
|
|
|
+ wsEvent: {
|
|
|
+ action: 'new_message',
|
|
|
+ payload: {
|
|
|
+ message: {
|
|
|
+ _id: res.data.message_id,
|
|
|
+ text: action.value.text,
|
|
|
+ replyMessage: action.value.replyMessage,
|
|
|
+ attachment: res.data.attachment ? res.data.attachment : -1
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ case 'edit': {
|
|
|
+ if (isGroup) {
|
|
|
+ await chatApi.editGroupMessage({
|
|
|
+ token,
|
|
|
+ group_token: chatKey.slice(2),
|
|
|
+ message_id: msg.messageId!,
|
|
|
+ text: action.value.text
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ await chatApi.editMessage({
|
|
|
+ token,
|
|
|
+ to_uid: Number(chatKey.slice(2)),
|
|
|
+ message_id: msg.messageId!,
|
|
|
+ text: action.value.text
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return {};
|
|
|
+ }
|
|
|
+
|
|
|
+ case 'delete': {
|
|
|
+ if (isGroup) {
|
|
|
+ await chatApi.deleteGroupMessage({
|
|
|
+ token,
|
|
|
+ group_token: chatKey.slice(2),
|
|
|
+ message_id: msg.messageId!
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ await chatApi.deleteMessage({
|
|
|
+ token,
|
|
|
+ conversation_with_user: Number(chatKey.slice(2)),
|
|
|
+ message_id: msg.messageId!
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return { deleted: true };
|
|
|
+ }
|
|
|
+
|
|
|
+ case 'reaction': {
|
|
|
+ if (isGroup) {
|
|
|
+ await chatApi.reactToGroupMessage({
|
|
|
+ token,
|
|
|
+ group_token: chatKey.slice(2),
|
|
|
+ message_id: msg.messageId!,
|
|
|
+ reaction: action.value
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ await chatApi.reactToMessage({
|
|
|
+ token,
|
|
|
+ conversation_with_user: Number(chatKey.slice(2)),
|
|
|
+ message_id: msg.messageId!,
|
|
|
+ reaction: action.value
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return {};
|
|
|
+ }
|
|
|
+
|
|
|
+ case 'unreaction': {
|
|
|
+ if (isGroup) {
|
|
|
+ await chatApi.unreactToGroupMessage({
|
|
|
+ token,
|
|
|
+ group_token: chatKey.slice(2),
|
|
|
+ message_id: msg.messageId!
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ await chatApi.unreactToMessage({
|
|
|
+ token,
|
|
|
+ conversation_with_user: Number(chatKey.slice(2)),
|
|
|
+ message_id: msg.messageId!
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return {};
|
|
|
+ }
|
|
|
+
|
|
|
+ case 'read': {
|
|
|
+ if (isGroup) {
|
|
|
+ await chatApi.groupMessagesRead({
|
|
|
+ token,
|
|
|
+ group_token: chatKey.slice(2),
|
|
|
+ messages_id: action.value.messagesIds
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ await chatApi.messagesRead({
|
|
|
+ token,
|
|
|
+ from_user: Number(chatKey.slice(2)),
|
|
|
+ messages_id: action.value.messagesIds
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return {};
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+export async function upsertMessagesIntoDB({
|
|
|
+ chatUid,
|
|
|
+ groupToken,
|
|
|
+ apiMessages,
|
|
|
+ avatar = null,
|
|
|
+ name = ''
|
|
|
+}: {
|
|
|
+ chatUid?: number;
|
|
|
+ groupToken?: string;
|
|
|
+ apiMessages: any[];
|
|
|
+ avatar?: string | null;
|
|
|
+ name?: string | null;
|
|
|
+}) {
|
|
|
+ if (!apiMessages?.length) return;
|
|
|
+ const chatKey = makeChatKey({ chatUid, groupChatToken: groupToken });
|
|
|
+ const isGroup = Boolean(groupToken);
|
|
|
+
|
|
|
+ const col = database.get<Message>('messages');
|
|
|
+
|
|
|
+ await database.write(async () => {
|
|
|
+ const batch: any[] = [];
|
|
|
+
|
|
|
+ for (const msg of apiMessages) {
|
|
|
+ const compositeId = `${chatKey}:${msg.id}`;
|
|
|
+
|
|
|
+ const existing = await col
|
|
|
+ .query(Q.where('chat_key', chatKey), Q.where('message_id', msg.id))
|
|
|
+ .fetch();
|
|
|
+
|
|
|
+ if (existing.length) {
|
|
|
+ const record = existing[0];
|
|
|
+ const hasDirty = Boolean(msg.dirtyActions);
|
|
|
+
|
|
|
+ batch.push(
|
|
|
+ record.prepareUpdate((r) => {
|
|
|
+ r.messageId = msg.id;
|
|
|
+ r.sentAt = msg.sent_datetime;
|
|
|
+ r.receivedAt = msg.received_datetime ?? r.receivedAt;
|
|
|
+ r.readAt = msg.read_datetime ?? r.readAt;
|
|
|
+
|
|
|
+ if (!hasDirty) {
|
|
|
+ r.text = msg.text;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (msg.attachement && msg.attachement !== -1) {
|
|
|
+ const prev = r.attachment ? JSON.parse(r.attachment) : {};
|
|
|
+ r.attachment = JSON.stringify({
|
|
|
+ ...msg.attachement,
|
|
|
+ local_uri: prev?.local_uri ?? null
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ r.attachment = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ r.status = msg.status;
|
|
|
+ r.isSending = false;
|
|
|
+ r.replyToId = msg.reply_to_id;
|
|
|
+ r.replyTo = msg.reply_to ? JSON.stringify(msg.reply_to) : null;
|
|
|
+
|
|
|
+ if (avatar && groupToken) {
|
|
|
+ r.senderAvatar = avatar;
|
|
|
+ } else {
|
|
|
+ r.senderAvatar = msg.sender_avatar ?? null;
|
|
|
+ }
|
|
|
+ if (name && groupToken) {
|
|
|
+ r.senderName = name;
|
|
|
+ } else {
|
|
|
+ r.senderName = msg.sender_name ?? '';
|
|
|
+ }
|
|
|
+
|
|
|
+ r.reactions = msg.reactions ?? '{}';
|
|
|
+ r.edits = msg.edits ?? '{}';
|
|
|
+ (r as any)._raw._status = 'synced';
|
|
|
+ (r as any)._raw._changed = '';
|
|
|
+ })
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ batch.push(
|
|
|
+ col.prepareCreate((r) => {
|
|
|
+ r.chatKey = chatKey;
|
|
|
+ r.isGroup = isGroup;
|
|
|
+
|
|
|
+ r.messageId = msg.id;
|
|
|
+ r.compositeId = compositeId;
|
|
|
+
|
|
|
+ r.sentAt = msg.sent_datetime;
|
|
|
+ r.receivedAt = msg.received_datetime ?? null;
|
|
|
+ r.readAt = msg.read_datetime ?? null;
|
|
|
+
|
|
|
+ r.senderId = msg.sender;
|
|
|
+ r.recipientId = msg.recipient;
|
|
|
+
|
|
|
+ r.text = msg.text;
|
|
|
+ r.status = msg.status;
|
|
|
+ r.isSending = false;
|
|
|
+
|
|
|
+ r.reactions = msg.reactions ?? '{}';
|
|
|
+ r.edits = msg.edits ?? '{}';
|
|
|
+
|
|
|
+ r.attachment =
|
|
|
+ msg.attachement && msg.attachement !== -1 ? JSON.stringify(msg.attachement) : null;
|
|
|
+
|
|
|
+ r.encrypted = msg.encrypted ?? 0;
|
|
|
+ r.replyToId = msg.reply_to_id ?? -1;
|
|
|
+ r.replyTo = msg.reply_to ? JSON.stringify(msg.reply_to) : null;
|
|
|
+
|
|
|
+ if (avatar && groupToken) {
|
|
|
+ r.senderAvatar = avatar;
|
|
|
+ } else {
|
|
|
+ r.senderAvatar = msg.sender_avatar ?? null;
|
|
|
+ }
|
|
|
+ if (name && groupToken) {
|
|
|
+ r.senderName = name;
|
|
|
+ } else {
|
|
|
+ r.senderName = msg.sender_name ?? '';
|
|
|
+ }
|
|
|
+
|
|
|
+ r.isDirty = false;
|
|
|
+ r.dirtyActions = null;
|
|
|
+
|
|
|
+ (r as any)._raw._status = 'synced';
|
|
|
+ (r as any)._raw._changed = '';
|
|
|
+ })
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (batch.length) {
|
|
|
+ await database.batch(batch);
|
|
|
+ }
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+export async function reconcileChatRange(
|
|
|
+ chatKey: string,
|
|
|
+ serverMessages: any[],
|
|
|
+ isLatest: boolean
|
|
|
+) {
|
|
|
+ if (!serverMessages.length) return;
|
|
|
+
|
|
|
+ const col = database.get<Message>('messages');
|
|
|
+
|
|
|
+ if (serverMessages.length === 1 && serverMessages[0].status === 4 && isLatest) {
|
|
|
+ const keepCompositeId = `${chatKey}:${serverMessages[0].id}`;
|
|
|
+
|
|
|
+ const local = await col.query(Q.where('chat_key', chatKey)).fetch();
|
|
|
+
|
|
|
+ await database.write(async () => {
|
|
|
+ for (const msg of local) {
|
|
|
+ if (msg.compositeId !== keepCompositeId) {
|
|
|
+ await msg.destroyPermanently();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ const serverIds = new Set(serverMessages.map((m) => m.id));
|
|
|
+
|
|
|
+ const minId = Math.min(...serverMessages.map((m) => m.id));
|
|
|
+ const maxId = Math.max(...serverMessages.map((m) => m.id));
|
|
|
+
|
|
|
+ const local = await col
|
|
|
+ .query(Q.where('chat_key', chatKey), Q.where('message_id', Q.between(minId, maxId)))
|
|
|
+ .fetch();
|
|
|
+
|
|
|
+ await database.write(async () => {
|
|
|
+ for (const msg of local) {
|
|
|
+ if (msg.messageId && msg.messageId > 0 && !serverIds.has(msg.messageId) && !msg.isDirty) {
|
|
|
+ await msg.destroyPermanently();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+export type OutgoingWsEvent = {
|
|
|
+ action: string;
|
|
|
+ payload: Record<string, any>;
|
|
|
+};
|
|
|
+
|
|
|
+export async function pushMessageChanges(
|
|
|
+ token: string,
|
|
|
+ onWsEvent?: (event: OutgoingWsEvent) => void
|
|
|
+) {
|
|
|
+ const col = database.get<Message>('messages');
|
|
|
+
|
|
|
+ const dirty = await col.query(Q.where('is_dirty', true)).fetch();
|
|
|
+ if (!dirty.length) return;
|
|
|
+
|
|
|
+ for (const msg of dirty) {
|
|
|
+ const raw = (msg as any)._raw;
|
|
|
+ const actions: MessageDirtyAction[] = msg.dirtyActions ? JSON.parse(msg.dirtyActions) : [];
|
|
|
+
|
|
|
+ const compacted = compactMessageActions(actions);
|
|
|
+
|
|
|
+ for (const a of compacted) {
|
|
|
+ const res = await performMessageAction(token, msg, a);
|
|
|
+
|
|
|
+ await database.write(async () => {
|
|
|
+ if (res?.newId) {
|
|
|
+ const duplicates = await col
|
|
|
+ .query(Q.where('chat_key', msg.chatKey), Q.where('message_id', res.newId))
|
|
|
+ .fetch();
|
|
|
+
|
|
|
+ for (const d of duplicates) {
|
|
|
+ if (d.id !== msg.id) {
|
|
|
+ await d.destroyPermanently();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ msg.update((m) => {
|
|
|
+ if (res?.newId) {
|
|
|
+ m.messageId = res.newId;
|
|
|
+ m.compositeId = `${msg.chatKey}:${res.newId}`;
|
|
|
+ m.status = 1;
|
|
|
+ m.isSending = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (res?.attachment) {
|
|
|
+ const prev = m.attachment ? JSON.parse(m.attachment) : {};
|
|
|
+ m.attachment = JSON.stringify({
|
|
|
+ ...res.attachment,
|
|
|
+ local_uri: prev?.local_uri ?? null
|
|
|
+ });
|
|
|
+
|
|
|
+ // if (res.attachment.filetype === 'nomadmania/location') {
|
|
|
+ // const locationUri = a?.value?.attachment?.uri;
|
|
|
+ // // await FileSystem.deleteAsync(locationUri);
|
|
|
+ // }
|
|
|
+ }
|
|
|
+ if (res?.wsEvent && onWsEvent) {
|
|
|
+ onWsEvent(res.wsEvent);
|
|
|
+ }
|
|
|
+
|
|
|
+ m.dirtyActions = null;
|
|
|
+ m.isDirty = false;
|
|
|
+
|
|
|
+ (m as any)._raw._status = 'synced';
|
|
|
+ (m as any)._raw._changed = '';
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+let pushInFlight = false;
|
|
|
+let needsAnotherRun = false;
|
|
|
+
|
|
|
+export async function triggerMessagePush(
|
|
|
+ token: string,
|
|
|
+ onWsEvent?: (event: OutgoingWsEvent) => void
|
|
|
+) {
|
|
|
+ if (pushInFlight) {
|
|
|
+ needsAnotherRun = true;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ pushInFlight = true;
|
|
|
+
|
|
|
+ try {
|
|
|
+ do {
|
|
|
+ needsAnotherRun = false;
|
|
|
+ await pushMessageChanges(token, onWsEvent);
|
|
|
+ } while (needsAnotherRun);
|
|
|
+ } finally {
|
|
|
+ pushInFlight = false;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+export function normalizeServerMessage(s: any, chatKey: string, isGroup: boolean) {
|
|
|
+ return {
|
|
|
+ messageId: s.id,
|
|
|
+ compositeId: `${chatKey}:${s.id}`,
|
|
|
+ chatKey,
|
|
|
+ isGroup,
|
|
|
+ senderId: s.sender,
|
|
|
+ recipientId: s.recipient,
|
|
|
+ text: s.text,
|
|
|
+ sentAt: s.sent_datetime,
|
|
|
+ receivedAt: s.received_datetime,
|
|
|
+ readAt: s.read_datetime,
|
|
|
+ status: s.status,
|
|
|
+ reactions: s.reactions,
|
|
|
+ edits: s.edits,
|
|
|
+ attachment: s.attachement !== -1 ? JSON.stringify(s.attachement) : null,
|
|
|
+ replyToId: s.reply_to_id ?? null,
|
|
|
+ replyTo: s.reply_to_id && s.reply_to_id !== -1 ? JSON.stringify(s.reply_to) : null,
|
|
|
+ encrypted: s.encrypted,
|
|
|
+ senderName: s.sender_name ?? null,
|
|
|
+ senderAvatar: s.sender_avatar ?? null,
|
|
|
+ isSending: false
|
|
|
+ };
|
|
|
+}
|
|
|
+
|
|
|
+export async function syncMessagesIncremental(token: string) {
|
|
|
+ const net = await NetInfo.fetch();
|
|
|
+ if (!net.isConnected) return;
|
|
|
+
|
|
|
+ try {
|
|
|
+ const speed = await testConnectionSpeed();
|
|
|
+ if ((speed?.downloadSpeed && speed.downloadSpeed < 0.2) || (speed?.ping && speed.ping > 1500)) {
|
|
|
+ console.warn('Internet too slow for sync');
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } catch {}
|
|
|
+
|
|
|
+ await pushMessageChanges(token);
|
|
|
+}
|