# This is One of the Solutions - Full Content > Complete blog content by Prabeesh Keezhathra. Each section below > is one post with its title, URL, date, and full Markdown body. > Generated: 2026-04-26 > Posts: 27 --- ## ATtiny2313 USBtinyISP Notes - URL: https://blog.prabeeshk.com/bonus/attiny2313-usb-programming-guide/ - Date: 2025-08-22 - Keywords: ATtiny2313, USBtinyISP, avrdude, USB programmer A short follow-up to [How to Build a USBtinyISP](/blog/2012/07/04/simplest-and-low-cost-usb-avr/). The original post covers the build; these are two things I end up doing often enough to write down. For anything fuse-related, the authoritative reference is the ATtiny2313 datasheet from Microchip; don't take anecdotal values from the internet at face value. ## Reading current fuses Before you change fuses, dump what's actually on the chip: ```bash avrdude -c usbasp -p t2313 \ -U hfuse:r:-:h \ -U lfuse:r:-:h ``` `:r:-:h` means read, output to stdout, format as hex. The ATtiny2313 has two fuse bytes, low and high. ## Won't enumerate over USB If plugging in the freshly-built USBtinyISP produces nothing (no entry in `lsusb` on Linux, no device popup on Windows), walk this list: 1. Measure 5 V across the ATtiny2313 VCC and GND pins. If it's well below 5 V, the USB cable, a blown polyfuse, or the regulator is suspect. 2. Check the crystal is actually oscillating. With a scope, probe XTAL1 or XTAL2. Without a scope, swap the crystal and the two 22 pF loading caps - a cracked or wrong-value cap is the most common silent failure. 3. Confirm the firmware was flashed with the correct fuses for external crystal operation. The internal RC oscillator is too slow and too imprecise for the USB timing the USBtinyISP firmware relies on. 4. Check D+ and D- aren't swapped. On a hand-wired or perfboard build this is an easy mistake. 5. Try a different cable and a different host port. Cheap USB cables and flaky hubs are responsible for more "dead board" reports than actual dead boards. ## A useful avrdude flag ```bash # Back up the full flash of a working board before touching fuses. avrdude -c usbtiny -p t2313 -U flash:r:backup.hex:i ``` For everything else, the [Adafruit USBtinyISP guide](https://learn.adafruit.com/usbtinyisp) is a good second reference. --- ## Install Apache Spark 3.5 on Linux (Ubuntu, CentOS) - URL: https://blog.prabeeshk.com/blog/2024/11/26/install-apache-spark-3-on-linux/ - Date: 2024-11-26 - Summary: To install Apache Spark 3.5 on Linux: install OpenJDK 17 and Python 3.8+, download the Spark binary from archive.apache.org/dist/spark, extract to /opt, set SPARK_HOME and PATH environment variables, then verify with spark-shell or pyspark. Optionally configure a standalone cluster with start-master.sh and start-worker.sh. - Keywords: install Apache Spark, Spark installation Linux, Apache Spark Ubuntu, PySpark installation To install Apache Spark 3.5 on Linux, install OpenJDK 17, download the Spark 3.5 binary from `archive.apache.org/dist/spark`, extract it to `/opt`, set `SPARK_HOME` and `PATH`, then verify with `spark-shell`. The full process takes about 15 minutes on a fresh Ubuntu or CentOS machine. This is a walkthrough of installing Apache Spark 3.5 on modern Linux, from prerequisites through a working standalone cluster. Earlier versions are covered in [Install Apache Spark 1.0 on Ubuntu 14.04](/blog/2014/10/31/install-apache-spark-on-ubuntu-14-dot-04/) and [Install Apache Spark 2 on Ubuntu 16.04 and macOS](/blog/2016/12/07/install-apache-spark-2-on-ubuntu-16-dot-04-and-mac-os/). ## Prerequisites | Requirement | Version / Recommendation | | --- | --- | | Java | OpenJDK 17 (Spark 3.5 supports 8, 11, 17) | | Python | 3.8+ for PySpark | | Memory | 4 GB minimum, 8 GB+ for comfortable work | | Storage | 10 GB free for install + logs | | OS | Ubuntu 20.04+, CentOS 7+, or equivalent | ### Install Java ```bash # Ubuntu/Debian sudo apt update sudo apt install openjdk-17-jdk # CentOS/RHEL sudo yum install java-17-openjdk-devel # Verify installation java -version javac -version ``` Set JAVA_HOME environment variable: ```bash # Add to ~/.bashrc or ~/.profile echo 'export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64' >> ~/.bashrc echo 'export PATH=$PATH:$JAVA_HOME/bin' >> ~/.bashrc source ~/.bashrc # Verify echo $JAVA_HOME ``` **Install Python and essential packages** (for PySpark): ```bash # Ubuntu/Debian sudo apt install python3 python3-pip python3-dev # CentOS/RHEL sudo yum install python3 python3-pip python3-devel # Install essential Python packages pip3 install py4j pandas numpy matplotlib ``` ## Install Spark ### Option 1: binary distribution (recommended) Download the latest distribution: ```bash # Navigate to your preferred installation directory cd /opt # Download Spark 3.5+ (check spark.apache.org for latest version) sudo wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz # Alternative: Download pre-built for specific Hadoop version # sudo wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3-scala2.13.tgz ``` Extract and set up the files: ```bash # Extract the archive sudo tar -xzf spark-3.5.0-bin-hadoop3.tgz # Create symbolic link for easier management sudo ln -sf spark-3.5.0-bin-hadoop3 spark # Set proper permissions sudo chown -R $USER:$USER /opt/spark-3.5.0-bin-hadoop3 ``` Configure environment variables: ```bash # Add to ~/.bashrc echo 'export SPARK_HOME=/opt/spark' >> ~/.bashrc echo 'export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin' >> ~/.bashrc echo 'export PYSPARK_PYTHON=python3' >> ~/.bashrc echo 'export PYSPARK_DRIVER_PYTHON=python3' >> ~/.bashrc # Apply changes source ~/.bashrc ``` ### Option 2: build from source For users needing specific configurations or latest development features: ```bash # Install build dependencies sudo apt install git maven scala # Clone Spark repository git clone https://github.com/apache/spark.git cd spark # Build with specific Hadoop version ./build/mvn -DskipTests clean package -Phadoop-3.3 -Dhadoop.version=3.3.4 # This process takes 30-60 minutes depending on your system ``` ## Configuration ### Basic configuration Create Spark configuration directory: ```bash cd $SPARK_HOME/conf cp spark-defaults.conf.template spark-defaults.conf cp spark-env.sh.template spark-env.sh ``` Essential spark-defaults.conf settings: ```bash # Edit spark-defaults.conf nano spark-defaults.conf # Add these essential configurations: spark.master spark://localhost:7077 spark.eventLog.enabled true spark.eventLog.dir /tmp/spark-events spark.history.fs.logDirectory /tmp/spark-events spark.sql.warehouse.dir /tmp/spark-warehouse # Performance optimizations spark.executor.memory 2g spark.executor.cores 2 spark.executor.instances 2 spark.driver.memory 1g spark.driver.maxResultSize 1g # Enable dynamic allocation spark.dynamicAllocation.enabled true spark.dynamicAllocation.minExecutors 1 spark.dynamicAllocation.maxExecutors 4 # Kryo serialization for better performance spark.serializer org.apache.spark.serializer.KryoSerializer ``` Configure spark-env.sh: ```bash # Edit spark-env.sh nano spark-env.sh # Add essential environment variables: export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 export SPARK_MASTER_HOST=localhost export SPARK_WORKER_MEMORY=2g export SPARK_WORKER_CORES=2 export SPARK_WORKER_INSTANCES=1 ``` ## Smoke tests ### Basic functionality **Test 1: Spark Shell (Scala)** ```bash # Start Spark shell spark-shell # Run in Spark shell: scala> val data = 1 to 10000 scala> val distData = sc.parallelize(data) scala> distData.filter(_ < 10).collect() scala> :quit ``` **Test 2: PySpark Shell** ```bash # Start PySpark shell pyspark # Run in PySpark: >>> data = range(1, 10000) >>> distData = sc.parallelize(data) >>> distData.filter(lambda x: x < 10).collect() >>> exit() ``` **Test 3: Submit Application** ```bash # Run the classic Pi estimation example spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local[2] \ $SPARK_HOME/examples/jars/spark-examples_2.12-3.5.0.jar \ 10 ``` Expected output: `Pi is roughly 3.141592653589793` ### Performance validation tests Test with different configurations: ```bash # Test with local cluster spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local[4] \ --driver-memory 2g \ --executor-memory 1g \ $SPARK_HOME/examples/jars/spark-examples_2.12-3.5.0.jar \ 100 ``` ## Cluster setup (optional but recommended) ### Standalone cluster configuration Start Spark Master: ```bash # Start master node start-master.sh # Verify master is running # Open browser to http://localhost:8080 ``` Start Spark Worker(s): ```bash # Start worker node start-worker.sh spark://localhost:7077 # For multiple workers on same machine start-worker.sh -c 1 -m 1g spark://localhost:7077 start-worker.sh -c 1 -m 1g spark://localhost:7077 ``` Test cluster deployment: ```bash # Submit job to cluster spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://localhost:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ $SPARK_HOME/examples/jars/spark-examples_2.12-3.5.0.jar \ 100 ``` ## Hadoop integration ### Working with HDFS For existing Hadoop clusters: ```bash # Ensure Spark is built with correct Hadoop version # Check your Hadoop version hadoop version # Download Spark pre-built for your Hadoop version # Example for Hadoop 3.3: wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz ``` Test HDFS integration: ```bash # Start spark-shell with HDFS access spark-shell # Read from HDFS scala> val textFile = sc.textFile("hdfs://namenode:9000/path/to/input.txt") scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) scala> wordCounts.saveAsTextFile("hdfs://namenode:9000/path/to/output") ``` ## Development environment setup ### IDE integration IntelliJ IDEA setup: 1. Install Scala plugin 2. Create new SBT project 3. Add Spark dependencies to build.sbt: ```scala name := "SparkApplication" version := "0.1" scalaVersion := "2.12.17" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.5.0", "org.apache.spark" %% "spark-sql" % "3.5.0", "org.apache.spark" %% "spark-mllib" % "3.5.0" ) ``` VS Code setup for PySpark: ```bash # Install Python extension # Install Pylint for Python linting pip3 install pylint # Create virtual environment for Spark projects python3 -m venv spark-env source spark-env/bin/activate pip install pyspark pandas numpy jupyter ``` ## Production deployment considerations ### Security configuration ```bash # Enable authentication spark.authenticate true spark.authenticate.secret yourSecretKey # SSL configuration spark.ssl.enabled true spark.ssl.keyStore /path/to/keystore.jks spark.ssl.keyStorePassword yourKeystorePassword ``` ### Monitoring and logging ```bash # Enable history server start-history-server.sh # Access history UI at http://localhost:18080 # Configure log levels cp log4j.properties.template log4j.properties # Edit log4j.properties for appropriate log levels ``` ### Resource management ```bash # For YARN integration export HADOOP_CONF_DIR=/path/to/hadoop/conf spark-submit --master yarn --deploy-mode cluster your-application.jar # For Kubernetes deployment spark-submit \ --master k8s://https://kubernetes-master-url:443 \ --deploy-mode cluster \ --conf spark.kubernetes.container.image=spark-py:latest \ your-application.py ``` ## Troubleshooting common issues ### Memory issues ```bash # Increase driver memory spark-submit --driver-memory 4g your-app.jar # Configure executor memory spark-submit --executor-memory 2g your-app.jar ``` ### Java version conflicts ```bash # Ensure consistent Java version update-alternatives --config java export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 ``` ### Network binding issues ```bash # Configure specific network interface export SPARK_MASTER_HOST=192.168.1.100 export SPARK_LOCAL_IP=192.168.1.100 ``` ## Next steps - [Performance tuning](/blog/2023/01/06/performance-tuning-on-apache-spark/) for a working cluster - [Standalone Scala applications](/blog/2014/04/01/a-standalone-spark-application-in-scala/) as a starter template - [PySpark with Docker](/blog/2015/06/19/pyspark-notebook-with-docker/) for a reproducible notebook environment ## Maintenance ```bash # Clean up event logs periodically find /tmp/spark-events -type f -mtime +30 -delete # Master UI: http://localhost:8080 # History server UI: http://localhost:18080 ``` ## Frequently asked questions ### What Java version does Apache Spark 3.5 require? Apache Spark 3.5 officially supports Java 8, 11, and 17. OpenJDK 17 is the recommended choice for new installs because it is actively maintained and has the best performance characteristics for Spark workloads. ### How do I install PySpark on Ubuntu? Install Python 3.8+ with `sudo apt install python3 python3-pip`, then install Spark as described above and set `PYSPARK_PYTHON=python3`. You can also install the `pyspark` pip package (`pip install pyspark`) for local development without a full Spark installation. ### Can I run Spark on a single machine without a cluster? Yes. Use `--master local[*]` when submitting jobs, which runs Spark in local mode using all available CPU cores. This is how most developers test and prototype before deploying to a cluster. ### How do I check which version of Spark is installed? Run `spark-shell --version` or `pyspark --version` from the terminal. Both print the Spark version, Scala version, and Java version being used. ### What is the difference between standalone, YARN, and Kubernetes deploy modes? Standalone mode is Spark's built-in cluster manager, simplest to set up. YARN integrates with an existing Hadoop cluster and shares resources with other Hadoop workloads. Kubernetes runs Spark executors as pods, which is the standard for cloud-native deployments. Choose standalone for learning and small teams, YARN for Hadoop shops, Kubernetes for containerized environments. --- ## Advanced AM Modulation Analysis with Matplotlib - URL: https://blog.prabeeshk.com/bonus/advanced-am-modulation-analysis-with-matplotlib/ - Date: 2024-01-25 - Keywords: AM modulation analysis, matplotlib signal processing, FFT in Python, sideband analysis, modulation index This post builds on [AM Wave Generation and Plotting with Matplotlib](/blog/2011/09/25/am-plot-matplotlib/). Once you can generate an AM waveform, the interesting question is: is it any good? That means measuring modulation index and inspecting the spectrum. Below is a small `AdvancedAMAnalyzer` class that handles both, plus a worked example of how sideband power redistributes with the modulation index. ## A reusable analyzer class The class holds the sample rate and duration, and exposes three operations: generate a signal, calculate the modulation index from the envelope, and analyze the sideband spectrum via FFT. ```python import matplotlib.pyplot as plt import numpy as np from scipy import signal from scipy.fft import fft, fftfreq class AdvancedAMAnalyzer: """AM signal analysis: modulation index and sideband spectrum.""" def __init__(self, sampling_rate=10000, duration=1.0): self.sampling_rate = sampling_rate self.duration = duration self.time = np.linspace(0, duration, int(sampling_rate * duration)) def generate_am_signal(self, carrier_freq, message_freq, modulation_index, message_amplitude=1.0, carrier_amplitude=1.0, noise_level=0.0): """Generate an AM signal with optional additive Gaussian noise.""" carrier = carrier_amplitude * np.sin(2 * np.pi * carrier_freq * self.time) message = message_amplitude * np.sin(2 * np.pi * message_freq * self.time) am_signal = carrier * (1 + modulation_index * message) if noise_level > 0: am_signal += noise_level * np.random.normal(0, 1, len(self.time)) return am_signal, carrier, message def calculate_modulation_index(self, am_signal): """Estimate modulation index from the signal envelope (Hilbert transform).""" envelope = np.abs(signal.hilbert(am_signal)) max_env = np.max(envelope) min_env = np.min(envelope) m = (max_env - min_env) / (max_env + min_env) return m, envelope def analyze_sidebands(self, am_signal, carrier_freq, message_freq): """FFT-based power at the carrier and the two sideband frequencies.""" fft_signal = fft(am_signal) freqs = fftfreq(len(self.time), 1 / self.sampling_rate) def power_at(f): idx = np.argmin(np.abs(freqs - f)) return np.abs(fft_signal[idx]) ** 2 carrier_power = power_at(carrier_freq) upper_sb_power = power_at(carrier_freq + message_freq) lower_sb_power = power_at(carrier_freq - message_freq) return { 'frequencies': freqs, 'fft_signal': fft_signal, 'carrier_power': carrier_power, 'upper_sideband_power': upper_sb_power, 'lower_sideband_power': lower_sb_power, 'total_power': carrier_power + upper_sb_power + lower_sb_power, } analyzer = AdvancedAMAnalyzer(sampling_rate=10000, duration=1.0) ``` The envelope is recovered with `scipy.signal.hilbert`, which returns the analytic signal; the absolute value of that is the amplitude envelope. The modulation index formula `(max - min) / (max + min)` then falls straight out of the definition of AM: the envelope varies between `A(1 - m)` and `A(1 + m)`. For the spectrum, a plain `fft` + `fftfreq` pair gives you the frequency bins. Pick the bins closest to the carrier and the two sidebands (`f_c ± f_m`) and square the magnitudes to get power. ## Modulation index examples The modulation index `m` controls how deeply the message modulates the carrier. Below `m = 1.0` the envelope stays positive; at `m = 1.0` it just touches zero; above `m = 1.0` the envelope flips sign and you get phase reversals that most receivers won't demodulate cleanly. The function below plots several indices side by side, showing both the time-domain envelope and the frequency spectrum. ```python def analyze_modulation_depth_examples(): modulation_indices = [0.25, 0.5, 0.75, 1.0, 1.25] carrier_freq, message_freq = 1000, 100 fig, axes = plt.subplots(len(modulation_indices), 2, figsize=(15, 12)) fig.suptitle('AM Signal Analysis: Different Modulation Indices', fontsize=16) for i, mi in enumerate(modulation_indices): am_signal, _, _ = analyzer.generate_am_signal(carrier_freq, message_freq, mi) calculated_mi, envelope = analyzer.calculate_modulation_index(am_signal) # Time domain axes[i, 0].plot(analyzer.time[:1000], am_signal[:1000], 'b-', label=f'AM Signal (MI={mi:.2f})') axes[i, 0].plot(analyzer.time[:1000], envelope[:1000], 'r--', label=f'Envelope (Calculated MI={calculated_mi:.3f})') axes[i, 0].set_title(f'Modulation Index: {mi}') axes[i, 0].set_xlabel('Time (s)') axes[i, 0].set_ylabel('Amplitude') axes[i, 0].legend() axes[i, 0].grid(True) # Frequency domain (positive frequencies only) sb = analyzer.analyze_sidebands(am_signal, carrier_freq, message_freq) freqs = sb['frequencies'] mag = np.abs(sb['fft_signal']) mask = freqs >= 0 axes[i, 1].plot(freqs[mask], mag[mask], 'g-') axes[i, 1].set_title(f'Frequency Spectrum (MI={mi})') axes[i, 1].set_xlabel('Frequency (Hz)') axes[i, 1].set_ylabel('Magnitude') axes[i, 1].set_xlim(0, 2000) axes[i, 1].grid(True) plt.tight_layout() plt.show() analyze_modulation_depth_examples() ``` Two things to watch in the output: for `m ≤ 1` the red envelope curve stays above zero, and the spectrum shows a clean carrier with two small sidebands. For `m > 1` (overmodulation), the envelope crosses zero and the spectrum gains extra harmonic components that aren't in the original message, visible audio distortion on a real receiver. ## Sideband power distribution In AM, the total transmitted power splits between the carrier and the two sidebands. The carrier carries no information, so the share of power in the sidebands is a direct measure of transmission efficiency. Theory says the sideband power scales with `m² / 2` relative to carrier power; this function sweeps the modulation index and plots the result. ```python def sideband_power_analysis(): carrier_freq, message_freq = 1000, 100 modulation_indices = np.linspace(0.1, 1.5, 20) carrier_powers = [] sideband_powers = [] total_powers = [] for mi in modulation_indices: am_signal, _, _ = analyzer.generate_am_signal(carrier_freq, message_freq, mi) a = analyzer.analyze_sidebands(am_signal, carrier_freq, message_freq) carrier_powers.append(a['carrier_power']) sideband_powers.append(a['upper_sideband_power'] + a['lower_sideband_power']) total_powers.append(a['total_power']) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5)) # Power vs modulation index ax1.plot(modulation_indices, carrier_powers, 'b-', label='Carrier', linewidth=2) ax1.plot(modulation_indices, sideband_powers, 'r-', label='Sidebands', linewidth=2) ax1.plot(modulation_indices, total_powers, 'g-', label='Total', linewidth=2) ax1.set_xlabel('Modulation Index') ax1.set_ylabel('Power') ax1.set_title('Power Distribution vs Modulation Index') ax1.legend() ax1.grid(True) # Sideband efficiency efficiency = np.array(sideband_powers) / np.array(total_powers) * 100 ax2.plot(modulation_indices, efficiency, color='purple', linewidth=2) ax2.set_xlabel('Modulation Index') ax2.set_ylabel('Sideband Efficiency (%)') ax2.set_title('Share of Total Power in Sidebands') ax2.grid(True) plt.tight_layout() plt.show() sideband_power_analysis() ``` The efficiency curve confirms the textbook result: at `m = 1.0`, only about 33% of the total power is in the sidebands (the useful part); the rest is in the carrier. That's why commercial AM systems sometimes use variants (SSB, DSB-SC) that suppress part or all of the carrier. ## Summary With `numpy`, `scipy.signal`, and `scipy.fft`, the two core measurements for an AM signal, modulation index from the envelope and power distribution via FFT, take only a few dozen lines. The same envelope-detection and spectrum-inspection approach also works for demodulating real recordings, for example from an SDR capture, as long as you resample down to a manageable rate first. For the starting point, see [AM Wave Generation and Plotting](/blog/2011/09/25/am-plot-matplotlib/). --- ## Advanced PySpark Performance Optimization Techniques - URL: https://blog.prabeeshk.com/bonus/advanced-performance-optimization-techniques-for-pyspark-data-pipelines/ - Date: 2024-01-20 - Summary: The three key Spark 3.x performance features beyond basic tuning are Adaptive Query Execution (AQE, enabled by default since Spark 3.2), dynamic partition pruning (eliminates unnecessary partition reads in star-schema joins), and predicate pushdown (pushes filters to the data source). Enable them via spark.sql.adaptive.enabled, spark.sql.optimizer.dynamicPartitionPruning.enabled, and proper filter placement before joins. - Keywords: PySpark AQE, dynamic partition pruning, predicate pushdown, PySpark caching, PySpark memory tuning This builds on [Performance Tuning on Apache Spark](/blog/2023/01/06/performance-tuning-on-apache-spark/), which covers the fundamentals (spill, skew, shuffle, storage, serialization). Once those are under control, the next wins come from runtime-adaptive features. This post is a quick reference to the config keys, not a deep dive; read each one in the [Spark configuration docs](https://spark.apache.org/docs/latest/configuration.html) before flipping it. ## Adaptive Query Execution (AQE) AQE re-plans the second half of a query at runtime based on statistics from completed shuffles. In current Spark (3.x) the master switch defaults to on, but the related knobs are worth knowing: ```python spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") ``` | Key | What it does | Default (per Spark docs) | | --- | --- | --- | | `spark.sql.adaptive.enabled` | Master switch for AQE | `true` | | `spark.sql.adaptive.coalescePartitions.enabled` | Merge small post-shuffle partitions | `true` | | `spark.sql.adaptive.advisoryPartitionSizeInBytes` | Target partition size after coalescing | (inherits `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`) | | `spark.sql.adaptive.skewJoin.enabled` | Split skewed partitions during joins | `true` | | `spark.sql.adaptive.skewJoin.skewedPartitionFactor` | A partition counts as skewed when it's this many times the median | `5.0` | | `spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes` | Absolute size threshold for the same rule | `256 MB` | Raise `advisoryPartitionSizeInBytes` (e.g. to 128-256 MB) on jobs that produce too many small output files. Lower the skew factor or threshold if `skewJoin.enabled` isn't catching the skew you see in the Spark UI. Always check the [Spark configuration docs](https://spark.apache.org/docs/latest/configuration.html) for the exact defaults in your Spark version. ## Dynamic Partition Pruning When you filter a fact table by values from a dimension (the classic star-schema join), DPP pushes the implied filter back down into the fact-table scan at runtime. You stop reading partitions you don't need. ```python # On by default in current Spark; shown here for completeness. spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true") ``` DPP only actually kicks in when: - The fact table is partitioned by the join key. - The dimension table is small enough to broadcast, or is pre-filtered. - The join is an equi-join between those two tables. Confirm it's happening by calling `.explain()` on your query and looking for `PartitionFilters` with a subquery reference. ## Predicate pushdown For Parquet (and ORC), predicate pushdown is on by default in current Spark. The relevant keys: | Key | Default | | --- | --- | | `spark.sql.parquet.filterPushdown` | `true` | | `spark.sql.parquet.enableVectorizedReader` | `true` | So you usually don't need to set anything; what matters is writing pushdown-friendly predicates. Predicate pushdown only helps when the file format supports statistics (Parquet, ORC, Delta) and the predicate references columns that are actually indexed in the file metadata. A `WHERE lower(name) = 'foo'` predicate will not push down; a `WHERE event_date = '2024-01-01'` will. ## Caching: measure before you cache The basic advice in the [fundamentals guide](/blog/2023/01/06/performance-tuning-on-apache-spark/) still applies: `.cache()` and `.persist()` are not free. They cost memory and serialization time. A useful rule of thumb: cache a DataFrame only if it will be scanned at least twice, and only after you've checked the DAG in the Spark UI to confirm Spark isn't already caching it via AQE. Storage levels worth knowing: - `MEMORY_ONLY`: fastest, but if the DataFrame doesn't fit, partitions are recomputed on access. Rarely the right default. - `MEMORY_AND_DISK` (default): spills to disk if memory fills. Safe choice. - `MEMORY_AND_DISK_SER`: serializes before caching, which shrinks the footprint at a CPU cost on every read. Good for wide DataFrames you'll scan many times, when memory is tight. - `DISK_ONLY`: only useful if recomputation is very expensive and memory is contended. ## Reading the query plan Before adding hints or tuning anything, run `.explain(mode="formatted")` (Spark 3.x) on the final DataFrame and look for: - `Exchange` nodes: each is a shuffle boundary. Fewer is better. - `BroadcastExchange` vs `ShuffledHashJoin` vs `SortMergeJoin`: Spark usually picks correctly with AQE on, but a misclassified small table can force a shuffle join. `broadcast(df)` is the usual fix. - `PartitionFilters` and `PushedFilters`: confirm your scans are actually pruning. ## Summary Once the fundamentals are in place, the biggest remaining wins come from: 1. Leaving AQE on and tuning the skew-join knobs to your data. 2. Enabling dynamic partition pruning on star-schema queries with partitioned facts. 3. Caching deliberately, after checking the plan, not by default. 4. Reading `.explain()` before guessing at hints. None of these are free. Measure on your own workload. --- ## PySpark Design Patterns Quick Reference - URL: https://blog.prabeeshk.com/bonus/pyspark-design-patterns-quick-reference/ - Date: 2024-01-20 - Keywords: PySpark patterns cheat sheet, PySpark design patterns reference card, data pipeline patterns reference Minimal runnable snippets for the five core patterns. For the why and when, see [Implementing Design Patterns in PySpark Data Pipelines](/bonus/implementing-design-patterns-in-pyspark-data-pipelines/) and [Advanced PySpark Design Patterns](/bonus/advanced-pyspark-design-patterns-implementation/). ## Factory Pattern Create data sources without specifying exact types: ```python from abc import ABC, abstractmethod class DataSourceFactory(ABC): @abstractmethod def create_data_source(self): pass class CSVFactory(DataSourceFactory): def create_data_source(self): return CSVDataSource() class ParquetFactory(DataSourceFactory): def create_data_source(self): return ParquetDataSource() # Usage factory = CSVFactory() source = factory.create_data_source() ``` ## Singleton Pattern Ensure only one instance exists: ```python from threading import Lock class SparkConfig: _instance = None _lock = Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance # Usage config1 = SparkConfig() config2 = SparkConfig() assert config1 is config2 # Same instance ``` ## Builder Pattern Construct complex objects step by step: ```python class TransformBuilder: def __init__(self): self.filters = [] self.aggregations = {} def add_filter(self, condition): self.filters.append(condition) return self def add_aggregation(self, col, func): self.aggregations[col] = func return self def build(self): return DataTransform(self.filters, self.aggregations) # Usage transform = (TransformBuilder() .add_filter("status == 'active'") .add_aggregation("amount", "sum") .build()) ``` ## Observer Pattern Notify multiple components of data changes: ```python from abc import ABC, abstractmethod class Observer(ABC): @abstractmethod def update(self, event): pass class DataSubject: def __init__(self): self.observers = [] def register(self, observer): self.observers.append(observer) def notify(self, event): for obs in self.observers: obs.update(event) # Usage subject = DataSubject() subject.register(LoggingObserver()) subject.notify(DataEvent(data)) ``` ## Pipeline Pattern Chain transformations sequentially: ```python from abc import ABC, abstractmethod class Transform(ABC): def __init__(self, next_transform=None): self.next = next_transform def set_next(self, transform): self.next = transform return transform @abstractmethod def process(self, data): pass class CleanTransform(Transform): def process(self, data): cleaned = data.dropna() return self.next.process(cleaned) if self.next else cleaned # Usage pipeline = CleanTransform() pipeline.set_next(ValidateTransform()).set_next(EnrichTransform()) result = pipeline.process(df) ``` ## Quick tips - **Factory**: flexible object creation when you need to swap the concrete class - **Singleton**: shared resources like the Spark session - **Builder**: complex objects with many optional parameters - **Observer**: event-driven data processing with fan-out - **Pipeline**: ordered chain of transformations Each snippet above is self-contained. Copy it into a new module, wire it into your pipeline, and adjust the class names to match your domain. For a deeper walkthrough of when and why to use each one, see the [full tutorial](/bonus/implementing-design-patterns-in-pyspark-data-pipelines/). For strategy, decorator, and template method patterns, see the [advanced patterns post](/bonus/advanced-pyspark-design-patterns-implementation/). --- ## Advanced PySpark Design Patterns: Implementation Examples - URL: https://blog.prabeeshk.com/bonus/advanced-pyspark-design-patterns-implementation/ - Date: 2024-01-15 - Keywords: PySpark strategy pattern, PySpark decorator pattern, PySpark template method pattern This builds on [basic design patterns in PySpark pipelines](/bonus/implementing-design-patterns-in-pyspark-data-pipelines/) (factory, singleton, builder, observer, pipeline). Once those are familiar, three more patterns cover more complex cases that come up in production: switching algorithms at runtime, adding cross-cutting concerns, and sharing skeleton logic across pipeline variants. ## Strategy: swap algorithms at runtime The Strategy pattern defines a family of algorithms, encapsulates each one, and makes them interchangeable. In data pipelines this is useful when the processing step varies by input characteristics or business requirements but the surrounding code should stay the same. ```python from abc import ABC, abstractmethod from typing import Dict, Any from pyspark.sql import DataFrame, SparkSession class DataProcessingStrategy(ABC): """Abstract strategy for data processing.""" @abstractmethod def process(self, data: DataFrame) -> DataFrame: pass @abstractmethod def get_processing_info(self) -> Dict[str, Any]: pass class AggregationStrategy(DataProcessingStrategy): """Strategy for aggregation-based processing.""" def __init__(self, group_by_cols: list, agg_cols: Dict[str, str]): self.group_by_cols = group_by_cols self.agg_cols = agg_cols def process(self, data: DataFrame) -> DataFrame: return data.groupBy(self.group_by_cols).agg(self.agg_cols) def get_processing_info(self) -> Dict[str, Any]: return { "strategy_type": "aggregation", "group_by_columns": self.group_by_cols, "aggregation_columns": self.agg_cols } class FilteringStrategy(DataProcessingStrategy): """Strategy for filtering-based processing.""" def __init__(self, filter_condition: str): self.filter_condition = filter_condition def process(self, data: DataFrame) -> DataFrame: return data.filter(self.filter_condition) def get_processing_info(self) -> Dict[str, Any]: return { "strategy_type": "filtering", "filter_condition": self.filter_condition } class DataProcessor: """Context class that uses different processing strategies.""" def __init__(self, strategy: DataProcessingStrategy): self.strategy = strategy def set_strategy(self, strategy: DataProcessingStrategy): """Change the processing strategy at runtime.""" self.strategy = strategy def process_data(self, data: DataFrame) -> DataFrame: return self.strategy.process(data) def get_strategy_info(self) -> Dict[str, Any]: return self.strategy.get_processing_info() # Usage example spark = SparkSession.builder.appName("StrategyPattern").getOrCreate() data = spark.createDataFrame( [(1, "A", 100), (1, "B", 200), (2, "A", 150), (2, "B", 250)], ["id", "category", "value"], ) agg_strategy = AggregationStrategy( group_by_cols=["id"], agg_cols={"value": "sum", "category": "count"}, ) processor = DataProcessor(agg_strategy) processor.process_data(data).show() # Switch to filtering at runtime processor.set_strategy(FilteringStrategy("value > 150")) processor.process_data(data).show() ``` When to use it: when the same context needs to pick between several distinct algorithms based on runtime state or config. The caller holds the `DataProcessor`; the concrete strategy is a swappable detail. ## Decorator: add orthogonal concerns to a transformation The Decorator pattern wraps an object to add behaviour without changing its class. In a data pipeline this is the cleanest way to layer on cross-cutting concerns like logging, validation, or timing without polluting each transformation class. ```python from abc import ABC, abstractmethod from typing import Callable, Dict import time import logging class DataTransformation(ABC): """Abstract base class for data transformations.""" @abstractmethod def transform(self, data: DataFrame) -> DataFrame: pass class BaseTransformation(DataTransformation): """Trivial base transformation that just returns the data unchanged.""" def __init__(self, name: str): self.name = name def transform(self, data: DataFrame) -> DataFrame: return data class TransformationDecorator(DataTransformation): """Base decorator that delegates to the wrapped transformation.""" def __init__(self, transformation: DataTransformation): self._transformation = transformation @property def name(self) -> str: return getattr(self._transformation, "name", "unknown") def transform(self, data: DataFrame) -> DataFrame: return self._transformation.transform(data) class LoggingDecorator(TransformationDecorator): """Decorator that adds logging around the wrapped transformation.""" def transform(self, data: DataFrame) -> DataFrame: start = time.time() logging.info(f"Starting transformation: {self.name}") result = self._transformation.transform(data) logging.info(f"Finished transformation: {self.name} ({time.time() - start:.2f}s)") return result class ValidationDecorator(TransformationDecorator): """Decorator that runs column-level validation rules before transforming.""" def __init__(self, transformation: DataTransformation, validation_rules: Dict[str, Callable]): super().__init__(transformation) self.validation_rules = validation_rules def transform(self, data: DataFrame) -> DataFrame: for column, validation_func in self.validation_rules.items(): if column in data.columns: invalid_count = data.filter(~validation_func(data[column])).count() if invalid_count > 0: logging.warning(f"Found {invalid_count} invalid values in column {column}") return self._transformation.transform(data) # Usage def is_positive(col): return col > 0 def is_not_null(col): return col.isNotNull() base = BaseTransformation("data_cleaning") validated = ValidationDecorator(base, {"value": is_positive, "id": is_not_null}) logged = LoggingDecorator(validated) result = logged.transform(data) ``` Each decorator wraps one concern (logging, validation). Stack them in whatever order you need. Adding a new concern (e.g. timing, retry, caching) is a new class plus one more wrapper call at the construction site; the existing transformations don't change. When to use it: cross-cutting concerns that apply to many transformations but don't belong inside any of them. ## Template Method: share a pipeline skeleton across variants The Template Method pattern defines the skeleton of an algorithm in a base class and lets subclasses fill in specific steps. In a data pipeline this is the natural fit for "every job follows the same shape (validate → preprocess → apply logic → postprocess → validate output), but each job's steps differ." ```python from abc import ABC, abstractmethod from pyspark.sql import DataFrame from pyspark.sql import functions as F import logging class DataPipelineTemplate(ABC): """Template class for data pipeline workflows.""" def run_pipeline(self, input_data: DataFrame) -> DataFrame: """Template method that fixes the order of pipeline stages.""" try: validated_data = self.validate_input(input_data) preprocessed_data = self.preprocess_data(validated_data) processed_data = self.apply_business_logic(preprocessed_data) postprocessed_data = self.postprocess_data(processed_data) final_data = self.validate_output(postprocessed_data) self.log_results(final_data) return final_data except Exception as e: self.handle_error(e) raise @abstractmethod def validate_input(self, data: DataFrame) -> DataFrame: ... @abstractmethod def preprocess_data(self, data: DataFrame) -> DataFrame: ... @abstractmethod def apply_business_logic(self, data: DataFrame) -> DataFrame: ... @abstractmethod def postprocess_data(self, data: DataFrame) -> DataFrame: ... @abstractmethod def validate_output(self, data: DataFrame) -> DataFrame: ... def log_results(self, data: DataFrame): logging.info(f"Pipeline completed. Output rows: {data.count()}") def handle_error(self, error: Exception): logging.error(f"Pipeline failed: {error}") class SalesDataPipeline(DataPipelineTemplate): """Concrete implementation for sales data.""" REQUIRED_COLUMNS = ["sale_id", "product_id", "amount", "date"] def validate_input(self, data: DataFrame) -> DataFrame: missing = [c for c in self.REQUIRED_COLUMNS if c not in data.columns] if missing: raise ValueError(f"Missing required columns: {missing}") return data def preprocess_data(self, data: DataFrame) -> DataFrame: return ( data.dropDuplicates(["sale_id"]) .withColumn("date", F.to_date("date")) .withColumn("year", F.year("date")) .withColumn("month", F.month("date")) ) def apply_business_logic(self, data: DataFrame) -> DataFrame: daily = data.groupBy("date").agg( F.sum("amount").alias("daily_total"), F.count("*").alias("daily_transactions"), ) return data.join(daily, "date", "left") def postprocess_data(self, data: DataFrame) -> DataFrame: return data.withColumn("amount", F.round("amount", 2)) def validate_output(self, data: DataFrame) -> DataFrame: negative = data.filter(F.col("amount") < 0).count() if negative > 0: logging.warning(f"Found {negative} rows with negative amounts") return data # Usage sales_pipeline = SalesDataPipeline() result = sales_pipeline.run_pipeline(sales_df) ``` The base class fixes the stage order and the error-handling shape; every subclass only has to fill in what each stage means for its domain. Adding a second subclass (say `LogDataPipeline`) reuses the whole scaffold for free. When to use it: any time you have several pipeline variants that share a common shape and differ only in the content of each stage. ## Summary Strategy, decorator, and template method each solve a different problem: runtime algorithm swap, orthogonal concerns, and shared skeletons. Combined with the five [basic patterns](/bonus/implementing-design-patterns-in-pyspark-data-pipelines/), they cover most of the structural shapes you'll reach for in a production pipeline. Start simple; only reach for an advanced pattern when the basic one stops fitting. --- ## PySpark Design Patterns for Data Pipelines - URL: https://blog.prabeeshk.com/bonus/implementing-design-patterns-in-pyspark-data-pipelines/ - Date: 2023-01-14 - Summary: The five most useful design patterns for PySpark data pipelines are Factory (create readers/writers for different formats), Singleton (share SparkSession across modules), Builder (compose complex transformations step by step), Observer (monitor pipeline events), and Pipeline (chain transformation stages). Each keeps pipeline code modular and testable as complexity grows. - Keywords: PySpark design patterns, PySpark tutorial, data pipeline design patterns, creational design patterns PySpark, PySpark pipeline architecture, PySpark factory singleton builder The five most useful design patterns for PySpark data pipelines are Factory (swap data sources without changing pipeline code), Singleton (one shared SparkSession), Builder (compose transformations step by step), Observer (monitor pipeline events), and Pipeline (chain stages together). This tutorial shows each with a complete, runnable PySpark example. If you want to write PySpark data pipelines that stay clean as they grow, design patterns are the most reliable tool to reach for. Pipelines get complex quickly: new data sources appear, transformations multiply, one-off scripts turn into production systems. Classic software design patterns give you proven structures to keep that complexity under control. This tutorial walks through the five most useful patterns for a PySpark pipeline, each with a complete, runnable example: - [Factory Pattern](#factory-pattern), swap data sources (CSV, Parquet, JSON) without changing pipeline code - [Singleton Pattern](#singleton-pattern), share one SparkSession or sink across the whole pipeline - [Builder Pattern](#builder-pattern), construct complex transforms with optional parameters - [Observer Pattern](#observer-pattern), react to data changes across loosely coupled components - [Pipeline Pattern](#pipeline-pattern), chain transforms into a clear, ordered flow Each example uses Python's `abc` module to define abstract base classes and concrete subclasses. If you haven't used `abc.ABC` before: it lets you declare a class contract via `@abstractmethod`, and any subclass must implement those methods or Python will refuse to instantiate it. ## Quick reference: when to use each pattern | Pattern | Use when you need to… | Example | | --- | --- | --- | | Factory | Create data sources or transforms without hard-coding the concrete type | `DataSourceFactory` → CSV, Parquet, JSON | | Singleton | Share one expensive object (SparkSession, connection, sink) | Single `DataSink`, single `SparkSession` | | Builder | Construct objects with many optional parameters | `DataTransformBuilder().set_param1(...).build()` | | Observer | Let several components react when data changes | Multiple sinks notified on a new batch | | Pipeline | Chain ordered stages in an ETL flow | `extract → clean → enrich → load` | ## Factory Pattern {#factory-pattern} The factory pattern is a creational pattern: a parent class defines an interface for creating objects, and subclasses decide which concrete class to instantiate. In a data pipeline, this lets you plug in different input formats, CSV, Parquet, JSON, without the pipeline code caring about the underlying format. ```Python from abc import ABC, abstractmethod class DataSourceFactory(ABC): """Abstract factory for generating data sources.""" @abstractmethod def create_data_source(self): pass class CSVDataSourceFactory(DataSourceFactory): """Concrete factory for generating CSV data sources.""" def create_data_source(self): return CSVDataSource() class ParquetDataSourceFactory(DataSourceFactory): """Concrete factory for generating Parquet data sources.""" def create_data_source(self): return ParquetDataSource() class DataSource(ABC): """Abstract base class for data sources.""" @abstractmethod def load_data(self): pass class CSVDataSource(DataSource): """Concrete implementation of a CSV data source.""" def load_data(self): # Use spark.read.csv(...) to load data from a CSV file pass class ParquetDataSource(DataSource): """Concrete implementation of a Parquet data source.""" def load_data(self): # Use spark.read.parquet(...) to load data from a Parquet file pass # Example usage factory = CSVDataSourceFactory() data_source = factory.create_data_source() data = data_source.load_data() ``` `DataSourceFactory` is the abstract factory; `CSVDataSourceFactory` and `ParquetDataSourceFactory` are concrete factories that each produce a specific `DataSource`. Pick the factory that matches the input format and call `create_data_source()`. The rest of the pipeline works against the `DataSource` interface and stays decoupled from the format. When to use it: any time your pipeline needs to support more than one input format or runtime configuration. ## Singleton Pattern {#singleton-pattern} The singleton pattern restricts a class to a single instance. In Spark, the canonical example is the `SparkSession`: you want exactly one, shared across the whole job. Output sinks and connection pools are similar cases. ```Python from threading import Lock class DataSink: """Class for writing data to a sink.""" _instance = None _lock = Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def write_data(self, data): # Write data to sink pass # Example usage sink1 = DataSink() sink2 = DataSink() # sink1 and sink2 are the same instance assert sink1 is sink2 ``` `__new__` checks whether an instance already exists and returns it if so. `_lock` makes it safe under concurrent access, which matters when multiple threads bootstrap the pipeline at once. When to use it: anything expensive to create and safe to share (SparkSession, Kafka producer, HTTP client, metrics emitter). ## Builder Pattern {#builder-pattern} The builder pattern separates construction of a complex object from its representation. It shines when a class has many optional parameters. Rather than a constructor with ten arguments, you get a fluent API the reader can scan top-to-bottom. ```Python class DataTransform: """Class for transforming data.""" def __init__(self, **kwargs): self.param1 = kwargs.get("param1") self.param2 = kwargs.get("param2") self.param3 = kwargs.get("param3") def transform(self, data): # Transform data using specified parameters pass class DataTransformBuilder: """Builder for creating DataTransform objects.""" def __init__(self): self.param1 = None self.param2 = None self.param3 = None def set_param1(self, param1): self.param1 = param1 return self def set_param2(self, param2): self.param2 = param2 return self def set_param3(self, param3): self.param3 = param3 return self def build(self): return DataTransform( param1=self.param1, param2=self.param2, param3=self.param3, ) # Example usage transform = ( DataTransformBuilder() .set_param1("value1") .set_param3("value3") .build() ) ``` Each setter returns `self`, so the calls chain naturally and the caller sets only the parameters that matter for their use case. When to use it: data readers, writers, and transforms with 5+ optional knobs (partitioning, compression, schema overrides, retry policy, etc.). ## Observer Pattern {#observer-pattern} The observer pattern sets up a one-to-many dependency: when a subject changes state, it notifies every registered observer. In a data pipeline, this is useful when one successful batch should trigger several follow-up actions, write to the warehouse, update a dashboard, emit a metric, without hard-wiring them into the pipeline. ```Python from abc import ABC, abstractmethod class DataEvent(ABC): """Abstract base class for data events.""" @abstractmethod def get_data(self): pass class DataUpdatedEvent(DataEvent): """Concrete event fired when data is updated.""" def __init__(self, data): self.data = data def get_data(self): return self.data class DataObserver(ABC): """Abstract base class for data observers.""" @abstractmethod def update(self, event): pass class DataTransformObserver(DataObserver): """Observer that applies a transform when data is updated.""" def __init__(self, transform): self.transform = transform def update(self, event): data = event.get_data() transformed_data = self.transform.transform(data) # Do something with the transformed data class DataSubject: """Manages observers and fires data events.""" def __init__(self): self.observers = [] def register_observer(self, observer): self.observers.append(observer) def remove_observer(self, observer): self.observers.remove(observer) def notify_observers(self, event): for observer in self.observers: observer.update(event) # Example usage subject = DataSubject() transform_observer = DataTransformObserver(transform) subject.register_observer(transform_observer) subject.notify_observers(DataUpdatedEvent(data)) ``` The `DataSubject` keeps the list of `DataObserver` instances and calls each one's `update()` when `notify_observers()` fires. Adding a new reaction is a one-line change: implement a new observer and register it. When to use it: fan-out reactions to a pipeline event (audit logs, cache invalidation, alerting). ## Pipeline Pattern {#pipeline-pattern} The pipeline pattern (sometimes called chain-of-responsibility for data) chains a series of processing steps so the output of each stage feeds the next. It maps naturally onto ETL, extract, transform, load. ```Python from abc import ABC, abstractmethod class DataTransform(ABC): """Abstract base class for data transforms in a pipeline.""" def __init__(self, next=None): self.next = next def set_next(self, next): self.next = next return next @abstractmethod def transform(self, data): pass class TransformA(DataTransform): def transform(self, data): # Apply transform A transformed_data = data if self.next: return self.next.transform(transformed_data) return transformed_data class TransformB(DataTransform): def transform(self, data): # Apply transform B transformed_data = data if self.next: return self.next.transform(transformed_data) return transformed_data class TransformC(DataTransform): def transform(self, data): # Apply transform C transformed_data = data if self.next: return self.next.transform(transformed_data) return transformed_data # Example usage transform_a = TransformA() transform_b = TransformB() transform_c = TransformC() transform_a.set_next(transform_b).set_next(transform_c) transformed_data = transform_a.transform(data) ``` `set_next()` returns the next stage so you can wire the chain in one readable line. Each concrete transform only knows about its own logic and the handoff; adding, removing, or reordering steps has zero ripple effect. When to use it: any multi-stage ETL/ELT flow, especially when stages need to be composed differently per job. ## Putting it all together These five patterns combine well. A production pipeline typically looks like: 1. A singleton `SparkSession` shared by every component. 2. A factory that picks the right `DataSource` based on config. 3. A builder that assembles the transform chain with per-job parameters. 4. A pipeline that chains the transforms in order. 5. An observer that fires side effects (metrics, alerts, downstream notifications) after each successful run. ## Frequently asked questions ### What are design patterns in PySpark? They are reusable solutions to recurring problems in PySpark code, for example, how to create data sources flexibly (factory), how to share one `SparkSession` (singleton), or how to compose transforms (pipeline). The same shapes show up across almost every non-trivial pipeline. ### Which pattern should I start with? Start with factory and pipeline. Factory removes hard-coded data formats; pipeline gives you a clean ETL skeleton. The others slot in once those two are in place. ### What is the difference between the factory and singleton patterns? Factory produces many instances of different concrete classes that share an interface. Singleton restricts a class to exactly one instance. They solve opposite problems and are often used together, a factory object itself may be implemented as a singleton. ### Is an abstract class the same as an `abc` class in Python? Yes. In Python, abstract base classes live in the `abc` module; any class that inherits from `abc.ABC` and uses `@abstractmethod` is what people casually call an "abc class". It defines a contract that subclasses must implement. ### Where can I learn more PySpark design patterns? Keep going: [Advanced PySpark Design Patterns: Real-World Implementation Examples](/bonus/advanced-pyspark-design-patterns-implementation/) covers strategy, decorator, and template method patterns. For a one-page cheat sheet, see the [PySpark Design Patterns Quick Reference](/bonus/pyspark-design-patterns-quick-reference/). And if you're tuning an existing pipeline, [Apache Spark Performance Tuning](/blog/2023/01/06/performance-tuning-on-apache-spark/) pairs well with these patterns. ## Conclusion The five patterns above cover most production needs: flexible data sources (factory), shared resources (singleton), configurable transforms (builder), event fan-out (observer), and readable ETL chains (pipeline). The next step is to try each one on a small PySpark job of your own, nothing locks the lessons in faster than running the code. --- ## Apache Spark Performance Tuning: Spill, Skew, Shuffle, Storage - URL: https://blog.prabeeshk.com/blog/2023/01/06/performance-tuning-on-apache-spark/ - Date: 2023-01-06 - Summary: The five main causes of slow Apache Spark jobs are spill (data doesn't fit in memory), skew (uneven partition sizes), shuffle (expensive cross-network data movement), storage (tiny files and inferred schemas), and serialization (Python UDF overhead). Fix them by enabling AQE, broadcasting small tables, salting skewed joins, using Parquet with explicit schemas, and replacing Python UDFs with SQL functions or Pandas UDFs. - Keywords: Apache Spark performance tuning, Spark optimization, PySpark tutorial, Spark data skew, Spark shuffle optimization, Spark memory tuning Performance tuning decides whether a Spark job runs in 10 minutes or 10 hours. Most slowdowns you'll hit in production come from the same five areas: **spill, skew, shuffle, storage, and serialization**. This guide walks through each one with the cause, how to spot it in the Spark UI, and the PySpark code to fix it. The examples use PySpark, but the concepts apply to Scala and Java Spark equally well. ## Prerequisites You'll get the most out of this guide if you already know: - The difference between a DataFrame and an RDD - Basic PySpark operations (transformations vs. actions) - The driver / executor / partition model - How to open the Spark UI and look at a stage If you're not set up yet, the [Apache Spark installation guide for Ubuntu and macOS](/blog/2016/12/07/install-apache-spark-2-on-ubuntu-16-dot-04-and-mac-os/) covers the setup. ## Quick reference | Problem | Symptom in Spark UI | First thing to try | | --- | --- | --- | | Spill | "Spill (memory)" / "Spill (disk)" columns in stage metrics | Raise executor memory, use salted joins, enable AQE | | Skew | One task takes 10× longer than the median | Salt the join key, enable AQE skew join, broadcast the small side | | Shuffle | Wide transformations dominate the stage timeline | Repartition on join keys, broadcast small tables, prefer narrow ops | | Storage | Lots of tiny files, slow reads | Compact with `coalesce`, specify schema, pick Parquet over CSV | | Serialization | Python UDFs are the slowest stage | Replace UDFs with SQL functions or Pandas UDFs | ## 1. Spill Spill happens when Spark can't fit an operation's working set in memory and starts writing temp files to disk. Disk I/O is orders of magnitude slower than memory, so every GB spilled is a proportional hit to job runtime. You'll see it in the Spark UI under the *Spill (memory)* and *Spill (disk)* columns of a stage's task list. ### Fix: give Spark enough memory ```Python spark.conf.set("spark.executor.memory", "16g") spark.conf.set("spark.driver.memory", "8g") # Reserve more of the executor heap for execution + storage spark.conf.set("spark.memory.fraction", "0.8") ``` ### Fix: salt a join that spills because of a hot key A single hot key forces millions of rows through one task. Adding a random salt spreads the work across many tasks, each small enough to fit in memory. ```Python from pyspark.sql import functions as F # Add a salt column to both sides of the join df1 = df1.withColumn("salt", (F.rand() * 10).cast("int")) df2 = df2.withColumn("salt", (F.rand() * 10).cast("int")) result = df1.join(df2, on=["key", "salt"], how="inner").drop("salt") ``` ### Fix: enable Adaptive Query Execution AQE is the simplest win for spill. It re-plans stages at runtime using real partition statistics. ```Python spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") ``` ## 2. Skew Skew is an uneven distribution of data across partitions. The job is only as fast as its slowest task, so a single skewed partition can dominate total runtime. Spot it by sorting the task list in the Spark UI by duration, a healthy stage has a narrow spread; a skewed one has a long tail. A small amount of skew (under ~20%) is normal and not worth chasing. Beyond that, fix it. ### Fix: bucket and repartition on the join key ```Python # Rebalance on a key column before joining df_rebalanced = df.repartition(10, "key") # Or persist with bucketing for repeated reads df.write.bucketBy(10, "key").sortBy("key").saveAsTable("my_bucketed_table") ``` ### Fix: use AQE skew join With `spark.sql.adaptive.skewJoin.enabled`, Spark detects skewed partitions at runtime and splits them into smaller tasks automatically. This is the default in Spark 3.2+. ## 3. Shuffle Shuffles move data across the network between executors. They're the most expensive operations in Spark, so reducing shuffle is usually the biggest performance lever. ### Narrow vs. wide transformations | Type | Examples | Shuffle? | Partition scope | | --- | --- | --- | --- | | Narrow | `map`, `filter`, `select`, `withColumn`, `union` | No | Within a single partition | | Wide | `join`, `groupBy`, `orderBy`, `distinct`, `repartition` | Yes | Across partitions | Rewriting a wide operation into narrow operations, or pushing filters above joins, directly cuts shuffle volume. ### Fix: pre-partition before a join ```Python # Repartitioning both sides on the join key means data is already collocated df1 = df1.repartition(10, "key") df2 = df2.repartition(10, "key") joined = df1.join(df2, "key") ``` ### Fix: broadcast small tables If one side of a join fits in memory on every executor (roughly < 10 MB by default, tunable via `spark.sql.autoBroadcastJoinThreshold`), broadcast it and skip the shuffle entirely. ```Python from pyspark.sql.functions import broadcast joined = large_df.join(broadcast(small_df), "key") ``` ## 4. Storage How data is laid out on disk affects both read performance and shuffle behavior. The most common problems are tiny files and inferred schemas. ### Fix: compact tiny files When a Spark job writes one file per partition, you can end up with thousands of small files. Each file has per-open overhead, so reads get slow. Aim for part-files between 128 MB and 1 GB. ```Python # Read many small files, write back as fewer larger ones df.coalesce(1).write.mode("overwrite").parquet("output_path") ``` Use `coalesce` when reducing partition count (it avoids a shuffle); use `repartition` when you need to balance size evenly and are OK with a shuffle. | | `coalesce(n)` | `repartition(n)` | | --- | --- | --- | | Shuffles data? | No | Yes | | Can increase partitions? | No (only decrease) | Yes | | Partition size balance | Uneven | Even | | Typical cost | Cheap | Expensive | | Use when | You want fewer, larger output files | You need evenly-sized partitions for a downstream join/group | ### Fix: specify the schema explicitly Schema inference scans the data, which is slow and flaky for large inputs. Declare the schema up front: ```Python from pyspark.sql.types import StructType, StructField, StringType, IntegerType schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True), ]) df = ( spark.read .format("csv") .option("header", "true") .schema(schema) .load("data.csv") ) ``` ### Fix: pick Parquet over CSV when you can Parquet is columnar, compressed, and stores the schema in the file. For anything you'll read more than once, the conversion pays for itself quickly. ## 5. Serialization Serialization is how Spark moves data and code across the cluster. The biggest lever here is avoiding Python UDFs. | Option | Serialization cost | Relative speed | When to use | | --- | --- | --- | --- | | SQL / DataFrame functions | None | Fastest | First choice whenever the logic is expressible in Spark functions | | SQL higher-order functions (`transform`, `filter`, `aggregate`) | None | Fast | Array / map column transformations | | Pandas UDF (vectorized) | Batched via Arrow | Fast | Custom logic that must run in Python, on large batches | | Python UDF (row-at-a-time) | Per row, JVM ↔ Python | Slow | Avoid, last resort for custom Python logic | ### Avoid Python UDFs A Python UDF forces Spark to serialize each row out of the JVM, run it through Python, then serialize the result back. That round trip is orders of magnitude slower than a native function. ```Python # Fast: native Python map (no Spark involvement) numbers = [1, 2, 3, 4, 5] doubled = list(map(lambda x: x * 2, numbers)) # Faster than a UDF inside Spark: use SQL functions df.select((df["col"] * 2).alias("doubled_col")) ``` ### Use Pandas / vectorized UDFs when you really need a UDF Pandas UDFs send a batch of rows via Arrow, so the serialization cost is amortized across the batch instead of paid per row. ```Python import pandas as pd from pyspark.sql.functions import pandas_udf from pyspark.sql.types import DoubleType @pandas_udf(DoubleType()) def double(x: pd.Series) -> pd.Series: return x * 2 df = df.withColumn("doubled_col", double(df["col"])) ``` ### Use SQL higher-order functions For operations on array or map columns, Spark's built-in higher-order functions (`transform`, `filter`, `aggregate`, etc.) run entirely in the JVM and skip the UDF round trip: ```Python from pyspark.sql.functions import expr # Double every element of an array column without a UDF df.select(expr("transform(my_array, x -> x * 2) as doubled_array")) ``` ### Name your jobs in the Spark UI Not a performance win on its own, but it makes the UI dramatically easier to debug when you're tuning: ```Python sc.setJobDescription("Processing data for analysis") df = df.filter(df.age > 30).collect() ``` ## Putting it together The five problems compound. A job that suffers from skew is also spilling; a shuffle-heavy job usually has a storage problem feeding it. In practice, tune in this order: 1. **Check the Spark UI first.** Every issue above has a distinct signal. 2. **Fix storage:** specify schema, compact tiny files, use Parquet. 3. **Enable AQE:** handles spill and skew for you automatically in most cases. 4. **Reduce shuffles:** broadcast small tables, pre-partition on join keys. 5. **Remove Python UDFs:** SQL functions or Pandas UDFs are almost always faster. ## Frequently asked questions ### How do I know if my Spark job is spilling? Look at the task table for a stage in the Spark UI. If the *Spill (memory)* or *Spill (disk)* columns are non-zero, you're spilling. Any non-trivial spill is worth chasing. ### What's the difference between `coalesce` and `repartition`? `coalesce(n)` reduces the number of partitions without a shuffle, fast but can leave you with uneven partitions. `repartition(n)` does a full shuffle to evenly rebalance. Use `coalesce` for shrinking, `repartition` when you need even partition sizes. ### When should I use `broadcast()`? When one side of a join is small enough to fit in memory on every executor, roughly under 10 MB by default, controlled by `spark.sql.autoBroadcastJoinThreshold`. Broadcasting skips the shuffle on the large side entirely. ### Is AQE on by default? Yes, since Spark 3.2. On older versions you need `spark.sql.adaptive.enabled=true` and, for skew handling, `spark.sql.adaptive.skewJoin.enabled=true`. ### Why are Python UDFs slow? Each row is serialized out of the JVM, passed to a Python process, executed, then serialized back. That round trip dominates runtime. Pandas UDFs batch rows through Arrow, which amortizes the cost; SQL functions avoid Python entirely. ## Next steps - [Install Apache Spark 2 on Ubuntu 16.04 and macOS](/blog/2016/12/07/install-apache-spark-2-on-ubuntu-16-dot-04-and-mac-os/) if you haven't set up a local cluster yet. - [Self-contained PySpark applications](/blog/2015/04/07/self-contained-pyspark-application/) for moving from notebook experiments to production jobs. - [Run a PySpark notebook with Docker](/blog/2015/06/19/pyspark-notebook-with-docker/) for a reproducible tuning environment. --- ## Install Apache Spark 2 on Ubuntu 16.04 and macOS - URL: https://blog.prabeeshk.com/blog/2016/12/07/install-apache-spark-2-on-ubuntu-16-dot-04-and-mac-os/ - Date: 2016-12-07 - Keywords: Apache Spark 2 installation, Spark Ubuntu 16.04 setup, Spark macOS installation, Maven Spark build, Hadoop Spark integration Earlier posts covered [Spark 1.1.0 on Ubuntu 14.04](/blog/2014/10/31/install-apache-spark-on-ubuntu-14-dot-04/). This one walks through Spark 2.0.2 on Ubuntu 16.04 and Mac OS X Sierra. For the latest version, see [Install Apache Spark 3.5 on Linux](/blog/2024/11/26/install-apache-spark-3-on-linux/). Java must be installed first. On Ubuntu: ``` $ sudo apt-add-repository ppa:webupd8team/java $ sudo apt-get update $ sudo apt-get install oracle-java7-installer ``` On macOS, grab `jdk-7u79-macosx-x64.dmg` from the [Oracle download page](http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html), accept the license, and double-click the dmg to install. Verify: ``` $ java -version java version "1.7.0_72" Java(TM) SE Runtime Environment (build 1.7.0_72-b14) Java HotSpot(TM) 64-Bit Server VM (build 24.72-b04, mixed mode) ``` The build depends on git. On Ubuntu: ``` sudo apt-get install git ``` On macOS: ``` brew install git ``` Download and untar the Spark 2 distribution, for example into `/usr/local/share/spark`: ``` $ mkdir /usr/local/share/spark $ curl http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2.tgz | tar xvz -C /usr/local/share/spark ``` ### Build Maven is bundled with Spark, so you can build in-place: ``` $ cd /usr/local/share/spark/spark-2.0.2 $ ./build/mvn -DskipTests clean package ``` The build takes a while. Once it finishes, run a sample job to confirm: ``` $ ./bin/run-example SparkPi 10 ``` You'll see `Pi is roughly 3.14634` in the output. To build against a specific Hadoop version: ``` ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -DskipTests clean package ``` See the [official building docs](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version) for more options. For the older Spark 1 install walkthrough, see the [Ubuntu 14.04 post](/blog/2014/10/31/install-apache-spark-on-ubuntu-14-dot-04/). ## Environment variables After a successful build, make sure the shell knows where to find the binaries. Add these to `~/.bashrc`: ```bash export SPARK_HOME=/usr/local/share/spark/spark-2.0.2 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin ``` Then reload with `source ~/.bashrc` and confirm with `spark-shell --version`. ## Next steps - [Performance tuning](/blog/2023/01/06/performance-tuning-on-apache-spark/) once you have jobs running - The [Spark 3.5 install guide](/blog/2024/11/26/install-apache-spark-3-on-linux/) if you want the latest version --- ## How to Run a PySpark Notebook with Docker - URL: https://blog.prabeeshk.com/blog/2015/06/19/pyspark-notebook-with-docker/ - Date: 2015-06-19 - Summary: To run PySpark in a Jupyter notebook with Docker, run 'docker run -d -t -p 8888:8888 jupyter/pyspark-notebook' and open http://localhost:8888 in your browser. For a current setup use the maintained jupyter/pyspark-notebook image rather than building from source. - Keywords: PySpark Jupyter, Spark Docker, PySpark notebook, Jupyter Spark To run PySpark in a Jupyter notebook with Docker, run `docker run -d -t -p 8888:8888 jupyter/pyspark-notebook` and open `http://localhost:8888` in your browser. No local Spark or Java installation is needed. Apache Spark works well in a Jupyter notebook: you get iterative development, inline plots, and the ability to poke at intermediate DataFrames. Docker makes the setup reproducible and removes the "works on my machine" problem. This post walks through running PySpark in Jupyter via the official `jupyter/pyspark-notebook` image. ## Installing Docker Docker is a containerization platform that allows you to package and deploy your applications in a predictable and isolated environment. To install Docker, use the following command. This command was run on an Ubuntu-14-04 instance, but you can find more options on the [Docker official site](https://docs.docker.com/). ```bash # This command installs Docker on your machine wget -qO- https://get.docker.com/ | sh ``` ### Running the PySpark Notebook To run the PySpark Notebook, use the following command on any machine with Docker installed. ```bash # This command runs the pyspark-notebook Docker container and exposes port 8888 for access to the notebook docker run -d -t -p 8888:8888 prabeeshk/pyspark-notebook ``` After the pyspark-notebook Docker container is up and running, you can access the PySpark Notebook by directing your web browser to [http://127.0.0.1:8888](http://127.0.0.1:8888) or [http://localhost:8888](http://localhost:8888). For more information on the Docker image, check out the [Dockerhub repository](https://registry.hub.docker.com/u/prabeeshk). The source code can be found in the [GitHub repository](https://github.com/prabeesh/pyspark-notebook). Below you will find the custom PySpark startup script and the `Dockerfile`. ```Python ## PySpark Startup Script # Import required modules import os import sys # Get the value of the SPARK_HOME environment variable spark_home = os.environ.get('SPARK_HOME', None) # If SPARK_HOME is not set, raise an error if not spark_home: raise ValueError('SPARK_HOME environment variable is not set') # Add the paths to the Python libraries for Spark and py4j to the system path sys.path.insert(0, os.path.join(spark_home, 'python')) sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip')) # Execute the pyspark shell script to launch PySpark execfile(os.path.join(spark_home, 'python/pyspark/shell.py')) ``` This script sets `SPARK_HOME`, adds the Spark Python libraries and py4j to `sys.path`, and then runs the PySpark shell initialiser so that `sc` (SparkContext) is available when the notebook starts. ```Dockerfile ## Dockerfile FROM ubuntu:trusty MAINTAINER Prabeesh Keezhathra. # Update the package list and install Java RUN \ apt-get -y update &&\ echo "deb http://ppa.launchpad.net/webupd8team/java/ubuntu precise main" > /etc/apt/sources.list.d/webupd8team-java.list &&\ echo "deb-src http://ppa.launchpad.net/webupd8team/java/ubuntu precise main" >> /etc/apt/sources.list.d/webupd8team-java.list &&\ apt-key adv --keyserver keyserver.ubuntu.com --recv-keys EEA14886 &&\ apt-get -y update &&\ echo oracle-java7-installer shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections &&\ apt-get install -y oracle-java7-installer &&\ apt-get install -y curl # Set the version of Spark to install and the installation directory ENV SPARK_VERSION 1.4.0 ENV SPARK_HOME /usr/local/src/spark-$SPARK_VERSION # Download and extract Spark to the installation directory and build Spark RUN \ mkdir -p $SPARK_HOME &&\ curl -s http://d3kbcqa49mib13.cloudfront.net/spark-$SPARK_VERSION.tgz | tar -xz -C $SPARK_HOME --strip-components=1 &&\ cd $SPARK_HOME &&\ build/mvn -DskipTests clean package # Set the Python path to include the Spark installation ENV PYTHONPATH $SPARK_HOME/python/:$PYTHONPATH # Install build essentials, Python, and the Python package manager pip RUN apt-get install -y build-essential \ python \ python-dev \ python-pip \ python-zmq # Install Python libraries for interacting with Spark RUN pip install py4j \ ipython[notebook]==3.2 \ jsonschema \ jinja2 \ terminado \ tornado # Create an IPython profile for PySpark RUN ipython profile create pyspark # Copy the custom PySpark startup script to the IPython profile directory COPY pyspark-notebook.py /root/.ipython/profile_pyspark/startup/pyspark-notebook.py # Create a volume for the notebook directory VOLUME /notebook # Set the working directory to the notebook directory WORKDIR /notebook # Expose port 8888 for the IPython Notebook server EXPOSE 8888 # Run IPython with the PySpark profile and bind to all interfaces CMD ipython notebook --no-browser --profile=pyspark --ip=* ``` The image is based on Ubuntu 14.04 (Trusty), installs Java 7, downloads and builds Spark 1.4.0, then layers on IPython Notebook 3.2 with a custom PySpark startup profile. Port 8888 is exposed for the notebook server. > **Note:** this Dockerfile targets Spark 1.4 on Ubuntu 14.04. For a current setup, see the [Spark 3 install post](/blog/2024/11/26/install-apache-spark-3-on-linux/) or use the maintained `jupyter/pyspark-notebook` Docker image. ## Frequently asked questions ### How do I run PySpark in a Jupyter notebook without installing Spark locally? Use Docker: `docker run -d -t -p 8888:8888 jupyter/pyspark-notebook`. This gives you a Jupyter environment with PySpark pre-installed. No local Java or Spark setup required. ### How do I persist my notebooks when using PySpark with Docker? Mount a local directory as a volume: `docker run -d -t -p 8888:8888 -v $(pwd)/notebooks:/home/jovyan/work jupyter/pyspark-notebook`. Files saved in the `/home/jovyan/work` directory inside the container will persist on your host machine. ### Which Docker image should I use for PySpark in 2024+? Use the official `jupyter/pyspark-notebook` image maintained by the Jupyter Docker Stacks project. It includes a current Spark version, JupyterLab, and common Python data science libraries. Avoid building from source unless you need a custom Spark build. --- ## Building Self-Contained PySpark Applications - URL: https://blog.prabeeshk.com/blog/2015/04/07/self-contained-pyspark-application/ - Date: 2015-04-07 - Keywords: standalone PySpark, spark-submit Python, self-contained PySpark, PySpark deployment The [earlier install post](/blog/2014/10/31/install-apache-spark-on-ubuntu-14-dot-04/) covered the Scala interactive shell. This one shows how to do the same in Python, then how to turn an experiment into a standalone application you can run with `spark-submit`. ``` ./bin/pyspark ``` Now you can enjoy Spark using Python interactive shell. This shell might be sufficient for experimentations and developments. However, for production level, we should use a standalone application. I talked about a stand alone Spark application in Scala in one of my previous [post](/blog/2014/04/01/a-standalone-spark-application-in-scala/). Here comes the same written in Python (you can find more about it in [Spark official site](https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications)), known as a self-contained PySpark application. First, [refer this post](/blog/2014/10/31/install-apache-spark-on-ubuntu-14-dot-04/) to build Spark using sbt assembly. Add Pyspark lib in system Python path as follows: ``` cd ~ vi .bashrc ``` Add the following exports in end of bashrc file ``` export SPARK_HOME= export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8-src.zip:$PYTHONPATH ``` PySpark depends on the `py4j` Python package. It helps Python interpreter to dynamically access the Spark object from the JVM. Don't forget to export the SPARK_HOME. Restart `BASH` once it is done. ``` . .bashrc ``` PySpark should be available in system path by now. After writing the Python code, one can simply run the code using `python` command then it runs in local Spark instance with default configurations. ``` python ``` It is better to use the spark submit script if you want to pass the configuration values at runtime. ``` ./bin/spark-submit --master local[*] ``` For more details about spark submit [refer here](https://spark.apache.org/docs/latest/configuration.html). From the site we can observe that the configuration values can be passed at run time. It can also be changed in the conf/spark-defaults.conf file. After configuring the spark config file the changes also get reflected while running pyspark applications using simple `python` command. The reason for why there is no `pip install` for pyspark can be found in this [jira ticket](https://issues.apache.org/jira/browse/SPARK-1267). If you are a fan of ipython, then you have the option to run PySpark ipython notebook. Refer this [blog post](http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/) for more detail. --- ## Install Apache Spark on Ubuntu 14.04 - URL: https://blog.prabeeshk.com/blog/2014/10/31/install-apache-spark-on-ubuntu-14-dot-04/ - Date: 2014-10-31 - Keywords: install Spark Ubuntu 14.04, Spark 1.1.0 setup, sbt build Spark, spark-shell Update: For Spark 2 see the [Ubuntu 16.04 and macOS post](/blog/2016/12/07/install-apache-spark-2-on-ubuntu-16-dot-04-and-mac-os/). This post walks through Spark 1.1.0 on Ubuntu 14.04. For the latest version, see [Install Apache Spark 3.5 on Linux](/blog/2024/11/26/install-apache-spark-3-on-linux/). ## Prerequisites Start with Java: ``` $ sudo apt-add-repository ppa:webupd8team/java $ sudo apt-get update $ sudo apt-get install oracle-java7-installer ``` To check that the Java installation is successful: ``` $ java -version ``` It shows installed java version ` java version "1.7.0_72"_ Java(TM) SE Runtime Environment (build 1.7.0_72-b14)_ Java HotSpot(TM) 64-Bit Server VM (build 24.72-b04, mixed mode) ` The next step is to install Scala. Follow the following instructions to set up Scala. First download the Scala from [Scala Official Website](http://www.scala-lang.org/download/2.10.4.html) Copy downloaded file to some location for example _/urs/local/src_, untar the file and set path variable, ``` $ wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz $ sudo mkdir /usr/local/src/scala $ sudo tar xvf scala-2.10.4.tgz -C /usr/local/src/scala/ ``` ``` $ vi .bashrc ``` And add following in the end of the file ``` export SCALA_HOME=/usr/local/src/scala/scala-2.10.4 export PATH=$SCALA_HOME/bin:$PATH ``` Restart bashrc: ``` $ . .bashrc ``` To check the Scala is installed successfully ``` $ scala -version ``` It shows installed Scala version ` Scala code runner version 2.10.4 -- Copyright 2002-2013, LAMP/EPFL ` Or just type scala. It goes to scala interactive shell ``` $ scala scala> ``` In the next step, install git. Spark build depends on git. ``` sudo apt-get install git ``` Finally, [download the Spark 1.1.0 distribution](https://archive.apache.org/dist/spark/spark-1.1.0/spark-1.1.0.tgz): ``` $ wget https://archive.apache.org/dist/spark/spark-1.1.0/spark-1.1.0.tgz $ tar xvf spark-1.1.0.tgz ``` ## Building Spark SBT(Simple Build Tool) is used for building Spark, which is bundled with it. To compile the code ``` $ cd spark-1.1.0 $ sbt/sbt assembly ``` The building takes some time. After successfully packing you can test a sample program ``` $ ./bin/run-example SparkPi 10 ``` Then you get the output as Pi is roughly 3.14634. Spark is ready to fire For more detail [visit](http://spark.apache.org/docs/1.1.1/) ## Spark interactive shell You can run Spark interactively through the Scala shell ``` $ ./bin/spark-shell ``` ```scala scala> val textFile = sc.textFile("README.md") scala> textFile.count() ``` If want to check some particular sections of spark using shell. For example run MQTT interactevely, the mqtt is defined under external for import that into _spark-shell_ just follow the instructions ``` $ sbt/sbt "streaming-mqtt/package" ``` Then add this package into the classpath ``` $ bin/spark-shell --driver-class-path external/mqtt/target/scala-2.10/spark-streaming-mqtt_2.10-1.1.0.jar scala > import org.apache.spark.streaming.mqtt._ ``` Using this you can check your code line by line. ## Accessing Hadoop filesystems If you have already the build source package, rebuild it against the hadoop version as follows ``` $ sbt/sbt clean ``` You can change this by setting the SPARK_HADOOP_VERSION variable. Here uses Hadoop 2.0.0-cdh4.3.0 ``` $ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.3.0 sbt/sbt assembly ``` After successfully build. You can read and write data into cdh4.3.0 clusters. ``` $ .bin/spark-shell ``` ```scala scala> var file = sc.textFile("hdfs://IP:8020/path/to/textfile.txt") scala> file.flatMap(line => line.split(",")).map(word => (word, 1)).reduceByKey(_+_) scala> count.saveAsTextFile("hdfs://IP:8020/path/to/ouput") ``` You may find more [quick start](http://spark.apache.org/docs/1.1.1/quick-start.html) --- ## Creating Uber JARs for Spark Projects with sbt-assembly - URL: https://blog.prabeeshk.com/blog/2014/04/08/creating-uber-jar-for-spark-project-using-sbt-assembly/ - Date: 2014-04-08 - Keywords: Spark uber jar, sbt-assembly, Spark fat jar, Spark deployment `sbt-assembly` packages a Spark project plus all its dependencies into one runnable "uber" JAR, which you can hand to `spark-submit` without worrying about classpath. This follows on from the [standalone Spark application in Scala](/blog/2014/04/01/a-standalone-spark-application-in-scala/) post. ## Adding the sbt-assembly plugin The first step in creating an assembled JAR for your Spark application is to add the sbt-assembly plugin. To do this, you will need to add the following line to the `project/plugin.sbt` file: ```scala addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1") ``` ## Configuring assembly settings Next, you will need to specify sbt-assembly.git as a dependency in the project/project/build.scala file: ```scala import sbt._ object Plugins extends Build { lazy val root = Project("root", file(".")) dependsOn( uri("git://github.com/sbt/sbt-assembly.git#0.9.1") ) } ``` In the build.sbt file, add the following contents: ```scala import AssemblyKeys._ // put this at the top of the file,leave the next line blank assemblySettings ``` You can use the full keys to configure the assembly plugin for more details [refer](https://github.com/sbt/sbt-assembly) ``` target assembly-jar-name test assembly-option main-class full-classpath dependency-classpath assembly-excluded-files assembly-excluded-jars ``` ## Configuring merge strategy If multiple files share the same relative path, the default strategy is to verify that all candidates have the same contents and error out otherwise. This behavior can be configured for Spark projects using the assembly-merge-strategy as follows: ```scala mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last case PathList("org", "apache", xs @ _*) => MergeStrategy.last case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case "about.html" => MergeStrategy.rename case x => old(x) } } ``` ## Creating the fat JAR Once you have added the sbt-assembly plugin and configured the assembly settings and merge strategy, you can create the fat JAR for your Spark application. From the root folder of your project, run the following command: ``` sbt/sbt assembly ``` This will create the JAR file in the `target/scala_2.10/ directory`. The name of the JAR file will be in the format of `-assembly-.jar`. You can find an example project on how to create an assembled JAR for a Spark application on [GitHub](https://github.com/prabeesh/SparkTwitterAnalysis). Once you have the fat JAR, see [Apache Spark Performance Tuning](/blog/2023/01/06/performance-tuning-on-apache-spark/) for tuning the job it runs. --- ## Standalone Spark Application in Scala: Twitter Streaming Example - URL: https://blog.prabeeshk.com/blog/2014/04/01/a-standalone-spark-application-in-scala/ - Date: 2014-04-01 - Keywords: Spark Streaming Scala, sbt Spark project, Eclipse Spark, Twitter streaming example This post walks through building a Spark Streaming application in Scala that extracts popular hashtags from the Twitter firehose, packaged with sbt, and runnable from the Eclipse IDE via the sbteclipse plugin. ## Building a Spark application using SBT A standalone Scala application built against the Apache Spark API and packaged with sbt (Simple Build Tool). For creating a stand alone app take the twitter popular tag [example](https://github.com/apache/spark/blob/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala) This program calculates popular hashtags (popular topics) over sliding 10 and 60 second windows from a Twitter stream. The stream is instantiated with credentials and optionally filters supplied by the command line arguments. But here modified the code for talking twitter authentication credentials through command line argument. So it needs to give the arguments as `master` `consumerKey` `consumerSecret` `accessToken` `accessTokenSecret` `filters`. ```Scala // Twitter Authentication credentials System.setProperty("twitter4j.oauth.consumerKey", args(1)) System.setProperty("twitter4j.oauth.consumerSecret", args(2)) System.setProperty("twitter4j.oauth.accessToken", args(3)) System.setProperty("twitter4j.oauth.accessTokenSecret", args(4)) ``` If you want to read Twitter authentication credentials from a file, see this [TwitterUtils example](https://github.com/pwendell/spark-twitter-collection/blob/master/TwitterUtils.scala). The sbt configuration file. For more detail about sbt, see the [sbt setup guide](http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html). ```Scala name := "TwitterPopularTags" version := "0.1.0" scalaVersion := "2.10.3" libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "0.9.0-incubating", "org.apache.spark" %% "spark-streaming" % "0.9.0-incubating", "org.apache.spark" %% "spark-streaming-twitter" % "0.9.0-incubating") resolvers += "Akka Repository" at "http://repo.akka.io/releases/" ``` You can find [the project at Github](https://github.com/prabeesh/SparkTwitterAnalysis/tree/0.1.0) ## Spark programming in Eclipse Using sbt eclipse plugin, sbt project can run on Eclipse IDE. For more details find [SBT Eclipse](https://github.com/typesafehub/sbteclipse) ```Scala addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0") ``` then run from the root folder of the project ``` sbt/sbt eclipse ``` This command creates a project compatible with Eclipse. Upon opening the eclipse IDE this project can now be imported and the executed with the spark. To avoid generating Eclipse source entries for the java directories and put all libs in `lib_managed` (so you can distribute Eclipse project files), add this to `build.sbt`: ```Scala retrieveManaged := true EclipseKeys.relativizeLibs := true (unmanagedSourceDirectories in Compile) <<= (scalaSource in Compile)(Seq(_)) (unmanagedSourceDirectories in Test) <<= (scalaSource in Test)(Seq(_)) ``` You can find [the sbt eclipse project here](https://github.com/prabeesh/SparkTwitterAnalysis/tree/0.2.0) Once you have the application running, see the [Uber JAR post](/blog/2014/04/01/creating-uber-jar-for-spark-project-using-sbt-assembly/) for packaging it into a single deployable JAR with sbt-assembly. --- ## Running Mesos 0.13 on Ubuntu 12.04 - URL: https://blog.prabeeshk.com/blog/2013/10/07/running-mesos-0130-on-ubuntu-1204/ - Date: 2013-10-07 - Keywords: Apache Mesos installation, Mesos Ubuntu 12.04, Mesos cluster setup Apache Mesos is a cluster manager that abstracts CPU, memory, and storage across machines so frameworks (like Spark, Marathon, or Chronos) can run workloads without worrying about which physical node they land on. This post covers installing 0.13 on Ubuntu 12.04 and bringing up a minimal master + slave cluster. ## Prerequisites Install the build dependencies and make sure Java is available: ``` $ sudo apt-get install python2.7-dev g++ libcppunit-dev libunwind7-dev git libcurl4-nss-dev ``` You need to have Java installed, or the JAVA_HOME environment variable pointing to a Java installation. You can download the Mesos distribution from [official website](http://www.apache.org/dyn/closer.cgi/mesos/0.13.0/). After that untar the downloaded file ``` $ tar xvf mesos-0.13.0.tar.gz ``` ## Building and Installing ``` $ cd mesos-0.13.0 $ mkdir build $ cd build $ sudo ../configure --prefix=/home/user/mesos $ sudo make $ sudo make check $ sudo make install ``` You can pass the --prefix option while configuring to tell where to install. For example , pass `--prefix=/home/user/mesos`. By default the prefix is `/usr/local`. Once you are done with the installation, it is now time to start your mesos cluster: Go into the directory where you built Mesos. ``` $ cd mesos-0.13.0/build/bin ``` Run the command to launch the master. ``` $ sh mesos-master.sh ``` Take note of the IP and port that the master is running on, which will look something like `[IP of the machine]:5050`. URL of master: `mesos://[IP of the machine]:5050`. View the master's web UI at `http://[IP of the machine]:5050`. Copy mesos-0.13.0 and mesos to the same paths on all the nodes in the cluster. To launch a slave, go to below directory ``` $ cd mesos-0.13.0/build/src ``` Run the command to launch the slave. ``` $ sh mesos-slave --master=[IP of the mesos master machine ]:5050 ``` The slave will show up on the mesos master's web UI. ## Mesos Client Copy the libmesos.so from prefix folder(/home/user/mesos/lib) of the mesos master to /usr/local/lib of the client machine and install the following package ``` $ sudo apt-get install libunwind7-dev ``` Now you can run applications against the Mesos cluster from the client machine. --- ## MQTT Publisher and Subscriber in Scala with Eclipse Paho - URL: https://blog.prabeeshk.com/blog/2013/08/26/mqtt-scala-publisher-and-subscriber/ - Date: 2013-08-26 - Keywords: MQTT Scala, Eclipse Paho Scala, MQTT publisher subscriber, Mosquitto broker MQTT is a lightweight publish-subscribe protocol widely used in IoT and telemetry because it runs fine over slow, lossy networks. This post builds a minimal publisher and subscriber in Scala using the [Eclipse Paho](https://www.eclipse.org/paho/) library, talking to a local Mosquitto broker. The complete code is on [GitHub](https://github.com/prabeesh/MQTTScalaClient). ## How MQTT works MQTT uses a central **broker**. Publishers send messages to **topics**, and any subscribers listening to a matching topic pattern receive them. Topics are hierarchical strings separated by `/`, for example `sensors/room1/temperature`, and subscriptions can use `+` (single level) and `#` (multi-level) wildcards. Three QoS levels control delivery: | QoS | Meaning | Use when | | --- | --- | --- | | 0 | At most once | Telemetry where the occasional drop is fine | | 1 | At least once | Most sensor or command messages | | 2 | Exactly once | When duplicates cause real problems | ## Installing the Mosquitto broker On Ubuntu or Debian: ```bash sudo apt-get install mosquitto mosquitto-clients sudo systemctl start mosquitto ``` Quick sanity check with the CLI tools: ```bash # Terminal 1 mosquitto_sub -t test/topic # Terminal 2 mosquitto_pub -t test/topic -m "Hello MQTT" ``` ## sbt setup `build.sbt`: ```scala name := "MQTTScalaClient" version := "0.1" scalaVersion := "2.10.4" libraryDependencies += "org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.0.2" resolvers += "Eclipse Paho Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/" ``` ## Publisher ```scala import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence object Publisher { def main(args: Array[String]): Unit = { val brokerUrl = "tcp://localhost:1883" val topic = "sensors/temperature" val qos = 1 val clientId = "scala-publisher" val persistence = new MqttDefaultFilePersistence("/tmp/mqtt") val client = new MqttClient(brokerUrl, clientId, persistence) val opts = new MqttConnectOptions() opts.setCleanSession(true) client.connect(opts) val payload = """{"temperature": 22.5}""" val message = new MqttMessage(payload.getBytes("UTF-8")) message.setQos(qos) client.publish(topic, message) println(s"Published to $topic: $payload") client.disconnect() } } ``` The flow: create an `MqttClient`, connect with an `MqttConnectOptions`, build an `MqttMessage`, call `publish`, and disconnect. That's the whole publisher API surface. ## Subscriber ```scala import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence object Subscriber { def main(args: Array[String]): Unit = { val brokerUrl = "tcp://localhost:1883" val topic = "sensors/+" // all sensors val qos = 1 val clientId = "scala-subscriber" val client = new MqttClient(brokerUrl, clientId, new MemoryPersistence()) client.setCallback(new MqttCallback { override def messageArrived(topic: String, message: MqttMessage): Unit = { val payload = new String(message.getPayload, "UTF-8") println(s"Received on $topic: $payload") } override def connectionLost(cause: Throwable): Unit = println(s"Connection lost: ${cause.getMessage}") override def deliveryComplete(token: IMqttDeliveryToken): Unit = {} }) client.connect() client.subscribe(topic, qos) println(s"Subscribed to $topic. Press Ctrl-C to exit.") // Keep the JVM alive so the callback can fire. Thread.currentThread().join() } } ``` `MqttCallback.messageArrived` fires on every message matching the subscription. Anything else (filtering, persistence, downstream processing) goes in there. ## Running the example Start the subscriber first so it's ready when the publisher fires: ```bash sbt "runMain Subscriber" ``` In another terminal: ```bash sbt "runMain Publisher" ``` The subscriber prints the JSON payload. You can also confirm with the CLI tools: ```bash mosquitto_sub -t "sensors/+" -v ``` ## A few practical notes - **Clean session**: `setCleanSession(false)` lets the broker queue messages for your client while it's offline. Pair with a stable client ID. - **Keep-alive**: `setKeepAliveInterval` controls how often the client pings the broker; lower values detect dropped connections faster but use more bandwidth. - **Last Will and Testament**: `opts.setWill(topic, payload, qos, retained)` lets the broker publish a message on your behalf if the client disconnects uncleanly. Useful for device presence tracking. - **TLS**: for anything beyond a local test, use `ssl://...` URLs and `setSocketFactory` with a properly configured `SSLContext`. For related Scala and data-processing content, see [Apache Spark performance tuning](/blog/2023/01/06/performance-tuning-on-apache-spark/). --- ## A Web Paint App with Flask and MongoDB - URL: https://blog.prabeeshk.com/blog/2013/03/31/paint-app-using-flask-with-mongodb/ - Date: 2013-03-31 - Keywords: Flask MongoDB, Python Flask tutorial, HTML5 canvas Flask, MongoDB CRUD Python Schema-less, document-shaped data like freehand drawings is a good fit for MongoDB: there's no rigid table to design around stroke coordinates that vary per drawing. This post ties the canvas front-end to a Flask backend that persists drawings in MongoDB. ## How the app works The user draws lines, rectangles, or circles on an HTML5 canvas. Each shape is stored in a JavaScript object with its coordinates and colour. When the user clicks "save", the frontend POSTs the entire drawing as a JSON string to Flask, which inserts it into MongoDB keyed by image name. Loading a drawing is a GET request: Flask queries MongoDB, passes the JSON back to the template, and the canvas redraws it. ## The Flask backend The entire server is one file. It connects to a local MongoDB instance, serves the canvas page, and handles save/load: ```python from flask import Flask, request, render_template, Response from pymongo import Connection app = Flask(__name__) connection = Connection() collection = connection.paint.images @app.route("/") @app.route('/', methods=['POST', 'GET']) def mainpage(imagename=None): if request.method == 'GET': if imagename: rows = collection.find({'imgname': imagename}) if rows: for row in rows: imgdata = row["imgdata"] return render_template('paint.html', saved=imgdata) else: resp = Response( '' ) return resp else: return render_template('paint.html') if request.method == 'POST': imgname = request.form['imagename'] imgdata = request.form['string'] collection.insert({"imgname": imgname, "imgdata": imgdata}) return Response("saved") if __name__ == '__main__': app.debug = True app.run() ``` A few things to note: - `Connection()` (from the older pymongo API) connects to `localhost:27017` by default. In current pymongo you would use `MongoClient()`. - The route handles both `/` (new drawing) and `/` (load or save). The `methods` list lets the same URL respond to GET and POST. - On save, the drawing data arrives as a JSON string in `request.form['string']` and gets stored verbatim in MongoDB. No schema needed. - On load, Flask passes the stored JSON string into the Jinja template as `saved`, and the JavaScript parses it back into drawing objects. ## The MongoDB document Each saved drawing produces one document in the `paint.images` collection: ```json { "_id": ObjectId("..."), "imgname": "my-drawing", "imgdata": "{\"line\":[{\"beginx\":120,\"beginy\":80,\"endx\":400,\"endy\":300,\"color\":\"red\"}],\"rect\":[],\"circle\":[]}" } ``` The `imgdata` field is a JSON string containing three arrays (one per shape type). Each shape stores its start/end coordinates and colour. This is the simplest possible approach: store the drawing as an opaque blob keyed by name. ## The canvas frontend The template (`templates/paint.html`) sets up two stacked canvases: one for the active stroke being drawn, one for the committed shapes underneath. The drawing tools (line, rectangle, circle) are selected from a dropdown, and colour buttons set the stroke colour. The core interaction captures `mousedown`, `mousemove`, and `mouseup` events: ```javascript var data = {"line":[], "rect":[], "circle":[]}; canvas.addEventListener('mousedown', function(evt) { var mousePos = getMousePos(canvas, evt); select = 1; beginX = mousePos.x; beginY = mousePos.y; }, false); canvas.addEventListener('mousemove', function(evt) { var mousePos = getMousePos(canvas, evt); if (select && toolselect.value == 'line') { endX = mousePos.x; endY = mousePos.y; drawLine(beginX, beginY, endX, endY); } if (select && toolselect.value == 'rect') { sideX = mousePos.x - beginX; sideY = mousePos.y - beginY; drawRect(beginX, beginY, sideX, sideY); } if (select && toolselect.value == 'circle') { radius = mousePos.x - beginX; drawCircle(beginX, beginY, radius); } }, false); canvas.addEventListener('mouseup', function(evt) { select = 0; context1.drawImage(canvas, 0, 0); if (toolselect.value == 'line') { data.line.push({ beginx: beginX, beginy: beginY, endx: endX, endy: endY, color: currentColor }); } if (toolselect.value == 'rect') { data.rect.push({ beginx: beginX, beginy: beginY, sidex: sideX, sidey: sideY, color: currentColor }); } if (toolselect.value == 'circle') { data.circle.push({ beginx: beginX, beginy: beginY, radius: radius, color: currentColor }); } }, false); ``` On `mouseup`, the shape is committed to `canvas1` (the background layer) with `drawImage`, and its coordinates are pushed into the `data` object. That object is what gets serialised to JSON and sent to Flask on save. Save and load use jQuery to POST/redirect: ```javascript function saveImage() { if (imagename.value == "") alert("Image name cannot be empty"); else { $.post("/" + imagename.value, {imagename: imagename.value, string: JSON.stringify(data)}); alert("saved"); } } function loadImage() { if (imagename.value == "") alert("Image name cannot be empty"); else document.location.href = "/" + imagename.value; } ``` When loading, Flask passes the stored JSON to the template, and the `init()` function rehydrates it: ```javascript function init() { var dataString = "{{ saved }}"; if (dataString) { data = JSON.parse(dataString.replace(/"/g, '"')); drawAll(); } } ``` `"` is the HTML entity for `"`, which Jinja escapes by default. The `replace` call converts it back before parsing. ## Running it ```bash # Start MongoDB mongod # In another terminal pip install flask pymongo python test.py ``` Open `http://localhost:5000`, draw something, type a name, and click save. Navigate to `http://localhost:5000/your-name` to load it back. The complete source is on [GitHub](https://github.com/prabeesh/Paintapp-Javascript-Canvas-Flask-MongoDB). --- ## HTML5 Canvas Paint Application with JavaScript - URL: https://blog.prabeeshk.com/blog/2013/03/30/paint-app-using-javascript-and-canvas/ - Date: 2013-03-30 - Keywords: HTML5 canvas paint, JavaScript drawing app, canvas mouse events A small browser-based drawing tool built on the HTML5 `` element and plain JavaScript. Paint Application ## How it works The page contains a `` element that listens for three mouse events: `mousedown`, `mousemove`, and `mouseup`. When you press and drag, the handler draws the currently selected shape (line, rectangle, or circle) in the chosen colour. A toolbar above the canvas lets you pick the tool and colour. All drawing state lives in the browser; nothing hits the server until you explicitly save. ## Saving and loading drawings Each drawing is serialized as a JSON array of shape objects (type, start coordinates, end coordinates, colour). When you click **Save**, that JSON is sent to the server with a user-provided name. Loading a drawing fetches the JSON back and replays each shape onto a fresh canvas. Two back-end implementations exist: | Back-end | Storage | Notes | | --- | --- | --- | | Google App Engine | Google Datastore | Originally deployed on appspot.com (no longer running) | | Flask | SQLite | Local or any WSGI host | ## Source code - [App with GAE](https://github.com/prabeesh/Paintapp-Javascript-Canvas-GAE) - [App with Flask](https://github.com/prabeesh/Paintapp-Javascript-Canvas-Flask) --- ## A Simple CUDA Program: Squaring 64 Numbers on the GPU - URL: https://blog.prabeeshk.com/blog/2013/03/07/a-simple-cuda-program/ - Date: 2013-03-07 - Keywords: CUDA programming tutorial, CUDA kernel example, cudaMalloc cudaMemcpy, nvcc compiler, NVIDIA GPU programming Following on from my [introduction to parallel programming](/blog/2013/02/22/introduction-to-parallel-programming/), this post walks through a simple CUDA program that computes the squares of 64 numbers on the GPU. The source is on [GitHub](https://github.com/prabeesh/CUDA-code-square/blob/master/square.cu). ## A typical GPU program A typical CUDA program follows four steps: 1. CPU allocates storage on the GPU 2. CPU copies input data from CPU to GPU 3. CPU launches kernels on the GPU to process the data 4. CPU copies the result back from GPU to CPU ## Compiling ```c nvcc -o square square.cu ``` Instead of the regular C compiler, we use `nvcc`, the NVIDIA C Compiler. The output is an executable called `square` and the input file is `square.cu`. `.cu` is the convention for CUDA source files. ## The host code We'll walk through the CPU side first. ```c int main(int argc, char ** argv) { const int ARRAY_SIZE = 64; const int ARRAY_BYTES = ARRAY_SIZE * sizeof(float); // generate the input array on the host float h_in[ARRAY_SIZE]; for (int i = 0; i < ARRAY_SIZE; i++) { h_in[i] = float(i); } float h_out[ARRAY_SIZE] .... ..... } ``` First we declare the array size and how many bytes it uses, then fill it with floating-point numbers where element `i` is set to `i`. All standard C so far, nothing GPU-specific. One CUDA convention to note: data on the CPU (the host) starts with `h`; data on the GPU (the device) starts with `d`. ```c // declare GPU memory pointers float * d_in; float * d_out; ``` Pointers on the GPU are declared the same way as on the CPU, just a `float *`. The difference is what they point to: if you access a CPU pointer as GPU memory (or vice versa), you'll have a bad time. ```c // allocate GPU memory cudaMalloc((void**) &d_in, ARRAY_BYTES); cudaMalloc((void**) &d_out, ARRAY_BYTES); ``` `cudaMalloc` takes two arguments, the pointer and the number of bytes to allocate. It allocates data on the GPU, whereas a plain `malloc` would allocate on the CPU. ```c // transfer the array to the GPU cudaMemcpy(d_in, h_in, ARRAY_BYTES, cudaMemcpyHostToDevice); ``` `cudaMemcpy` copies the array `h_in` from the CPU to the array `d_in` on the GPU. It's like a regular `memcpy`, but with a fourth argument that specifies the transfer direction. The three options are `cudaMemcpyHostToDevice`, `cudaMemcpyDeviceToHost`, and `cudaMemcpyDeviceToDevice`. ## Launching the kernel ```c // launch the kernel square<<>>(d_out, d_in); ``` This is the CUDA launch operator: ``` <<>>; ``` The line says: launch the kernel `square` on one block of 64 elements. The arguments to the kernel are two pointers, `d_out` and `d_in`. This tells the CPU to launch 64 copies of the kernel on 64 threads. The kernel can only be called on GPU data, not CPU data. After the kernel runs, a second `cudaMemcpy` moves memory from device to host into `h_out`. ```c // print out the resulting array for (int i =0; i ; ARRAY_SIZE; i++) { printf("%f", h_out[i]); printf(((i % 4) != 3) ? "\t" : "\n"); } cudaFree(d_in); c udaFree(d_out); return 0; ``` We walk through `h_out`, print four values per line, then free the GPU memory. Most CUDA programs follow the same shape: create data on the CPU, allocate on the GPU, copy CPU -> GPU, launch kernels, copy GPU -> CPU, continue processing. ## The kernel ```c __global__ void square(float * d_out, float * d_in){ int idx = threadIdx.x; float f = d_in[idx]; d_out[idx] = f * f; } ``` The kernel looks like a serial program that runs on one thread. The CPU is responsible for launching it on many parallel threads. The `__global__` qualifier is how CUDA knows this is a kernel rather than CPU code. `void` means the kernel doesn't return a value; it writes its output into the pointer passed in. The body: - `threadIdx.x` gives each thread its index within the block. `threadIdx` is a struct with `.x`, `.y`, `.z` members (a `dim3`). With 64 threads launched, the first instance returns 0, the next 1, up to 63 for the last. - Each thread reads its array element from global memory into `f`, squares it, and writes the result back to the output array at the same index. That's it. Everything else looks like straightforward C. This post is from my notes while taking the Udacity [Intro to Parallel Programming](https://www.udacity.com/course/cs344) course. --- ## Introduction to Parallel Programming - URL: https://blog.prabeeshk.com/blog/2013/02/22/introduction-to-parallel-programming/ - Date: 2013-02-22 - Keywords: GPU parallel programming, CUDA programming model, GPGPU computing, parallel computing GPU, Udacity parallel programming This post focuses on parallel computing on the GPU. Parallel computing solves large problems by breaking them into smaller pieces and running those pieces at the same time. ## Why GPUs for parallel computing Modern processors are made from transistors. Each year those transistors get smaller. The feature size is the minimum size of a transistor on a chip. As it decreases, transistors get smaller, run faster, use less power, and you can fit more of them on a chip, giving more resources for computation every year. One of the primary features of processors is clock speed. For many years clock speeds went up steadily, but over the last decade they have essentially remained constant. Even though transistors continue to get smaller, faster, and more power-efficient per transistor, running a billion of them generates a lot of heat. Because we cannot keep all these processors cool, power has become a primary design constraint. Traditional CPUs have very complicated control hardware. That allows flexibility and performance, but as the control logic gets more complicated it becomes expensive in terms of power and design complexity. An alternative is to build simpler control structures and compensate with a large number of parallel compute units. Each unit is small, simple, and power-efficient, so you can put a large number of them on a single chip and program them to work together on complex problems. A high-end GPU contains over 3,000 ALUs that can run 3,000 arithmetic operations simultaneously. GPUs can have tens of thousands of active parallel threads, with up to 65,000 running concurrently. Together, all this computing power can solve problems faster than a single fast CPU core. To build a power-efficient processor you have two choices: minimise latency (the time to complete one task) or maximise throughput (tasks completed per unit time). Maximising throughput is the right approach because latency lags bandwidth. ## CUDA (Compute Unified Device Architecture) CUDA is a parallel programming platform and programming model created by NVIDIA. It gives developers access to the virtual instruction set and memory of the parallel computational elements in CUDA GPUs. Unlike CPUs, a GPU is a parallel throughput architecture that emphasises executing many concurrent threads slowly rather than executing a single thread very quickly. This approach to solving general-purpose problems on the GPU is known as GPGPU. The CUDA programming model lets you program both the CPU and the GPU from one program. The typical workflow is: 1. The CPU allocates storage on the GPU. 2. The CPU copies input data from host memory to the GPU. 3. The CPU launches kernels that run on the GPU and process the data. 4. The CPU copies the results back from the GPU. This post is a set of short notes from the Udacity [Intro to Parallel Programming](https://www.udacity.com/course/cs344) course. --- ## Building a Simple Game with HTML5 Canvas and JavaScript - URL: https://blog.prabeeshk.com/blog/2013/02/09/developing-a-simple-game-with-html5-slash-canvas/ - Date: 2013-02-09 - Keywords: HTML5 canvas game, JavaScript game programming, canvas animation, keyboard input canvas HTML5 introduced the `` element for 2D drawing: a rectangular area on the page that JavaScript can paint into. This post walks through building a simple bird-shooting game using canvas drawing, `setInterval` timing, keyboard events, and basic collision detection. Add a canvas element to the HTML document: ```html ``` To draw inside the canvas, use JavaScript. First find the canvas element using getElementById, then initialize the context. ```javascript ``` To draw text on a canvas, the most important property and methods are: font - defines the font properties for text fillText(text, x, y) - draws filled text on the canvas To set the font, size, and style of HTML5 canvas text, set the font property of the canvas context to a string containing the font style, size and family separated by spaces. ```javascript ``` To draw an image on a canvas use the following method drawImage(image, X, Y, Width, Height) ```javascript ``` To rotate the HTML5 Canvas, use the rotate() transform method. The rotate transformation requires an angle in radians. context.rotate(angle); ```javascript ``` To get the event occurring in the document, use EventListener. ```javascript document.addEventListener('keydown', function); ``` The functions used for game development are The `WhichKey()` function reads the keyboard input. The UP/DOWN arrow keys control gun movements. The Enter key starts the game. The space key fires. ```javascript function WhichKey(event) { if (event.keyCode==38) //UP arrow gunUpMove(); if (event.keyCode==40) //Down arrow gunDownMove(); if (event.keyCode==32) // Space Key firing(); if (event.keyCode==13) //Enter Key } ``` The `counter()` function sets up a countdown timer. Using `setInterval(function, timeInMs)`, the counter is called every 1000 ms (1 second). ```javascript var count = 60; clear=setInterval(counter, 1000); function counter() { count-= 1; if (count <= 0) { clearInterval(clear); } //code for show the counter } ``` The birdAppearing() is used for appearing bird on the canvas at random points and disappear after a particular interval. ```javascript function birdAppearing() { if (start==1) { var posx = Math.floor((Math.random()*635)+85); var posy = Math.floor((Math.random()*305)+85); context2.drawImage(bird, posx, posy, 40, 40); posX = posx; posY = posy; setTimeout(function() {context2.clearRect(0, 0, 800, 450); birdAppearing(); },2000); } } ``` The firing() is used for creating red dotted lines in canvas. If the dotted line coordinates exceed the canvas coordinates then it restores to the initial position. The end point coordinate of the dotted line always compares with the coordinates of the center point of the bird image. If the difference between these points less than the radius of the circle centered at bird image then dotted lines restore the initial position and also the bird will die and get a point. ```javascript function firing() { while(1) { context.strokeStyle = "red"; context.lineWidth = 2; context.beginPath(); context.moveTo(fireStart+= 8,14); context.lineTo(fireEnd+= 8,14); context.stroke(); if ((fireEnd*Math.cos(angle*Math.PI/60)) >800 || (fireEnd*Math.sin(angle*Math.PI/60)) >450){ setTimeout(function() { context.clearRect(85, 10, fireEnd, 6); fireStart = 85; fireEnd = 87; },16); return; } if (Math.abs(posX+20-(fireStart*Math.cos(angle*Math.PI/60))) < 20 && (Math.abs( posY+20-(fireEnd*Math.sin(angle*Math.PI/60))) < 20)) { setTimeout(function() { context.clearRect(85, 10, fireEnd, 6); fireStart = 85; fireEnd = 87; },16); context2.clearRect(0, 0, 800, 450); context2.drawImage(deadBird, posX, posY, 60, 60); score += 1; context0.font = " 20px sans-serif"; context0.textAlign = "left"; context0.fillStyle="black"; context0.textBaseline = "top"; context0.clearRect(200, 0, 800, 500); context0.fillText('Score :'+ score, 700, 10); return; } } ``` The [source code is available here](https://github.com/prabeesh/Game-Javascript-Canvas-GAE). --- ## Measuring an RC Time Constant with an ATmega8 - URL: https://blog.prabeeshk.com/blog/2012/07/14/finding-rc-constant-using-atmega8/ - Date: 2012-07-14 - Keywords: RC time constant AVR, ATmega8 ADC, measuring capacitance microcontroller The time constant of an RC circuit equals `R * C` (in seconds). It is the time for the capacitor to charge through the resistor to 63.2% of the supply voltage, or to discharge to 36.8% of its starting level. ## Approach The ATmega8's ADC samples the voltage across the capacitor continuously. A digital output pin (PB0) supplies the input voltage to the RC network. The moment PB0 goes high, Timer1 starts counting. When the ADC reading hits 63.2% of the rail (roughly 161 out of 255 for a 5 V supply), we capture the timer value. Timer1 is a 16-bit counter, clocked at 8 MHz with a 1024 prescaler. To convert timer ticks to seconds, multiply by `1024 / 8000000`. An LCD shows both the raw tick count and the ADC reading. ## Test results | R | C | Calculated RC | Timer ticks | Measured RC | | --- | --- | --- | --- | --- | | 1 kOhm | 100 uF | 0.100 s | 846 | 0.108 s | | 2 kOhm | 100 uF | 0.200 s | 1864 | 0.239 s | Both within ~20% of the ideal value, which is typical for low-precision components. ## Code ```c #include #define F_CPU 8000000UL #include #define RS 6 //PD6 #define EN 7 //PD7 #define databits PORTC //PC0 to PC3 #define row1 cmd(0x80) #define row2 cmd(0xc0) void adc_init() { //select AVCC reference voltage , left alignment of data and ADC4 ADMUX=((1<> 4); LCD_STROBE(); databits = (c); LCD_STROBE(); } void cmd(unsigned char c) { PORTD &= ~(1 << RS); _delay_us(50); databits = (c >> 4); LCD_STROBE(); databits = (c); LCD_STROBE(); } void clear(void) { cmd(0x01); _delay_ms(5); } void lcd_init() { _delay_ms(15); cmd(0x30); _delay_ms(1); cmd(0x30); _delay_us(100); cmd(0x30); cmd(0x28); // Function set (4-bit interface, 2 lines, 5*7Pixels) cmd(0x28); // Function set (4-bit interface, 2 lines, 5*7Pixels) cmd(0x0c); // Make cursorinvisible clear(); // Clear screen cmd(0x6); // Set entry Mode(auto increment of cursor) } void print(char *p) { while(*p) data(*p++); } void main() { char a[5],b[5]; int c,d; DDRB=0x01; _delay_ms(50); TCCR1B|=(1< Programmer**, pick your ISP (e.g. USBasp). 4. Under **Tools > Board**, select ATmega8. ## Fuse-byte configuration The Arduino runtime assumes an 8 MHz clock. You need the fuse bytes set correctly before uploading any sketch. For the **internal 8 MHz RC oscillator** (no crystal required): ```c -U lfuse:w:0xa4:m -U hfuse:w:0xcc:m ``` For an **external 8 MHz crystal**: ```c -U lfuse:w:0xef:m ``` Once the fuses are set, use **Sketch > Upload Using Programmer** in the IDE. The sketch compiles, links, and flashes exactly as it would on a real board. ## Pin mapping The ATmega8 pins map to Arduino "digital pin" numbers. Refer to the diagram below when wiring peripherals: ![ATmega8 pin mapping](/images/arduino.png) ## Related - [How to build a USBtinyISP](/blog/2012/07/04/simplest-and-low-cost-usb-avr/) if you need a programmer - [Measuring an RC time constant with an ATmega8](/blog/2012/07/14/finding-rc-constant-using-atmega8/) for another bare-chip project --- ## How to Build a USBtinyISP: Low-Cost DIY AVR Programmer - URL: https://blog.prabeeshk.com/blog/2012/07/04/simplest-and-low-cost-usb-avr/ - Date: 2012-07-04 - Keywords: USBtinyISP, DIY AVR programmer, AVR microcontroller, ATtiny2313, ATmega8, USB programmer Atmel AVR chips power a lot of hobby and embedded projects, small, cheap, well-documented. To get code onto them you need a programmer, and commercial ISPs run $20-$40 for something that's basically an ATtiny running open-source firmware. The USBtinyISP flips that: it's a DIY programmer built around an ATtiny2313, costs a few dollars in parts, and works with almost any AVR target (ATtiny, ATmega, etc.). This post walks through building one. ## What you need - An ATtiny2313 (the microcontroller running the programmer firmware) - Passive components: resistors, capacitors, a 12 MHz crystal, a USB-B connector - A 6-pin ISP header to connect to target chips - A PCB or breadboard The reference design is well documented, [Adafruit's USBtinyISP guide](https://learn.adafruit.com/usbtinyisp) has the schematic, PCB layout, and firmware image. Follow that for the build. ![USBtinyISP circuit](/images/usbtiny_circuit.png) ## Bootstrapping: programming the ATtiny2313 There's a chicken-and-egg problem: to flash the USBtinyISP firmware onto the ATtiny2313, you need another programmer. Any working ISP (a friend's, a commercial unit, an Arduino-as-ISP) will do, you only need it once. Once the firmware is on the 2313, the board becomes self-sufficient. ## Setting fuse bits Fuse bits control clock source, brown-out detection, and reset behavior. For the USBtinyISP running at 12 MHz from an external crystal, set: ```bash avrdude -c usbasp -p t2313 \ -U hfuse:w:0xdf:m \ -U lfuse:w:0xef:m ``` If you're bootstrapping with a serial-port ISP instead of USB, swap `-c usbasp` for `-c stk200`. ## Using the programmer Once built and flashed, the USBtinyISP shows up as a USB device. avrdude talks to it out of the box: ```bash # Read the target chip's signature to confirm the wiring avrdude -c usbtiny -p m8 -n # Flash an ATmega8 with a compiled hex file avrdude -c usbtiny -p m8 -U flash:w:firmware.hex ``` Most AVR toolchains, Arduino IDE, PlatformIO, plain avr-gcc + avrdude, accept `usbtiny` as a programmer type, so you can use the same board across workflows. | Target family | Example chips | Notes | | --- | --- | --- | | ATtiny | ATtiny13, ATtiny85, ATtiny2313 | Fully supported | | ATmega (8-bit) | ATmega8, ATmega328P, ATmega32 | Fully supported | | AVR32 (32-bit) | AT32UC3 series | Check target datasheet first | ## My build Here's the board I ended up with: ![USBtinyISP board, top view](/images/040720129881.jpeg) ![USBtinyISP board, angled view](/images/04072012989.jpeg) ## Related - [Programming a standalone ATmega8 with Arduino code](/blog/2012/07/14/running-arduino-codes-in-stand-alone/) - [Finding the RC constant using an ATmega8](/blog/2012/07/14/finding-rc-constant-using-atmega8/) - [Introduction to AVR programming](/blog/2012/02/21/introduction-to-avr-programing/) --- ## MSP430 LCD Interfacing: Displaying ADC Readings - URL: https://blog.prabeeshk.com/blog/2012/07/04/lcd-interfacing-using-msp430/ - Date: 2012-07-03 - Keywords: MSP430 LCD, MSP430 ADC, LCD 4-bit interface, MSP430 LaunchPad Interfacing a 16x2 LCD with an MSP430 gives you cheap, on-board output for embedded projects. This post wires one up to a LaunchPad, samples a potentiometer with the ADC, and writes the measured voltage to the display. ## Project Overview Our system reads analog voltage from a potentiometer connected to the MSP430's ADC channel and displays the digital value on an LCD screen in real-time. This fundamental pattern applies to countless sensor monitoring applications in embedded systems. ### System Components - MSP430 LaunchPad (main microcontroller board) - 16x2 LCD display for showing readings - Potentiometer as a variable voltage source for ADC input - External 5V power supply for the LCD ## Understanding MSP430 ADC Fundamentals The MSP430's 10-bit ADC provides precise analog-to-digital conversion essential for sensor interfacing. Here's how the conversion process works: ### ADC Resolution and Range The MSP430 LaunchPad operates at **3.6V**, giving us a working voltage range of **0V to 3.6V**. The 10-bit ADC resolution means: Minimum Input (0V): ``` Binary: 0000000000 Decimal: 0 ``` Maximum Input (3.6V): ``` Binary: 1111111111 Decimal: 1023 ``` This gives us a resolution of **3.6V ÷ 1023 = 3.52mV per step**, providing excellent precision for most sensing applications. ### Voltage Calculation Formula To convert ADC readings back to voltage: ``` Voltage = (ADC_Value × 3.6V) ÷ 1023 ``` ## Circuit Design and Connections ### Power Supply Considerations The MSP430 operates at 3.6V while standard LCD displays require 5V for reliable operation. This mixed-voltage design requires careful consideration: - Potentiometer supply: powered from the MSP430's 3.6V rail - LCD supply: powered from an external 5V source - Data lines: the MSP430's 3.6V logic levels are compatible with 5V LCD inputs ### Pin Configuration The LCD uses a 4-bit interface to minimize pin usage while maintaining functionality: ```c // LCD Pin Assignments #define LCM_PIN_RS BIT2 // P1.2 - Register Select #define LCM_PIN_EN BIT1 // P1.1 - Enable Signal #define LCM_PIN_D7 BIT7 // P1.7 - Data Bit 7 #define LCM_PIN_D6 BIT6 // P1.6 - Data Bit 6 #define LCM_PIN_D5 BIT5 // P1.5 - Data Bit 5 #define LCM_PIN_D4 BIT4 // P1.4 - Data Bit 4 ``` **Potentiometer connection**: P1.0 (ADC Channel A0) ## Code Implementation Breakdown ### ADC Configuration ```c void adc_init() { // Configure ADC: 10-bit resolution, internal reference ADC10CTL0 = ADC10ON | ADC10SHT_2 | SREF_0; // Select input channel A0, single-conversion mode ADC10CTL1 = INCH_0 | SHS_0 | ADC10DIV_0 | ADC10SSEL_0 | CONSEQ_0; // Enable analog input on P1.0 ADC10AE0 = BIT0; // Enable conversions ADC10CTL0 |= ENC; } ``` Key Configuration Details: - `ADC10SHT_2`: Sets sample-and-hold time for accurate conversions - `SREF_0`: Uses VCC and VSS as voltage references - `INCH_0`: Selects input channel A0 (P1.0) ### LCD Communication Protocol The LCD uses a parallel interface with specific timing requirements: ```c void PulseLcm() { // LCD Enable pulse sequence for data transfer LCM_OUT &= ~LCM_PIN_EN; // Pull EN low __delay_cycles(200); // Wait for setup time LCM_OUT |= LCM_PIN_EN; // Pull EN high __delay_cycles(200); // Hold time LCM_OUT &= ~LCM_PIN_EN; // Pull EN low __delay_cycles(200); // Recovery time } ``` The LCD requires precise enable pulse timing for reliable data transfer. ### Main Application Loop ```c void main(void) { adc_init(); int i, adc_value; char display_buffer[5]; WDTCTL = WDTPW + WDTHOLD; // Disable watchdog timer InitializeLcm(); while(1) { // Start ADC conversion start_conversion(); // Wait for conversion completion while(converting()); // Read conversion result adc_value = ADC10MEM; // Convert to ASCII string itoa(adc_value, display_buffer, 10); // Update LCD display ClearLcmScreen(); PrintStr(display_buffer); // Simple delay for display stability for(i = 0; i < 5000; i++); } } ``` ## Complete Implementation Code Here's the full implementation with comprehensive LCD and ADC handling: ```c #include // LCD Pin Definitions #define LCM_DIR P1DIR #define LCM_OUT P1OUT #define LCM_PIN_RS BIT2 // P1.2 #define LCM_PIN_EN BIT1 // P1.1 #define LCM_PIN_D7 BIT7 // P1.7 #define LCM_PIN_D6 BIT6 // P1.6 #define LCM_PIN_D5 BIT5 // P1.5 #define LCM_PIN_D4 BIT4 // P1.4 #define LCM_PIN_MASK ((LCM_PIN_RS | LCM_PIN_EN | LCM_PIN_D7 | LCM_PIN_D6 | LCM_PIN_D5 | LCM_PIN_D4)) #define FALSE 0 #define TRUE 1 // ADC Functions void adc_init() { ADC10CTL0 = ADC10ON | ADC10SHT_2 | SREF_0; ADC10CTL1 = INCH_0 | SHS_0 | ADC10DIV_0 | ADC10SSEL_0 | CONSEQ_0; ADC10AE0 = BIT0; ADC10CTL0 |= ENC; } void start_conversion() { ADC10CTL0 |= ADC10SC; } unsigned int converting() { return ADC10CTL1 & ADC10BUSY; } // LCD Functions void PulseLcm() { // pull EN bit low LCM_OUT &= ~LCM_PIN_EN; __delay_cycles(200); // pull EN bit high LCM_OUT |= LCM_PIN_EN; __delay_cycles(200); // pull EN bit low again LCM_OUT &= (~LCM_PIN_EN); __delay_cycles(200); } void SendByte(char ByteToSend, int IsData) { // clear out all pins LCM_OUT &= (~LCM_PIN_MASK); LCM_OUT |= (ByteToSend & 0xF0); if (IsData == TRUE) { LCM_OUT |= LCM_PIN_RS; } else { LCM_OUT &= ~LCM_PIN_RS; } PulseLcm(); LCM_OUT &= (~LCM_PIN_MASK); LCM_OUT |= ((ByteToSend & 0x0F) << 4); if (IsData == TRUE) { LCM_OUT |= LCM_PIN_RS; } else { LCM_OUT &= ~LCM_PIN_RS; } PulseLcm(); } void LcmSetCursorPosition(char Row, char Col) { char address; // construct address from (Row, Col) pair if (Row == 0) { address = 0; } else { address = 0x40; } address |= Col; SendByte(0x80 | address, FALSE); } void ClearLcmScreen() { // Clear display, return home SendByte(0x01, FALSE); SendByte(0x02, FALSE); } void InitializeLcm(void) { LCM_DIR |= LCM_PIN_MASK; LCM_OUT &= ~(LCM_PIN_MASK); __delay_cycles(100000); LCM_OUT &= ~LCM_PIN_RS; LCM_OUT &= ~LCM_PIN_EN; LCM_OUT = 0x20; PulseLcm(); SendByte(0x28, FALSE); SendByte(0x0E, FALSE); SendByte(0x06, FALSE); } void PrintStr(char *Text) { char *c; c = Text; while ((c != 0) && (*c != 0)) { SendByte(*c, TRUE); c++; } } void main(void) { adc_init(); int i, a; char b[5]; WDTCTL = WDTPW + WDTHOLD; // Stop watchdog timer InitializeLcm(); while(1) { start_conversion(); while(converting()); a = ADC10MEM; itoa(a, b, 10); // integer to ASCII ClearLcmScreen(); PrintStr(b); for(i = 0; i < 5000; i++); } } ``` ## Video Demonstration {{< youtube hsM_o5hNUmg >}} Watch the complete project in action, demonstrating real-time ADC value updates as the potentiometer is adjusted. --- ## Introduction to AVR Microcontroller Programming - URL: https://blog.prabeeshk.com/blog/2012/02/21/introduction-to-avr-programing/ - Date: 2012-02-20 - Keywords: AVR programming tutorial, Atmel microcontroller programming, AVR development setup, AVR-GCC compiler, embedded C programming, AVR LED blink program, USBASP programmer, microcontroller development tools The Atmel AVR family covers 8-bit and 32-bit microcontrollers with a compact instruction set that maps well to C. The chips are cheap, the toolchain is free (GCC-based), and a USB programmer like the USBasp lets you flash code in seconds. This post walks through setting up the Linux toolchain and flashing a first LED-blink program. ## Packages required on Linux - `binutils-avr`: assembler, linker, and object-file utilities targeting the AVR architecture. - `gcc-avr`: the GNU C cross-compiler for AVR. - `avr-libc`: C standard library, startup code, and header files for AVR targets. ## Sample program: blink an LED ```c #include #include main() { DDRC |=1<` insert a timed pause between the two states. --- ## Generating and Plotting an AM Wave with Matplotlib - URL: https://blog.prabeeshk.com/blog/2011/09/25/am-plot-matplotlib/ - Date: 2011-09-24 - Keywords: amplitude modulation, AM generation, Matplotlib Python, signal processing, analog communication Amplitude Modulation (AM) is a type of analog communication in which the amplitude of a carrier signal is varied in proportion to the message signal. It's one of the oldest and most widely used forms of radio communication, still used in AM broadcasting and some aviation and military systems. This post generates and plots an AM wave in Python, using NumPy for the math and Matplotlib for the plot. Familiarity with basic Python is useful; signal-processing knowledge isn't required. ## The approach Import `matplotlib.pylab` and `numpy`, define the carrier frequency `fc`, the message frequency `fm`, and a time array `t`. Generate the carrier and message signals with NumPy's `sin`, multiply them together to form the AM signal, and plot the result. ## The code ```Python # Import the matplotlib.pylab and numpy modules import matplotlib.pylab as plt import numpy as np # Set the carrier frequency and message frequency fc = 50 fm = 5 # Generate the time array t = np.arange(0, 1, 0.001) # Generate the carrier signal c = np.sin(2 * np.pi * fc * t) # Generate the message signal m = np.sin(2 * np.pi * fm * t) # Generate the AM signal am = c * (2 + m) # Plot the AM signal plt.plot(am, 'r') plt.show() ``` This produces an AM wave with a 50 Hz carrier and a 5 Hz message, plotted via Matplotlib: ![AM waveform plotted in Matplotlib with a 50 Hz carrier and 5 Hz message signal](/images/am.png) ## Next steps For more on Matplotlib, see the [Matplotlib Artist tutorial](https://matplotlib.org/stable/tutorials/artists.html). From here, try adjusting `fc` and `fm` to see how the carrier and message frequencies interact, or add noise to the signal and see how the plot changes.