Skip to content

Commit

Permalink
Merge pull request #191 from SainsburyWellcomeCentre/json-lines
Browse files Browse the repository at this point in the history
Support formatting value sequences as JSON strings
  • Loading branch information
glopesdev authored Dec 5, 2023
2 parents 4212d97 + f2e0088 commit 997b676
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/Aeon.Acquisition/Aeon.Acquisition.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<PackageReference Include="LibGit2Sharp" Version="0.25.4" />
<PackageReference Include="Bonsai.Vision" Version="2.8.0" />
<PackageReference Include="Bonsai.Numerics" Version="0.9.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>

</Project>
20 changes: 20 additions & 0 deletions src/Aeon.Acquisition/FormatJson.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.ComponentModel;
using System.Linq;
using System.Reactive.Linq;
using Bonsai;
using Newtonsoft.Json;

namespace Aeon.Acquisition
{
[Combinator]
[Description("Formats a sequence of values into a sequence of JSON strings.")]
[WorkflowElementCategory(ElementCategory.Transform)]
public class FormatJson
{
public IObservable<string> Process<TSource>(IObservable<TSource> source)
{
return source.Select(value => JsonConvert.SerializeObject(value));
}
}
}
116 changes: 116 additions & 0 deletions src/Aeon.Acquisition/LogJson.bonsai
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?xml version="1.0" encoding="utf-8"?>
<WorkflowBuilder Version="2.8.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aeon="clr-namespace:Aeon.Acquisition;assembly=Aeon.Acquisition"
xmlns:rx="clr-namespace:Bonsai.Reactive;assembly=Bonsai.Core"
xmlns:io="clr-namespace:Bonsai.IO;assembly=Bonsai.System"
xmlns="https://bonsai-rx.org/2018/workflow">
<Description>Chunks and logs a generic timestamped data stream into the base data log using JSON lines.</Description>
<Workflow>
<Nodes>
<Expression xsi:type="WorkflowInput">
<Name>Source1</Name>
</Expression>
<Expression xsi:type="ExternalizedMapping">
<Property Name="Name" DisplayName="Heartbeats" Description="The source of heartbeat signals used as a timing signal for closing groups." Category="GroupClosing" />
</Expression>
<Expression xsi:type="SubscribeSubject">
<Name />
</Expression>
<Expression xsi:type="SubscribeSubject">
<Name>ChunkSize</Name>
</Expression>
<Expression xsi:type="PropertyMapping">
<PropertyMappings>
<Property Name="ChunkSize" />
</PropertyMappings>
</Expression>
<Expression xsi:type="ExternalizedMapping">
<Property Name="ClosingDuration" Category="GroupClosing" />
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="aeon:GroupByTime">
<aeon:ChunkSize>1</aeon:ChunkSize>
</Combinator>
</Expression>
<Expression xsi:type="ExternalizedMapping">
<Property Name="Name" DisplayName="LogName" Description="The base name of the data log, without extension." />
</Expression>
<Expression xsi:type="rx:CreateObservable">
<Name>LogJson</Name>
<Workflow>
<Nodes>
<Expression xsi:type="WorkflowInput">
<Name>Source1</Name>
</Expression>
<Expression xsi:type="rx:AsyncSubject">
<Name>Data</Name>
</Expression>
<Expression xsi:type="SubscribeSubject">
<Name>Data</Name>
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="rx:Merge" />
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="aeon:FormatJson" />
</Expression>
<Expression xsi:type="SubscribeSubject">
<Name>Data</Name>
</Expression>
<Expression xsi:type="MemberSelector">
<Selector>Key</Selector>
</Expression>
<Expression xsi:type="ExternalizedMapping">
<Property Name="Name" />
</Expression>
<Expression xsi:type="IncludeWorkflow" Path="Aeon.Acquisition:FormatFileName.bonsai">
<Name>JsonData</Name>
<Extension>jsonl</Extension>
</Expression>
<Expression xsi:type="PropertyMapping">
<PropertyMappings>
<Property Name="FileName" />
</PropertyMappings>
</Expression>
<Expression xsi:type="io:CsvWriter">
<io:Append>false</io:Append>
<io:Overwrite>false</io:Overwrite>
<io:Suffix>None</io:Suffix>
<io:IncludeHeader>false</io:IncludeHeader>
</Expression>
<Expression xsi:type="WorkflowOutput" />
</Nodes>
<Edges>
<Edge From="0" To="1" Label="Source1" />
<Edge From="2" To="3" Label="Source1" />
<Edge From="3" To="4" Label="Source1" />
<Edge From="4" To="10" Label="Source1" />
<Edge From="5" To="6" Label="Source1" />
<Edge From="6" To="8" Label="Source1" />
<Edge From="7" To="8" Label="Source2" />
<Edge From="8" To="9" Label="Source1" />
<Edge From="9" To="10" Label="Source2" />
<Edge From="10" To="11" Label="Source1" />
</Edges>
</Workflow>
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="rx:Switch" />
</Expression>
<Expression xsi:type="WorkflowOutput" />
</Nodes>
<Edges>
<Edge From="0" To="6" Label="Source1" />
<Edge From="1" To="2" Label="Source1" />
<Edge From="2" To="6" Label="Source2" />
<Edge From="3" To="4" Label="Source1" />
<Edge From="4" To="6" Label="Source3" />
<Edge From="5" To="6" Label="Source4" />
<Edge From="6" To="8" Label="Source1" />
<Edge From="7" To="8" Label="Source2" />
<Edge From="8" To="9" Label="Source1" />
<Edge From="9" To="10" Label="Source1" />
</Edges>
</Workflow>
</WorkflowBuilder>

0 comments on commit 997b676

Please sign in to comment.