手習ひデジタル信号処理(94) Python、やり直し、中間データもScilabへ輸出。

Joseph Halfmoon

前回デシメーション処理が「出来た気」がしたので、後はPythonで行っている処理をScilabへ移植していけば音声でるんでないの?しかし、いけません。ザーという雑音。どこが悪いの?もう一度Pythonに戻ってステップバイステップで比較できるよう、中間データもScilabへ輸出することにいたしました。やり直しね。

※「手習ひデジタル信号処理」投稿順 Indexはこちら

※実験に使用したPythonソース全文は末尾に

音声ファイルのScilab上での再生

C:¥var¥Tx.csvなるファイルに、音声(16ビット符合付整数がならんだCSV形式)があった場合、以下にて音声再生することができます。

aFnam=fullfile("C:\var", "Tx.csv");
audio=csvRead(aFnam);
playsnd(audio, Fa);

なお、今回オーディオ信号のサンプル周波数Faは44100Hzとしてます(Python上で素で再生したときよりファイルでScilabもってきた後の方が雑音きつくなった気がするのは何故?別途要調査。)

Scilab上での復調処理?不調。Pythonに戻る。

同様な変数に同様なデータが格納されていればplaysnd関数にて音声再生が可能です。前回の結果をうけて、後続のFM復調処理から再度のリサンプリング、そしてオーディオ形式への変換をチョコチョコっと行ってみたところ、イケません。

雑音しか聞こえんがや

原因追及方法は、Python上での処理とScilab上での処理をステップバイステップで比較することしか考えつかん、ということで、前近代的な手法で行くことにいたしました。そのため、以下の回で作成してあったRTL-SDRを使ってデータ断片を取得するPythonプログラムに再び手直しすることにいたしました(ソース全文は末尾に。)

手習ひデジタル信号処理(91) PythonでFMラジオデータを音声再生、RTL-SDR

修正後のPythonプログラムを使って、RTL-SDRで再びTokyo FM様の音声断片のデータを取得。プロフラム引数のデフォルトがTokyo FM様に向けてあるので何も指定しないとTokyo FMとなりまする。

RTLSDRagain

今回取得の復調済音声データを再生すると以下が聞こえました。

『高速道路の下りは』(Tokyo FM様)

これをScilab上で聞こえるようにするのが目標ね。

取得した中間データを含むファイル群が以下に。files

なんとか音声聞こえるようにならんかのう。

手習ひデジタル信号処理(93) Scilab、デシメーション処理、実データで動作確認 へ戻る

手習ひデジタル信号処理(95) Scilab、Tokyo FMの再生、1秒ちょっと へ進む

※以下ソースは2023年7月27日改定

#!/usr/bin/python
# coding: utf-8
### @file  pyrtlsdr2sci.py
### @brief save RTL-SDR IQ signal for scilab
###
### @author: Joseph Halfmoon
### @date:   June 7, 2023
"""
Save RTL-SDR IQ signal for scilab v0.3

input
=====
--FS s    sample rate (default=2.1168[MHz])
--FC c    center_freq (defalut=80[MHz]
--N  n    number of samples (defalut=2560, means 2560*1024)
--FNAME x File name to save (defalut=toScilab.csv)
--G  opt  'psd' 'decimated' 'audio'
--D  f    decimation factor
-A        play Audio
-V        VERSION
"""
import argparse
import csv
import math
import matplotlib.pyplot as plt
import numpy as np
import os.path
import scipy.signal
import sys
import pyaudio
from rtlsdr import RtlSdr
import wave

versionSTR = "pyrtlsdr2sci.py v0.4 (July 20, 2023)"

def parse2float(st):
    """parse string to float
    
    Returns at Fail: None
    """
    try:
        work = float(st)
    except:
        return None
    return work

def chkVal(arg, dval):
    vl = parse2float(arg)
    if vl is None:
        nLen = len(arg)
        if nLen > 1:
            bdy = arg[0:nLen-1]
            lastChar = arg[nLen-1]
            vl = parse2float(bdy)
        if vl is None:
            vl = dval
    else:
        lastChar = "0"
    return (vl, lastChar)

def freqIN(arg, dval):
    prefixes = 'mkMG'
    vl, lastChar = chkVal(arg, dval)
    pfx = prefixes.find(lastChar)
    if pfx == 0:
        ex = 1e-3
    elif pfx == 1:
        ex = 1e3
    elif pfx == 2:
        ex = 1e6
    elif pfx == 3:
        ex = 1e9
    else:
        ex = 1
    return vl * ex

def parse2int(st):
    """parse string to int
    
    Returns at Fail: None
    """
    try:
        work = int(st)
    except:
        return None
    return work

