Hey there, I'm currently working on utilizing the combineLatest operator to merge two streams in Angular, but I keep encountering an error message stating that "combineLatest does not exist on type". I've attempted to move the code into a .pipe() function, however, it doesn't seem to resolve the issue. Should I possibly use map before combining using combineLatest? Appreciate any help you can provide!
import { Injectable } from '@angular/core';
import {Subject, BehaviorSubject, Observable, combineLatest} from 'rxjs';
import { filter, map, scan } from 'rxjs/operators';
// Importing models and services
import {Thread} from './thread.model';
import {Message} from '../message/message.model';
import {MessagesService} from '../message/messages.service';
import * as _ from 'lodash';
@Injectable({
providedIn: 'root'
})
export class ThreadsService {
threads: Observable<{[key:string]: Thread}>; // (a)
orderedThreads: Observable<Thread[]>; // (a1)
currentThread: Subject<Thread> = new BehaviorSubject<Thread>(new Thread()); // (a2)
currentThreadMessages: Observable<Message[]>; // (a3)
constructor(public messagesService: MessagesService) {
this.threads = messagesService.messages // (b)
.pipe( map((messages: Message[]) => {
const threads: { [key: string]: Thread } = {}; //(b1)
messages.map((message: Message) => { // (b2)
this.threads[message.thread.id] =
this.threads[message.thread.id] ||
message.thread;
const messagesThread: Thread = // (c)
threads[message.thread.id];
if (!messagesThread.lastMessage || messagesThread.lastMessage.sentAt < message.sentAt) {
messagesThread.lastMessage = message;
}
});
return threads;
}))
this.orderedThreads = this.threads // (d)
.pipe(map((threadGroups: { [key: string]: Thread }) => {
const threads: Thread[] = _.values(threadGroups);
return _.sortBy(threads, (t: Thread) => t.lastMessage.sentAt).reverse();
}));
this.currentThread.subscribe(this.messagesService.markThreadAsRead); // (e1)
this.currentThreadMessages = combineLatest([this.currentThread, messagesService.messages]).pipe(
map(currentThread: Thread, messages: Message[]) => {
if(currentThread && messages.length > 0) {
return _.chain(messages)
.filter((message: Message) => // (f1)
(message.thread.id === currentThread.id))
.map((message: Message) => {
message.isRead = true;
return message;
})
.value();
} else {
return [];
}
}
);
}
setCurrentThread(newThread: Thread): void { // (e)
this.currentThread.next(newThread);
}
}