Skip to content

Commit

Permalink
Merge pull request #1554 from thingsboard/feature/report-strategy
Browse files Browse the repository at this point in the history
Report strategy feature
  • Loading branch information
imbeacon authored Nov 1, 2024
2 parents e930483 + db92e6d commit 6495820
Show file tree
Hide file tree
Showing 72 changed files with 2,031 additions and 1,134 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ mmh3
protobuf<=3.20.0
cachetools
paho-mqtt~=2.1.0
tb-mqtt-client>=1.10.8
tb-mqtt-client>=1.10.9
service-identity
psutil
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
include_package_data=True,
python_requires=">=3.7",
packages=['thingsboard_gateway', 'thingsboard_gateway.gateway', 'thingsboard_gateway.gateway.proto', 'thingsboard_gateway.gateway.grpc_service',
'thingsboard_gateway.storage', 'thingsboard_gateway.storage.memory', 'thingsboard_gateway.gateway.shell',
'thingsboard_gateway.storage', 'thingsboard_gateway.storage.memory', 'thingsboard_gateway.gateway.shell', 'thingsboard_gateway.gateway.report_strategy',
'thingsboard_gateway.storage.file', 'thingsboard_gateway.storage.sqlite', 'thingsboard_gateway.gateway.entities',
'thingsboard_gateway.connectors', 'thingsboard_gateway.connectors.ble', 'thingsboard_gateway.connectors.socket',
'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.xmpp', 'thingsboard_gateway.connectors.modbus_async',
Expand Down Expand Up @@ -70,7 +70,7 @@
'grpcio',
'protobuf',
'cachetools',
'tb-mqtt-client>=1.10.8',
'tb-mqtt-client>=1.10.9',
'packaging==23.1',
'service-identity',
'psutil'
Expand Down
2 changes: 1 addition & 1 deletion tb_mqtt_client
2 changes: 2 additions & 0 deletions tests/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
LOG.level = logging.DEBUG
stream_handler = logging.StreamHandler(stdout)
LOG.addHandler(stream_handler)
LOG.trace = LOG.debug

class BaseTest(TestCase):
TIMEOUT = 600 # 10 minutes in seconds

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.log = LOG
self.log.trace = self.log.debug

@classmethod
def setUpClass(cls):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


LOG = logging.getLogger("TEST")
LOG.trace = LOG.debug


@unittest.skip('Flaky test')
Expand Down
1 change: 1 addition & 0 deletions tests/blackbox/connectors/modbus/test_modbus_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
GENERAL_TIMEOUT = 6

LOG = logging.getLogger("TEST")
LOG.trace = LOG.debug


class ModbusRpcTest(BaseTest):
Expand Down
68 changes: 48 additions & 20 deletions tests/blackbox/connectors/modbus/test_modbus_uplink_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
GENERAL_TIMEOUT = 6

LOG = logging.getLogger("TEST")
LOG.trace = LOG.debug


class ModbusUplinkMessagesTest(BaseTest):
Expand Down Expand Up @@ -116,29 +117,23 @@ def change_connector_configuration(cls, config_file_path):
sleep(GENERAL_TIMEOUT)
return config, response

def test_send_only_on_data_changed(self):
"""
Test the send_only_on_data_changed method.
This method tests the behavior of the send_only_on_data_changed method in the MyClass class.
The method performs the following steps:
1. Changes the connector configuration using the change_connector_configuration method with
the specified configuration file.
2. Retrieves the telemetry keys from the modified configuration.
3. Retrieves the latest timeseries data for the device.
4. Pauses the execution for 5 seconds.
5. Retrieves the latest timeseries data for the device again.
6. Compares the timestamps of the two sets of timeseries data for each telemetry key.
Parameters:
- self: The instance of the ModbusUplinkMessagesTest class.
def test_send_on_received_report_strategy(self):
(config, _) = self.change_connector_configuration(
self.CONFIG_PATH + 'configs/initial_modbus_uplink_converter_report_strategy_on_received.json')
telemetry_keys = [key['tag'] for slave in config['Modbus']['configurationJson']['master']['slaves'] for key in
slave['timeseries']]
actual_values = self.client.get_latest_timeseries(self.device.id, ','.join(telemetry_keys))
sleep(GENERAL_TIMEOUT)
latest_ts_1 = self.client.get_latest_timeseries(self.device.id, ','.join(telemetry_keys))

Returns:
None.
"""
# check that timestamps are equal
for ts_key in telemetry_keys:
self.assertNotEqual(actual_values[ts_key][0]['ts'], latest_ts_1[ts_key][0]['ts'],
f'Timestamps are equal for the next telemetry key: {ts_key}')

def test_send_on_change_report_strategy(self):
(config, _) = self.change_connector_configuration(
self.CONFIG_PATH + 'configs/initial_modbus_uplink_converter_only_on_change_config.json')
self.CONFIG_PATH + 'configs/initial_modbus_uplink_converter_report_strategy_on_change.json')
telemetry_keys = [key['tag'] for slave in config['Modbus']['configurationJson']['master']['slaves'] for key in
slave['timeseries']]
actual_values = self.client.get_latest_timeseries(self.device.id, ','.join(telemetry_keys))
Expand All @@ -150,6 +145,39 @@ def test_send_only_on_data_changed(self):
self.assertEqual(actual_values[ts_key][0]['ts'], latest_ts_1[ts_key][0]['ts'],
f'Timestamps are not equal for the next telemetry key: {ts_key}')

