diff --git a/src/Aeon.Acquisition/CreateGroup.cs b/src/Aeon.Acquisition/CreateGroup.cs new file mode 100644 index 0000000..fe0d1f3 --- /dev/null +++ b/src/Aeon.Acquisition/CreateGroup.cs @@ -0,0 +1,108 @@ +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Linq; +using System.Linq.Expressions; +using System.Reactive.Linq; +using Bonsai; +using Bonsai.Expressions; + +namespace Aeon.Acquisition +{ + [Description("Creates a sequence of grouped observables for each key in the input sequence.")] + public class CreateGroup : WorkflowExpressionBuilder + { + static readonly Range argumentRange = Range.Create(lowerBound: 1, upperBound: 1); + + public CreateGroup() + : this(new ExpressionBuilderGraph()) + { + } + + public CreateGroup(ExpressionBuilderGraph workflow) + : base(workflow) + { + } + + public override Range ArgumentRange => argumentRange; + + public override Expression Build(IEnumerable arguments) + { + var source = arguments.FirstOrDefault(); + if (source == null) + { + throw new InvalidOperationException("There must be at least one input to the create group operator."); + } + + Type keyType, elementType; + var sourceType = source.Type.GetGenericArguments()[0]; + var selectorParameter = Expression.Parameter(source.Type); + if (sourceType.IsGenericType && sourceType.GetGenericTypeDefinition() == typeof(IGroupedObservable<,>)) + { + var sourceTypeArguments = sourceType.GetGenericArguments(); + keyType = sourceTypeArguments[0]; + elementType = sourceTypeArguments[1]; + } + else + { + keyType = sourceType; + elementType = null; + } + + return BuildWorkflow(new[] { selectorParameter }, null, selectorBody => + { + var selector = Expression.Lambda(selectorBody, selectorParameter); + var resultType = selectorBody.Type.GetGenericArguments()[0]; + + return Expression.Call( + typeof(CreateGroup), + nameof(Process), + elementType != null + ? new[] { keyType, elementType, resultType } + : new[] { keyType, resultType }, + source, selector); + }); + } + + static IObservable> Process( + IObservable source, + Func, IObservable> selector) + { + return source.Select(key => new GroupedObservable( + key, + selector(Observable.Return(key)))); + } + + static IObservable> Process( + IObservable> source, + Func>, IObservable> selector) + { + return source.Select(group => new GroupedObservable( + group.Key, + selector(Observable.Return(group)))); + } + + class GroupedObservable : IGroupedObservable + { + public GroupedObservable(TKey key, IObservable source) + { + if (source is null) + { + throw new ArgumentNullException(nameof(source)); + } + + Key = key; + Source = source.Publish().RefCount(); + } + + public TKey Key { get; } + + private IObservable Source { get; } + + public IDisposable Subscribe(IObserver observer) + { + return Source.Subscribe(observer); + } + } + } +} diff --git a/src/Aeon.Acquisition/SynchronizerMonitor.bonsai b/src/Aeon.Acquisition/SynchronizerMonitor.bonsai index c6ad44b..2cf2145 100644 --- a/src/Aeon.Acquisition/SynchronizerMonitor.bonsai +++ b/src/Aeon.Acquisition/SynchronizerMonitor.bonsai @@ -1,5 +1,5 @@  - Item2 + + + Timestamp + false + + new( Item1 as Stats, -Item2 as Names) +Item2 as Names, +Item3 as SyncTimestamp) HeartbeatSources @@ -111,7 +118,7 @@ Item2 as Names) new( Item1.Stats.Mean as MeanTimestamp, -DateTime(1904, 1, 1) + TimeSpan.FromSeconds(Item1.Stats.Mean) as MeanUtcTimestamp, +DateTime(1904, 1, 1) + TimeSpan.FromSeconds(Double.IsNan(Item1.Stats.Mean) ? Item1.SyncTimestamp : Item1.Stats.Mean) as MeanUtcTimestamp, Item2 as ExpectedDeviceCount, Item1.Stats.Count as DeviceCount, Item1.Stats.Maximum - Item1.Stats.Minimum as MaxDifference, @@ -128,18 +135,20 @@ string.Join("\t", Item1.Names) as Elements) + - - - - - - - + + + + + + + + \ No newline at end of file