class rtlsdrBuffer(object):
    """rtlsdr buffer class
    
    read & save RTL-SDR IQ signal
    Fs : sampling Freq[Hz]
    Fc : center Freq[Hz]
    """
    def __init__(self, Fs, Fc, N):
        self.sdr = RtlSdr()
        self.sdr.sample_rate = Fs
        self.sdr.center_freq = Fc
        self.sdr.freq_correction = 60
        self.sdr.gain = 'auto'
        self.N = N
        self.buffer = np.zeros(self.N, dtype=np.complex128)

    def read(self):
        self.buffer = self.sdr.read_samples(self.N)
        self.sdr.close()

    def writeSCI(self, fnam, x):
        self.fnam = fnam
        np.savetxt(self.fnam, x, fmt='%.18e %+.18ej')
    
    def plotPSD(self, x):
        plt.psd(x, NFFT=1024, Fs=self.sdr.sample_rate/1e6, Fc=self.sdr.center_freq/1e6)
        plt.xlabel('Frequency [MHz]')
        plt.ylabel('Relative power [dB]')
        plt.show()

    def decimation(self, N):
        self.decimated = scipy.signal.decimate(self.buffer, N)

class fmRadioDemodulator(object):
    """FM Radio Demodulator class with Audio output
    
    Buf: modulatedData
    Fs : sampling Freq[Hz]
    Ft : 1st. downSampling target Freq[Hz]
    """
    def __init__(self, Buf, Fs, Ft):
        self.buf = Buf
        self.Fs = Fs
        self.Ft = Ft
        self.AudioFreq = 44100
        self.audioObj = pyaudio.PyAudio()
        self.audioStream = self.audioObj.open(format=pyaudio.paInt16, channels=1, rate=self.AudioFreq, output=True)
        self.audioData = None
        self.d1data = None
        self.d2data = None
        self.demodulated = None

    def playAudio(self):
        self.d1data = scipy.signal.resample(self.buf, int(len(self.buf)*self.Ft/self.Fs))
        self.demodulated = np.angle(self.d1data[1:]*np.conj(self.d1data[:-1]))
        self.d2data = scipy.signal.resample(self.demodulated, int(len(self.demodulated)*self.AudioFreq/self.Ft))
        self.audioData = (self.d2data/math.pi * 32768.0).astype(np.int16)
        self.audioStream.write(self.audioData)

    def plotAudio(self):
        if self.audioData is not None:
            plt.plot(self.audioData)
            plt.show()
        else:
            print("ERROR: No audio data available.")

    def writeAudio(self, fb, fext):
        if self.audioData is not None:
            FNAME_D1 = fb + "_d1d" + fext
            FNAME_DM = fb + "_demod" + fext
            FNAME_D2 = fb + "_d2d" + fext
            FNAME_A = fb + "_audio" + fext
            self.fnam = fb
            self.fext = fext
            np.savetxt(FNAME_D1, self.d1data, fmt='%.18e %+.18ej')
            np.savetxt(FNAME_DM, self.demodulated)
            np.savetxt(FNAME_D2, self.d2data)
            np.savetxt(FNAME_A, self.audioData)
        else:
            print("ERROR: No audio data available.")


def main():
    """main program.
    
    process options
    """
    parser = argparse.ArgumentParser(description=versionSTR)
    parser.add_argument('--FS', nargs=1, help='sample rate (default=2.1168[MHz])')
    parser.add_argument('--FC', nargs=1, help='center_freq (defalut=80[MHz])')
    parser.add_argument('--N',  nargs=1, help='number of samples (defalut=256, means 256*1024)')
    parser.add_argument('--FNAME',  nargs=1, help='File name to save (defalut=toScilab.csv)')
    parser.add_argument('--G', nargs=1, help='select options: psd, decimated, audio(w/-A only)')
    parser.add_argument('--D', nargs=1, help='decimation factor (defalut=8)')
    parser.add_argument('-A', dest='AUDIO', help='Play Audio, then save', action='store_true', default=False)
    parser.add_argument('-V', dest='VERSION', help='Show Version, then exit', action='store_true', default=False)
    args = parser.parse_args()

    if args.VERSION:
        print( versionSTR )
        sys.exit(0)

    Fa = 44100 #sampling Frequency, Audio Output
    Fs = Fa * 48 #2.1168MHz (OLD:2.048e6)
    if args.FS is not None:
        Fs = freqIN(args.FS[0], Fs)
    Fc = 80e6
    if args.FC is not None:
        Fc = freqIN(args.FC[0], Fc)
    N = 2560*1024 #10x OLD N
    if args.N is not None:
        tmpN = parse2int(args.N[0])
        if tmpN is not None:
            N = tmpN * 1024
    FNAME = "toScilab.csv"
    if args.FNAME is not None:
        FNAME = args.FNAME[0]
    fb, fext = os.path.splitext(FNAME)
    FNAME_D = fb + "_decimated" + fext
 
    sdr = rtlsdrBuffer(Fs, Fc, N)
    sdr.read()
    sdr.writeSCI(FNAME, sdr.buffer)

    if args.AUDIO:
        audioplayer = fmRadioDemodulator(sdr.buffer, Fs, Fa*6)
        audioplayer.playAudio()
        audioplayer.writeAudio(fb, fext)

    if args.D is not None:
        D = parse2int(args.D[0])
        if D is not None:
            sdr.decimation(D)
            sdr.writeSCI(FNAME_D, sdr.decimated)

    if args.G is not None:
        if args.G[0].startswith('psd'):
            sdr.plotPSD(sdr.buffer)
        if args.G[0].startswith('decimated'):
            sdr.plotPSD(sdr.decimated)
        if args.G[0].startswith('audio') and args.AUDIO:
            audioplayer.plotAudio()

    sys.exit(0)

if __name__ == "__main__":
    main()