手習ひデジタル信号処理(90) PythonでRTL-SDRデータをデシメーション

Joseph Halfmoon

前回、ソフトウエア無線受信用USBドングルRTL-SDRのデータをPythonで読み取ったものをScilabへ「輸出」。これでScilabでもRTL-SDRのデータを処理できる(リアルタイムじゃないけど)とほくそ笑みました。今回は「輸出」に使ったPythonスクリプトに手を入れデシメーション機能など追加。益々繁盛?

※「手習ひデジタル信号処理」投稿順 Indexはこちら

Pythonからの複素数データの出力方法をチョイとScilabに合わせるだけでRTL-SDRのIQ信号をScilabで読み取れるようになりました。ただ「生データ」を素のままで送るのも芸がない上、サイズもデカいです。今回は、前回作成スクリプトのオプションとしてデシメーション処理した後のデータを出力する機能を追加してみました。これでデータ量は何分の1だかに削減可能。ついでに取得したデータのPSDプロットを表示するオプションも追加してみました。

Scipy使ったデシメーション処理

デシメーションそのものは一定の間隔でデータを間引く処理なのでPythonでは一撃ですが、メンドイのがデシメーション・フィルタです。デシメーションによりサンプリング周波数は 1/n (nは整数)に低下してしまうので、信号帯域を新しいサンプリング周波数の1/2(ナイキスト周波数)以下に制限するためにLPFかける必要があるからです。

ところがPythonでの定番Scipyモジュールの信号処理機能の中にデシメーション・フィルタも「ついでにかけてくれる」機能を発見。これ使えばお楽じゃね。

Signal processing (scipy.signal)

デシメーション機能については以下です。

scipy.signal.decimate

それにしてもデシメーション、恐ろしい言葉なんだが。。。

機能追加したPythonスクリプト

前回スクリプトに対して追加した機能は以下です。

    • “–D f ” オプション、デシメーション・ファクタfを指定します。このオプションがついていたらデシメーション済のScilab可読なファイルも生成します。ファイル名は、”–FNAME” で指定したファイル名が”test.csv”だとすると”test_decimated.csv” のようになります。
    • “–G opt” オプション、opt=psdのとき元のIQ信号のPSD(パワースペクトル密度)プロット、opt=decivatedのときデシメーション済のIQ信号のPSDプロットを表示します。

なお、コマケー話ですが以下変更点です。

    • “–N n” サンプルデータ点数の指定。前回は素のnどおりの点数でしたが、今回からは n * 1024 と解釈するようにいたしました。
#!/usr/bin/python
# coding: utf-8
### @file  pyrtlsdr2sci.py
### @brief save RTL-SDR IQ signal for scilab
###
### @author: Joseph Halfmoon
### @date:   June 7, 2023
"""
Save RTL-SDR IQ signal for scilab v0.2

input
=====
--FS s    sample rate (default=2.048[MHz])
--FC c    center_freq (defalut=80[MHz]
--N  n    number of samples (defalut=256, means 256*1024)
--FNAME x File name to save (defalut=toScilab.csv)
--G  opt  'psd' 'decimated'
--D  f    decimation factor
-V        VERSION
"""
import argparse
import csv
import matplotlib.pyplot as plt
import numpy as np
import os.path
import scipy.signal
import sys
from rtlsdr import RtlSdr

versionSTR = "pyrtlsdr2sci.py v0.2"

def parse2float(st):
    """parse string to float
    
    Returns at Fail: None
    """
    try:
        work = float(st)
    except:
        return None
    return work

def chkVal(arg, dval):
    vl = parse2float(arg)
    if vl is None:
        nLen = len(arg)
        if nLen > 1:
            bdy = arg[0:nLen-1]
            lastChar = arg[nLen-1]
            vl = parse2float(bdy)
        if vl is None:
            vl = dval
    else:
        lastChar = "0"
    return (vl, lastChar)

def freqIN(arg, dval):
    prefixes = 'mkMG'
    vl, lastChar = chkVal(arg, dval)
    pfx = prefixes.find(lastChar)
    if pfx == 0:
        ex = 1e-3
    elif pfx == 1:
        ex = 1e3
    elif pfx == 2:
        ex = 1e6
    elif pfx == 3:
        ex = 1e9
    else:
        ex = 1
    return vl * ex

