当前位置: 首页 > 面试题库 >

PiZero W连接到两个外围设备(GPIO和USB):如何同时连续从两个外围设备读取数据?

丁昌翰
2023-03-14
问题内容

我有一个覆盆子pizero
W,它通过GPIO引脚连接到流量计,通过USB连接到条形码扫描仪。我有python脚本,当检测到GPIO输入时,该脚本使用回调函数来发出警报。该python脚本需要在pizero上连续运行,以便识别何时激活流量计并处理输入。

问题是我还有一个通过USB连接到pizero的条形码扫描仪。我希望pizero也可以识别何时扫描条形码并处理该输入。

然后,pizero应该发送一条消息,其中包含来自流量计的信息和来自条形码扫描仪的信息。

有没有办法在相同的python脚本中执行此操作?我如何才能同时从两个输入进行pizero侦听和处理?将其分成两个不同的脚本是否更容易实现,如果可以,我是否可以同时运行它们,并以某种方式统一它们在第三个连续运行的脚本中提供的信息?

谢谢!

每个注释的一些说明(感谢您的输入):

  • 流量计的输入引脚GPIO 17是SPI连接
  • 还连接了5V电源和接地引脚。

该脚本需要在系统启动时运行。我会看一下,systemctl直到被提及之前我才听说过。

当未连接流量计时,Pi通常会将正在扫描的条形码识别为键盘输入(即,一系列数字后跟换行符)。

当我发送包含流量计和条形码信息的消息时,我需要从python发送一个JSON对象,该对象既包含信息,又包含接收信息的时间戳。

该JSON对象将通过wifi发送到与pizero相同的家庭网络上具有静态ip的树莓派服务器。raspberry
pi服务器可以访问Django数据库,该数据库应将JSON对象信息合并到数据库中。


问题答案:

更新的答案

我为条形码阅读器添加了一些代码。我这样做是为了使条形码读取器花费可变的时间,最多花费5秒来读取数据,而流量计花费恒定的0.5秒,因此您可以看到不同的线程以彼此独立的速率进行进程。

#!/usr/bin/env python3

from threading import Lock
import threading
import time
from random import seed
from random import random

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    # Take 0.5s to read
    time.sleep(0.5)
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

# Dummy function to read Barcode as I don't have anything attached
def readBarcode():
    # Take variable time, 0..5 seconds, to read
    time.sleep(random()*5)
    result = "BC" + str(int(random()*1000))
    return result

class BarcodeReader(threading.Thread):
    def __init__(self):
        super(BarcodeReader, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read barcode and safely update self.currentReading
        while True:
            value = readBarcode()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Generate repeatable random numbers
    seed(42)

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.daemon = True
    fmThread.start()

    # Instantiate and start barcode reader thread
    bcThread = BarcodeReader()
    bcThread.daemon = True
    bcThread.start()

    # Now you can do other things in main, but always get access to latest readings
    for i in range(20):
        fmReading = fmThread.read()
        bcReading = bcThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}, Barcode={bcReading}")
        time.sleep(1)

样本输出

Main: i = 0 FlowMeter reading = 0, Barcode=0
Main: i = 1 FlowMeter reading = 1, Barcode=0
Main: i = 2 FlowMeter reading = 3, Barcode=0
Main: i = 3 FlowMeter reading = 5, Barcode=0
Main: i = 4 FlowMeter reading = 7, Barcode=BC25
Main: i = 5 FlowMeter reading = 9, Barcode=BC223
Main: i = 6 FlowMeter reading = 11, Barcode=BC223
Main: i = 7 FlowMeter reading = 13, Barcode=BC223
Main: i = 8 FlowMeter reading = 15, Barcode=BC223
Main: i = 9 FlowMeter reading = 17, Barcode=BC676
Main: i = 10 FlowMeter reading = 19, Barcode=BC676
Main: i = 11 FlowMeter reading = 21, Barcode=BC676
Main: i = 12 FlowMeter reading = 23, Barcode=BC676
Main: i = 13 FlowMeter reading = 25, Barcode=BC86
Main: i = 14 FlowMeter reading = 27, Barcode=BC86
Main: i = 15 FlowMeter reading = 29, Barcode=BC29
Main: i = 16 FlowMeter reading = 31, Barcode=BC505
Main: i = 17 FlowMeter reading = 33, Barcode=BC198
Main: i = 18 FlowMeter reading = 35, Barcode=BC198
Main: i = 19 FlowMeter reading = 37, Barcode=BC198

原始答案

我建议您研究一下systemdsystemctl在每次系统启动时启动您的应用程序-这里的示例。

至于同时监视两件事,我建议您使用Python的 线程
模块。这是一个简单的示例,我创建了一个子类threading,通过不断读取该对象并将其当前值保存在主程序可以随时读取的变量中来管理您的流量计。您可以启动另一个类似的程序来管理您的条形码读取器,然后并行运行它们。我不想这样做,并且会使您的代码倍增。

#!/usr/bin/env python3

from threading import Lock
import threading
import time

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()
            time.sleep(0.01)

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.start()

    # Now you can do other things in main, but always get access to latest reading
    for i in range(100000):
        fmReading = fmThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}")
        time.sleep(1)

您可以考虑使用logging来协调和统一调试和日志记录消息-参见此处。

您可以看一下events让其他线程知道某些事情达到临界水平时需要做的事情-
例如此处。



 类似资料:
  • 当建立蓝牙连接(BLE)时,外围设备是否有办法获得中央设备的名称?我不确定Bleno是否有必要的工具来得到这个。

  • 这听起来很基本,但我是Android BLE开发的初学者。到目前为止,我能够创建我的Nexus9设备作为外围设备和Moto G作为中心设备。而且我正在成功连接设备。但我不知道当我从中央设备发送一个特性时,它将从外设接收到哪里?广告回调仅在广告启动成功而不是(在我的例子中是成功的)时才返回 这是我的外设代码 我正在从中央使用连接的Gatt的writeCharacteristic命令,但不知道如何从外

  • 在iOS中,无法获取CBP外围对象的mac地址。现在,我有很多不同UUID但外设名称相同的BLE设备。用户必须首先通过注册命令注册到BLE设备,该BLE设备的MAC地址将在二维码扫描时获得。但是,我如何才能获得设备用户正在进行注册,因为在iOS中,我在外围设备中没有获得MAC地址?

  • 问题内容: 我正在使用在Xcode上使用Swift的应用程序,该应用程序连接到蓝牙BLE外设。我已经建立了与设备的连接,并想从特定特征(特别是服务UUID FFF0中的FFF1)读取一些数据。 如果要查找其信息的特征是,我可以使用以下代码请求特征的读取: 我想知道的是:如何检查此读取值是否是我要的值。我希望能够执行if语句,以针对该特征的发现值检查我的值。 例如:如果发现的值为X,则执行其他操作;

  • 我们可以从adb shell获取连接到android设备的设备列表吗?

  • 我正在尝试检测已经连接到android的usb设备。我知道有动作可以检测USB何时连接或断开。但是我真的不知道如何在将usb设备连接到android后检查设备。另外,我发现每个USB设备都有它的设备类别代码,但是我怎么知道连接的是哪种设备呢?例如,我需要检测usb鼠标和键盘;我如何区分它们?