Advanced AM Modulation Analysis with Matplotlib: Real-World Applications and Advanced Techniques
Building upon our foundational guide on AM Wave Generation and Plotting with Matplotlib, this bonus article explores …
Building upon our foundation in AM wave generation and plotting, this advanced guide explores sophisticated signal processing techniques using Python for professional applications in audio processing, biomedical signal analysis, and sensor data processing.
1import numpy as np
2import matplotlib.pyplot as plt
3from scipy import signal, fft
4from scipy.io import wavfile
5import pandas as pd
6from typing import Tuple, Optional, Dict, List
7import warnings
8warnings.filterwarnings('ignore')
9
10class AdvancedSignalProcessor:
11 """Comprehensive signal processing toolkit for real-world applications"""
12
13 def __init__(self, sampling_rate: float = 44100):
14 self.fs = sampling_rate
15 self.nyquist = sampling_rate / 2
16
17 # Filter bank storage
18 self.filter_bank = {}
19
20 # Processing history for analysis
21 self.processing_history = []
22
23 def design_filter(self, filter_type: str, cutoff: float,
24 order: int = 5, filter_name: str = 'default') -> Tuple[np.ndarray, np.ndarray]:
25 """Design digital filters with various types and characteristics"""
26
27 # Normalize cutoff frequency
28 normalized_cutoff = cutoff / self.nyquist
29
30 if filter_type.lower() == 'butterworth_lowpass':
31 b, a = signal.butter(order, normalized_cutoff, btype='low')
32 elif filter_type.lower() == 'butterworth_highpass':
33 b, a = signal.butter(order, normalized_cutoff, btype='high')
34 elif filter_type.lower() == 'butterworth_bandpass':
35 # Cutoff should be [low, high]
36 if isinstance(cutoff, (list, tuple)) and len(cutoff) == 2:
37 normalized_cutoff = [f / self.nyquist for f in cutoff]
38 b, a = signal.butter(order, normalized_cutoff, btype='band')
39 else:
40 raise ValueError("Bandpass filter requires [low_cutoff, high_cutoff]")
41 elif filter_type.lower() == 'chebyshev_lowpass':
42 b, a = signal.cheby1(order, 1, normalized_cutoff, btype='low')
43 elif filter_type.lower() == 'elliptic_lowpass':
44 b, a = signal.ellip(order, 1, 40, normalized_cutoff, btype='low')
45 elif filter_type.lower() == 'bessel_lowpass':
46 b, a = signal.bessel(order, normalized_cutoff, btype='low')
47 else:
48 raise ValueError(f"Unknown filter type: {filter_type}")
49
50 # Store filter coefficients
51 self.filter_bank[filter_name] = {'b': b, 'a': a, 'type': filter_type}
52
53 return b, a
54
55 def apply_filter(self, data: np.ndarray, filter_name: str = 'default',
56 method: str = 'filtfilt') -> np.ndarray:
57 """Apply designed filter to signal data"""
58
59 if filter_name not in self.filter_bank:
60 raise ValueError(f"Filter '{filter_name}' not found in filter bank")
61
62 b = self.filter_bank[filter_name]['b']
63 a = self.filter_bank[filter_name]['a']
64
65 if method == 'filtfilt':
66 # Zero-phase filtering (forward-backward)
67 filtered_data = signal.filtfilt(b, a, data)
68 elif method == 'lfilter':
69 # Standard causal filtering
70 filtered_data = signal.lfilter(b, a, data)
71 else:
72 raise ValueError("Method must be 'filtfilt' or 'lfilter'")
73
74 # Log processing step
75 self.processing_history.append({
76 'operation': 'filter',
77 'filter_name': filter_name,
78 'method': method,
79 'input_shape': data.shape,
80 'output_shape': filtered_data.shape
81 })
82
83 return filtered_data
84
85 def adaptive_filter(self, input_signal: np.ndarray, desired_signal: np.ndarray,
86 filter_length: int = 32, mu: float = 0.01) -> Tuple[np.ndarray, np.ndarray]:
87 """Implement adaptive LMS (Least Mean Squares) filter"""
88
89 N = len(input_signal)
90
91 # Initialize filter weights and output
92 weights = np.zeros(filter_length)
93 output = np.zeros(N)
94 error = np.zeros(N)
95
96 # Create input buffer
97 input_buffer = np.zeros(filter_length)
98
99 for n in range(N):
100 # Update input buffer (shift register)
101 input_buffer[1:] = input_buffer[:-1]
102 input_buffer[0] = input_signal[n]
103
104 # Calculate filter output
105 output[n] = np.dot(weights, input_buffer)
106
107 # Calculate error
108 error[n] = desired_signal[n] - output[n]
109
110 # Update weights using LMS algorithm
111 weights += 2 * mu * error[n] * input_buffer
112
113 return output, error
114
115 def spectral_analysis(self, data: np.ndarray, window: str = 'hann',
116 nperseg: Optional[int] = None) -> Dict:
117 """Comprehensive spectral analysis of signal"""
118
119 # Time-domain statistics
120 time_stats = {
121 'mean': np.mean(data),
122 'std': np.std(data),
123 'rms': np.sqrt(np.mean(data**2)),
124 'peak': np.max(np.abs(data)),
125 'crest_factor': np.max(np.abs(data)) / np.sqrt(np.mean(data**2))
126 }
127
128 # FFT Analysis
129 N = len(data)
130 freqs = np.fft.fftfreq(N, 1/self.fs)
131 fft_data = np.fft.fft(data)
132 magnitude = np.abs(fft_data)
133 phase = np.angle(fft_data)
134
135 # Power Spectral Density using Welch's method
136 if nperseg is None:
137 nperseg = min(N // 8, 1024)
138
139 f_psd, psd = signal.welch(data, fs=self.fs, window=window, nperseg=nperseg)
140
141 # Spectrogram
142 f_spec, t_spec, spectrogram = signal.spectrogram(data, fs=self.fs,
143 window=window, nperseg=nperseg)
144
145 # Spectral features
146 spectral_stats = {
147 'dominant_frequency': freqs[np.argmax(magnitude[:N//2])],
148 'spectral_centroid': np.sum(freqs[:N//2] * magnitude[:N//2]) / np.sum(magnitude[:N//2]),
149 'spectral_bandwidth': np.sqrt(np.sum(((freqs[:N//2] -
150 np.sum(freqs[:N//2] * magnitude[:N//2]) / np.sum(magnitude[:N//2]))**2) *
151 magnitude[:N//2]) / np.sum(magnitude[:N//2])),
152 'spectral_rolloff': self._calculate_spectral_rolloff(freqs[:N//2], magnitude[:N//2]),
153 'zero_crossing_rate': self._calculate_zero_crossings(data)
154 }
155
156 return {
157 'time_stats': time_stats,
158 'spectral_stats': spectral_stats,
159 'frequencies': freqs[:N//2],
160 'magnitude': magnitude[:N//2],
161 'phase': phase[:N//2],
162 'psd_frequencies': f_psd,
163 'psd': psd,
164 'spectrogram_frequencies': f_spec,
165 'spectrogram_times': t_spec,
166 'spectrogram': spectrogram
167 }
168
169 def _calculate_spectral_rolloff(self, freqs: np.ndarray, magnitude: np.ndarray,
170 threshold: float = 0.85) -> float:
171 """Calculate spectral rolloff frequency"""
172 total_energy = np.sum(magnitude)
173 cumulative_energy = np.cumsum(magnitude)
174 rolloff_index = np.where(cumulative_energy >= threshold * total_energy)[0]
175
176 if len(rolloff_index) > 0:
177 return freqs[rolloff_index[0]]
178 else:
179 return freqs[-1]
180
181 def _calculate_zero_crossings(self, data: np.ndarray) -> float:
182 """Calculate zero crossing rate"""
183 zero_crossings = np.sum(np.diff(np.sign(data)) != 0)
184 return zero_crossings / len(data)
185
186 def envelope_detection(self, data: np.ndarray, method: str = 'hilbert') -> np.ndarray:
187 """Extract signal envelope using various methods"""
188
189 if method == 'hilbert':
190 # Hilbert transform for analytical signal
191 analytic_signal = signal.hilbert(data)
192 envelope = np.abs(analytic_signal)
193
194 elif method == 'peak':
195 # Peak envelope detection
196 peaks, _ = signal.find_peaks(np.abs(data))
197 envelope = np.interp(np.arange(len(data)), peaks, np.abs(data[peaks]))
198
199 elif method == 'moving_average':
200 # Moving average of absolute values
201 window_size = int(0.01 * self.fs) # 10ms window
202 envelope = np.convolve(np.abs(data),
203 np.ones(window_size)/window_size, mode='same')
204
205 else:
206 raise ValueError("Method must be 'hilbert', 'peak', or 'moving_average'")
207
208 return envelope
209
210 def cross_correlation_analysis(self, signal1: np.ndarray, signal2: np.ndarray) -> Dict:
211 """Perform cross-correlation analysis between two signals"""
212
213 # Normalize signals
214 signal1_norm = (signal1 - np.mean(signal1)) / np.std(signal1)
215 signal2_norm = (signal2 - np.mean(signal2)) / np.std(signal2)
216
217 # Cross-correlation
218 correlation = signal.correlate(signal1_norm, signal2_norm, mode='full')
219 lags = signal.correlation_lags(len(signal1_norm), len(signal2_norm), mode='full')
220
221 # Find peak correlation and corresponding lag
222 max_corr_idx = np.argmax(np.abs(correlation))
223 max_correlation = correlation[max_corr_idx]
224 optimal_lag = lags[max_corr_idx]
225
226 # Time delay in seconds
227 time_delay = optimal_lag / self.fs
228
229 return {
230 'correlation': correlation,
231 'lags': lags,
232 'max_correlation': max_correlation,
233 'optimal_lag': optimal_lag,
234 'time_delay_seconds': time_delay,
235 'lag_times': lags / self.fs
236 }
237
238# Biomedical Signal Processing Specialization
239class BiomedicalSignalProcessor(AdvancedSignalProcessor):
240 """Specialized signal processor for biomedical applications"""
241
242 def __init__(self, sampling_rate: float = 1000): # Typical for bio signals
243 super().__init__(sampling_rate)
244
245 # Biomedical signal frequency bands
246 self.frequency_bands = {
247 'eeg_delta': (0.5, 4),
248 'eeg_theta': (4, 8),
249 'eeg_alpha': (8, 13),
250 'eeg_beta': (13, 30),
251 'eeg_gamma': (30, 100),
252 'ecg_baseline': (0.05, 0.5),
253 'ecg_main': (0.5, 40),
254 'emg_low': (10, 100),
255 'emg_high': (100, 500)
256 }
257
258 def ecg_preprocessing(self, ecg_data: np.ndarray) -> Dict:
259 """Comprehensive ECG signal preprocessing"""
260
261 # Remove baseline wander (high-pass filter)
262 self.design_filter('butterworth_highpass', 0.5, order=4, filter_name='baseline_removal')
263 ecg_detrended = self.apply_filter(ecg_data, 'baseline_removal')
264
265 # Remove power line interference (notch filter)
266 # Design notch filter at 50Hz or 60Hz
267 notch_freq = 50 # Change to 60 for US
268 Q = 30 # Quality factor
269 b_notch, a_notch = signal.iirnotch(notch_freq, Q, self.fs)
270 ecg_clean = signal.filtfilt(b_notch, a_notch, ecg_detrended)
271
272 # Low-pass filter to remove high-frequency noise
273 self.design_filter('butterworth_lowpass', 40, order=4, filter_name='noise_removal')
274 ecg_filtered = self.apply_filter(ecg_clean, 'noise_removal')
275
276 return {
277 'original': ecg_data,
278 'detrended': ecg_detrended,
279 'notch_filtered': ecg_clean,
280 'final_filtered': ecg_filtered
281 }
282
283 def detect_qrs_complexes(self, ecg_data: np.ndarray) -> Dict:
284 """QRS complex detection using Pan-Tompkins algorithm"""
285
286 # Preprocessing
287 ecg_processed = self.ecg_preprocessing(ecg_data)['final_filtered']
288
289 # Derivative filter
290 derivative = np.diff(ecg_processed)
291
292 # Squaring
293 squared = derivative ** 2
294
295 # Moving average integration
296 window_size = int(0.08 * self.fs) # 80ms window
297 integrated = np.convolve(squared, np.ones(window_size), mode='same')
298
299 # Peak detection with adaptive threshold
300 peaks, properties = signal.find_peaks(integrated,
301 height=np.max(integrated) * 0.3,
302 distance=int(0.2 * self.fs)) # 200ms refractory
303
304 # Calculate heart rate
305 if len(peaks) > 1:
306 rr_intervals = np.diff(peaks) / self.fs # in seconds
307 heart_rate = 60 / np.mean(rr_intervals) # BPM
308 heart_rate_variability = np.std(rr_intervals) * 1000 # in ms
309 else:
310 heart_rate = 0
311 heart_rate_variability = 0
312
313 return {
314 'qrs_peaks': peaks,
315 'rr_intervals': rr_intervals if len(peaks) > 1 else np.array([]),
316 'heart_rate_bpm': heart_rate,
317 'hrv_ms': heart_rate_variability,
318 'processed_signal': integrated
319 }
320
321 def eeg_band_power_analysis(self, eeg_data: np.ndarray) -> Dict:
322 """Analyze power in different EEG frequency bands"""
323
324 band_powers = {}
325
326 for band_name, (low_freq, high_freq) in self.frequency_bands.items():
327 if not band_name.startswith('eeg_'):
328 continue
329
330 # Design bandpass filter for this frequency band
331 self.design_filter('butterworth_bandpass', [low_freq, high_freq],
332 order=4, filter_name=f'{band_name}_filter')
333
334 # Filter EEG data
335 filtered_eeg = self.apply_filter(eeg_data, f'{band_name}_filter')
336
337 # Calculate band power
338 band_power = np.mean(filtered_eeg ** 2)
339
340 band_powers[band_name.replace('eeg_', '')] = {
341 'power': band_power,
342 'relative_power': 0, # Will be calculated after all bands
343 'filtered_signal': filtered_eeg
344 }
345
346 # Calculate relative power
347 total_power = sum([bp['power'] for bp in band_powers.values()])
348 for band in band_powers:
349 band_powers[band]['relative_power'] = band_powers[band]['power'] / total_power
350
351 return band_powers
352
353# Audio Signal Processing Specialization
354class AudioSignalProcessor(AdvancedSignalProcessor):
355 """Specialized signal processor for audio applications"""
356
357 def __init__(self, sampling_rate: float = 44100):
358 super().__init__(sampling_rate)
359
360 def load_audio_file(self, filepath: str) -> Tuple[np.ndarray, int]:
361 """Load audio file and return signal data"""
362 try:
363 sample_rate, audio_data = wavfile.read(filepath)
364
365 # Convert to float and normalize
366 if audio_data.dtype == np.int16:
367 audio_data = audio_data.astype(np.float32) / 32768.0
368 elif audio_data.dtype == np.int32:
369 audio_data = audio_data.astype(np.float32) / 2147483648.0
370
371 # Handle stereo to mono conversion
372 if len(audio_data.shape) > 1:
373 audio_data = np.mean(audio_data, axis=1)
374
375 self.fs = sample_rate
376 return audio_data, sample_rate
377
378 except Exception as e:
379 raise ValueError(f"Could not load audio file: {e}")
380
381 def extract_audio_features(self, audio_data: np.ndarray,
382 frame_size: int = 1024, hop_size: int = 512) -> Dict:
383 """Extract comprehensive audio features"""
384
385 features = {}
386
387 # Spectral analysis
388 spectral_data = self.spectral_analysis(audio_data)
389
390 # MFCCs (Mel-frequency cepstral coefficients)
391 mfccs = self._calculate_mfccs(audio_data, frame_size)
392 features['mfccs'] = mfccs
393
394 # Tempo and beat tracking
395 tempo, beats = self._estimate_tempo(audio_data)
396 features['tempo'] = tempo
397 features['beats'] = beats
398
399 # Pitch estimation
400 pitch = self._estimate_pitch(audio_data)
401 features['pitch'] = pitch
402
403 # Spectral features
404 features.update({
405 'spectral_centroid': spectral_data['spectral_stats']['spectral_centroid'],
406 'spectral_bandwidth': spectral_data['spectral_stats']['spectral_bandwidth'],
407 'spectral_rolloff': spectral_data['spectral_stats']['spectral_rolloff'],
408 'zero_crossing_rate': spectral_data['spectral_stats']['zero_crossing_rate'],
409 'rms_energy': spectral_data['time_stats']['rms']
410 })
411
412 return features
413
414 def _calculate_mfccs(self, audio_data: np.ndarray, frame_size: int = 1024) -> np.ndarray:
415 """Calculate Mel-frequency cepstral coefficients"""
416
417 # This is a simplified implementation
418 # In practice, you would use librosa or similar library
419
420 # Pre-emphasis
421 pre_emphasis = 0.97
422 emphasized_signal = np.append(audio_data[0],
423 audio_data[1:] - pre_emphasis * audio_data[:-1])
424
425 # Windowing
426 frame_count = len(emphasized_signal) // frame_size
427 frames = emphasized_signal[:frame_count * frame_size].reshape(frame_count, frame_size)
428
429 # Apply Hamming window
430 hamming_window = np.hamming(frame_size)
431 windowed_frames = frames * hamming_window
432
433 # FFT and Power Spectrum
434 fft_frames = np.fft.fft(windowed_frames, axis=1)
435 power_spectrum = np.abs(fft_frames) ** 2
436
437 # Mel Filter Bank (simplified - would need proper mel scale implementation)
438 # Return first 13 coefficients as placeholder
439 return power_spectrum[:, :13]
440
441 def _estimate_tempo(self, audio_data: np.ndarray) -> Tuple[float, np.ndarray]:
442 """Simple tempo estimation"""
443
444 # Calculate onset strength
445 hop_length = 512
446 frame_length = 2048
447
448 # Simple energy-based onset detection
449 energy = []
450 for i in range(0, len(audio_data) - frame_length, hop_length):
451 frame = audio_data[i:i + frame_length]
452 energy.append(np.sum(frame ** 2))
453
454 energy = np.array(energy)
455
456 # Simple peak-based beat detection
457 peaks, _ = signal.find_peaks(energy, distance=int(self.fs / hop_length * 0.1))
458
459 if len(peaks) > 1:
460 tempo = 60 * self.fs / hop_length / np.mean(np.diff(peaks))
461 else:
462 tempo = 120 # Default
463
464 return tempo, peaks
465
466 def _estimate_pitch(self, audio_data: np.ndarray) -> np.ndarray:
467 """Simple pitch estimation using autocorrelation"""
468
469 # Autocorrelation-based pitch detection
470 autocorr = signal.correlate(audio_data, audio_data, mode='full')
471 autocorr = autocorr[autocorr.size // 2:]
472
473 # Find peaks in autocorrelation
474 min_pitch_samples = int(self.fs / 500) # 500 Hz max
475 max_pitch_samples = int(self.fs / 50) # 50 Hz min
476
477 search_range = autocorr[min_pitch_samples:max_pitch_samples]
478 if len(search_range) > 0:
479 pitch_sample = np.argmax(search_range) + min_pitch_samples
480 pitch_hz = self.fs / pitch_sample
481 else:
482 pitch_hz = 0
483
484 return pitch_hz
485
486# Visualization utilities
487class SignalVisualizer:
488 """Advanced signal visualization toolkit"""
489
490 @staticmethod
491 def plot_signal_analysis(data: np.ndarray, analysis_results: Dict,
492 sampling_rate: float, title: str = "Signal Analysis"):
493 """Create comprehensive signal analysis plots"""
494
495 fig, axes = plt.subplots(3, 2, figsize=(15, 12))
496 fig.suptitle(title, fontsize=16)
497
498 # Time domain plot
499 t = np.arange(len(data)) / sampling_rate
500 axes[0, 0].plot(t, data)
501 axes[0, 0].set_title('Time Domain Signal')
502 axes[0, 0].set_xlabel('Time (s)')
503 axes[0, 0].set_ylabel('Amplitude')
504 axes[0, 0].grid(True)
505
506 # Frequency domain plot
507 axes[0, 1].plot(analysis_results['frequencies'],
508 20 * np.log10(analysis_results['magnitude'] + 1e-10))
509 axes[0, 1].set_title('Frequency Domain (dB)')
510 axes[0, 1].set_xlabel('Frequency (Hz)')
511 axes[0, 1].set_ylabel('Magnitude (dB)')
512 axes[0, 1].grid(True)
513
514 # Power Spectral Density
515 axes[1, 0].semilogy(analysis_results['psd_frequencies'], analysis_results['psd'])
516 axes[1, 0].set_title('Power Spectral Density')
517 axes[1, 0].set_xlabel('Frequency (Hz)')
518 axes[1, 0].set_ylabel('PSD')
519 axes[1, 0].grid(True)
520
521 # Spectrogram
522 im = axes[1, 1].imshow(10 * np.log10(analysis_results['spectrogram'] + 1e-10),
523 aspect='auto', origin='lower',
524 extent=[analysis_results['spectrogram_times'][0],
525 analysis_results['spectrogram_times'][-1],
526 analysis_results['spectrogram_frequencies'][0],
527 analysis_results['spectrogram_frequencies'][-1]])
528 axes[1, 1].set_title('Spectrogram (dB)')
529 axes[1, 1].set_xlabel('Time (s)')
530 axes[1, 1].set_ylabel('Frequency (Hz)')
531 plt.colorbar(im, ax=axes[1, 1])
532
533 # Phase plot
534 axes[2, 0].plot(analysis_results['frequencies'],
535 np.unwrap(analysis_results['phase']))
536 axes[2, 0].set_title('Phase Spectrum')
537 axes[2, 0].set_xlabel('Frequency (Hz)')
538 axes[2, 0].set_ylabel('Phase (rad)')
539 axes[2, 0].grid(True)
540
541 # Signal statistics
542 stats_text = f"""
543 Mean: {analysis_results['time_stats']['mean']:.4f}
544 Std: {analysis_results['time_stats']['std']:.4f}
545 RMS: {analysis_results['time_stats']['rms']:.4f}
546 Peak: {analysis_results['time_stats']['peak']:.4f}
547 Crest Factor: {analysis_results['time_stats']['crest_factor']:.2f}
548
549 Dominant Freq: {analysis_results['spectral_stats']['dominant_frequency']:.1f} Hz
550 Spectral Centroid: {analysis_results['spectral_stats']['spectral_centroid']:.1f} Hz
551 Zero Crossing Rate: {analysis_results['spectral_stats']['zero_crossing_rate']:.4f}
552 """
553 axes[2, 1].text(0.1, 0.5, stats_text, fontsize=10, verticalalignment='center')
554 axes[2, 1].set_title('Signal Statistics')
555 axes[2, 1].axis('off')
556
557 plt.tight_layout()
558 return fig
559
560# Example usage
561if __name__ == "__main__":
562 # Create test signal
563 fs = 1000 # Sampling frequency
564 t = np.linspace(0, 2, 2 * fs, False)
565
566 # Complex test signal with multiple components
567 signal_data = (np.sin(2 * np.pi * 50 * t) +
568 0.5 * np.sin(2 * np.pi * 120 * t) +
569 0.2 * np.random.normal(0, 1, len(t)))
570
571 # Initialize processor
572 processor = AdvancedSignalProcessor(fs)
573
574 # Perform analysis
575 analysis = processor.spectral_analysis(signal_data)
576
577 # Create visualization
578 visualizer = SignalVisualizer()
579 fig = visualizer.plot_signal_analysis(signal_data, analysis, fs,
580 "Advanced Signal Processing Example")
581 plt.show()
This advanced signal processing framework provides professional-grade tools for analyzing complex signals across multiple domains. The modular architecture supports biomedical, audio, and general signal processing applications with comprehensive analysis and visualization capabilities.
For foundational signal processing concepts, see our AM wave generation tutorial. For related data analysis techniques, explore our data visualization guides.