def test_send_on_report_period_report_strategy(self):
(config, _) = self.change_connector_configuration(
self.CONFIG_PATH + 'configs/initial_modbus_uplink_converter_report_strategy_on_report_period.json')
telemetry_keys = [key['tag'] for slave in config['Modbus']['configurationJson']['master']['slaves'] for key in
slave['timeseries']]
actual_values = self.client.get_latest_timeseries(self.device.id, ','.join(telemetry_keys))
sleep(GENERAL_TIMEOUT)
latest_ts_1 = self.client.get_latest_timeseries(self.device.id, ','.join(telemetry_keys))
sleep(GENERAL_TIMEOUT * 2)
latest_ts_2 = self.client.get_latest_timeseries(self.device.id, ','.join(telemetry_keys))

for ts_key in telemetry_keys:
self.assertTrue(latest_ts_1[ts_key][0]['ts'] - actual_values[ts_key][0]['ts'] >= 3000,
f'Second update of telemetry was less in 3 seconds for key: {ts_key}')

for ts_key in telemetry_keys:
self.assertTrue(latest_ts_2[ts_key][0]['ts'] - actual_values[ts_key][0]['ts'] >= 10000,
f'Next update of telemetry was less in 10 seconds, expected - ~15 seconds for key: {ts_key}')

def test_send_on_change_or_report_period_report_strategy_on_period_factor(self):
(config, _) = self.change_connector_configuration(
self.CONFIG_PATH + 'configs/initial_modbus_uplink_converter_report_strategy_on_change_or_report_period_on_period_factor.json')
telemetry_keys = [key['tag'] for slave in config['Modbus']['configurationJson']['master']['slaves'] for key in
slave['timeseries']]
actual_values = self.client.get_latest_timeseries(self.device.id, ','.join(telemetry_keys))
sleep(GENERAL_TIMEOUT*2)
latest_ts_1 = self.client.get_latest_timeseries(self.device.id, ','.join(telemetry_keys))

# check that timestamps are not equal
for ts_key in telemetry_keys:
self.assertTrue(latest_ts_1[ts_key][0]['ts'] - actual_values[ts_key][0]['ts'] >= 5000,
f'Second update of telemetry was less in 5 seconds for key: {ts_key}')

def test_input_register_reading_little_endian(self):
"""
Test the input register reading in little endian format.
Expand Down
1 change: 1 addition & 0 deletions tests/blackbox/connectors/opcua/test_base_opcua.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DEVICE_CREATION_TIMEOUT = 120

LOG = logging.getLogger("TEST")
LOG.trace = LOG.debug


class BaseOpcuaTest(BaseTest):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
"pollPeriod": 1000,
"unitId": 1,
"deviceName": "Temp Sensor",
"sendDataOnlyOnChange": true,
"reportStrategy": {
"type": "ON_CHANGE"
},
"connectAttemptTimeMs": 5000,
"connectAttemptCount": 5,
"waitAfterFailedAttemptsMs": 300000,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
{
"Modbus": {
"name": "Modbus",
"type": "modbus",
"logLevel": "DEBUG",
"configuration": "modbus.json",
"configurationJson": {
"name": "Modbus",
"logLevel": "DEBUG",
"master": {
"slaves": [
{
"host": "localhost",
"port": 5021,
"type": "tcp",
"method": "socket",
"timeout": 35,
"byteOrder": "LITTLE",
"wordOrder": "LITTLE",
"retries": true,
"retryOnEmpty": true,
"retryOnInvalid": true,
"pollPeriod": 1000,
"unitId": 1,
"deviceName": "Temp Sensor",
"reportStrategy": {
"type": "ON_CHANGE_OR_REPORT_PERIOD",
"reportPeriod": 5000
},
"connectAttemptTimeMs": 5000,
"connectAttemptCount": 5,
"waitAfterFailedAttemptsMs": 300000,
"attributes": [
{
"tag": "string_read",
"type": "string",
"functionCode": 4,
"objectsCount": 2,
"address": 0
},
{
"tag": "bits_read",
"type": "bits",
"functionCode": 4,
"objectsCount": 1,
"address": 5
},
{
"tag": "8int_read",
"type": "8int",
"functionCode": 4,
"objectsCount": 1,
"address": 6
},
{
"tag": "16int_read",
"type": "16int",
"functionCode": 4,
"objectsCount": 1,
"address": 7
},
{
"tag": "32int_read_divider",
"type": "32int",
"functionCode": 4,
"objectsCount": 2,
"address": 8,
"divider": 10
},
{
"tag": "8int_read_multiplier",
"type": "8int",
"functionCode": 4,
"objectsCount": 1,
"address": 10,
"multiplier": 10
},
{
"tag": "32int_read",
"type": "32int",
"functionCode": 4,
"objectsCount": 2,
"address": 11
},
{
"tag": "64int_read",
"type": "64int",
"functionCode": 4,
"objectsCount": 4,
"address": 13
}
],
"timeseries": [
{
"tag": "8uint_read",
"type": "8uint",
"functionCode": 4,
"objectsCount": 1,
"address": 17
},
{
"tag": "16uint_read",
"type": "16uint",
"functionCode": 4,
"objectsCount": 2,
"address": 18
},
{
"tag": "32uint_read",
"type": "32uint",
"functionCode": 4,
"objectsCount": 4,
"address": 20
},
{
"tag": "16float_read",
"type": "16float",
"functionCode": 4,
"objectsCount": 1,
"address": 25
}
],
"attributeUpdates": [],
"rpc": []
}
]
}
}
}
}
Loading

0 comments on commit 6495820

Please sign in to comment.