||
- import { Q } from '@nozbe/watermelondb';
- import { database } from 'src/watermelondb';
- import Message from 'src/watermelondb/models/Message';
- import { chatApi } from '@api/chat';
- import NetInfo from '@react-native-community/netinfo';
- import { testConnectionSpeed } from 'src/database/speedService';
- export function makeChatKey(params: { chatUid?: number | null; groupChatToken?: string | null }) {
- if (params.chatUid) return `u:${params.chatUid}`;
- if (params.groupChatToken) return `g:${params.groupChatToken}`;
- throw new Error('Invalid chat identity');
- }
- function now() {
- return Date.now();
- }
- export type MessageDirtyAction =
- | {
- type: 'send';
- value: {
- text: string;
- currentUid: number;
- attachment?: any;
- reply_to_id?: number;
- replyMessage?: any;
- };
- ts: number;
- }
- | { type: 'edit'; value: { text: string }; ts: number }
- | { type: 'delete'; value?: any; ts: number }
- | { type: 'read'; value: { messagesIds: number[] }; ts: number }
- | { type: 'reaction'; value: string; ts: number }
- | { type: 'unreaction'; value: string; ts: number };
- export function addMessageDirtyAction(msg: Message, action: Omit<MessageDirtyAction, 'ts'>) {
- const list: MessageDirtyAction[] = msg.dirtyActions ? JSON.parse(msg.dirtyActions) : [];
- list.push({ ...(action as MessageDirtyAction), ts: now() });
- msg.isDirty = true;
- msg.dirtyActions = JSON.stringify(list);
- (msg as any)._raw._status = 'synced';
- (msg as any)._raw._changed = '';
- }
- export function compactMessageActions(actions: MessageDirtyAction[]): MessageDirtyAction[] {
- if (!actions.length) return [];
- const sorted = [...actions].sort((a, b) => a.ts - b.ts);
- const res: MessageDirtyAction[] = [];
- for (const a of sorted) {
- const last = res[res.length - 1];
- if (a.type === 'delete') {
- return [a];
- }
- if (a.type === 'edit' && last?.type === 'edit') {
- last.value = a.value;
- last.ts = a.ts;
- continue;
- }
- if (
- (last?.type === 'reaction' && a.type === 'unreaction') ||
- (last?.type === 'unreaction' && a.type === 'reaction')
- ) {
- res.pop();
- continue;
- }
- if (a.type === 'reaction' && last?.type === 'reaction') {
- last.value = a.value;
- last.ts = a.ts;
- continue;
- }
- if (a.type === 'read' && last?.type === 'read') {
- last.ts = a.ts;
- continue;
- }
- res.push(a);
- }
- return res;
- }
- async function performMessageAction(token: string, msg: Message, action: MessageDirtyAction) {
- const isGroup = Boolean(msg.isGroup);
- const chatKey = msg.chatKey;
- if (action.type !== 'send' && msg.messageId == null) {
- throw new Error('Message has no server id yet');
- }
- switch (action.type) {
- case 'send': {
- if (isGroup) {
- const res = await chatApi.sendGroupMessage({
- token,
- to_group_token: chatKey.slice(2),
- text: action.value.text,
- attachment: action.value.attachment,
- reply_to_id: action.value.reply_to_id
- });
- return {
- newId: res.data.message_id,
- attachment: res.data.attachment,
- wsEvent: {
- action: 'new_message',
- payload: {
- message: {
- _id: res.data.message_id,
- text: action.value.text,
- replyMessage: action.value.replyMessage,
- attachment: res.data.attachment ? res.data.attachment : -1
- }
- }
- }
- };
- } else {
- const res = await chatApi.sendMessage({
- token,
- to_uid: Number(chatKey.slice(2)),
- text: action.value.text,
- attachment: action.value.attachment,
- reply_to_id: action.value.reply_to_id
- });
- return {
- newId: res.data.message_id,
- attachment: res.data.attachment,
- wsEvent: {
- action: 'new_message',
- payload: {
- message: {
- _id: res.data.message_id,
- text: action.value.text,
- replyMessage: action.value.replyMessage,
- attachment: res.data.attachment ? res.data.attachment : -1
- }
- }
- }
- };
- }
- }
- case 'edit': {
- if (isGroup) {
- await chatApi.editGroupMessage({
- token,
- group_token: chatKey.slice(2),
- message_id: msg.messageId!,
- text: action.value.text
- });
- } else {
- await chatApi.editMessage({
- token,
- to_uid: Number(chatKey.slice(2)),
- message_id: msg.messageId!,
- text: action.value.text
- });
- }
- return {};
- }
- case 'delete': {
- if (isGroup) {
- await chatApi.deleteGroupMessage({
- token,
- group_token: chatKey.slice(2),
- message_id: msg.messageId!
- });
- } else {
- await chatApi.deleteMessage({
- token,
- conversation_with_user: Number(chatKey.slice(2)),
- message_id: msg.messageId!
- });
- }
- return { deleted: true };
- }
- case 'reaction': {
- if (isGroup) {
- await chatApi.reactToGroupMessage({
- token,
- group_token: chatKey.slice(2),
- message_id: msg.messageId!,
- reaction: action.value
- });
- } else {
- await chatApi.reactToMessage({
- token,
- conversation_with_user: Number(chatKey.slice(2)),
- message_id: msg.messageId!,
- reaction: action.value
- });
- }
- return {};
- }
- case 'unreaction': {
- if (isGroup) {
- await chatApi.unreactToGroupMessage({
- token,
- group_token: chatKey.slice(2),
- message_id: msg.messageId!
- });
- } else {
- await chatApi.unreactToMessage({
- token,
- conversation_with_user: Number(chatKey.slice(2)),
- message_id: msg.messageId!
- });
- }
- return {};
- }
- case 'read': {
- if (isGroup) {
- await chatApi.groupMessagesRead({
- token,
- group_token: chatKey.slice(2),
- messages_id: action.value.messagesIds
- });
- } else {
- await chatApi.messagesRead({
- token,
- from_user: Number(chatKey.slice(2)),
- messages_id: action.value.messagesIds
- });
- }
- return {};
- }
- }
- }
- export async function upsertMessagesIntoDB({
- chatUid,
- groupToken,
- apiMessages,
- avatar = null,
- name = ''
- }: {
- chatUid?: number;
- groupToken?: string;
- apiMessages: any[];
- avatar?: string | null;
- name?: string | null;
- }) {
- if (!apiMessages?.length) return;
- const chatKey = makeChatKey({ chatUid, groupChatToken: groupToken });
- const isGroup = Boolean(groupToken);
- const col = database.get<Message>('messages');
- await database.write(async () => {
- const batch: any[] = [];
- for (const msg of apiMessages) {
- const compositeId = `${chatKey}:${msg.id}`;
- const existing = await col
- .query(Q.where('chat_key', chatKey), Q.where('message_id', msg.id))
- .fetch();
- if (existing.length) {
- const record = existing[0];
- const hasDirty = Boolean(msg.dirtyActions);
- try {
- batch.push(
- record.prepareUpdate((r) => {
- r.messageId = msg.id;
- r.sentAt = msg.sent_datetime;
- r.receivedAt = msg.received_datetime ?? r.receivedAt;
- r.readAt = msg.read_datetime ?? r.readAt;
- if (!hasDirty) {
- r.text = msg.text;
- }
- if (msg.attachement && msg.attachement !== -1) {
- const prev = r.attachment ? JSON.parse(r.attachment) : {};
- r.attachment = JSON.stringify({
- ...msg.attachement,
- local_uri: prev?.local_uri ?? null
- });
- } else {
- r.attachment = null;
- }
- r.status = msg.status;
- r.isSending = false;
- r.replyToId = msg.reply_to_id;
- r.replyTo = msg.reply_to ? JSON.stringify(msg.reply_to) : null;
- if (avatar && groupToken) {
- r.senderAvatar = avatar;
- } else {
- r.senderAvatar = msg.sender_avatar ?? null;
- }
- if (name && groupToken) {
- r.senderName = name;
- } else {
- r.senderName = msg.sender_name ?? '';
- }
- r.reactions = msg.reactions ?? '{}';
- r.edits = msg.edits ?? '{}';
- (r as any)._raw._status = 'synced';
- (r as any)._raw._changed = '';
- })
- );
- } catch (err) {}
- } else {
- try {
- batch.push(
- col.prepareCreate((r) => {
- r.chatKey = chatKey;
- r.isGroup = isGroup;
- r.messageId = msg.id;
- r.compositeId = compositeId;
- r.sentAt = msg.sent_datetime;
- r.receivedAt = msg.received_datetime ?? null;
- r.readAt = msg.read_datetime ?? null;
- r.senderId = msg.sender;
- r.recipientId = msg.recipient;
- r.text = msg.text;
- r.status = msg.status;
- r.isSending = false;
- r.reactions = msg.reactions ?? '{}';
- r.edits = msg.edits ?? '{}';
- r.attachment =
- msg.attachement && msg.attachement !== -1 ? JSON.stringify(msg.attachement) : null;
- r.encrypted = msg.encrypted ?? 0;
- r.replyToId = msg.reply_to_id ?? -1;
- r.replyTo = msg.reply_to ? JSON.stringify(msg.reply_to) : null;
- if (avatar && groupToken) {
- r.senderAvatar = avatar;
- } else {
- r.senderAvatar = msg.sender_avatar ?? null;
- }
- if (name && groupToken) {
- r.senderName = name;
- } else {
- r.senderName = msg.sender_name ?? '';
- }
- r.isDirty = false;
- r.dirtyActions = null;
- (r as any)._raw._status = 'synced';
- (r as any)._raw._changed = '';
- })
- );
- } catch (err) {}
- }
- }
- if (batch.length) {
- await database.batch(batch);
- }
- });
- }
- export async function reconcileChatRange(
- chatKey: string,
- serverMessages: any[],
- isLatest: boolean
- ) {
- if (!serverMessages.length) return;
- const col = database.get<Message>('messages');
- if (serverMessages.length === 1 && serverMessages[0].status === 4 && isLatest) {
- const keepCompositeId = `${chatKey}:${serverMessages[0].id}`;
- const local = await col.query(Q.where('chat_key', chatKey)).fetch();
- await database.write(async () => {
- for (const msg of local) {
- if (msg.compositeId !== keepCompositeId) {
- await msg.destroyPermanently();
- }
- }
- });
- return;
- }
- const serverIds = new Set(serverMessages.map((m) => m.id));
- const minId = Math.min(...serverMessages.map((m) => m.id));
- const maxId = Math.max(...serverMessages.map((m) => m.id));
- const local = await col
- .query(Q.where('chat_key', chatKey), Q.where('message_id', Q.between(minId, maxId)))
- .fetch();
- await database.write(async () => {
- for (const msg of local) {
- if (msg.messageId && msg.messageId > 0 && !serverIds.has(msg.messageId) && !msg.isDirty) {
- await msg.destroyPermanently();
- }
- }
- });
- }
- export type OutgoingWsEvent = {
- action: string;
- payload: Record<string, any>;
- };
- export async function pushMessageChanges(
- token: string,
- onWsEvent?: (event: OutgoingWsEvent) => void
- ) {
- const col = database.get<Message>('messages');
- const dirty = await col.query(Q.where('is_dirty', true)).fetch();
- if (!dirty.length) return;
- for (const msg of dirty) {
- const actions: MessageDirtyAction[] = msg.dirtyActions ? JSON.parse(msg.dirtyActions) : [];
- const compacted = compactMessageActions(actions);
- for (const a of compacted) {
- const res = await performMessageAction(token, msg, a);
- await database.write(async () => {
- if (res?.newId) {
- const duplicates = await col
- .query(Q.where('chat_key', msg.chatKey), Q.where('message_id', res.newId))
- .fetch();
- for (const d of duplicates) {
- if (d.id !== msg.id) {
- await d.destroyPermanently();
- }
- }
- }
- msg.update((m) => {
- if (res?.newId) {
- m.messageId = res.newId;
- m.compositeId = `${msg.chatKey}:${res.newId}`;
- m.status = 1;
- m.isSending = false;
- }
- if (res?.attachment) {
- const prev = m.attachment ? JSON.parse(m.attachment) : {};
- m.attachment = JSON.stringify({
- ...res.attachment,
- local_uri: prev?.local_uri ?? null
- });
- }
- m.isDirty = false;
- m.dirtyActions = null;
- (m as any)._raw._status = 'synced';
- (m as any)._raw._changed = '';
- if (res?.wsEvent && onWsEvent) {
- onWsEvent(res.wsEvent);
- }
- });
- });
- }
- }
- }
- let pushInFlight = false;
- let needsAnotherRun = false;
- export async function triggerMessagePush(
- token: string,
- onWsEvent?: (event: OutgoingWsEvent) => void
- ) {
- if (pushInFlight) {
- needsAnotherRun = true;
- return;
- }
- pushInFlight = true;
- try {
- do {
- needsAnotherRun = false;
- await pushMessageChanges(token, onWsEvent);
- } while (needsAnotherRun);
- } finally {
- pushInFlight = false;
- }
- }
- export function normalizeServerMessage(s: any, chatKey: string, isGroup: boolean) {
- return {
- messageId: s.id,
- compositeId: `${chatKey}:${s.id}`,
- chatKey,
- isGroup,
- senderId: s.sender,
- recipientId: s.recipient,
- text: s.text,
- sentAt: s.sent_datetime,
- receivedAt: s.received_datetime,
- readAt: s.read_datetime,
- status: s.status,
- reactions: s.reactions,
- edits: s.edits,
- attachment: s.attachement !== -1 ? JSON.stringify(s.attachement) : null,
- replyToId: s.reply_to_id ?? null,
- replyTo: s.reply_to_id && s.reply_to_id !== -1 ? JSON.stringify(s.reply_to) : null,
- encrypted: s.encrypted,
- senderName: s.sender_name ?? null,
- senderAvatar: s.sender_avatar ?? null,
- isSending: false
- };
- }
- export async function syncMessagesIncremental(token: string) {
- const net = await NetInfo.fetch();
- if (!net.isConnected) return;
- try {
- const speed = await testConnectionSpeed();
- if ((speed?.downloadSpeed && speed.downloadSpeed < 0.2) || (speed?.ping && speed.ping > 1500)) {
- console.warn('Internet too slow for sync');
- return;
- }
- } catch {}
- await pushMessageChanges(token);
- }
|