Skip to content

Commit

Permalink
Ensure source subscription is disposed
Browse files Browse the repository at this point in the history
  • Loading branch information
glopesdev committed Oct 23, 2023
1 parent 4ffa38c commit 26cadeb
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/Aeon.Acquisition/ObservableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public static IObservable<Unit> MergeUnit<TSource, TOther>(this IObservable<TSou
{
var pc = clock.Publish();
var ps = source.Publish();
var sourceSubscription = new SingleAssignmentDisposable();
var trigger = Observer.Create<HarpMessage>(
_ => ps.Connect(),
_ => sourceSubscription.Disposable = ps.Connect(),
observer.OnError);
var result = ps.CombineLatest(pc, (data, message) => (data, message))
.Sample(ps.MergeUnit(pc.Take(1)))
Expand All @@ -73,7 +74,8 @@ public static IObservable<Unit> MergeUnit<TSource, TOther>(this IObservable<TSou
return new CompositeDisposable(
result.SubscribeSafe(observer),
pc.Take(1).SubscribeSafe(trigger),
pc.Connect());
pc.Connect(),
sourceSubscription);
});
}
}
Expand Down

0 comments on commit 26cadeb

Please sign in to comment.