I am currently investigating the functionality of publishReplay
in rxjs. I have encountered an example where it behaves as expected:
const source = new Subject()
const sourceWrapper = source.pipe(
publishReplay(1),
refCount()
)
const subscribeTest1 = sourceWrapper
.subscribe(
(data: any) => {
console.log('subscriber1 received a value:', data)
}
)
source.next(1)
source.next(2)
setTimeout(() => {
const subscribeTest2 = sourceWrapper
.subscribe(
(data: any) => {
console.log('subscriber2 received a value:', data)
}
)
}, 5000)
We have a subject and a wrapper on it with publushReplay(1), refCount()
included.
We establish the initial subscription and then emit 2 values. As a consequence, the following appears in the console:
subscriber1 received a value: 1
subscriber1 received a value: 2
After 5 seconds, we initiate another subscription, receiving the most recent buffered value from the ReplaySubject
created by publishReplay(1)
. Consequently, after 5 seconds, an additional message will be shown in the console:
subscriber2 received a value: 2
Everything seems to be functioning correctly so far.
However, I have come across another scenario where I attempt to implement this concept in an Angular application. Here is the link.
This setup consists of a single module containing two components: app.component.ts
, hello.component.ts
, and one service: test.service.ts
test.service.ts
@Injectable({
providedIn: 'root'
})
export class TestService {
private _test$: Subject<string> = new Subject()
public get test$(): Observable<string> {
return this._test$.pipe(
publishReplay(1),
refCount()
)
}
public pushToStream(val: string): void {
this._test$.next(val)
}
}
app.component.ts
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
public name = 'Angular';
constructor(private testService: TestService) {}
public ngOnInit(): void {
this.testService.test$.subscribe((data: string) => {
console.log('Subscriber in app.component received a value: ', data)
})
this.testService.pushToStream('new value')
this.testService.pushToStream('one more value')
}
}
app.component.html
<hello name="{{ name }}"></hello>
<p>
Start editing to see some magic happen :)
</p>
hello.component.ts
@Component({
selector: 'hello',
template: `<h1>Hello {{name}}!</h1>`,
styles: [`h1 { font-family: Lato; }`]
})
export class HelloComponent implements OnInit {
@Input() name: string;
constructor(private testService: TestService) { }
public ngOnInit(): void {
setTimeout(() => {
this.testService.test$.subscribe((data: string) => {
console.log('Subscriber in hello component received a value:', data)
})
}, 4000)
}
}
In this case, we follow the same principle: maintaining the source in a service, which acts as a singleton, creating 1 subscription in app.component.ts
, emitting 2 values, and setting up another subscription with a delay in hello.component.ts
. As seen in the previous example, the second subscription should retrieve the latest buffered value, but it does not. Only the following messages are displayed in the console:
Subscriber in app.component received a value: new value
Subscriber in app.component received a value: one more value
What could be the missing piece causing it to fail?