def parse2int(st):
    """parse string to int
    
    Returns at Fail: None
    """
    try:
        work = int(st)
    except:
        return None
    return work

class rtlsdrBuffer(object):
    """rtlsdr buffer class
    
    read & save RTL-SDR IQ signal
    Fs : sampling Freq[Hz]
    Fc : center Freq[Hz]
    """
    def __init__(self, Fs, Fc, N):
        self.sdr = RtlSdr()
        self.sdr.sample_rate = Fs
        self.sdr.center_freq = Fc
        self.sdr.freq_correction = 60
        self.sdr.gain = 'auto'
        self.N = N
        self.buffer = np.zeros(self.N, dtype=np.complex128)

    def read(self):
        self.buffer = self.sdr.read_samples(self.N)
        self.sdr.close()

    def writeSCI(self, fnam, x):
        self.fnam = fnam
        np.savetxt(self.fnam, x, fmt='%.18e %+.18ej')
    
    def plotPSD(self, x):
        plt.psd(x, NFFT=1024, Fs=self.sdr.sample_rate/1e6, Fc=self.sdr.center_freq/1e6)
        plt.xlabel('Frequency [MHz]')
        plt.ylabel('Relative power [dB]')
        plt.show()

    def decimation(self, N):
        self.decimated = scipy.signal.decimate(self.buffer, N)

def main():
    """main program.
    
    process options
    """
    parser = argparse.ArgumentParser(description=versionSTR)
    parser.add_argument('--FS', nargs=1, help='sample rate (default=2.048[MHz])')
    parser.add_argument('--FC', nargs=1, help='center_freq (defalut=80[MHz])')
    parser.add_argument('--N',  nargs=1, help='number of samples (defalut=256, means 256*1024)')
    parser.add_argument('--FNAME',  nargs=1, help='File name to save (defalut=toScilab.csv)')
    parser.add_argument('--G', nargs=1, help='select options: psd, decimated')
    parser.add_argument('--D', nargs=1, help='decimation factor (defalut=8)')
    parser.add_argument('-V', dest='VERSION', help='Show Version, then exit', action='store_true', default=False)
    args = parser.parse_args()

    if args.VERSION:
        print( versionSTR )
        sys.exit(0)

    Fs = 2.048e6
    if args.FS is not None:
        Fs = freqIN(args.FS[0], Fs)
    Fc = 80e6
    if args.FC is not None:
        Fc = freqIN(args.FC[0], Fc)
    N = 256*1024
    if args.N is not None:
        tmpN = parse2int(args.N[0])
        if tmpN is not None:
            N = tmpN * 1024
    FNAME = "toScilab.csv"
    if args.FNAME is not None:
        FNAME = args.FNAME[0]
    f1, fext = os.path.splitext(FNAME)
    f1 = f1 + "_decimated"
    FNAME_D = f1 + fext

    sdr = rtlsdrBuffer(Fs, Fc, N)
    sdr.read()
    sdr.writeSCI(FNAME, sdr.buffer)

    if args.D is not None:
        D = parse2int(args.D[0])
        if D is not None:
            sdr.decimation(D)
            sdr.writeSCI(FNAME_D, sdr.decimated)

    if args.G is not None:
        if args.G[0].startswith('psd'):
            sdr.plotPSD(sdr.buffer)
        if args.G[0].startswith('decimated'):
            sdr.plotPSD(sdr.decimated)

    sys.exit(0)

if __name__ == "__main__":
    main()
実行結果

以下のコマンドラインから上記スクリプトを実行することでScilab 可読のファイル2つを生成できます。v02_update

また、デシメーション済のデータのPSDプロットも以下のように表示されます。plotDecimatedSignal

元のIQファイルのサンプリング周波数2.048MHzに対して、8:1のデシメーションかけているのでデシメーション後のIQファイルのサンプリング周波数は256kHzです。

手習ひデジタル信号処理(89) PythonからScilabへRTL-SDRデータをお裾分け へ戻る

手習ひデジタル信号処理(91) PythonでFMラジオデータを音声再生、RTL-SDR へ進む