-
Notifications
You must be signed in to change notification settings - Fork 88
/
dd.py
executable file
·177 lines (136 loc) · 6.91 KB
/
dd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TFC - Onion-routed, endpoint secure messaging system
Copyright (C) 2013-2024 Markus Ottela
This file is part of TFC.
TFC is free software: you can redistribute it and/or modify it under the terms
of the GNU General Public License as published by the Free Software Foundation,
either version 3 of the License, or (at your option) any later version.
TFC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TFC. If not, see <https://www.gnu.org/licenses/>.
"""
import multiprocessing.connection
import socket
import sys
import time
from multiprocessing import Process, Queue
from typing import Any, Dict, Tuple
from src.common.misc import get_terminal_height, get_terminal_width, ignored, monitor_processes
from src.common.output import clear_screen
from src.common.statics import (DATA_FLOW, DD_ANIMATION_LENGTH, DD_OFFSET_FROM_CENTER, DST_DD_LISTEN_SOCKET,
DST_LISTEN_SOCKET, EXIT_QUEUE, IDLE, LOCALHOST, NC, NCDCLR, NCDCRL, RP_LISTEN_SOCKET,
SCNCLR, SCNCRL, SRC_DD_LISTEN_SOCKET)
def draw_frame(argv: str, # Arguments for the simulator position/orientation
message: str, # Status message to print
high: bool = False # Determines the signal's state (high/low)
) -> None:
"""Draw a data diode animation frame."""
l, indicator, arrow, r = {NCDCLR: ('Rx', '<', '←', 'Tx'),
SCNCLR: ('Tx', '>', '→', 'Rx'),
NCDCRL: ('Tx', '>', '→', 'Rx'),
SCNCRL: ('Rx', '<', '←', 'Tx')}[argv]
indicator = indicator if high else ' '
arrow = arrow if message != IDLE else ' '
terminal_width = get_terminal_width()
def c_print(string: str) -> None:
"""Print string at the center of the screen."""
print(string.center(terminal_width))
print('\n' * ((get_terminal_height() // 2) - DD_OFFSET_FROM_CENTER))
c_print(message)
c_print(arrow)
c_print( "────╮ " + ' ' + " ╭────" )
c_print(f" {l} │ " + indicator + f" │ {r} ")
c_print( "────╯ " + ' ' + " ╰────" )
def animate(argv: str) -> None:
"""Animate the data diode transmission indicator."""
for i in range(DD_ANIMATION_LENGTH):
clear_screen()
draw_frame(argv, DATA_FLOW, high=(i % 2 == 0))
time.sleep(0.04)
clear_screen()
draw_frame(argv, IDLE)
def rx_loop(io_queue: 'Queue[Any]', # Queue through which to push datagrams through
input_socket: int # Socket number for Transmitter/Relay Program
) -> None:
"""Read datagrams from a transmitting program."""
listener = multiprocessing.connection.Listener((LOCALHOST, input_socket))
interface = listener.accept()
while True:
try:
io_queue.put(interface.recv())
except KeyboardInterrupt:
pass
except EOFError:
sys.exit(0)
def tx_loop(io_queue: 'Queue[Any]', # Queue through which to push datagrams through
output_socket: int, # Socket number for the Relay/Receiver Program
argv: str, # Arguments for the simulator position/orientation
unit_test: bool = False # Break out from the loop during unit testing
) -> None:
"""Send queued datagrams to a receiving program."""
draw_frame(argv, IDLE)
while True:
try:
interface = multiprocessing.connection.Client((LOCALHOST, output_socket))
break
except socket.error:
time.sleep(0.01)
while True:
with ignored(EOFError, KeyboardInterrupt):
while io_queue.qsize() == 0:
time.sleep(0.01)
animate(argv)
interface.send(io_queue.get())
if unit_test:
break
def process_arguments() -> Tuple[str, int, int]:
"""Load simulator settings from the command line argument."""
try:
argv = str(sys.argv[1])
input_socket, output_socket = {SCNCLR: (SRC_DD_LISTEN_SOCKET, RP_LISTEN_SOCKET),
SCNCRL: (SRC_DD_LISTEN_SOCKET, RP_LISTEN_SOCKET),
NCDCLR: (DST_DD_LISTEN_SOCKET, DST_LISTEN_SOCKET),
NCDCRL: (DST_DD_LISTEN_SOCKET, DST_LISTEN_SOCKET)}[argv]
return argv, input_socket, output_socket
except (IndexError, KeyError):
clear_screen()
print(f"\nUsage: python3 dd.py [OPTION]\n\n"
f"\nMandatory arguments"
f"\n Argument Simulate data diode between..."
f"\n {SCNCLR} Source Computer and Networked Computer (left to right)"
f"\n {SCNCRL} Source Computer and Networked Computer (right to left)"
f"\n {NCDCLR} Networked Computer and Destination Computer (left to right)"
f"\n {NCDCRL} Networked Computer and Destination Computer (right to left)")
sys.exit(1)
def main(queues: Dict[bytes, 'Queue[Any]']) -> None:
"""\
Read the argument from the command line and launch the data diode simulator.
This application is the data diode simulator program used to
visualize data transfer inside the data diode #1 between the Source
Computer and the Networked Computer, or data transfer inside the
data diode #2 between the Networked Computer and the Destination
Computer. The local testing terminal multiplexer configurations that
use data diode simulators run two instances of this program.
The visualization is done with an indicator ('<' or '>') that blinks
when data passes from one program to another. The data diode
simulator does not provide any of the endpoint security properties
that the hardware data diodes do.
The visualization is designed to make data transfer between programs
slower than is the case with actual serial interfaces. This allows
the user to track the movement of data from one program to another
with their eyes.
"""
time.sleep(0.5) # Wait for the terminal multiplexer size to stabilize
argv, input_socket, output_socket = process_arguments()
io_queue = Queue() # type: Queue[Any]
process_list = [Process(target=rx_loop, args=(io_queue, input_socket )),
Process(target=tx_loop, args=(io_queue, output_socket, argv))]
for p in process_list:
p.start()
monitor_processes(process_list, NC, queues, error_exit_code=0)
if __name__ == '__main__': # pragma: no cover
main({EXIT_QUEUE: Queue()})