i'm trying feels should straightforward, proving surprisingly difficult.
i have function subscribe rabbitmq queue. concretely, channel.consume function here: http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
it returns promise resolved subscription id - needed unsubscribe later - , has callback argument invoke when messages pulled off queue.
when want unsubscribe queue, i'd need cancel consumer using channel.cancel function here: http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel. takes returned subscription id.
i want wrap of stuff in observable subscribes queue when observable subscribed to, , cancels subscription when observable unsubscribed from. however, proving hard due 'double-asynchronous' nature of calls (i mean have both callback , return promise).
ideally, code i'd able write is:
return new rx.observable(async (subscriber) => { var consumeresult = await channel.consume(queuename, (message) => subscriber.next(message)); return async () => { await channel.cancel(consumeresult.consumertag); }; });
however, isn't possible constructor doesn't support async subscriber functions or teardown logic.
i've not been able figure 1 out. missing here? why hard?
cheers, alex
the created observable not need wait channel.consume
promise resolve, observer (it's observer that's passed, not subscriber) called within function provide.
however, unsubscribe function return have wait promise resolve. , can internally, this:
return new rx.observable((observer) => { var consumeresult = channel.consume(queuename, (message) => observer.next(message)); return () => { consumeresult.then(() => channel.cancel(consumeresult.consumertag)); }; });
Comments
Post a Comment