前々回、前回と低価格なソフトウエア無線受信用USBドングルRTL-SDRのデータをPythonで読み取ったものをScilabへ「輸出」するためのPythonスクリプトを作ってました。今回はそのスクリプトにFM放送波形を復調して断片的ですが音響再生する機能を追加してみました。毒を食らわば皿まで?違うか。
※「手習ひデジタル信号処理」投稿順 Indexはこちら
※実験に使用したPythonソース全文は末尾に
FMラジオ放送の復調
前回、前々回とRTL-SDRで受信したIQデータおよびそれをデシメーションかけたデータをScilabへ輸出できるようにCSVファイル化してディスクにセーブしてきました。CSVなのでScilabに限らず各種の処理系で読み込めるじゃないかとおもいますが。
ファイルにセーブするだけではどんな信号なんだが不安なので、Pythonで処理するついでにPSD(パワースペクトル密度)プロットを表示するオプションなどをつけてグラフを眺めてました。
まあデシメーションかけるところまでやったならば、復調するところでやらんかい、ということで今回復調(demodulation)までやってみました。FMラジオ波形を復調したのであれば、グラフだけでは寂しいです。やっぱり音声にして耳で聞きたいです。
それについては以下に素晴らしい文献ありました。
CQ出版社 インタフェース2021年5月号 特設 ソフトウエア・ラジオで合点! 無線通信変復調技術 (藤井義巳様著)
のP.78にアナデバ製のSDRデバイス、ADALM-PLUTOをつかったFMラジオ受信のサンプルコードとその説明が掲載されています。RTL-SDRを使った場合の修正点についてもサジェスチョンがあり参考にさせて(パクらせて)いただきました。ありがとうございます。
前回、前々回のコードの修正
今回追加の復調と音声再生、音声波形プロット、音声波形のCSVセーブ機能は、新たに追加した以下のクラスの中にすべて押し込めてあります。
fmRadioDemodulator
以前からつかっていた rtlsdrBuffer クラスを使ってRTL-SDRから受信済のIQデータを上記クラスに引き渡した上で、以下のメソッドを呼び出せば目的を果たします。
-
- playAudio 復調したのち音声再生
- plotAudio 上記で復調した音声波形をプロット
- writeAudio 上記で復調した音声波形をファイルにセーブ
なお、rtlsdrBufferクラスは断片的なIQデータを蓄えるだけのクラスなので、音声再生も断片的なものです。今回設定のデフォルト値で約1秒強ほどです。音楽が流れているとかいう程度は識別できるかと。
また、音声再生に対応したことで、デフォルト値をいくつか修正しています。データを整数比でリサンプリングした結果で音声再生用のサンプリング周波数44100Hzに帰着するようにしたためです。
※ソース全文は末尾に
動作確認
以下のようにコマンドライン引数を与えてスクリプトを起動してみました。
ちょっと時間あってから、パソコンのスピーカから音楽が流れてきました。約1秒くらい。その後、プロットオプションが発動して以下のようなグラフが画面にあらわれました。
先ほど聞こえた音楽の波形のハズ。もちろんフォルダには、元のIQ波形データと、音声(実数)波形データの両方が生成されとります。まあいいんでないかい。
手習ひデジタル信号処理(90) PythonでRTL-SDRデータをデシメーション へ戻る
手習ひデジタル信号処理(92) Scilab、デシメーション処理、サンプルデータで動作確認 へ進む
実験に使用したPythonソース全文
#!/usr/bin/python # coding: utf-8 ### @file pyrtlsdr2sci.py ### @brief save RTL-SDR IQ signal for scilab ### ### @author: Joseph Halfmoon ### @date: June 7, 2023 """ Save RTL-SDR IQ signal for scilab v0.3 input ===== --FS s sample rate (default=2.1168[MHz]) --FC c center_freq (defalut=80[MHz] --N n number of samples (defalut=2560, means 2560*1024) --FNAME x File name to save (defalut=toScilab.csv) --G opt 'psd' 'decimated' 'audio' --D f decimation factor -A play Audio -V VERSION """ import argparse import csv import math import matplotlib.pyplot as plt import numpy as np import os.path import scipy.signal import sys import pyaudio from rtlsdr import RtlSdr import wave versionSTR = "pyrtlsdr2sci.py v0.3" def parse2float(st): """parse string to float Returns at Fail: None """ try: work = float(st) except: return None return work def chkVal(arg, dval): vl = parse2float(arg) if vl is None: nLen = len(arg) if nLen > 1: bdy = arg[0:nLen-1] lastChar = arg[nLen-1] vl = parse2float(bdy) if vl is None: vl = dval else: lastChar = "0" return (vl, lastChar) def freqIN(arg, dval): prefixes = 'mkMG' vl, lastChar = chkVal(arg, dval) pfx = prefixes.find(lastChar) if pfx == 0: ex = 1e-3 elif pfx == 1: ex = 1e3 elif pfx == 2: ex = 1e6 elif pfx == 3: ex = 1e9 else: ex = 1 return vl * ex def parse2int(st): """parse string to int Returns at Fail: None """ try: work = int(st) except: return None return work class rtlsdrBuffer(object): """rtlsdr buffer class read & save RTL-SDR IQ signal Fs : sampling Freq[Hz] Fc : center Freq[Hz] """ def __init__(self, Fs, Fc, N): self.sdr = RtlSdr() self.sdr.sample_rate = Fs self.sdr.center_freq = Fc self.sdr.freq_correction = 60 self.sdr.gain = 'auto' self.N = N self.buffer = np.zeros(self.N, dtype=np.complex128) def read(self): self.buffer = self.sdr.read_samples(self.N) self.sdr.close() def writeSCI(self, fnam, x): self.fnam = fnam np.savetxt(self.fnam, x, fmt='%.18e %+.18ej') def plotPSD(self, x): plt.psd(x, NFFT=1024, Fs=self.sdr.sample_rate/1e6, Fc=self.sdr.center_freq/1e6) plt.xlabel('Frequency [MHz]') plt.ylabel('Relative power [dB]') plt.show() def decimation(self, N): self.decimated = scipy.signal.decimate(self.buffer, N) class fmRadioDemodulator(object): """FM Radio Demodulator class with Audio output Buf: modulatedData Fs : sampling Freq[Hz] Ft : 1st. downSampling target Freq[Hz] """ def __init__(self, Buf, Fs, Ft): self.buf = Buf self.Fs = Fs self.Ft = Ft self.AudioFreq = 44100 self.audioObj = pyaudio.PyAudio() self.audioStream = self.audioObj.open(format=pyaudio.paInt16, channels=1, rate=self.AudioFreq, output=True) self.audioData = None def playAudio(self): d1data = scipy.signal.resample(self.buf, int(len(self.buf)*self.Ft/self.Fs)) demodulated = np.angle(d1data[1:]*np.conj(d1data[:-1])) d2data = scipy.signal.resample(demodulated, int(len(demodulated)*self.AudioFreq/self.Ft)) self.audioData = (d2data/math.pi * 32768.0).astype(np.int16) self.audioStream.write(self.audioData) def plotAudio(self): if self.audioData is not None: plt.plot(self.audioData) plt.show() else: print("ERROR: No audio data available.") def writeAudio(self, fnam): if self.audioData is not None: self.fnam = fnam np.savetxt(self.fnam, self.audioData) else: print("ERROR: No audio data available.") def main(): """main program. process options """ parser = argparse.ArgumentParser(description=versionSTR) parser.add_argument('--FS', nargs=1, help='sample rate (default=2.1168[MHz])') parser.add_argument('--FC', nargs=1, help='center_freq (defalut=80[MHz])') parser.add_argument('--N', nargs=1, help='number of samples (defalut=256, means 256*1024)') parser.add_argument('--FNAME', nargs=1, help='File name to save (defalut=toScilab.csv)') parser.add_argument('--G', nargs=1, help='select options: psd, decimated, audio(w/-A only)') parser.add_argument('--D', nargs=1, help='decimation factor (defalut=8)') parser.add_argument('-A', dest='AUDIO', help='Play Audio, then save', action='store_true', default=False) parser.add_argument('-V', dest='VERSION', help='Show Version, then exit', action='store_true', default=False) args = parser.parse_args() if args.VERSION: print( versionSTR ) sys.exit(0) Fa = 44100 #sampling Frequency, Audio Output Fs = Fa * 48 #2.1168MHz (OLD:2.048e6) if args.FS is not None: Fs = freqIN(args.FS[0], Fs) Fc = 80e6 if args.FC is not None: Fc = freqIN(args.FC[0], Fc) N = 2560*1024 #10x OLD N if args.N is not None: tmpN = parse2int(args.N[0]) if tmpN is not None: N = tmpN * 1024 FNAME = "toScilab.csv" if args.FNAME is not None: FNAME = args.FNAME[0] fb, fext = os.path.splitext(FNAME) FNAME_D = fb + "_decimated" + fext FNAME_A = fb + "_audio" + fext sdr = rtlsdrBuffer(Fs, Fc, N) sdr.read() sdr.writeSCI(FNAME, sdr.buffer) if args.AUDIO: audioplayer = fmRadioDemodulator(sdr.buffer, Fs, Fa*6) audioplayer.playAudio() audioplayer.writeAudio(FNAME_A) if args.D is not None: D = parse2int(args.D[0]) if D is not None: sdr.decimation(D) sdr.writeSCI(FNAME_D, sdr.decimated) if args.G is not None: if args.G[0].startswith('psd'): sdr.plotPSD(sdr.buffer) if args.G[0].startswith('decimated'): sdr.plotPSD(sdr.decimated) if args.G[0].startswith('audio') and args.AUDIO: audioplayer.plotAudio() sys.exit(0) if __name__ == "__main__": main()