Given a scenario where an Rsocket endpoint is set up using Spring,
@MessageMapping("chat.{chatId}")
Flux<Message> getChats(@DestinationVariable String chatId) {
Mono<Chat> data = chatRepository.findById(chatId);
return data.map(chatGroup -> chatGroup.getMessages())
.flatMapMany(Flux::fromIterable);
}
public interface ChatRepository extends FirestoreReactiveRepository<Chat> {
}
a client connects to it via websocket through JS (Rsocket 1.x)
const makeConnector = () => {
return new RSocketConnector({
setup: {
dataMimeType: 'application/json',
keepAlive: 100000,
lifetime: 100000,
metadataMimeType: 'message/x.rsocket.routing.v0',
},
transport: new WebsocketClientTransport({
url: 'ws://localhost:7000/rsocket',
}),
});
};
client()
.connect()
.then((socket) => {
const requester = socket.requestStream(
{
data: undefined,
metadata: Buffer.concat([
Buffer.from(String.fromCharCode('chat.chatId'.length)),
Buffer.from('chat.chatId'),
]),
},
10,
{
onError: (e) => console.log('error getting data', e),
onNext: (payload, isComplete) => {
const parsedData: Chat = JSON.parse(payload.data.toString());
requester.request(5); // request the next 5 chats
},
onComplete: () => {
console.log('complete');
},
onExtension: () => {
console.log('on extension');
},
}
);
});
After the requestStream
data is completed, such as when 'complete' is printed in console,
new events are no longer received. Is there a way to listen to new events even after a stream is completed?
I attempted using intervals to work around this issue, but it led to receiving duplicate chats. Filtering them on the frontend side is a solution, but I believe there must be a better approach
return Flux.interval(Duration.ofSeconds(5)).flatMap(x -> {
return chatService.findChats(matchId);
});