且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何组织与.NET RX数据处理器序列

更新时间:2023-12-05 22:41:34

这好像你需要的工作流,而不是接收。似乎根据你的其他问题*您要采取什么看起来非常适合一个ProducerConsumer排队的工作流程方案,并迫使它变成接收。

This seems like you need a workflow instead of Rx. It seems based on your other questions* you are trying to take what looks very well suited to a ProducerConsumer queuing workflow scenario, and forcing it into Rx.

它看起来像你


  • 要直到收到一个值,那么
    重新订阅的队列和块读取。只要使用BlockingCollection的排队功能。作为值到达他们可以从任何线程推入收集。

  • 不想要实际上有一个值序列,但可以
    的工作流失败上的任何给定值,该转移,然后处理下一个值。
    只使用一个队列。过程中的每个值,并把结果放到下一个适当的队列
    (失败/成功) - >请参见企业集成模式
    专门无效信息频道和死信通道。

  • 潜在的并行处理这些值。见或
    BlockingCollection.GetConsumingEnumerable的干扰器
    高性能的实现。有了这些工具,你可以有很多生产和消费者的青睐。当然,你可以用接收做到这一点,但它仅仅是轮询和占用一个线程这样做。我觉得这是更好地更明确一些这样的事情

  • want to Read from a Queue and block until a value is received, then resubscribe. Just use Queuing features of BlockingCollection. As values arrive they can be pushed into the collection from any thread.
  • dont want to actually have a sequence of values, but a workflow that can fail on any given value, divert that and then process the next value. Just use a queue. Process each value and put the results into the next appropriate queue (Failure/Success) ->See Enterprise Integration Patterns, specifically Invalid Message Channel and Dead Letter channel.
  • potentially process these values in parallel. See BlockingCollection.GetConsumingEnumerable or the Disruptor for a high performance implementation. With these tools you can have many produces and consumers. Sure you can do this with Rx, but it is just polling and tying up a thread doing so. I think it is better to be explicit about this kind of thing

我觉得你接收的使用应支付本身,你不该找不到自己拼斗(像任何其他技术或框架)。

I think your usage of Rx should pay for itself and you shouldn't find yourself fighting it (like any other technology or framework).

*像
在无扩展处理异常不停止序列

How序列化观测量到云和背部