Advanced PySpark Design Patterns: Implementation Examples
This builds on basic design patterns in PySpark pipelines (factory, singleton, builder, observer, pipeline). Once those …
Minimal runnable snippets for the five core patterns. For the why and when, see Implementing Design Patterns in PySpark Data Pipelines and Advanced PySpark Design Patterns.
Create data sources without specifying exact types:
1from abc import ABC, abstractmethod
2
3class DataSourceFactory(ABC):
4 @abstractmethod
5 def create_data_source(self):
6 pass
7
8class CSVFactory(DataSourceFactory):
9 def create_data_source(self):
10 return CSVDataSource()
11
12class ParquetFactory(DataSourceFactory):
13 def create_data_source(self):
14 return ParquetDataSource()
15
16# Usage
17factory = CSVFactory()
18source = factory.create_data_source()
Ensure only one instance exists:
1from threading import Lock
2
3class SparkConfig:
4 _instance = None
5 _lock = Lock()
6
7 def __new__(cls):
8 with cls._lock:
9 if cls._instance is None:
10 cls._instance = super().__new__(cls)
11 return cls._instance
12
13# Usage
14config1 = SparkConfig()
15config2 = SparkConfig()
16assert config1 is config2 # Same instance
Construct complex objects step by step:
1class TransformBuilder:
2 def __init__(self):
3 self.filters = []
4 self.aggregations = {}
5
6 def add_filter(self, condition):
7 self.filters.append(condition)
8 return self
9
10 def add_aggregation(self, col, func):
11 self.aggregations[col] = func
12 return self
13
14 def build(self):
15 return DataTransform(self.filters, self.aggregations)
16
17# Usage
18transform = (TransformBuilder()
19 .add_filter("status == 'active'")
20 .add_aggregation("amount", "sum")
21 .build())
Notify multiple components of data changes:
1from abc import ABC, abstractmethod
2
3class Observer(ABC):
4 @abstractmethod
5 def update(self, event):
6 pass
7
8class DataSubject:
9 def __init__(self):
10 self.observers = []
11
12 def register(self, observer):
13 self.observers.append(observer)
14
15 def notify(self, event):
16 for obs in self.observers:
17 obs.update(event)
18
19# Usage
20subject = DataSubject()
21subject.register(LoggingObserver())
22subject.notify(DataEvent(data))
Chain transformations sequentially:
1from abc import ABC, abstractmethod
2
3class Transform(ABC):
4 def __init__(self, next_transform=None):
5 self.next = next_transform
6
7 def set_next(self, transform):
8 self.next = transform
9 return transform
10
11 @abstractmethod
12 def process(self, data):
13 pass
14
15class CleanTransform(Transform):
16 def process(self, data):
17 cleaned = data.dropna()
18 return self.next.process(cleaned) if self.next else cleaned
19
20# Usage
21pipeline = CleanTransform()
22pipeline.set_next(ValidateTransform()).set_next(EnrichTransform())
23result = pipeline.process(df)
Each snippet above is self-contained. Copy it into a new module, wire it into your pipeline, and adjust the class names to match your domain. For a deeper walkthrough of when and why to use each one, see the full tutorial. For strategy, decorator, and template method patterns, see the advanced patterns post.