Lập trình hướng đối tượng (Object-Oriented Programming - OOP) là một mô hình lập trình mà tổ chức code bằng cách sử dụng các đối tượng (object). Các đối tượng này là những thể hiện (instance) của các lớp (class). Ta có thể hiệu một class giống như một bản thiết kế (một blueprint hay một template) và một object giống như một phiên bản cụ thể được tạo ra từ bản thiết kế đó.
OOP giúp data engineer:
Chia nhỏ và tổ chức code thành các thành phần khác nhau một cách logic, từ đó mà ta có thể tăng khả năng tái sử dụng code. Các keywords bạn có thể tham khảo thêm là “code modularity” và “reusable code”.
Duy trì, bảo trì code (maintain code) dễ dàng hơn nhờ các tính chất căn bản trong OOP như encapsulation, abstraction, inheritance, và polymorphism. Ta có thể thực hiện những việc như là sửa logic tại một nơi và thay đổi đó sẽ được áp dụng cho tất cả các nơi khác, hoặc ta có thể thay đổi logic hoạt động bên trong một cấu phần nào đó mà không ảnh hưởng tới phần còn lại của hệ thống/pipeline.
Mở rộng code dễ dàng hơn. Vì các business rules, data schema, data format thường thay đổi nên việc có thể mở rộng code một cách dễ dàng cũng rất quan trọng. Thông qua các tính chất cơ bản của OOP, có thể kết hợp cùng sử dụng các design pattern phù hợp thì việc mở rộng code sẽ đơn giản hơn, hạn chế sai sót và tránh việc phải thực hiện regression test quá nhiều. Dự án càng lớn và càng phức tạp thì codebase càng lớn và khi đó vai trò của OOP càng được thể hiện rõ hơn so với việc chỉ viết các logic ra một cách đơn giản trong các jupyter notebook.
OOP cũng giúp việc viết các Unit test dễ dàng hơn. Có các unit test là cần thiết để đảm bảo các logic quan trọng hoạt động chính xác.
Một Python class (một lớp trong Python) là một bản thiết kế hay mẫu mà từ đó ta có thể tạo các object cùng kiểu. Class bao gồm các thành phần chính sau:
Ta dùng từ khoá class để khởi tạo 1 class (tên class
thường ở dạng CamelCase theo thông lệ trong PEP-8).
Ví dụ: class MyClass:
Attribute (thuộc tính) là các biến dùng để lưu trữ dữ liệu liên quan tới class hoặc instance của class
Class Attribute: được chia sẻ và sử dụng chung bởi tất cả các instance của class. Class attribute được định nghĩa trực tiếp trong phần body của class chứ không nằm trong bất kỳ một method nào.
Instance Attribute: là các thuộc tính riêng của mỗi
instance. Instance attribute được định nghĩa trong phương thức
__init__ bằng cách sử dụng cú pháp
self.attribute_name. Method __init__ là một
method đặc biệt trong Python, nó tự động được gọi khi một instance được
khởi tạo.
Là các hàm (functions) được định nghĩa trong class và hoạt động trên dữ liệu của class.
Class Method: hoạt động ở level của class thay vì là
ở một instance cụ thể. Để tạo một class method, ta dùng decorator
@classmethod. Một class method phải có tối thiểu 1 tham số
(parameter), tham số đầu tiên bao giờ cũng là “cls” (tất nhiên bạn có
thể chọn một cái tên bất kì, nhưng tốt nhất ta nên theo convention và
đặt tên như vậy). “cls” ở đây là tham chiếu tới chính class mà chứa
class method đó. Khi ta gọi class method thì Python sẽ tự động truyền
chính class đó vào tham số đầu tiên của method.
class TestClass():
@classmethod
def print_something(cls):
print("something")
TestClass.print_something()## something
Instance Method: được định nghĩa ở class và hoạt
động ở level của một instance cụ thể của class. Một instance method phải
có tối thiểu 1 tham số và tham số đầu tiên theo convention bao giờ cũng
là “self”, tham chiếu tới chính instance đó. Khi gọi instance method (cú
pháp instance.instance_method()), Python sẽ gọi method đó
từ class và truyền instance vào tham số đầu tiên của method.
class User:
def __init__(self, name):
self.name = name
def greet(self): # <-- this is an instance method
print(f"Hello, my name is {self.name}")
user = User("Tuan")
user.greet()## Hello, my name is Tuan
Static Method: chỉ đơn giản là một hàm nằm trong
class. Python không tự truyền class hay instance vào tham số của static
method. Ta thường tạo các utility functions dưới dạng static method.
Class lúc này giống như một cách để nhóm các utility function đó lại. Để
tạo static method, ta dùng decorator @staticmethod.
# Define DataValidator class containing utility methods
class DataValidator:
@staticmethod
def is_valid_email(email):
return "@" in email and "." in email
@staticmethod
def is_valid_phone(phone):
pass
@staticmethod
def is_non_empty(value):
pass
# List of sample emails (e.g., from a CSV or API)
emails = [
"tuan.phan@example.com",
"some_one.com",
"tuan@company",
"tuan.phan@domain.org"
]
# Filter valid emails using the static method
valid_emails = [
email for email in emails if DataValidator.is_valid_email(email)
# hoặc ta có thể gọi static method từ instance cũng được, như sau: DataValidator().is_valid_email(email)
]
print("Valid emails:")## Valid emails:
## - tuan.phan@example.com
## - tuan.phan@domain.org
__init__Đây là một method đặc biệt trong Python, thường được gọi là constructor (hoặc đúng hơn thì là initialiser). Nó được tự động gọi sau khi một instance của class được khởi tạo. Mục đích là ta muốn khởi tạo một instance với những giá trị nhất định.
Method này luôn phải có tham số đầu tiên là self, là
tham chiếu tới instance mà đang được khởi tạo. Ta có thể có thêm các
tham số khác cho method này và khi khởi tạo instance thì ta có thể
truyền giá trị vào cho các tham số đó (trong lập trình thì các giá trị
này gọi là đối số - argument).
class DataSource:
def __init__(self, file_path, format):
self.file_path = file_path
self.format = format
source_1 = DataSource("/data/users.csv", "csv")
print(source_1.file_path)## /data/users.csv
## csv
selfLà một tham chiếu (reference) đến instance của class. Ta dùng self để truy cập và thay đổi các attribute của instance và để truy cập các method của instance.
Mọi object đều được tạo ra từ một class nào đó (tức là đều là một instance của một class nào đó). Mỗi object thì lại có một bộ các instance attributes riêng.
## <class 'int'>
## <class 'str'>
## <class 'function'>
## <class 'type'>
Đây là việc ta nhóm dữ liệu (chính là các thuộc tính - attributes) và các phương thức (methods) mà hoạt động trên những dữ liệu đó vào trong cùng một object (chính là class và các instance của class) và hạn chế khả năng truy cập đến một vài thành phần bên trong của object đó.
Lợi ích của encapsulation:
Bảo vệ các thành phần bên trong object khỏi những thay đổi ngoài ý muốn
Giúp code trở nên dễ mantain và ít phát sinh lỗi hơn.
Cho phép ta thay đổi các thành phần, logic bên trong object mà không ảnh hưởng đến phần code mà dùng để cho object đó tương tác với các object khác bên ngoài.
Ví dụ: Ta có một class DataIngestionJob như code dưới
đây.
import pandas as pd
import numpy as np
from pandas import DataFrame
class DataIngestionJob:
def __init__(self, source_name: str) -> None:
self.source_name = source_name
self.__status = "initialized" # private attribute
self.__records_loaded = 0 # private attribute
def run(self, df: DataFrame):
if df.empty:
self.__status = "failed"
else:
self.__records_loaded = df.shape[0]
self.__status = "completed"
@property
def status(self):
return self.__status
@property
def records_loaded(self):
return self.__records_loadedTa có một pandas dataframe như sau:
df = pd.DataFrame({
'id': range(1, 5),
'name': [f'Name_{i}' for i in range(1, 5)],
'age': np.random.randint(20, 60, size=4)
})| id | name | age |
|---|---|---|
| 1 | Name_1 | 50 |
| 2 | Name_2 | 43 |
| 3 | Name_3 | 59 |
| 4 | Name_4 | 40 |
Ta khởi tạo một instance của class DataIngestionJob, sau
đó sử dụng method run bằng cách truyền dataframe
df vào tham số của method đó.
## completed
## 4
Hãy cùng xem tính đóng gói được thể hiện như thế nào:
Bảo vệ trạng thái bên trong của object: Ta thấy rằng có 2 private
attribute được tạo đó là __status và
__records_loaded . Hai attribute này khó có thể bị trực
tiếp thay đổi từ bên ngoài class. Nói là “khó có thể” vì thực ra thì 2
attribute này không thực sự hẳn là private mà khi tạo các attribute bắt
đầu với 2 dấu gạch dưới liên tiếp nhau thì python tự động sử dụng “name
mangling” để thay đổi tên của chúng, để hạn chế việc sơ ý thay đổi giá
trị của các attribute này.
## AttributeError: 'DataIngestionJob' object has no attribute '__status'
print(job._DataIngestionJob__status) # do name mangling, attribute __status đã bị đổi tên thành dạng _[tên Class]__status## completed
Ta sử dụng decorator @property để biến
2 method status() và records_loaded() thành
một read-only interface để truy cập giá trị của 2 private attribute.
Hiểu một cách đơn giản thì với việc dùng @property, ta có thể gọi 2 method
status() và records_loaded() với syntax như là
gọi các public attribute. Về bản chất thì ta đang dùng method để truy
cập giá trị của các private attribute (vì ta muốn bảo vệ các giá trị này
không cho ai thay đổi chúng).
Nếu ta thử thay đổi chúng thì sẽ bị báo lỗi như dưới đây.
## AttributeError: property 'status' of 'DataIngestionJob' object has no setter
Điều này giúp tránh trường hợp các attribute này bị thay đổi do nhầm lẫn (có thể do sơ suất khi viết code) hoặc bị một đoạn code từ một ai đó, hoặc một ứng dụng nào đó khác thay đổi.
# Giờ giả sử nếu có đoạn code sau đây được chạy nhằm
# cố để thay đổi giá trị của status, Python sẽ tạo ra
# 1 attribute mới tên là __status. Attribute này sẽ
# không được sử dụng trong method run() và status()
# vì vậy property job.status của chúng ta vẫn an toàn,
# vẫn giữ nguyên giá trị của nó
job.__status = "a new value"
print(job.status)## completed
## a new valueGiúp code dễ maintain và ít lỗi hơn: việc thay đổi giá trị của
status được tập trung vào method run() và nếu ta muốn thay
đổi logic gì liên quan tới status thì ta chỉ việc thay đổi ở trong
method này là được.
Cho phép ta thay đổi các thành phần, logic bên trong object mà
không ảnh hưởng đến phần code mà dùng để cho object đó tương tác với các
object khác bên ngoài: Giả sử ta muốn lưu lại số bản ghi được xử lý sau
mỗi lần chạy method run() và ta muốn property
records_loaded trả ra kết quả là tổng số bản ghi đã được xử
lý tính đến thời điểm hiện tại thay vì là số bản ghi của lần xử lý gần
nhất. Vậy ta chỉ cần làm 2 thao tác như sau: 1) thêm private attribute
self.__records_log = [] và 2) đổi logic của method
records_loaded() để trả ra
sum(self.__records_log) thay vì trả ra
self.__records_loadded như trước.
import pandas as pd
import numpy as np
from pandas import DataFrame
class DataIngestionJob:
def __init__(self, source_name: str) -> None:
self.source_name = source_name
self.__status = "initialized" # private attribute
self.__records_loaded = 0 # private attribute
self.__records_log = [] # thêm private attribute này vào
def run(self, df: DataFrame):
if df.empty:
self.__status = "failed"
self.__records_log.append(0)
else:
self.__records_loaded = df.shape[0]
self.__status = "completed"
self.__records_log.append(self.__records_loaded)
@property
def status(self):
return self.__status
@property
def records_loaded(self):
return sum(self.__records_log) # thay đổi logic để trả ra tổng records luỹ kế
# Giờ ta sẽ ingest 2 dataframe và xem tổng số bản ghi luỹ kế trả ra có chính xác không
df1 = pd.DataFrame({ # dataframe có 4 bản ghi
'id': range(1, 5),
'name': [f'Name_{i}' for i in range(1, 5)],
'age': np.random.randint(20, 60, size=4)
})
df2 = pd.DataFrame({ # dataframe có 2 bản ghi
'id': range(1, 3),
'name': [f'Name_{i}' for i in range(1, 3)],
'age': np.random.randint(20, 60, size=2)
})
job = DataIngestionJob("user_logs")
job.run(df1)
print(job.records_loaded)## 4
## 6
Như vậy tuy rằng logic bên trong đã thay đổi nhưng code để sử dụng
property vẫn không đổi, vẫn là job.records_loaded.
Lưu ý:
public attribute: Trong Python, public attribute là một attribute mà có thể được truy cập từ cả trong lẫn ngoài class. Đây là loại attribute mặc định của Python và khi đặt tên ta không dùng dấu gạch dưới ở trước tên.
protected attribute: Trong Python, protected
attribute theo thông lệ được đặt tên với một dấu gạch dưới ở trước (vd:
_my_attribute). Các protected attribute có mục đích là để
được sử dụng bên trong class hoặc sử dụng bởi các subclass.
private attribute: Trong Python, private
attribute theo thông lệ được đặt tên với hai dấu gạch dưới ở trước (vd:
__my_attribute). Private attribute có mục đích là để chỉ
được sử dụng bên trong class mà tạo ra nó.
Các public method, protected method, private method cũng tương tự như attribute đã nêu ở trên.
Đây là việc ẩn đi các chi tiết về cách triển khai và chỉ thể hiện ra các tính năng cần thiết của một đối tượng. Tính trừu tượng giúp ta tập trung vào những gì mà đối tượng làm thay vì quan tâm đến cách nó thực hiện. Thông thường, ta sẽ tạo các “interface” (chính là các lớp trừu tượng - “abstract class”) để chỉ ra các hoạt động cần thực hiện, sau đó ta sẽ tạo các lớp cụ thể - “concrete class” để thực hiện các hoạt động đó theo cách riêng của chúng.
Những lợi ích của abstraction:
Giúp ta chia code thành các cấu phần riêng biệt và tăng khả năng tái sử dụng của code (divide code into modules and increase code reusability): Khi ta tạo một abstract class thì chính là ta đang tạo một interface (giao diện) hay một contract (hợp đồng). Trong OOP, interface định nghĩa những việc cần làm chứ không phải làm như thế nào. Sau khi ta đã tạo interface thì việc tiếp theo là tạo các concrete class (các class con, kế thừa từ class mẹ là interface) và triển khai các logic cụ thể ở đó. Dù logic của mỗi concrete class khác nhau và có phức tạp đến đâu thì khi tương tác với các class đó ta cũng đều dùng các method đã được định nghĩa ở interface. Mỗi concrete class có thể coi như là một module và có thể unit test được, nhưng chúng đều dùng chung một interface nên code có tính tái sử dụng.
Separation of concerns (SoC): Việc chia code thành các phần riêng biệt như đề cập bên trên giúp ta viết code (develop), kiểm thử (test), vận hành (operate and maintain) và mở rộng code (extend) dễ hơn vì mỗi cấu phần chỉ tập trung giải quyết một vấn đề riêng biệt.
Loose coupling: Abstraction giúp cho các hệ thống không bị phụ thuộc chặt chẽ vào nhau, thay vào đó thì giao tiếp với nhau thông qua interface đã được thiết lập, giống như thông qua một hợp đồng (contract) đã được ký kết giữa các bên.
Tăng khả năng kiểm thử cho hệ thống (improve testability): Như đã đề cập ở trên, nhờ việc code được chia thành các phần riêng biệt, ta có thể mock dữ liệu (tự tạo ra dữ liệu để kiểm thử thay vì dùng dữ liệu thật từ nguồn) để kiểm thử từng cấu phần đó mà không phải phụ thuộc vào production database hay các ứng dụng bên thứ ba khác, hay network access, … Việc mock dữ liệu cũng giúp ta dễ dàng tạo ra và kiểm tra xem hệ thống ETL có xử lý được các edge case hay không (các edge case là các tình huống đặc biệt, phức tạp, ít xảy ra nhưng nếu xảy ra mà không được xử lý đúng thì thường phát sinh vấn đề).
Tăng khả năng phối hợp làm việc: Các thành viên khác nhau trong nhóm phát triển, hoặc các nhóm khác nhau có thể làm việc với các cấu phần khác nhau trong cùng một thời điểm. Ta chỉ cần thống nhất về interface là các thành viên, các team khác nhau có thể bắt đầu làm việc mà chưa cần quan tâm tới cách triển khai, logic triển khai cụ thể như thế nào.
Giúp chuẩn hoá cách xử lý và quản trị dữ liệu: Việc thiết kế và sử dụng các abstract class/interface thường sẽ dẫn tới kết quả là ta có cách thức xử lý dữ liệu, xử lý lỗi, lưu log, bảo mật, … chung được sử dụng trong toàn bộ nhóm Data Engineer hoặc thậm chí trong data team, góp phần giúp đưa các thông lệ thực hành tốt (good practices) tới nhiều thành viên trong tổ chức hơn và việc chuẩn hoá này giúp ta quản trị dữ liệu dễ hơn.
Ví dụ: Ta cần viết một số logic xử lý dữ liệu cơ bản. Thay vì hard
code một loạt các logic xử lý dữ liệu khác nhau, ta sẽ tạo một abstract
class tên là Transformer. Đây sẽ là interface của chúng ta.
Sau đó ta sẽ tạo các concrete class để triển khai các loại logic xử lý
dữ liệu khác nhau. Bằng việc sử dụng class ABC và decorator @abstractmethod, Python sẽ đảm bảo rằng nếu có
một concrete class nào mà không triển khai abstract method
transform() thì class đó sẽ không khởi tạo instance
được.
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from pandas import DataFrame
class Transformer(ABC):
"""
Abstract class that defines the interface for transforming data
"""
@abstractmethod
def transform(self, data: DataFrame):
"""
Apply transformation rules and return the transformed dataframe
"""
pass
class LowerCaseTransformer(Transformer):
def transform(self, df: DataFrame) -> DataFrame:
"""
Lowercase all text columns in the dataframe
"""
transformed_df = df.copy()
for col in transformed_df.columns:
if transformed_df[col].dtype == 'object' or transformed_df[col].dtype == 'string':
transformed_df[col] = transformed_df[col].str.lower()
return transformed_df
class NaFillTransformer(Transformer):
def __init__(self, na_fill_config: dict):
"""
Initialize the NaFillTransformer with a dictionary specifying the fill value for each data type
Sample dictionary: {np.number: 0, 'object': "N/A", 'string': "N/A"}
"""
self.na_fill_config = na_fill_config
def transform(self, df: DataFrame) -> DataFrame:
"""
Fill NA values in the dataframe based on the provided configuration
"""
transformed_df = df.copy()
for dtype, fill_value in self.na_fill_config.items():
# Select columns of the current data type
cols_to_fill = transformed_df.dtypes[transformed_df.dtypes == dtype].index.tolist()
# Fill NA values in the selected columns
transformed_df[cols_to_fill] = transformed_df[cols_to_fill].fillna(fill_value)
return transformed_df
def apply_transformation(df: DataFrame, transformer: Transformer):
return transformer.transform(df)data = {'col_num': [1, 2, None, 4], 'col_text': ['A', 'B', None, 'D'], 'col_text_2': ['D', None, 'E', 'F']}
df = pd.DataFrame(data)
print("Original DataFrame:")## Original DataFrame:
## col_num col_text col_text_2
## 0 1.0 A D
## 1 2.0 B None
## 2 NaN None E
## 3 4.0 D F
na_fill_config = {
np.number: 0,
'object': "N/A",
'string': "N/A"
}
na_filler = NaFillTransformer(na_fill_config)
lower_caser = LowerCaseTransformer()
transformed_df = apply_transformation(df, na_filler)
transformed_df = apply_transformation(transformed_df, lower_caser)
print("\nDataFrame after transformation:")##
## DataFrame after transformation:
## col_num col_text col_text_2
## 0 1.0 a d
## 1 2.0 b n/a
## 2 0.0 n/a e
## 3 4.0 d f
Hãy cùng xem tính trừu trượng được thể hiện như thế nào:
Chỉ thể hiện ra các tính năng cần thiết của đối tượng: abstract
class Transformer đóng vai trò là một interface, đưa ra các
method cần thiết để biến đổi dữ liệu (ở đây chỉ có duy nhất method
transform). LowerCaseTransformer và
NaFillTransformer là các concrete implementation của
abstract class Transformer, là nơi mà ta viết các logic xử
lý dữ liệu cụ thể.
Ẩn đi các chi tiết về cách triển khai: Khi cần biến đổi dữ liệu
của dataframe, ta sử dụng function apply_transformation.
Function này chỉ quan tâm tới việc method transform() có
tồn tại hay không chứ không quan tâm đến các logic bên trong method đó
của concrete class.
Loose coupling: Ta có thể truyền vào function
apply_transformation các instance của các concrete
class/subclass khác nhau và method transform() vẫn hoạt
động một cách bình thường (interface giữ nguyên, không thay đổi) và ta
không cần phải viết lại các ETL logic.
Separation of concerns: Khi logic về xử lý giá trị NULL thay đổi,
ta chỉ cần update logic trong class NaFillTransformer mà
không cần thay đổi gì khác. Ngoài ra, bên cạnh class
Transformer trong ví dụ trên, ta cũng có thể có các
interface (class) khác nữa như Ingestor,
Logger, … Khi cần thay đổi logic load dữ liệu, ta cập nhật
code trong Ingestor, khi cần thay đổi logic biến đổi dữ
liệu, ta cập nhật code trong Transformer, v.v.
Đây là việc tạo ra một class mới (class con) dựa trên một class đã có sẵn (class cha / superclass). Class con sẽ được “kế thừa” / thừa hưởng các method, property và attribute của class cha. Bên cạnh đó, class con cũng có thể có các method, property và attribute riêng của nó.
Những lợi ích của tính ké thừa:
Tăng khả năng tái sử dụng code (code reuse) và duy trì, chỉnh sửa code cũng dễ hơn (maintainability): Các logic mà cần sử dụng ở nhiều class thì được đưa vào một nơi duy nhất là ở base class (class cha), và các class con tự động được thừa hưởng, tránh việc phải lặp lại code, từ đó tránh phát sinh lỗi. Ngoài ra thì việc sửa lỗi nếu có cũng sẽ dễ hơn vì ta chỉ cần sửa ở base class. Điều này tuân theo quy tắc DRY (don’t repeat yourself) trong lập trình.
Tính kế thừa kết hợp cùng với tính trừu tượng (abstraction) đã nêu trên và tính đa hình (polymorphism) sẽ nêu ở dưới đây giúp ta sử dụng các object của các class khác nhau thông qua chung một interface.
Ta sẽ xem ví dụ về tính kế thừa ở phần tính đa hình dưới đây vì chúng thường đi cùng với nhau (nhớ là “thường” chứ không phải bắt buộc đi cùng với nhau).
Đây là việc mà các đối tượng thuộc các lớp khác nhau có khả năng hiểu cùng một thông điệp (message - trong OOP, khi mà client code gọi một method của một object thì ta hay gọi việc đó là “send a message”) theo các cách khác nhau.
Để cho dễ hiểu thì ta sẽ xem một ví dụ dưới đây trong Python:
import pandas as pd
from abc import ABC, abstractmethod
class DataValidator(ABC):
"""
Abstract base class for validating rows in a pandas DataFrame using column-level rules.
This class defines the interface for applying validation logic to a DataFrame. Subclasses must
implement the `evaluate_rules` method, which returns a list of boolean Series — each representing
the result of a rule applied to all rows in the DataFrame.
The `validate` method combines these boolean Series using a logical AND to produce a final Series
indicating which rows are valid (meaning which rows passed all rules).
Methods:
validate(df: pd.DataFrame) -> pd.Series:
Applies all defined rules to the DataFrame and returns a boolean mask of valid rows.
evaluate_rules(df: pd.DataFrame) -> list[pd.Series]:
Abstract method that must be implemented by subclasses to return a list of boolean Series,
each representing a rule applied to the DataFrame.
"""
def validate(self, df: pd.DataFrame) -> pd.Series:
valid_mask = pd.Series(True, index=df.index) # valid_mask is a Series with all True values by default
for rule in self.evaluate_rules(df):
valid_mask &= rule # this means that all rules must be satisfied for mask to be True
return valid_mask
@abstractmethod
def evaluate_rules(self, df: pd.DataFrame) -> list:
"""Return a list of boolean Series indicating rule pass/fail for each row."""
pass
class CSVDataValidator(DataValidator):
"""
Concrete implementation of DataValidator for validating customer data loaded from CSV files.
"""
def evaluate_rules(self, df: pd.DataFrame):
return [
df['name'].notna(),
df['age'].between(0, 120),
df['email'].str.contains('@', na=False)
]
class APIDataValidator(DataValidator):
"""
Concrete implementation of DataValidator for validating customer data loaded from API.
"""
def evaluate_rules(self, df: pd.DataFrame):
return [
df['timestamp'].notna(),
df['status'].isin(['active', 'inactive'])
]
# Create sample dataframes. In reality, these dataframes will be loaded from csv files and from API
csv_df = pd.DataFrame([
{'name': 'Nguyen Thi A', 'age': 30, 'email': 'nguyenthiA@example.com'},
{'name': None, 'age': 25, 'email': 'noemail@example.com'},
{'name': "Le Van C", 'age': -5, 'email': 'noemail@example.com'},
{'name': "Le Van D", 'age': 30, 'email': 'example.com'},
])
api_df = pd.DataFrame([
{'id': 1, 'timestamp': '2023-07-19T10:00:00Z', 'status': None},
{'id': 2, 'timestamp': '2023-07-19T10:00:00Z', 'status': 'not sure'},
{'id': 3, 'timestamp': '2023-07-19T10:00:00Z', 'status': 'inactive'},
{'id': 4, 'timestamp': None, 'status': 'invalid'}
])
def run_validation(validator: DataValidator, df: pd.DataFrame) -> pd.DataFrame:
"""
Applies a DataValidator to a pandas DataFrame.
This function uses the provided DataValidator instance to compute a boolean mask indicating
which rows in the DataFrame pass all validation rules. The mask is added to the DataFrame
as a new column called 'is_valid', and the resulting DataFrame is returned.
Args:
validator (DataValidator): An instance of a subclass of DataValidator that implements the validation logic.
df (pd.DataFrame): The DataFrame to validate.
Example:
run_validation(CSVDataValidator(), csv_df)
"""
valid_mask = validator.validate(df)
df['is_valid'] = valid_mask
return df
# Run validation on each dataframe
validated_csv_df = run_validation(CSVDataValidator(), csv_df)
validated_api_df = run_validation(APIDataValidator(), api_df)
# Show validated customer data from csv
validated_csv_df## name age email is_valid
## 0 Nguyen Thi A 30 nguyenthiA@example.com True
## 1 None 25 noemail@example.com False
## 2 Le Van C -5 noemail@example.com False
## 3 Le Van D 30 example.com False
## id timestamp status is_valid
## 0 1 2023-07-19T10:00:00Z None False
## 1 2 2023-07-19T10:00:00Z not sure False
## 2 3 2023-07-19T10:00:00Z inactive True
## 3 4 None invalid False
Hãy cùng xem tính kế thừa và tính đa hình được thể hiện như thế nào:
Ta có base class là DataValidator với một common
method là validate() và một abstract method là
evaluate_rules. Các class con là
APIDataValidator và CSVDataValidator kế thừa 2
method nói trên của base class (tính kế thừa). Mỗi class con triển khai
abstract method evaluate_rules() theo một cách riêng (tính
đa hình).
Method run_validation() nhận argument là bất kỳ một
object nào thuộc loại DataValidator (cho dù object đó có là
APIDataValidator hay CSVDataValidator) và tại
thời điểm chạy code (at runtime) thì gọi method
evaluate_rules() tương ứng tuỳ thuộc vào loại object được
truyền vào. Đây là một ví dụ điển hình của “runtime
polymorphism”.
Những lợi ích của tính đa hình:
Tăng khả năng tái sử dụng code. Ta có thể viết function mà làm việc được với nhiều object của các class khác nhau, miễn là chúng đều có chung một interface.
Tăng khả năng mở rộng của code (extensibility): Vì class con thừa hưởng các method, attribute từ class cha nên ta có thể mở rộng các tính năng của code khá dễ dàng thông qua việc thay đổi hoặc thêm code vào trong các method, attribute ở class con mà được thừa hưởng từ class cha, hoặc tạo method, attribute mới mà không phải sửa code cũ ở class cha, hoặc thêm các class con mới. Điều này tuân theo nguyên tắc thứ 2 của 5 nguyên lý thiết kế hướng đối tượng (SOLID) là “Open/Closed principle - OCP”.
Loose coupling: Kết hợp với tính trừu tượng (abstraction), tính đa hình giúp cho các hệ thống không bị phụ thuộc chặt chẽ vào nhau, thay vào đó thì giao tiếp với nhau thông qua interface đã được thiết lập, giống như thông qua một hợp đồng (contract) đã được ký kết giữa các bên.
Duy trì (maintain) code dễ dàng hơn. Các thay đổi ở subclass sẽ không ảnh hưởng đến code của base class. Thông thường, ta duy trì một interface (được tạo ra ở base class/class cha) và khi cần mở rộng tính năng thì ta thực hiện ở các subclass (class con).
Thông thường, để quyết định xem ta cần dùng class attribute hay instance attribute thì ta xem dữ liệu mà cần được lưu trữ bởi attribute đó và hành vi mà ta đang muốn thiết lập sẽ được dùng chung bởi các instance của class hay là sẽ có sự khác biệt giữa các instance.
Một số trường hợp ta thường dùng class attribute có thể kể đến như:
khi ta cần lưu trữ hằng số (constant) mà được dùng chung cho tất cả các instance của class
khi ta cần có một bộ đếm (counter) chung cho tất cả các instance của class, hoặc lưu trữ một dữ liệu (data) chung nào đó
khi ta cần tạo một thiết lập (configuration) nào đó và các instance của class đều sẽ thừa hưởng thiết lập đó
…
Một số trường hợp ta thường dùng instance attribute có thể kể đến như:
khi ta cần lưu trữ dữ liệu riêng cho từng object
khi mỗi object có hành vi khác nhau
…
import logging
from logging.handlers import RotatingFileHandler
import os
class ETLLogger:
# Class-level attribute: shared logging configuration
log_directory = "logs"
log_level = logging.INFO
max_bytes = 10 * 1024 * 1024 # 10 MB
backup_count = 5
_shared_logger = None
def __init__(self, job_name: str):
# Instance-level attribute: unique to each ETL job
self.job_name = job_name
self.logger = self._get_or_create_logger()
@classmethod
def _initialize_logger(cls):
"""Initializes a shared logger with file rotation."""
if cls._shared_logger:
return cls._shared_logger
os.makedirs(cls.log_directory, exist_ok=True)
log_file_path = os.path.join(cls.log_directory, "etl_pipeline.log")
logger = logging.getLogger("ETLLogger")
logger.setLevel(cls.log_level)
logger.propagate = False
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(job_name)s | %(message)s",
"%Y-%m-%d %H:%M:%S"
)
handler = RotatingFileHandler(
log_file_path, maxBytes=cls.max_bytes, backupCount=cls.backup_count
)
handler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(handler)
cls._shared_logger = logger
return logger
def _get_or_create_logger(self):
logger = self._initialize_logger()
# Use LoggerAdapter to inject job-specific context
# This return a logging.LoggerAdapter object instead of the base logger
# to avoid client code accidentally modifying the base logger object
return logging.LoggerAdapter(logger, {"job_name": self.job_name})
def info(self, message: str):
self.logger.info(message)
def error(self, message: str):
self.logger.error(message)
def warning(self, message: str):
self.logger.warning(message)
def debug(self, message: str):
self.logger.debug(message)def run_etl_job_1():
logger = ETLLogger("ETL job 1")
logger.info("ETL job started")
try:
# Your ETL logic here
logger.debug("Fetching data from source")
# ...
logger.info("ETL job completed successfully")
except Exception as e:
logger.error(f"ETL job failed: {e}")
def run_etl_job_2():
logger = ETLLogger("ETL job 2")
logger.info("ETL job started")
try:
# Your ETL logic here
logger.debug("Fetching data from source")
# ...
logger.info("ETL job completed successfully")
except Exception as e:
logger.error(f"ETL job failed: {e}")
run_etl_job_1()
run_etl_job_2()Sau lần chạy đầu tiên thì file “etl_pipeline.log” sẽ có nội dung như sau:
2025-07-23 05:04:08 | INFO | ETL job 1 | ETL job started
2025-07-23 05:04:08 | INFO | ETL job 1 | ETL job completed successfully
2025-07-23 05:04:08 | INFO | ETL job 2 | ETL job started
2025-07-23 05:04:08 | INFO | ETL job 2 | ETL job completed successfully
Phân tích:
Ta dùng class attribute _shared_logger để lưu logger
dùng chung cho mọi instance của class ETLLogger. Ta thấy ở
code bên trên, với etl job 1 và etl job 2 ta khởi tạo 2 instance khác
nhau của class ETLLogger, tuy nhiên cả 2 instance này đều
được thừa hưởng những thiết lập mà đã được tạo sẵn ở base logger (bao
gồm nơi chứa file log - log_directory, tên file log, log level, …).
Việc lựa chọn sử dụng class method hay instance method thường dựa vào việc hành vi của method đó được gắn với cả class hay là với từng instance cụ thể.
Ta thường dùng instance method khi:
Method cần sử dụng hoặc chỉnh sửa các instance attribute
Hành vi của method đối với từng instance có thể khác nhau
Ta thường dùng class method khi:
Method cần sử dụng hoặc chỉnh sửa các class attribute
Method chứa các logic cần được áp dụng cho toàn bộ các instance của class
Trong ví dụ về class ETLLogger ở phần Class attribute vs
Instance attribute bên trên, method
_initialize_logger() là class method vì ta đang muốn tạo ra
một logger dùng chung cho toàn bộ class, và method này cần truy cập được
tới và chỉnh sửa class attribute là _shared_logger. Method
_get_or_create_logger() thì lại là một instance method vì
nó có tác dụng tạo ra một LoggerAdapter object riêng cho mỗi instance
của class dựa trên base logger object.
Chia nhỏ các quy trình phức tạp và đóng gói các logic vào trong các class
Tuân theo 5 nguyên lý thiết kế hướng đối tượng (SOLID), đặc biệt là SRP (Single Responsibility Principle) và OCP (Open/Closed principle):
SRP: Nên tạo các class nhỏ và mỗi class làm một việc
OCP: Code có thể được mở rộng mà không phải chỉnh sửa các logic cũ
Sử dụng class và instance attribute, method một cách phù hợp (xem các nội dung đã trình bày ở các mục phía trên)
Tránh tạo một hệ thống thứ bậc của các class với quá nhiều tầng lớp (deep hierarchy of classses). Ví dụ: Class A kế thừa từ class B, class B kế thừa từ class C, class C lại kế thừa từ class D, … Điều này sẽ:
làm tăng việc phụ thuộc của các class vào nhau, qua thời gian base class sẽ dễ bị phình to ra và chứa quá nhiều tác vụ khác nhau.
làm code thiếu linh hoạt hơn vì sau này khi ta muốn thừa hưởng (inherit) một logic nào đó từ base class thì ta lại phải thừa hưởng tất cả các logic khác của base class đó, mà base class lại to rồi nên sẽ phải thừa hưởng lại nhiều logic thừa không cần thiết.
làm việc kiểm thử (test) class khó hơn
Viết code sao cho dễ kiểm thử, bao gồm cả việc tạo các class: Khi viết code (dù có theo OOP hay không) thì ta cũng nên chia code thành các phần (module, function) để dễ dàng kiểm thử các logic. Khi tạo các class, ta cũng cần tạo class sao cho ta có thể dễ dàng tạo mock data và kiểm thử. Ví dụ:
class Transformer:
def clean(self, df):
return df.dropna().drop_duplicates()
class ETLJob:
def __init__(self, reader, writer, transformer):
self.reader = reader
self.writer = writer
self.transformer = transformer
def run(self, input_path, output_table):
df = self.reader.read(input_path)
df = self.transformer.clean(df)
self.writer.write(df, output_table)Với code trên thì class Transformer có thể được kiểm thử
dễ dàng bằng cách dùng một mock pandas dataframe. Các yếu tố phụ thuộc
(dependencies) như đường dẫn đến nơi chứa dữ liệu nguồn (input_path) và
nơi chứa dữ liệu cho bảng đích (output_table) không bị hardcode mà sẽ
được truyền vào các biến của method run() cũng giúp ta kiểm
thử dễ dàng hơn.
# Global variable
metrics = {"processed": 0, "errors": 0}
class DataProcessor:
def process(self, data):
if data:
# some processing logics here
metrics["processed"] += 1
else:
metrics["errors"] += 1Với code trên thì giá trị của biến global “metric” sẽ thay đổi khi ta
kiểm thử method process trong class DataProcessor. Kết quả
của các lần test sẽ phụ thuộc vào biến global này, và nếu ta không để ý
là biến global này chưa được reset trước khi thực hiện thì kết quả test
sẽ không chính xác.
Viết docstrings và dùng type hint để code dễ đọc và dễ hiểu hơn:
Docstrings là những đoạn text đặc biệt được viết ra trong các function, class và method để mô tả:
code thực hiện những việc gì
nhận những argument nào
trả ra kết quả là gì
Ngoài người lập trình viên ra thì Docstring còn được sử dụng bởi các công cụ như IDE, linter (công cụ phân tích để tìm lỗi trong code), các công cụ tự động tạo document (documentation generators).
Type hints giúp ta nêu rõ loại dữ liệu (data type) được sử dụng cho
các biến (đứng độc lập hoặc các biến của function, method) và các giá
trị được trả ra (return value) của các function, method đó. Để hỗ trợ
cho việc sử dụng type hints, python có module typing. Đọc
thêm về module này trong Python tại đây: https://docs.python.org/3/library/typing.html
Type hints giúp:
code rõ ràng và dễ hiểu hơn, nhất là khi codebase lớn và có nhiều file, nhiều class
giúp autocompletion của IDE chính xác hơn
Hãy thử so sánh 2 đoạn code python dưới đây, một đoạn có docstring và type hint, và một đoạn không có, xem bạn cảm thấy đoạn code nào dễ hiểu hơn và nếu bạn bị yêu cầu phải tiếp nhận lại và vận hành code, thì đoạn code nào giúp bạn cảm thấy yên tâm hơn nhé:
# Không có docstring và type hints
class ETLJob:
def run(self, config):
return self.execute_pipeline(config["src"], config["dst"])# Có docstring và type hints
from typing import Dict
class ETLJob:
def run(self, config: Dict[str, str]) -> bool:
"""
Runs the ETL job using provided configuration.
Args:
config (Dict[str, str]): Must include 'src' and 'dst' keys.
Returns:
bool: True if job succeeded, False otherwise.
"""
return self.execute_pipeline(config["src"], config["dst"])Nếu thích tìm hiểu và có thời gian thì các bạn có thể đọc thêm về decorator @property trong Python.
Decorator @property trong Python được dùng để định nghĩa một method là một property. Khi đó ta sẽ truy cập method theo cách giống như một attribute. Property giống như là một loại attribute đặc biệt. Thường @property được dùng trong một số trường hợp như khi ta muốn có logic để validate dữ liệu được gán cho attribute, hoặc muốn tạo một read-only attribute, …
Decorator @abstractmethod được dùng để định nghĩa một method là abstract method (như ta đã thấy trong các ví dụ ở những phần trước). Các subclass sẽ buộc phải triển khai abstract method này nếu không khi khởi tạo instance python sẽ báo lỗi.
Khi ta kết hợp 2 decorator @property và @abstractmethod thì ta sẽ định nghĩa được một abstract property. Các subclass sẽ buộc phải triển khai property này.
Ví dụ:
from abc import ABC, abstractmethod
# Create the base class
class AttributeMapping(ABC):
"""
Abstract base class for creating attribute mapping classes.
Attribute mapping means conforming values (E.g. U.S.A to be conformed to The United States of America)
"""
@property
@abstractmethod
def schema(self) -> str:
pass
@property
@abstractmethod
def data(self) -> list:
pass
# Create the subclass
class CountryName(AttributeMapping):
"""
A class storing details of the country attribute mappings
"""
schema = "xxx"
data = []
class FlagName(AttributeMapping):
"""
A class storing details of the flag attribute mappings
"""
def __init__(self, data, schema) -> None:
self._data = data
self._schema = schema
@property
def schema(self) -> str:
return self._schema
@property
def data(self) -> list:
return self._data
# Create instances
country = CountryName()
flag = FlagName([], "yyy")
print(country.schema)## xxx
## yyy
## <class 'str'>
## <class 'property'>
Trong ví dụ trên thì schema và data được
định nghĩa là các abstract properties. Bất kỳ subclass nào của class
AttributeMapping đều phải triển khai các properties này nếu
không ta sẽ gặp lỗi TypeError khi khởi tạo instance của
subclass.
Lưu ý: Thực ra nếu để ý kĩ thì với @abstractmethod thì nó chỉ kiểm tra xem trong subclass/concrete class có object nào với tên là “schema” và “data” hay không, chứ không quan trọng đấy là method hay attribute hay property, cho nên nếu ở subclass mà ta chỉ tạo class attribute tên là “data” và “schema” (chứ không phải method) thì vẫn không bị báo lỗi. Tuy nhiên, ta không nên làm vậy vì nó làm sai lệch đi mục đích của việc sử dụng @abstractmethod decorator.
Đề bài: Ta cần tạo một data pipeline để load dữ liệu từ một só nguồn dữ liệu khác nhau (giả sử là file csv và MySQL database). Dữ liệu sau đó được xử lý, biến đổi tiếp trước khi có thể cung cấp cho các người dùng hay ứng dụng khác sử dụng. Trong phạm vi bài tập này, ta sẽ viết code python cho phần load dữ liệu từ file csv và MySQL database và thực hiện một biến đổi duy nhất đó là đổi tên các cột về snake case.
Hướng thực hiện:
Sử dụng một design pattern tên là factory method để tạo ra các interface cho việc load dữ liệu (đọc thêm về factory method tại đây: https://refactoring.guru/design-patterns/factory-method).
Ta sẽ có cấu trúc folder chứa các file python như sau:
project_root/
├── read_data_from_source.py
├── data/
│ └── inventory.csv
└── reader/
├── base_creator.py
├── csv_creator.py
├── mysql_creator.py
├── base_reader.py
├── csv_reader.py
└── mysql_reader.py
Xem chi tiết các file tại đây: https://github.com/tuanphan92/datatute-datasets/tree/main/data-engineering-booster/exercise1
Dữ liệu raw từ file /data/inventory.csv như sau:
Product key,Warehouse key,Date-key,Beginning inventory quantity,Ending inventory quantity,Quantity sold,Quantity received,Quantity adjusted,Inventory turnover rate,Average inventory value,Stockout days
567,12,20240131,500,300,250,100,-50,0.625,"$2,500",0
568,15,20240131,"1,000",850,200,100,-50,0.222,"$5,400",2
569,12,20240131,750,700,100,50,0,0.133,"$3,200",1
Dữ liệu raw từ bảng customer ở MySQL database như
sau:
Ta chạy file `read_data_from_source.py” (đây chính là file chứa client code) có nội dung như dưới đây và được kết quả như sau:
from reader.csv_creator import CSVCreator, CSVConfig
from reader.mysql_creator import MySQLCreator, MySQLConfig
# Declare configs for source data
inventory_data_config: CSVConfig = {"file_path": "data/inventory.csv"}
customer_data_config: MySQLConfig = {
"server": "localhost",
"database": "mydb",
"username": "root",
"password": "123456",
"query": "SELECT * FROM customer",
}
# Create Creators objects for loading and transforming data
inventory_creator = CSVCreator(inventory_data_config)
customer_creator = MySQLCreator(customer_data_config)
# Load data
inventory_df = inventory_creator.read_data()
customer_df = customer_creator.read_data()
print(inventory_df.head())## product_key warehouse_key ... average_inventory_value stockout_days
## 0 567 12 ... $2,500 0
## 1 568 15 ... $5,400 2
## 2 569 12 ... $3,200 1
##
## [3 rows x 11 columns]
## customer_id name phone_number
## 0 1 Nguyen Van A 0911111111
## 1 2 Nguyen Van B 0911222222
## 2 3 Nguyen Van C 0911333333
Vậy ta thấy dữ liệu từ file csv và từ MySQL database đều đã được load và tên cột đã được chuẩn hoá về dạng snake case.
Nhiệm vụ của bạn đọc đối với bài tập này như sau:
Clone github project về máy của mình
Xem cách dùng type hints trong các file
Đọc về design pattern “factory”
Nhận diện đâu là các interface cho việc load dữ liệu từ nguồn, xem cách sử dụng decorator @abstractmethod
Nhận diện cách mà các logic liên quan tới load dữ liệu từ các
nguồn khác nhau (csv, MySQL) được đóng gói vào các class và client code
sẽ chỉ cần sử dụng method read_data() mà không cần quan tâm
tới logic xử lý bên trong cho từng nguồn dữ liệu
Tự thử nghiệm, setup một MySQL db và chạy các file đã clone về, sau đó làm một project tương tự, áp dụng các kiến thức đã học về OOP