-
Notifications
You must be signed in to change notification settings - Fork 1
/
message.py
150 lines (114 loc) · 3.76 KB
/
message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""Harp protocol message classes."""
from struct import pack_into, unpack_from
from binascii import hexlify
from micropython import const
from microharp.types import HarpTypes
class HarpMessage():
"""Abstract base class, defines Harp message semantics and translates to/from native types."""
READ = const(1)
WRITE = const(2)
EVENT = const(3)
ERROR = const(0x08)
messageTypes = (READ, WRITE, EVENT)
LENGTH_BYTE = const(2)
formats = {
HarpTypes.U8: 'B',
HarpTypes.S8: 'b',
HarpTypes.U16: 'H',
HarpTypes.S16: 'h',
HarpTypes.U32: 'I',
HarpTypes.S32: 'i',
HarpTypes.U64: 'Q',
HarpTypes.S64: 'q',
HarpTypes.FLOAT: 'f'
}
@property
def format(self):
return self.formats[self.payloadType & ~HarpTypes.HAS_TIMESTAMP]
@property
def messageType(self):
return self.buffer[0]
@messageType.setter
def messageType(self, value):
self.buffer[0] = value
@property
def length(self):
return self.buffer[1]
@length.setter
def length(self, value):
self.buffer[1] = value
@property
def address(self):
return self.buffer[2]
@address.setter
def address(self, value):
self.buffer[2] = value
@property
def port(self):
return self.buffer[3]
@port.setter
def port(self, value):
self.buffer[3] = value
@property
def payloadType(self):
return self.buffer[4]
@payloadType.setter
def payloadType(self, value):
self.buffer[4] = value
@property
def timestamp(self):
return unpack_from('IH', self.buffer, 5)
@timestamp.setter
def timestamp(self, value):
pack_into('IH', self.buffer, 5, *value)
@property
def payload(self):
ofs = HarpMessage.offset(self.payloadType)
nof = (self.length - ofs + 1) // HarpTypes.size(self.payloadType)
return unpack_from(f'{nof}{self.format}', self.buffer, ofs)
@payload.setter
def payload(self, value):
ofs = HarpMessage.offset(self.payloadType)
nof = len(value)
pack_into(f'{nof}{self.format}', self.buffer, ofs, *value)
@property
def checksum(self):
return self.buffer[-1]
@checksum.setter
def checksum(self, value):
self.buffer[-1] = value
def to_string(self):
return str(hexlify(self.buffer, '-'))
@staticmethod
def offset(payloadType):
return 5 if payloadType & HarpTypes.HAS_TIMESTAMP == 0 else 11
@staticmethod
def resize(payloadType):
return 0 if payloadType & HarpTypes.HAS_TIMESTAMP != 0 else 6
class HarpRxMessage(HarpMessage):
"""Receive message buffer and helper functions.
Allocation is performed dynamically, by the caller, during messgae reception.
"""
def __init__(self):
self.buffer = bytearray()
def has_valid_message_type(self):
return self.messageType in HarpMessage.messageTypes
def has_valid_checksum(self):
return self.checksum == sum(self.buffer[:-1]) & 0xff
class HarpTxMessage(HarpMessage):
"""Transmit message buffer and helper functions.
Allocation is performed statically on initialisation.
"""
def __init__(self, messageType, length, address, payloadType, timestamp=None):
self.buffer = bytearray(length + HarpMessage.LENGTH_BYTE)
self.messageType = messageType
self.length = length
self.address = address
self.port = 0xff
if timestamp is None:
self.payloadType = payloadType & ~HarpTypes.HAS_TIMESTAMP
else:
self.payloadType = payloadType | HarpTypes.HAS_TIMESTAMP
self.timestamp = timestamp
def calc_set_checksum(self):
self.checksum = sum(self.buffer[:-1]) & 0xff