这好像你需要的工作流,而不是接收。似乎根据你的其他问题*您要采取什么看起来非常适合一个ProducerConsumer排队的工作流程方案,并迫使它变成接收。
This seems like you need a workflow instead of Rx. It seems based on your other questions* you are trying to take what looks very well suited to a ProducerConsumer queuing workflow scenario, and forcing it into Rx.
它看起来像你
- 要直到收到一个值,那么
重新订阅的队列和块读取。只要使用BlockingCollection的排队功能。作为值到达他们可以从任何线程推入收集。
- 不想要实际上有一个值序列,但可以
的工作流失败上的任何给定值,该转移,然后处理下一个值。
只使用一个队列。过程中的每个值,并把结果放到下一个适当的队列
(失败/成功) - >请参见企业集成模式,
专门无效信息频道和死信通道。
- 潜在的并行处理这些值。见或
BlockingCollection.GetConsumingEnumerable的干扰器为
高性能的实现。有了这些工具,你可以有很多生产和消费者的青睐。当然,你可以用接收做到这一点,但它仅仅是轮询和占用一个线程这样做。我觉得这是更好地更明确一些这样的事情
- want to Read from a Queue and block until a value is received, then
resubscribe. Just use Queuing features of BlockingCollection. As values arrive they can be pushed into the collection from any thread.
- dont want to actually have a sequence of values, but a workflow that can
fail on any given value, divert that and then process the next value.
Just use a queue. Process each value and put the results into the next appropriate queue
(Failure/Success) ->See Enterprise Integration Patterns,
specifically Invalid Message Channel and Dead Letter channel.
- potentially process these values in parallel. See
BlockingCollection.GetConsumingEnumerable or the Disruptor for a
high performance implementation. With these tools you can have many produces and consumers. Sure you can do this with Rx, but it is just polling and tying up a thread doing so. I think it is better to be explicit about this kind of thing
我觉得你接收的使用应支付本身,你不该找不到自己拼斗(像任何其他技术或框架)。
I think your usage of Rx should pay for itself and you shouldn't find yourself fighting it (like any other technology or framework).
*像
的在无扩展处理异常不停止序列
和
的How序列化观测量到云和背部