import { combineLatest } from 'rxjs';
const obs1 = getAndCreateObservableOne();
const obs2 = getAndCreateObservableTwo();
console.log('Initial output', obs1, obs2);
obs1.subscribe(e => console.log('Second output', e));
obs2.subscribe(e => console.log('Third output', e));
const combinedObs = combineLatest(obs1, obs2);
console.log('Fourth output', combinedObs);
combinedObs.subscribe(e => console.log('Fifth output', e)); // Throws TypeError warning, not functioning
I encountered a situation where my code worked fine as described above. However, yesterday after making changes to what I believed was unrelated code, it stopped working.
The Initial output
confirms that both variables are indeed of type Observable
.
The outputs from Second output
and Third output
confirm that each observable is emitting at least one value.
The Fourth output
shows that combineLatest()
is returning an Observable
.
The Fifth output
never fires, and commenting out that line removes the warning. Whenever I attempt to use combined.subscribe()
, it throws a TypeError warning stating:
TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
What possibly could be causing this issue? I have tried every troubleshooting step I could think of without success. If I'm passing two valid Observables to combineLatest()
and they emit values, shouldn't it work accordingly?
As requested, below are the sources for the two observables:
import { BehaviorSubject } from 'rxjs';
import { distinctUntilChanged, map } from 'rxjs/operators';
const getAndCreateObservableOne = () => {
const subject = new BehaviorSubject<string>(null);
// Subject.next(...) is used elsewhere
return subject.asObservable().pipe(distinctUntilChanged());
};
const getAndCreateObservableTwo = () => {
// Where store represents a redux Store
const storeSubject = new BehaviorSubject(store.getState());
store.subscribe(() => {
storeSubject.next(store.getState());
});
const stream = storeSubject.asObservable();
const customObservable = stream.pipe(
map(state => {
return formatData(state) || [];
})
);
return customObservable;
};
Update:
This recent development is truly baffling to me and makes no sense at all. I inserted the following line of code within getAndCreateObservableTwo()
before returning customObservable
, and oddly enough, everything started working perfectly. If I remove that line, the code stops working again. How does this even make sense? I don't save the new observable to a variable or utilize it in any way. It seems that simply combining them allows the chain to function properly.
combineLatest(customObservable, of(true));