Skip to content

Commit

Permalink
Merge pull request #169 from SainsburyWellcomeCentre/issue-168
Browse files Browse the repository at this point in the history
Ensure timestamp is ready on source subscription
  • Loading branch information
glopesdev authored Oct 23, 2023
2 parents 0bc1887 + a8e2c2c commit 3676983
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/Aeon.Acquisition/Aeon.Acquisition.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<PackageTags>Bonsai Rx Project Aeon Acquisition</PackageTags>
<TargetFramework>net472</TargetFramework>
<VersionPrefix>0.5.0</VersionPrefix>
<VersionSuffix>build231009</VersionSuffix>
<VersionSuffix>build231010</VersionSuffix>
</PropertyGroup>

<ItemGroup>
Expand Down
10 changes: 1 addition & 9 deletions src/Aeon.Acquisition/MetadataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@ public virtual IObservable<TMetadata> Process()

public virtual IObservable<Timestamped<TMetadata>> Process(IObservable<HarpMessage> source)
{
return source.Publish(
ps => Process().Publish(
xs => xs.CombineLatest(ps, (data, message) => (data, message))
.Sample(xs.MergeUnit(ps.Take(1)))
.Select(x =>
{
var timestamp = x.message.GetTimestamp();
return Timestamped.Create(x.data, timestamp);
})));
return Process().Timestamp(source);
}
}
}
31 changes: 22 additions & 9 deletions src/Aeon.Acquisition/ObservableExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Bonsai.Harp;

Expand Down Expand Up @@ -55,15 +56,27 @@ public static IObservable<Unit> MergeUnit<TSource, TOther>(this IObservable<TSou

public static IObservable<Bonsai.Harp.Timestamped<TSource>> Timestamp<TSource>(this IObservable<TSource> source, IObservable<HarpMessage> clock)
{
return clock.Publish(
pc => source.Publish(
ps => ps.CombineLatest(pc, (data, tick) => (data, tick))
.Sample(ps.MergeUnit(pc.Take(1)))
.Select(x =>
{
var timestamp = x.tick.GetTimestamp();
return Bonsai.Harp.Timestamped.Create(x.data, timestamp);
})));
return Observable.Create<Bonsai.Harp.Timestamped<TSource>>(observer =>
{
var pc = clock.Publish();
var ps = source.Publish();
var sourceSubscription = new SingleAssignmentDisposable();
var trigger = Observer.Create<HarpMessage>(
_ => sourceSubscription.Disposable = ps.Connect(),
observer.OnError);
var result = ps.CombineLatest(pc, (data, message) => (data, message))
.Sample(ps.MergeUnit(pc.Take(1)))
.Select(x =>
{
var timestamp = x.message.GetTimestamp();
return Bonsai.Harp.Timestamped.Create(x.data, timestamp);
});
return new CompositeDisposable(
result.SubscribeSafe(observer),
pc.Take(1).SubscribeSafe(trigger),
sourceSubscription,
pc.Connect());
});
}
}
}

0 comments on commit 3676983

Please sign in to comment.