手習ひデジタル信号処理(91) PythonでFMラジオデータを音声再生、RTL-SDR

Joseph Halfmoon

前々回前回と低価格なソフトウエア無線受信用USBドングルRTL-SDRのデータをPythonで読み取ったものをScilabへ「輸出」するためのPythonスクリプトを作ってました。今回はそのスクリプトにFM放送波形を復調して断片的ですが音響再生する機能を追加してみました。毒を食らわば皿まで?違うか。

※「手習ひデジタル信号処理」投稿順 Indexはこちら

※実験に使用したPythonソース全文は末尾に

FMラジオ放送の復調

前回、前々回とRTL-SDRで受信したIQデータおよびそれをデシメーションかけたデータをScilabへ輸出できるようにCSVファイル化してディスクにセーブしてきました。CSVなのでScilabに限らず各種の処理系で読み込めるじゃないかとおもいますが。

ファイルにセーブするだけではどんな信号なんだが不安なので、Pythonで処理するついでにPSD(パワースペクトル密度)プロットを表示するオプションなどをつけてグラフを眺めてました。

まあデシメーションかけるところまでやったならば、復調するところでやらんかい、ということで今回復調(demodulation)までやってみました。FMラジオ波形を復調したのであれば、グラフだけでは寂しいです。やっぱり音声にして耳で聞きたいです。

それについては以下に素晴らしい文献ありました。

CQ出版社 インタフェース2021年5月号 特設 ソフトウエア・ラジオで合点! 無線通信変復調技術 (藤井義巳様著)

のP.78にアナデバ製のSDRデバイス、ADALM-PLUTOをつかったFMラジオ受信のサンプルコードとその説明が掲載されています。RTL-SDRを使った場合の修正点についてもサジェスチョンがあり参考にさせて(パクらせて)いただきました。ありがとうございます。

前回、前々回のコードの修正

今回追加の復調と音声再生、音声波形プロット、音声波形のCSVセーブ機能は、新たに追加した以下のクラスの中にすべて押し込めてあります。

fmRadioDemodulator

以前からつかっていた rtlsdrBuffer クラスを使ってRTL-SDRから受信済のIQデータを上記クラスに引き渡した上で、以下のメソッドを呼び出せば目的を果たします。

    • playAudio 復調したのち音声再生
    • plotAudio 上記で復調した音声波形をプロット
    • writeAudio 上記で復調した音声波形をファイルにセーブ

なお、rtlsdrBufferクラスは断片的なIQデータを蓄えるだけのクラスなので、音声再生も断片的なものです。今回設定のデフォルト値で約1秒強ほどです。音楽が流れているとかいう程度は識別できるかと。

また、音声再生に対応したことで、デフォルト値をいくつか修正しています。データを整数比でリサンプリングした結果で音声再生用のサンプリング周波数44100Hzに帰着するようにしたためです。

※ソース全文は末尾に

動作確認

以下のようにコマンドライン引数を与えてスクリプトを起動してみました。

playAudioOpr

ちょっと時間あってから、パソコンのスピーカから音楽が流れてきました。約1秒くらい。その後、プロットオプションが発動して以下のようなグラフが画面にあらわれました。AudioWave

先ほど聞こえた音楽の波形のハズ。もちろんフォルダには、元のIQ波形データと、音声(実数)波形データの両方が生成されとります。まあいいんでないかい。

手習ひデジタル信号処理(90) PythonでRTL-SDRデータをデシメーション へ戻る

手習ひデジタル信号処理(92) Scilab、デシメーション処理、サンプルデータで動作確認 へ進む

実験に使用したPythonソース全文
#!/usr/bin/python
# coding: utf-8
### @file  pyrtlsdr2sci.py
### @brief save RTL-SDR IQ signal for scilab
###
### @author: Joseph Halfmoon
### @date:   June 7, 2023
"""
Save RTL-SDR IQ signal for scilab v0.3

input
=====
--FS s    sample rate (default=2.1168[MHz])
--FC c    center_freq (defalut=80[MHz]
--N  n    number of samples (defalut=2560, means 2560*1024)
--FNAME x File name to save (defalut=toScilab.csv)
--G  opt  'psd' 'decimated' 'audio'
--D  f    decimation factor
-A        play Audio
-V        VERSION
"""
import argparse
import csv
import math
import matplotlib.pyplot as plt
import numpy as np
import os.path
import scipy.signal
import sys
import pyaudio
from rtlsdr import RtlSdr
import wave

versionSTR = "pyrtlsdr2sci.py v0.3"

def parse2float(st):
    """parse string to float
    
    Returns at Fail: None
    """
    try:
        work = float(st)
    except:
        return None
    return work

def chkVal(arg, dval):
    vl = parse2float(arg)
    if vl is None:
        nLen = len(arg)
        if nLen > 1:
            bdy = arg[0:nLen-1]
            lastChar = arg[nLen-1]
            vl = parse2float(bdy)
        if vl is None:
            vl = dval
    else:
        lastChar = "0"
    return (vl, lastChar)

def freqIN(arg, dval):
    prefixes = 'mkMG'
    vl, lastChar = chkVal(arg, dval)
    pfx = prefixes.find(lastChar)
    if pfx == 0:
        ex = 1e-3
    elif pfx == 1:
        ex = 1e3
    elif pfx == 2:
        ex = 1e6
    elif pfx == 3:
        ex = 1e9
    else:
        ex = 1
    return vl * ex

