程式設計師帶娃有多“恐怖” ?!

語言: CN / TW / HK

(給 伯樂線上 加星標,看經典文章

英文:Fabio Manganiello

翻譯:

Python開發者 / 字串拼接工程師

本文已獲授權轉載

我有一陣子沒更新文章了。是因為我當爹啦,必須放下手頭的工作,轉而處理一些尚未自動化的育兒任務。

換個角度想,這些沒自動化的任務,是否可以做成自動化的呢?雖然機器人換尿布還要好幾年才能實現,但是目前有一些比較簡單的育兒任務可以自動化掉。

當了爹後我發現,寶寶真的經常哭。即使我在家裡,我可能也不會總是在附近聽到我兒子的哭聲。商用嬰兒監視器通常會填補這一空白,它們就像對講機,即使你在其他房間也能聽到寶寶的聲音。但我很快意識到,商用嬰兒監視器比我想要的理想裝置要笨得多。它們並不能檢測寶寶的哭聲,只是像對講機一樣把聲音從聲源傳到揚聲器。父母在不同屋子裡活動的時候必須帶著揚聲器,否則在其他房間 就聽不到聲音了。商用嬰兒監視器通常帶有低功率的揚聲器,而且還不能連線到外接揚聲器 —— 這意味著如果我在另一個房間裡播放音樂,就算我帶著監視器,我也可能會聽不到寶寶的哭聲。

我理想中的嬰兒監控器是這樣的:

  • 它要在廉價裝置上執行,比如外接廉價 USB 麥克風的樹莓派。

  • 它要能夠檢測嬰兒哭聲,並在他開始或停止哭的時候通知我(最好通知發手機上)、或者把哭聲記錄到儀表盤上、或者做任何我想做的哭聲監控。

  • 它應該能夠在任何裝置上播放音訊,比如:我自己的揚聲器、智慧手機、電腦等等裝置。無論聲源和揚聲器之間有多遠,都可以播放,不需要我在屋子裡移動揚聲器。

  • 它還應該帶有一個攝像頭,這樣就可以實時檢查寶寶的情況。或者在寶寶開始哭時,我可以得到嬰兒床的照片或短影片。

接下來我們來看看如何用開源工具處理上述需求。

錄音取樣

首先要搞一個樹莓派跑 Tensorflow 模型,把 Linux 作業系統裝到 SD 卡上,最好用 樹莓派3 及以上的版本。另外還要一個可相容的麥克風。

然後安裝依賴:

[sudo] apt-get install ffmpeg lame libatlas-base-dev alsa-utils
[sudo] pip3 install tensorflow

這第一步就是要錄足夠多的嬰兒哭的音訊樣本,要讓檢測模型識別嬰兒是不是在哭。

注意:在這個例子中,我將展示如何使用聲音檢測識別嬰兒的哭聲,但也可以檢測其他型別的聲音(比如警報聲或鄰居的電鑽聲),前提是有足夠長的時間和足夠響亮的聲音。

先看看能不能識別音訊輸入裝置:

arecord -l

我的樹莓派上輸出如下(我裝了兩個 USB 麥克風):

**** List of CAPTURE Hardware Devices ****
card 1: Device [USB PnP Sound Device], device 0: USB Audio [USB Audio]
Subdevices: 0/1
Subdevice #0: subdevice #0
card 2: Device_1 [USB PnP Sound Device], device 0: USB Audio [USB Audio]
Subdevices: 0/1
Subdevice #0: subdevice #0

我想用第二個麥克風 card 2, device 0 錄音頻。ALSA (Advanced Linux Sound Architecture)識別第二個麥克風的引數是 hw:2,0 (這個引數直接訪問裝置硬體)或 plughw:2,0 (這個是聲明瞭取樣率和格式轉換外掛)。確定下SD卡有足夠的儲存空間,或者外接外部USB儲存裝置。開始錄製音訊:

arecord -D plughw:2,0 -c 1 -f cd | lame - audio.mp3

錄幾分鐘到幾小時嬰兒房間聲音,最好這期間有足夠長的安靜、嬰兒哭啼聲音和其他無關聲音。錄好後 Ctrl C 結束錄音。一天或者幾天裡重複錄音幾次。

給音訊樣本打標籤

一旦錄好足夠多音訊樣本,就可以把音訊複製到電腦上訓練模型了。不論是用 scp 還是直接從SD卡或者usb裝置拷貝都行。

先把拷貝音訊樣本放到同一個目錄下,比如 ~/datasets/sound-detect/audio 。另外建立一個新目錄放樣本,每個目錄下包含一個命名為 audio.mp3 的音訊檔案和命名為 labels.json 的標籤檔案,標籤檔案裡標記音訊段落的正向/負向。目錄結構大概是這樣:

~/datasets/sound-detect/audio
-> sample_1
-> audio.mp3
-> labels.json
-> sample_2
-> audio.mp3
-> labels.json
...

現在要給音訊檔案打標籤了,如果音訊裡錄著寶寶幾個小時的哭聲,這個過程就很自虐。用任何播放器或是 Audacity 播放器開啟音訊播放,另外在每個目錄下都建立一個 labels.json 標籤檔案。識別哭聲開始結束的準確時間,在 labels.json 裡用 時間->標籤 的格式的鍵值對格式記錄,比如:

{
"00:00": "negative",
"02:13": "positive",
"04:57": "negative",
"15:41": "positive",
"18:24": "negative"
}

上面的例子裡 00:00 到 02:12 的音訊會被判定為負向,02:13 到 04:56的音訊會被判定為正向,以此類推。

生成資料集

一旦給所有音訊都打好標籤,就可以著手生成給 tensorflow 訓練模型的資料集了。我建立了一個名為 micmon 的通用聲音監控庫和一套實用程式。安裝:

git clone git@github.com:/BlackLight/micmon.git
cd micmon
[sudo] pip3 install -r requirements.txt
[sudo] python3 setup.py build install

該模型旨在處理頻率樣本,而不是處理原始音訊。因為如果我們想檢測特定的聲音,該聲音將具有特定的“頻譜”特徵,即基頻(或基頻通常可能下降的狹窄範圍)和通過特定比率與基頻相關聯的特定諧波集。這些頻率之間的比率既不受振幅的影響,無論輸入音量如何,頻率比率都是恆定的;也不受相位的影響,無論何時開始錄製,連續的聲音都將具有相同的頻譜特徵。與簡單地將原始音訊樣本饋送到模型的情況相比,這種幅度和時間不變的特性,使得這種方法更有可能訓練健壯的聲音檢測模型。另外該模型可以更簡單、更輕量,而且不會過擬合。簡單是指可以在不影響效能的情況下輕鬆地將頻率分組到頻段中,從而可以有效地執行降維;輕量指將有 50 到 100 個頻帶作為輸入值,而不考慮樣本持續時間,而一秒鐘的原始音訊通常包含 44100 個數據點,並且輸入的長度隨著樣本持續時間的增加而增加。

micmon 提供了在一些音訊樣本上計算 FFT(快速傅立葉變換)的邏輯,使用低通和高通濾波器將結果頻譜分組後把結果儲存到一組 numpy 壓縮( .npz )檔案中。通過命令列工具 micmon-datagen 進行操作:

micmon-datagen \
--low 250 --high 2500 --bins 100 \
--sample-duration 2 --channels 1 \
~/datasets/sound-detect/audio ~/datasets/sound-detect/data

上面的例子中,用 ~/dataset/sound-detect/audio 目錄裡的原始音訊生成了一組資料集,存在 ~/datasets/sound-detect/data 目錄下。

--low--high 引數分別代表指定結果頻譜中的最低和最高頻率,預設之分別是 20Hz (最低人耳朵可以識別到的頻率)和 20kHz(最高健康年輕人耳朵識別到的頻率)。你可能要自己調整這個引數,以儘可能多地捕捉您想要檢測的聲音並儘量限制任何其他型別的背景音和不相關的諧波。我這裡是 250–2500Hz 這個範圍就可以檢測嬰兒哭聲了。嬰兒哭聲頻率很高(歌劇女高音最高可以達到最高 1000Hz),通常可以至少將頻率提高一倍,來獲得足夠高次諧波(諧波是實際上給聲音帶來音色的較高頻率)、但不能太高,否則其他背景音的諧波會汙染頻譜。我忽略了低於 250Hz 的聲音,因為嬰兒的哭聲不會再這麼低的頻率上發生,這些聲音會扭曲檢測。推薦通過 Audacity 或其他任何均衡器或頻譜分析儀中開啟正向音訊樣本,檢查哪些頻率在正向樣本中占主導地位,將資料圍繞這些頻率對齊。

--bins 引數指定頻率空間的組數,預設值 100。更高 bins 配置意味著更高頻率解析度/粒度,但如果太高,會是模型容易過擬合。

上面的指令碼將原始音訊分割成更小的片段,並計算每個片段的頻譜“簽名”。 --sample-duration 指這些分段應有多長,預設 2 秒。越高數值和更長的聲音匹配,但是高數值會縮小檢測的時間長度,而且在短音上會失效。低數值給短音使用越好,但是如果聲音較長,捕獲的片段可能沒有足夠的資訊來可靠地識別聲音。

除了呼叫 micmon-datagen ,還有另一個方法可以生成資料集,即呼叫 micmon 提供的python api:

import os

from micmon.audio import AudioDirectory, AudioPlayer, AudioFile
from micmon.dataset import DatasetWriter

basedir = os.path.expanduser('~/datasets/sound-detect')
audio_dir = os.path.join(basedir, 'audio')
datasets_dir = os.path.join(basedir, 'data')
cutoff_frequencies = [250, 2500]

# Scan the base audio_dir for labelled audio samples
audio_dirs = AudioDirectory.scan(audio_dir)

# Save the spectrum information and labels of the samples to a
# different compressed file for each audio file.
for audio_dir in audio_dirs:
dataset_file = os.path.join(datasets_dir, os.path.basename(audio_dir.path) + '.npz')
print(f'Processing audio sample {audio_dir.path}')

with AudioFile(audio_dir) as reader, \
DatasetWriter(dataset_file,
low_freq=cutoff_frequencies[0],
high_freq=cutoff_frequencies[1]) as writer:
for sample in reader:
writer += sample

無論用 micmon-datagen 還是 micmon python api,最後都要在 ~/datasets/sound-detect/data 目錄下生成 .npz 檔案,每個原始音訊生成一個標記檔案。使用這個資料集來訓練我們的神經網路進行聲音檢測。

訓練模型

micmon 用Tensorflow+Keras定義和訓練模型,用已有的python api很容易做:

import os
from tensorflow.keras import layers

from micmon.dataset import Dataset
from micmon.model import Model

# This is a directory that contains the saved .npz dataset files
datasets_dir = os.path.expanduser('~/datasets/sound-detect/data')

# This is the output directory where the model will be saved
model_dir = os.path.expanduser('~/models/sound-detect')

# This is the number of training epochs for each dataset sample
epochs = 2

# Load the datasets from the compressed files.
# 70% of the data points will be included in the training set,
# 30% of the data points will be included in the evaluation set
# and used to evaluate the performance of the model.
datasets = Dataset.scan(datasets_dir, validation_split=0.3)
labels = ['negative', 'positive']
freq_bins = len(datasets[0].samples[0])

# Create a network with 4 layers (one input layer, two intermediate layers and one output layer).
# The first intermediate layer in this example will have twice the number of units as the number
# of input units, while the second intermediate layer will have 75% of the number of
# input units. We also specify the names for the labels and the low and high frequency range
# used when sampling.
model = Model(
[
layers.Input(shape=(freq_bins,)),
layers.Dense(int(2 * freq_bins), activation='relu'),
layers.Dense(int(0.75 * freq_bins), activation='relu'),
layers.Dense(len(labels), activation='softmax'),
],
labels=labels,
low_freq=datasets[0].low_freq,
high_freq=datasets[0].high_freq
)

# Train the model
for epoch in range(epochs):
for i, dataset in enumerate(datasets):
print(f'[epoch {epoch+1}/{epochs}] [audio sample {i+1}/{len(datasets)}]')
model.fit(dataset)
evaluation = model.evaluate(dataset)
print(f'Validation set loss and accuracy: {evaluation}')

# Save the model
model.save(model_dir, overwrite=True)

跑完這些程式碼後,看看模型的準確率, ~/models/sound-detect 儲存著有新的模型。我這裡,從寶寶房間收集大約5個小時的聲音,並定義一個好的頻率範圍來訓練出準確率大於96%的模型就可以了。

在電腦上訓練好模型後複製到樹莓派。

使用模型做檢測

做一個指令碼,使用之前訓練好的模型來處理麥克風傳來的實時音訊資料,在寶寶哭鬧時提醒我們:

import os

from micmon.audio import AudioDevice
from micmon.model import Model

model_dir = os.path.expanduser('~/models/sound-detect')
model = Model.load(model_dir)
audio_system = 'alsa' # Supported: alsa and pulse
audio_device = 'plughw:2,0' # Get list of recognized input devices with arecord -l

with AudioDevice(audio_system, device=audio_device) as source:
for sample in source:
source.pause() # Pause recording while we process the frame
prediction = model.predict(sample)
print(prediction)
source.resume() # Resume recording

在樹莓派上跑起來指令碼,如果2秒內沒有哭鬧發生,會列印 negative ,否則列印 positive

指令碼僅僅列印嬰兒哭鬧情況是不夠的,我們需要通知。通知的功能通過Platypush實現。這個例子中,我們使用pushbullet,在檢測到嬰兒哭鬧時傳送訊息到我們的手機。

安裝Redis(Platypush用Redis接收訊息)、Platypush的Http與Pushbullet整合:

[sudo] apt-get install redis-server
[sudo] systemctl start redis-server.service
[sudo] systemctl enable redis-server.service
[sudo] pip3 install 'platypush[http,pushbullet]'

在智慧手機上安裝Pushbullet應用,去pushbullet.com上取一個api token。建立 ~/.config/platypush/config.yaml 檔案,開啟Http與Pushbullet整合:

backend.http:
enabled: True
pushbullet:
token: YOUR_TOKEN

修改之前的指令碼,不再列印一個訊息,改為呼叫Platypush可以捕捉到的CustomEvent

#!/usr/bin/python3

import argparse
import logging
import os
import sys

from platypush import RedisBus
from platypush.message.event.custom import CustomEvent

from micmon.audio import AudioDevice
from micmon.model import Model

logger = logging.getLogger('micmon')


def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('model_path', help='Path to the file/directory containing the saved Tensorflow model')
parser.add_argument('-i', help='Input sound device (e.g. hw:0,1 or default)', required=True, dest='sound_device')
parser.add_argument('-e', help='Name of the event that should be raised when a positive event occurs', required=True, dest='event_type')
parser.add_argument('-s', '--sound-server', help='Sound server to be used (available: alsa, pulse)', required=False, default='alsa', dest='sound_server')
parser.add_argument('-P', '--positive-label', help='Model output label name/index to indicate a positive sample (default: positive)', required=False, default='positive', dest='positive_label')
parser.add_argument('-N', '--negative-label', help='Model output label name/index to indicate a negative sample (default: negative)', required=False, default='negative', dest='negative_label')
parser.add_argument('-l', '--sample-duration', help='Length of the FFT audio samples (default: 2 seconds)', required=False, type=float, default=2., dest='sample_duration')
parser.add_argument('-r', '--sample-rate', help='Sample rate (default: 44100 Hz)', required=False, type=int, default=44100, dest='sample_rate')
parser.add_argument('-c', '--channels', help='Number of audio recording channels (default: 1)', required=False, type=int, default=1, dest='channels')
parser.add_argument('-f', '--ffmpeg-bin', help='FFmpeg executable path (default: ffmpeg)', required=False, default='ffmpeg', dest='ffmpeg_bin')
parser.add_argument('-v', '--verbose', help='Verbose/debug mode', required=False, action='store_true', dest='debug')
parser.add_argument('-w', '--window-duration', help='Duration of the look-back window (default: 10 seconds)', required=False, type=float, default=10., dest='window_length')
parser.add_argument('-n', '--positive-samples', help='Number of positive samples detected over the window duration to trigger the event (default: 1)', required=False, type=int, default=1, dest='positive_samples')

opts, args = parser.parse_known_args(sys.argv[1:])
return opts


def main():
args = get_args()
if args.debug:
logger.setLevel(logging.DEBUG)

model_dir = os.path.abspath(os.path.expanduser(args.model_path))
model = Model.load(model_dir)
window = []
cur_prediction = args.negative_label
bus = RedisBus()

with AudioDevice(system=args.sound_server,
device=args.sound_device,
sample_duration=args.sample_duration,
sample_rate=args.sample_rate,
channels=args.channels,
ffmpeg_bin=args.ffmpeg_bin,
debug=args.debug) as source:
for sample in source:
source.pause() # Pause recording while we process the frame
prediction = model.predict(sample)
logger.debug(f'Sample prediction: {prediction}')
has_change = False

if len(window) < args.window_length:
window += [prediction]
else:
window = window[1:] + [prediction]

positive_samples = len([pred for pred in window if pred == args.positive_label])
if args.positive_samples <= positive_samples and \
prediction == args.positive_label and \
cur_prediction != args.positive_label:
cur_prediction = args.positive_label
has_change = True
logging.info(f'Positive sample threshold detected ({positive_samples}/{len(window)})')
elif args.positive_samples > positive_samples and \
prediction == args.negative_label and \
cur_prediction != args.negative_label:
cur_prediction = args.negative_label
has_change = True
logging.info(f'Negative sample threshold detected ({len(window)-positive_samples}/{len(window)})')

if has_change:
evt = CustomEvent(subtype=args.event_type, state=prediction)
bus.post(evt)

source.resume() # Resume recording


if __name__ == '__main__':
main()

把上面的指令碼存到 ~/bin/micmon_detect.py 。這個指令碼只在 window_length 長度的滑動視窗內檢測到發生了 positive_samples ,只在當前的檢測從負向變成正向或正向變成負向的時候出發提示事件。提示事件通過 RedisBus 傳送給 Platypush。這個指令碼很通用,不僅可以檢測嬰兒哭音模型,還使用於任何聲音模型、任何正向負向標籤、任何頻率範圍、任何型別的輸出的場景。

再來建立一個響應事件和傳送推送到裝置的 Platypush 鉤子。首先,準備 Platypush 指令碼目錄:

mkdir -p ~/.config/platypush/scripts
cd ~/.config/platypush/scripts
# Define the directory as a module
touch __init__.py
# Create a script for the baby-cry events
vi babymonitor.py

babymonitor.py 的程式碼如下:

from platypush.context import get_plugin
from platypush.event.hook import hook
from platypush.message.event.custom import CustomEvent


@hook(CustomEvent, subtype='baby-cry', state='positive')
def on_baby_cry_start(event, **_):
pb = get_plugin('pushbullet')
pb.send_note(title='Baby cry status', body='The baby is crying!')


@hook(CustomEvent, subtype='baby-cry', state='negative')
def on_baby_cry_stop(event, **_):
pb = get_plugin('pushbullet')
pb.send_note(title='Baby cry status', body='The baby stopped crying - good job!')

為 Platypush 建立一個服務檔案,並啟動和啟用該服務,這樣它將在終止或重新啟動時自動重新啟動:

mkdir -p ~/.config/systemd/user
wget -O ~/.config/systemd/user/platypush.service \
http://raw.githubusercontent.com/BlackLight/platypush/master/examples/systemd/platypush.service
systemctl --user start platypush.service
systemctl --user enable platypush.service

另外建立為嬰兒監控一個service:

~/.config/systemd/user/babymonitor.service

[Unit]
Description=Monitor to detect my baby's cries
After=network.target sound.target
[Service]
ExecStart=/home/pi/bin/micmon_detect.py -i plughw:2,0 -e baby-cry -w 10 -n 2 ~/models/sound-detect
Restart=always
RestartSec=10
[Install]
WantedBy=default.target

這個 service 會開啟 ALSA設 備 plughw:2,0 的麥克風監控,如果在過去 10 秒內檢測到至少 2 個正向的 2 秒樣本,並且之前的狀態為負向,則它將觸發一個 baby-cry 事件、配置 state=positive ;如果在過去 10 秒內檢測到少於 2 個正向樣本,並且之前的狀態為正向,則配置 state=negative

然後啟動和啟用該服務:

systemctl --user start babymonitor.service
systemctl --user enable babymonitor.service

嬰兒一哭,你就能在手機上收到通知。如果沒有收到,要檢查應用於音訊樣本的標籤、神經網路的架構和引數,或者樣本長度/視窗/頻率引數。

你也可以把這個事情當作一個基本的自動化的例子,新增任意多自動化任務。例如向其他帶有 tts 外掛的 Platypush 裝置傳送請求,提示嬰兒在哭。還可以擴充套件 micmon_detect.py ,讓捕獲的音訊樣本也用 http 做流式傳輸,例如用 Flask wrapper 傳送、ffmpeg 進行音訊轉換。另一個有趣的用例是當嬰兒開始/停止啼哭時,將資料點發送到您的本地資料庫,這是一組有用的資料,可以跟蹤嬰兒何時睡覺、何時醒來或何時需要餵養。參考如何使用 Platypush + PostgreSQL + Moscoitto + Grafana 建立靈活的儀表板。

監控我的寶寶是我開發 micmon 的主要動機,但本文中同樣的程式碼也可以用來訓練和使用模型來檢測任何型別的聲音。

最後注意,要使用一個好的電源或一塊鋰電池供電。

嬰兒攝像頭

一旦有了音訊流和檢測音訊開始和結束的方法,就可以新增一個影片流觀察孩子的情況了。我在用於音訊檢測的同一個樹莓派3上安裝了PiCamera,但是這種配置比較不切實際。樹莓派3加電池加相機,體積很龐大,不容易安裝在支架上。最後我還是選了樹莓派Zero,配小電池和帶外殼的PiCamera。

我的嬰兒監控攝像頭模組的第一個原型

和在其他裝置上一樣,還是在 sd 卡上裝一個樹莓派適用的系統。然後在插槽中插入一個與樹莓派相容的攝像頭,確定攝像頭模組已在 raspi-config 中啟用,並安裝帶有 PiCamera 整合的 Platypush:

[sudo] pip3 install 'platypush[http,camera,picamera]'

在配置檔案 ~/.config/platypush/config.yaml 里加攝像頭配置:

camera.pi:
listen_port: 5001

配置完成後重啟,可以通過http請求檢視攝像頭影象:

wget http://raspberry-pi:8008/camera/pi/photo.jpg

或者開啟瀏覽器看攝像頭傳來的影片流:

http://raspberry-pi:8008/camera/pi/video.mjpg

或者建立一個鉤子函式、在服務啟動時,使用Tcp和H264來看影片流:

mkdir -p ~/.config/platypush/scripts
cd ~/.config/platypush/scripts
touch __init__.py
vi camera.py

camera.py程式碼:

from platypush.context import get_plugin
from platypush.event.hook import hook
from platypush.message.event.application import ApplicationStartedEvent


@hook(ApplicationStartedEvent)
def on_application_started(event, **_):
cam = get_plugin('camera.pi')
cam.start_streaming()

配置完成後可以通過 vlc 看影片流:

vlc tcp/h264://raspberry-pi:5001

也可以在手機上通過 vlc 應用或者類似樹莓派攝像頭檢視器這種 app 看影片流。

音訊監控

最後一步是建立一個麥克風音訊流,把寶寶的樹莓派連結到任何客戶端。雖然 Tensorflow 做了檢測可以提示到你嬰兒啼哭,但是機器學習檢測模型不是 100% 精準。有時候還是需要聽一聽/看一看在孩子房間裡發生了什麼。

我為此製作了一個名為 micstream 的工具,可以用於任何您想要通過 HTTP/mp3 從麥克風取音訊流的場景。

注意:一個麥克風向 Tensorflow 提供音訊樣本,需要另外一個麥克風進行流式音訊傳輸。

把工具克隆下來,安裝軟體(只有一個 ffmpeg 依賴需要安裝):

git clone http://github.com/BlackLight/micstream.git
cd micstream
[sudo] python3 setup.py install

執行 micstream --help 獲得可用的命令列選項。

舉個例子,如果想要在第三個音訊輸入裝置上設定音訊流( arecord -l 看所有音訊裝置)、在 /baby.mp3 檔案上、監聽 8088 埠、96 kbps 位元率,命令如下:

micstream -i plughw:3,0 -e '/baby.mp3' -b 96 -p 8088

這時候瀏覽器或音訊播放器開啟 http://your-rpi:8088/baby.mp3 ,就可以聽到實時嬰兒聲音監控了。

- EOF -

看完本文有收穫?請分享給更多人

推薦關注「伯樂線上」,看 精選 IT 職場文章

點贊和在看就是最大的支援:heart: