1、订阅序列
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; namespace SimpleSequence { class Program { static void Main(string[] args) { IObservable<int> source = Observable.Range(1, 10); IObserver<int> obsvr = Observer.Create<int>( x => Console.WriteLine("OnNext: {0}", x), ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted")); IDisposable subscription = source.Subscribe(obsvr); Console.WriteLine("Press ENTER to unsubscribe..."); Console.ReadLine(); subscription.Dispose(); } } }
2、Timer使用
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; namespace RxConsole { class Program { static void Main(string[] args) { //等待10s后开始以2s为时间区间调用 var source = Observable.Timer(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2)) .Timestamp(); using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp))) { Console.WriteLine("Press any key to unsubscribe"); Console.ReadKey(); } Console.WriteLine("Press any key to exit"); Console.ReadKey(); } } }
显示
3、待续
例子来自 https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242977(v%3dvs.103)