Advanced PySpark Design Patterns: Real-World Implementation Examples
Building upon our previous discussion of basic design patterns in PySpark data pipelines,Improve PySpark Data Pipelines …
This quick reference provides concise code snippets for the five essential design patterns in PySpark data pipelines. For detailed explanations, see Implementing Design Patterns in PySpark Data Pipelines.
Create data sources without specifying exact types:
1from abc import ABC, abstractmethod
2
3class DataSourceFactory(ABC):
4 @abstractmethod
5 def create_data_source(self):
6 pass
7
8class CSVFactory(DataSourceFactory):
9 def create_data_source(self):
10 return CSVDataSource()
11
12class ParquetFactory(DataSourceFactory):
13 def create_data_source(self):
14 return ParquetDataSource()
15
16# Usage
17factory = CSVFactory()
18source = factory.create_data_source()
Ensure only one instance exists:
1from threading import Lock
2
3class SparkConfig:
4 _instance = None
5 _lock = Lock()
6
7 def __new__(cls):
8 with cls._lock:
9 if cls._instance is None:
10 cls._instance = super().__new__(cls)
11 return cls._instance
12
13# Usage
14config1 = SparkConfig()
15config2 = SparkConfig()
16assert config1 is config2 # Same instance
Construct complex objects step by step:
1class TransformBuilder:
2 def __init__(self):
3 self.filters = []
4 self.aggregations = {}
5
6 def add_filter(self, condition):
7 self.filters.append(condition)
8 return self
9
10 def add_aggregation(self, col, func):
11 self.aggregations[col] = func
12 return self
13
14 def build(self):
15 return DataTransform(self.filters, self.aggregations)
16
17# Usage
18transform = (TransformBuilder()
19 .add_filter("status == 'active'")
20 .add_aggregation("amount", "sum")
21 .build())
Notify multiple components of data changes:
1from abc import ABC, abstractmethod
2
3class Observer(ABC):
4 @abstractmethod
5 def update(self, event):
6 pass
7
8class DataSubject:
9 def __init__(self):
10 self.observers = []
11
12 def register(self, observer):
13 self.observers.append(observer)
14
15 def notify(self, event):
16 for obs in self.observers:
17 obs.update(event)
18
19# Usage
20subject = DataSubject()
21subject.register(LoggingObserver())
22subject.notify(DataEvent(data))
Chain transformations sequentially:
1from abc import ABC, abstractmethod
2
3class Transform(ABC):
4 def __init__(self, next_transform=None):
5 self.next = next_transform
6
7 def set_next(self, transform):
8 self.next = transform
9 return transform
10
11 @abstractmethod
12 def process(self, data):
13 pass
14
15class CleanTransform(Transform):
16 def process(self, data):
17 cleaned = data.dropna()
18 return self.next.process(cleaned) if self.next else cleaned
19
20# Usage
21pipeline = CleanTransform()
22pipeline.set_next(ValidateTransform()).set_next(EnrichTransform())
23result = pipeline.process(df)
These patterns help create maintainable, testable, and scalable PySpark pipelines. Choose the pattern that best fits your specific use case.