def parse2int(st):
    """parse string to int
    
    Returns at Fail: None
    """
    try:
        work = int(st)
    except:
        return None
    return work

class rtlsdrBuffer(object):
    """rtlsdr buffer class
    
    read & save RTL-SDR IQ signal
    Fs : sampling Freq[Hz]
    Fc : center Freq[Hz]
    """
    def __init__(self, Fs, Fc, N):
        self.sdr = RtlSdr()
        self.sdr.sample_rate = Fs
        self.sdr.center_freq = Fc
        self.sdr.freq_correction = 60
        self.sdr.gain = 'auto'
        self.N = N
        self.buffer = np.zeros(self.N, dtype=np.complex128)

    def read(self):
        self.buffer = self.sdr.read_samples(self.N)
        self.sdr.close()

    def writeSCI(self, fnam, x):
        self.fnam = fnam
        np.savetxt(self.fnam, x, fmt='%.18e %+.18ej')
    
    def plotPSD(self, x):
        plt.psd(x, NFFT=1024, Fs=self.sdr.sample_rate/1e6, Fc=self.sdr.center_freq/1e6)
        plt.xlabel('Frequency [MHz]')
        plt.ylabel('Relative power [dB]')
        plt.show()

    def decimation(self, N):
        self.decimated = scipy.signal.decimate(self.buffer, N)

class fmRadioDemodulator(object):
    """FM Radio Demodulator class with Audio output
    
    Buf: modulatedData
    Fs : sampling Freq[Hz]
    Ft : 1st. downSampling target Freq[Hz]
    """
    def __init__(self, Buf, Fs, Ft):
        self.buf = Buf
        self.Fs = Fs
        self.Ft = Ft
        self.AudioFreq = 44100
        self.audioObj = pyaudio.PyAudio()
        self.audioStream = self.audioObj.open(format=pyaudio.paInt16, channels=1, rate=self.AudioFreq, output=True)
        self.audioData = None

    def playAudio(self):
        d1data = scipy.signal.resample(self.buf, int(len(self.buf)*self.Ft/self.Fs))
        demodulated = np.angle(d1data[1:]*np.conj(d1data[:-1]))
        d2data = scipy.signal.resample(demodulated, int(len(demodulated)*self.AudioFreq/self.Ft))
        self.audioData = (d2data/math.pi * 32768.0).astype(np.int16)
        self.audioStream.write(self.audioData)

    def plotAudio(self):
        if self.audioData is not None:
            plt.plot(self.audioData)
            plt.show()
        else:
            print("ERROR: No audio data available.")

    def writeAudio(self, fnam):
        if self.audioData is not None:
            self.fnam = fnam
            np.savetxt(self.fnam, self.audioData)
        else:
            print("ERROR: No audio data available.")


def main():
    """main program.
    
    process options
    """
    parser = argparse.ArgumentParser(description=versionSTR)
    parser.add_argument('--FS', nargs=1, help='sample rate (default=2.1168[MHz])')
    parser.add_argument('--FC', nargs=1, help='center_freq (defalut=80[MHz])')
    parser.add_argument('--N',  nargs=1, help='number of samples (defalut=256, means 256*1024)')
    parser.add_argument('--FNAME',  nargs=1, help='File name to save (defalut=toScilab.csv)')
    parser.add_argument('--G', nargs=1, help='select options: psd, decimated, audio(w/-A only)')
    parser.add_argument('--D', nargs=1, help='decimation factor (defalut=8)')
    parser.add_argument('-A', dest='AUDIO', help='Play Audio, then save', action='store_true', default=False)
    parser.add_argument('-V', dest='VERSION', help='Show Version, then exit', action='store_true', default=False)
    args = parser.parse_args()

    if args.VERSION:
        print( versionSTR )
        sys.exit(0)

    Fa = 44100 #sampling Frequency, Audio Output
    Fs = Fa * 48 #2.1168MHz (OLD:2.048e6)
    if args.FS is not None:
        Fs = freqIN(args.FS[0], Fs)
    Fc = 80e6
    if args.FC is not None:
        Fc = freqIN(args.FC[0], Fc)
    N = 2560*1024 #10x OLD N
    if args.N is not None:
        tmpN = parse2int(args.N[0])
        if tmpN is not None:
            N = tmpN * 1024
    FNAME = "toScilab.csv"
    if args.FNAME is not None:
        FNAME = args.FNAME[0]
    fb, fext = os.path.splitext(FNAME)
    FNAME_D = fb + "_decimated" + fext
    FNAME_A = fb + "_audio" + fext
 
    sdr = rtlsdrBuffer(Fs, Fc, N)
    sdr.read()
    sdr.writeSCI(FNAME, sdr.buffer)

    if args.AUDIO:
        audioplayer = fmRadioDemodulator(sdr.buffer, Fs, Fa*6)
        audioplayer.playAudio()
        audioplayer.writeAudio(FNAME_A)

    if args.D is not None:
        D = parse2int(args.D[0])
        if D is not None:
            sdr.decimation(D)
            sdr.writeSCI(FNAME_D, sdr.decimated)

    if args.G is not None:
        if args.G[0].startswith('psd'):
            sdr.plotPSD(sdr.buffer)
        if args.G[0].startswith('decimated'):
            sdr.plotPSD(sdr.decimated)
        if args.G[0].startswith('audio') and args.AUDIO:
            audioplayer.plotAudio()

    sys.exit(0)

if __name__ == "__main__":
    main()