-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathreader.py
103 lines (84 loc) · 3.07 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import time
import nidaqmx
import numpy as np
from nidaqmx.constants import AcquisitionType
from nidaqmx.stream_readers import AnalogMultiChannelReader
from pyqtgraph.Qt import QtCore
from config import *
class SignalReader(QtCore.QThread):
"""
Captures signals on the input DAQ
Creates a new thread to constantly poll the buffer from the DAQ
"""
# signal that is emitted whenever buffer object has been filled and is ready
incoming_data = QtCore.pyqtSignal(object)
def __init__(self, sample_rate, sample_size, channels, dev_name="Dev2"):
super().__init__()
self.reader = None
self.is_running = False
self.is_paused = False
self.input_channels = channels
self.daq_in_name = dev_name
self.sample_rate = sample_rate
self.sample_size = sample_size
# actual data received from the DAQ
self.input = np.empty(shape=(len(CHANNEL_NAMES_IN), self.sample_size))
# called on start()
def run(self):
"""
Main thread loop
Whenver the reader has data available and ready, it will emit the incoming_data signal with the data
"""
self.is_running = True
self.create_task()
while self.is_running:
if not self.is_paused:
try:
self.reader.read_many_sample(
data=self.input, number_of_samples_per_channel=self.sample_size
)
self.incoming_data.emit(self.input)
except Exception as e:
print("Error with read_many_sample")
print(e)
break
self.task.close()
def create_task(self):
"""
Create a read task
"""
print("reader input channels:", self.input_channels)
try:
self.task = nidaqmx.Task("Reader Task")
except OSError:
print("DAQ is not connected, task could not be created")
return
try:
for ch in self.input_channels:
channel_name = self.daq_in_name + "/ai" + str(ch)
self.task.ai_channels.add_ai_voltage_chan(channel_name)
print(channel_name)
except Exception:
print("DAQ is not connected, channel could not be added")
return
self.task.timing.cfg_samp_clk_timing(
rate=self.sample_rate, sample_mode=AcquisitionType.CONTINUOUS
)
self.task.start()
self.reader = AnalogMultiChannelReader(self.task.in_stream)
def restart(self):
"""
Deletes the previous task and creates a new task again
"""
self.is_paused = True
self.task.close()
self.create_task()
self.is_paused = False
if __name__ == "__main__":
print("\nRunning demo for SignalReader\n")
# reader_thread = SignalReader(sample_rate=1000, sample_size=1000, channels=[])
# reader_thread.start()
# input("Press return to stop")
# reader_thread.is_running = False
# reader_thread.wait()
# print("\nTask done")