1 Tổng quan

Khi nói đến data engineering thì không thể không nói đến việc xây dựng các data pipeline. Xây một data pipeline có nghĩa là thiết kế và triển khai một loạt các bước hay các tiến trình (process) để lấy dữ liệu từ một hoặc nhiều nguồn khác nhau, lưu trữ và biến đổi chúng, rồi đưa chúng đến một hệ thống nào đó để các người dùng hoặc các ứng dụng khác có thể sử dụng. Nếu ta coi dữ liệu là nước thì các data pipeline cũng giống như các đường ống nước và hệ thống bơm, hút, hệ thống lọc nước để luân chuyển nước từ nơi này sang nơi khác.

Bài học này nói về một số những yếu tố và những điều cần thực hiện để đảm bảo data pipeline chạy tốt, có độ tin cậy cao.

2 Yêu cầu

Khi một bài toán được đặt ra cho data engineer, các yêu cầu của bài toán này có thể chia làm 2 loại là yêu cầu chức năng (functional requirement) và yêu cầu phi chức năng (non-functional requirement).

Tương tự như trong software engineering, các yêu cầu chức năng tập trung vào câu hỏi WHAT, nêu ra các tính năng, các hành động mà ứng dụng, hệ thống phải thực hiện được để đảm bảo nhu cầu của người dùng. Trong bối cảnh một bài toán về data engineering thì các yêu cầu chức năng thường thấy là lấy dũ liệu ở một số nguồn nhất định, lưu trữ, biến đổi dữ liệu và cung cấp dữ liệu cho người dùng/ứng dụng đầu cuối.

Các yêu cầu phi chức năng tập trung vào câu hỏi HOW, nêu ra yêu cầu về cách mà ứng dụng, hệ thống cần hoạt động. Trong bối cảnh data engineering thì các yêu cầu phi chức năng thường thấy liên quan tới performance (tần suất cung cấp dữ liệu, tốc độ xử lý và cung cấp dữ liệu, …), security (phân quyền cho các nhóm user phù hợp, …), scalability (phải scale được khi lượng dữ liệu tăng lên và đảm bảo performance, đảm bảo SLA đã cam kết, …), maintainability (code phải được tổ chức tốt, phải có data lineage, …), reliability (dữ liệu có chất lượng tốt và ổn định, lỗi dữ liệu được phát hiện kịp thời và xử lý đúng cách, …), v.v.

3 Data engineering lifecycle (Vòng đời của dữ liệu nhìn từ góc độ data engineering)

Để tạo được các data pipeline tốt thì ta cần có cái nhìn tổng quan về các bước trong vòng đời của dữ liệu, nhìn từ góc độ của một data engineer, gọi là data engineering lifecycle. Về cơ bản thì data engineering lifecycle là những giai đoạn mà dữ liệu trải qua mà người data engineer có tác động đến, từ khi được khởi tạo, thường là từ một hệ thống nguồn nào đó, rồi được luân chuyển qua các nơi khác nhau và được biến đổi trong quá trình luân chuyển đó, rồi kết thúc, được tiêu thụ tại các báo cáo, các phân tích, hoặc các ứng dụng nào đó. Cũng có khi dữ liệu từ điểm cuối lại được đưa quay trở lại nguồn (Reverse ETL trong hình vẽ dưới đây).

Dưới đây là một hình vẽ minh hoạ cho data engineering lifecycle. Về mặt kĩ thuật thì việc xây dựng data pipeline sẽ nằm phần lớn ở các bước Ingestion, Transformation, và Serving, tuy nhiên data engineer cần nắm được tổng thể toàn bộ vòng đời dữ liệu và hiểu được mình cần làm gì tại mỗi bước trong vòng đời này.

Nguồn: Fundamental of Data Engineering - Joe Reis, Matt Housley

Trong hình vẽ trên, các giai đoạn chính trong vòng đời dữ liệu bao gồm:

  • Generation: Dữ liệu được khởi tạo ra. Nguồn tạo dữ liệu rất đa dạng, có thể là một cơ sở dữ liệu của một hệ thống đang chạy tại doanh nghiệp của bạn, một thiết bị IoT, hoặc một ứng dụng message queue, v.v. Data Engineer sẽ lấy và sử dụng dữ liệu từ nguồn nhưng thường lại không phải là chủ sở hữu của nguồn đó. Data Engineer phải nắm được một số khía cạnh kĩ thuật của nguồn phát sinh ra dữ liệu, thường bao gồm:

    • Bản chất của nguồn (là thiết bị IoT hay một hệ thống CRM, …)

    • Loại dữ liệu (cấu trúc, bán cấu trúc, phi cấu trúc, file format, …)

    • Schema (gồm những bảng, cột nào với datatype là gì, nếu schema thay đổi thì thông báo tới các hệ thống mà tiêu thụ dữ liệu của nguồn như thế nào)

    • Cách nguồn sinh ra và lưu trữ dữ liệu (dữ liệu được lưu dưới dạng periodic snapshot hay có cung cấp cho ta dưới dạng CDC, … sau bao lâu thì dữ liệu bị xoá đi, có trường hợp một số bản ghi đến muộn hay hay bị thiếu không và nguồn có cơ chế kiểm soát cho các trường hợp này không, …)

    • Tần suất và tốc độ phát sinh dữ liệu

    Data Engineer phải thực hiện kiểm tra dữ liệu tại nguồn trước khi xây data pipeline.

  • Storage: Đây là nói đến nơi lưu trữ dữ liệu. Dữ liệu dù ở bước Ingestion - load dữ liệu từ hệ thống nguồn, hay bước Transformation - biến đổi dữ liệu, hay bước Serving - cung cấp dữ liệu cho các ứng dụng, báo cáo phân tích, thì cũng đều cần phải được lưu trữ ở đâu đó. Có thể dữ liệu ở mỗi bước này được lưu trữ ở một server khác nhau, hoặc cũng có thể được lưu trữ ở cùng một nơi, tuỳ thuộc vào công nghệ và bộ công cụ mà doanh nghiệp sử dụng, và cũng tuỳ thuộc vào yêu cầu, bài toán dữ liệu được đặt ra.

  • Ingestion: Đây là bước lấy dữ liệu từ các hệ thống nguồn. Tại bước này ta cần lưu ý một số nội dung quan trọng sau:

    • Dữ liệu được lấy về sẽ được dùng để làm gì và có cần phải sử dụng cho nhiều mục đích khác nữa không? lưu trữ vào đâu? tần suất cần load dữ liệu? lượng dữ liệu? định dạng dữ liệu? có cần biến đổi/làm sạch gì không? … –> Trả lời các câu hỏi này sẽ giúp ta xác định được kiến trúc của hệ thống dữ liệu cần sử dụng và hình thức load dữ liệu phù hợp

    • Batch vs Streaming: Dữ liệu nguồn thì thường luôn được sinh ra liên tục, nhưng khi ingest dữ liệu thì ta thường có 2 cách phổ biến là batch ingestion và streaming ingestion. Batch ingestion tức là load dữ liệu từ nguồn theo từng lô (batch), ví dụ như mỗi lần load dữ liệu thì load dữ liệu của một ngày hôm đó. Streaming ingestion là load dữ liệu theo thời gian thực (real-time) hoặc gần như thời gian thực (near real-time), hay nói đơn giản là khi có dữ liệu phát sinh mới tại nguồn thì chỉ một thời gian rất ngắn sau (ví dụ chỉ vài giây sau) là dữ liệu đã được cung cấp đến các hệ thống, ứng dụng đầu cuối. Batch ingestion từ xưa đến nay vẫn là phương thức load dữ liệu phổ biến, một phần do nhu cầu lấy dữ liệu phục vụ các hoạt động phân tích thường không cần phải ngay lập tức như trong các hệ thống OLTP mà nhiều khi quan trọng là có lượng dữ liệu lịch sử đủ nhiều và chất lượng đủ tốt cho phân tích, một phần do hạn chế về công nghệ của các hệ thống cũ. Ngày nay, với sự ra đời và phổ biến của các công nghệ dữ liệu streaming mới thì streaming ingestion đang dần ngày càng trở nên phổ biến. Data engineer cần cân nhắc một số điểm sau để xác định được pipeline cần load dữ liệu theo batch hay streaming:

      • Nhu cầu về dữ liệu để giải quyết bài toán kinh doanh đang đặt ra có cần thiết phải cập nhật real-time hay near real-time hay không? Nếu cần cập nhật dữ liệu liên tục thì cần nhanh đến đâu (đôi khi ta có thể chạy batch ingestion với tần suất lớn, ví dụ một lần/giờ hoặc vài phút một lần, chứ không nhất thiết phải streaming)

      • Nếu streaming real-time thì ứng dụng đầu cuối có xử lý được lượng dữ liệu với tần suất như vậy không? và giá trị gia tăng thêm của việc ingest dữ liệu real-time có lớn không?

      • Công nghệ batch/streaming và hạ tầng phần cứng cần sử dụng, cân đối năng lực của team, mức độ phức tạp của từng giải pháp, và cân đối chi phí.

      Nhìn chung thì batch ingestion thường là cách tiếp cận đơn giản, đỡ tốn kém hơn, và đáp ứng được hầu hết các nhu cầu về phân tích dữ liệu hay cho training các mô hình ML. Streaming thường phức tạp hơn để triển khai và có chi phí cao hơn, và thường được sử dụng khi ta đã xác định được rõ bài toán kinh doanh và thấy lợi ích đem lại cao hơn chi phí phải bỏ ra để triển khai streaming.

    • Push vs Pull: Một data pipeline sử dụng cách tiếp cận ETL dữ liệu (Extract, Transform, Load) truyền thống thì thường sẽ thực hiện bước Extract (chính là data ingestion) bằng cách query dữ liệu từ hệ thống nguồn. Đây là cách ingest dữ liệu theo kiểu “pull”. Ngược lại, có những trường hợp hệ thống nguồn sẽ “push” dữ liệu, ví dụ như hệ thống nguồn có thiết lập CDC (Change Data Capture) và mỗi khi có một dòng dữ liệu nào ở bảng được thay đổi thì hệ thống sẽ tạo và đẩy một message vào một nơi lưu trữ (thường ta gọi là queue) hoặc hệ thống lưu thay đổi đó vào các file log và data pipeline sẽ lấy dữ liệu từ queue hoặc các file log đó thay vì query trực tiếp vào database của nguồn.

  • Transformation: Sau khi ta đã lấy dữ liệu từ nguồn về rồi thì tiếp theo là bước biến đổi dữ liệu, thường bao gồm kiểm tra, đổi định dạng dữ liệu, làm sạch và chuẩn hoá dữ liệu, xoay chiều dữ liệu sao cho phù hợp với mục đích phân tích hoặc phù hợp với ứng dụng đầu cuối. Tại bước này, data engineer cần lưu ý một số điểm quan trọng sau:

    • Cần có những biến đổi dữ liệu nào và chúng đáp ứng các business rule nào?

    • Mỗi biến đổi dữ liệu nên tuân theo quy tắc Single Responsibility và càng đơn giản càng tốt. Nói cách khác, thay vì vừa clean dữ liệu, join với các bảng dữ liệu khác, format lại dữ liệu, rồi tính toán các chỉ số phức tạp cùng một lúc, ta nên chia thành các bước biến đổi dữ liệu, mỗi bước làm một việc rõ ràng. Mỗi biến đổi dữ liệu cũng nên độc lập (gọi là self-isolated hoặc self-contained), không phụ thuộc vào các bước biến đổi khác và các phần khác của data pipeline, tức là nó nhận dữ liệu đầu vào với những yêu cầu nhất định, xử lý biến đổi dữ liệu, và đưa ra kết quả đầu ra. Hiểu đơn giản là ví dụ ta viết một hàm biến đổi dữ liệu bằng Python, hàm này nhận vào input là một dataframe, thực hiện một biến đổi (ví dụ đổi tên cột từ camel case về snake case), sau đó trả ra một dataframe. Các logic biến đổi dữ liệu của hàm này độc lập và không phụ thuộc vào các hàm biến đổi dữ liệu khác. Nếu ta dùng lệnh SQL để biến đổi thay vì Python thì ta có thể sử dụng kết hợp các tính năng như store procedure, function, CTE, bảng tạm để chia logic biến đổi dữ liệu thành những bước rõ ràng thay vì viết 1 câu lệnh SQL dài ngoằng.

  • Serving: Đây là bước cuối cùng trong data engineering lifecycle, khi mà dữ liệu được đưa đến cho người dùng cuối, có thể là được dùng để tạo các báo cáo phân tích, hoặc dùng trong các mô hình ML, hoặc được đưa ngược trở lại các hệ thống nguồn, v.v. Tại bước này, data engineer cần lưu ý một số điểm quan trọng sau:

    • Ai là người sử dụng dữ liệu và dữ liệu được sử dụng vào mục đích gì? Cho dù đây là một câu hỏi liên quan tới bước Serving, ta nên đặt ra câu hỏi này ngay khi nhận được yêu cầu về dữ liệu và bắt đầu trao đổi với các bên liên quan. Data engineer không nên chỉ dừng ở việc biết rằng “dữ liệu này sẽ được dùng để tạo một báo cáo” mà nên biết sâu hơn về mục đích sử dụng thực sự, ví dụ như ai sẽ dùng báo cáo, họ sẽ dùng nó để ra quyết định gì, quyết định đó đem lại doanh thu hay giảm chi phí hay giúp phòng ngừa rủi ro cho doanh nghiệp như thế nào, v.v. Đây là cách tiếp cận data engineering chú trọng tới nhu cầu của người dùng và nhu cầu sử dụng dữ liệu, từ đó giúp ta xây dựng các data product tốt. Bạn đọc có thể tự tìm hiểu thêm về khái niệm data product, còn trong bài học này, ta hiểu đơn giản nó là một sản phẩm mà sử dụng dữ liệu để đạt được một mục đích nào đó, và nó bao gồm nhiều khía cạnh hơn là chỉ xây các data pipeline.

      Bên cạnh đó, người dùng cuối cũng có thể là một ứng dụng hoặc để phục vụ cho một hệ thống ML, AI.

      Nếu người dùng cuối là con người thì ta cũng cần biết cách thức sử dụng dữ liệu sẽ là self service (business user tự sử dụng dữ liệu để phân tích) hay sẽ có data analyst làm báo cáo.

      Tuỳ vào đối tượng và mục đích sử dụng mà data engineer sẽ cần đưa ra giải pháp cung cấp dữ liệu phù hợp.

    • Yêu cầu về hiệu năng (performance) và độ trễ (latency) của việc cung cấp dữ liệu là gì? Từ yêu cầu này mà ta có thể sẽ cần phải có phương án sử dụng công nghệ để cung cấp dữ liệu cho phù hợp và tối ưu performance của pipeline.

    • Cách thức cung cấp dữ liệu phù hợp là gì? Ta sẽ để người dùng trực tiếp query dữ liệu trong database hay stream dữ liệu hay sử dụng một hình thức chia sẻ file? v.v.

    • Làm sao để xây dựng niềm tin của người dùng cuối vào dữ liệu? Để đạt được điều này, ta thường cần kết hợp giữa triển khai các quy trình kiểm tra, đánh giá chất lượng dữ liệu, phát hiện lỗi dữ liệu kịp thời, đề ra và đảm bảo các cam kết SLA (Service Level Agreement) và SLO (Service Level Objective), v.v.

    • Các vấn đề liên quan tới bảo mật như tạo các nhóm quyền truy cập và cấp quyền truy cập phù hợp (thường tuân theo nguyên tắc quyến quyền tối thiểu - principle of least privilege).

4 ETL và ELT

Khi xây dựng các data pipeline thì có 2 cách tiếp cận đó là ETL (Extract, Transform, Load) và ELT (Extract, Load, Transform):

4.1 ETL

ETL là cách tiếp cận truyền thống khi xây dựng data pipeline, bao gồm 3 giai đoạn chính:

  1. Extract: Lấy dữ liệu từ một hoặc nhiều nguồn khác nhau như từ các cơ sở dữ liệu quan hệ, từ API, các loại file, … Các data engineer thường phải làm việc với nhiều loại hình dữ liệu và loại nguồn dữ liệu khác nhau, và đảm bảo lấy được đầy đủ và chính xác dữ liệu.

  2. Transform: Sau khi lấy dữ liệu từ nguồn về, data engineer sẽ làm sạch và biến đổi dữ liệu. Việc làm sạch và biến đổi dữ liệu này được thực hiện bên ngoài hệ thống lưu trữ dữ liệu cuối (tức là dữ liệu được làm sạch và biến đổi cho phù hợp trước rồi mới được tải vào và lưu trữ ở một hệ thống đầu cuối, ví dụ như Data Warehouse chẳng hạn, để sử dụng). Việc biến đổi dữ liệu này thường sẽ được thực hiện tại một server ETL riêng hoặc tại một hệ thống trung gian nào đó. Quá trình này giúp dữ liệu sạch hơn, dễ dàng sử dụng và dễ duy trì hơn, cũng như tối ưu cho việc truy vấn dữ liệu cho việc phân tích, làm báo cáo, hoặc cho các hệ thống sử dụng dữ liệu khác.

  3. Load: Sau khi đã được biến đổi, dữ liệu được tải vào trong hệ thống lưu trữ cuối. Tại đây, dữ liệu đã sẵn sàng để được sử dụng bởi các ứng dụng, công cụ BI hoặc các truy vấn dữ liệu từ người dùng (ví dụ như data analyst, data scientist, …)

4.2 ELT

ETL là cách tiếp cận mà dữ liệu sẽ được trích xuất và lưu trữ luôn vào hệ thống lưu trữ dữ liệu cuối trước khi thực hiện xử lý dữ liệu. Cách tiếp cận này thường được sử dụng với các đơn vị có dùng giải pháp lưu trữ trên đám mây (vd: GCP, AWS, Azure, …) và thường dữ liệu sẽ được lưu vào data lake, lake house, hoặc một data warehouse trên cloud. Dù là lưu trữ vào đâu trên cloud thì mục tiêu đều là lưu trữ luôn dữ liệu từ các nguồn khác nhau lên cloud để tận dụng ưu thế của các data lake và data warehouse trên cloud về lưu trữ và xử lý dữ liệu lớn.

  1. Extract: Dữ liệu được lấy từ các nguồn khác nhau (tương tự như cách tiếp cận ETL) nhưng thường không có xử lý gì hoặc xử lý rất ít, và dữ liệu được giữ nguyên bản, giống như ở nguồn nhất có thể.

  2. Load: Thay vì biến đổi dữ liệu sau khi trích xuất từ nguồn thì dữ liệu được lưu trữ luôn vào hệ thống lưu trữ cuối (data lake, cloud-based data warehouse, …). Với những bộ dữ liệu lớn thì việc load dữ liệu từ nguồn về đến đầu cuối sẽ nhanh hơn cách tiếp cận ETL do không phải biến đổi dữ liệu gì nhiều. Và do dữ liệu từ nguồn được lưu về hệ thống lưu trữ cuối khá sớm, kể cả khi data engineer chưa kịp xây dựng các data pipeline hoàn chỉnh để xử lý chúng thì về lý thuyết các data analyst, data scientist vẫn có thể truy cập vào hệ thống, xem dữ liệu raw và thực hiện một số công việc được.

  3. Transform: Sau khi dữ liệu đã được lưu trữ tại hệ thống lưu trữ cuối thì lúc này data engineer mới làm sạch và biến đổi chúng sao cho phù hợp.

5 ACID transactions

Để tạo được pipeline đáng tin cậy thì ta phải dùng 1 công nghệ lưu trữ dữ liệu đáng tin cậy, cụ thể ở đây là ta cần chọn sử dụng một loại database (vd: SQL Server, Oracle, PostgreSQL) hoặc một công nghệ lưu trữ dữ liệu nào đó có hỗ trợ ACID transaction (vd: Delta Lake, Apache Iceberg).

ACID là viết tắt của bốn tính chất: Atomicity, Consistency, Isolation, và Durability. Bốn tính chất này giúp đảm bảo tính tin cậy của database khi xử lý dữ liệu.

  • Atomicity: Một transaction bao gồm một hoặc nhiều tác vụ. Mỗi transaction bắt đầu bằng một tác vụ và kết thúc khi tất cả các tác vụ đều được thực hiện thành công. Atomicity đảm bảo là một transaction sẽ được coi là một unit duy nhất. Nếu có bất kỳ tác vụ nào thất bại thì cả transaction sẽ thất bại (fail). Vì vậy, một transaction chỉ có thể có một trong hai kết quả là thành công hoặc thất bại (chứ không có thành công một phần). Việc database có hỗ trợ transaction giúp đảm bảo không xảy ra việc lưu dữ liệu không hoàn chỉnh vào database.

  • Consistency: Consistency tức là khi các transaction thực hiện thay đổi trạng thái của database (ví dụ thêm dữ liệu, xoá dữ liệu, update dữ liệu, …) thì vẫn phải tuân thủ tất cả các quy tắc và constraints của database.

  • Isolation: Các transaction xảy ra đồng thời thì không can thiệp gây ảnh hưởng tới nhau. Ví dụ như khi ta đang có một ETL job ghi dữ liệu vào bảng, và cùng lúc đó có những người dùng đang query dữ liệu từ bảng đó. Với isolation thì người dùng sẽ chỉ thấy dữ liệu của bảng lúc trước hoặc lúc sau khi việc ghi dữ liệu xảy ra, chứ không bao giờ thấy dữ liêu nửa vời khi job ETL đang ghi dở dữ liệu vào bảng cả. Việc đọc được dữ liệu mà chưa được commit gọi là dirty reads hay read uncommited data. Bạn đọc cũng có thể đọc thêm về non-repeatable reads và phantom reads. Tuy nhiên ta cũng lưu ý là các database cũng có những mức độ isolation khác nhau để ta lựa chọn. Mức độ isolation thấp thì performance cao hơn nhưng đổi lại là mức độ consistency và accuracy của dữ liệu lại thấp hơn. Mức độ isolation cao thì ta chấp nhận thiệt về performance để đổi lấy consistency và accuracy về dữ liệu.

  • Durability: Một khi các thay đổi với database trong transaction đã được commit xong thì chúng sẽ không bị mất đi (cho dù server có bị crash, mất kết nối, v.v.).

ACID transaction chính là nền tảng công nghệ quan trọng để ta có thể thực hiện được rất nhiều thông lệ tốt (good practices) trong data engineering và xây dựng những data pipeline đáng tin cậy.

6 Data profiling

Đây là việc kiểm tra dữ liệu raw tại nguồn hoặc dữ liệu gần với dữ liệu raw ở nguồn (ví dụ dữ liệu ở bảng staging hoặc bảng bronze nếu ta theo kiến truc medallion, là dữ liệu load từ nguồn về và chưa được biến đổi hoặc hầu như chưa được biến đổi gì). Việc kiểm tra này nhằm tìm hiểu về cấu trúc, nội dung và chất lượng dữ liệu của nguồn trước khi viết các logic xử lý dữ liệu.

Kiểm tra cấu trúc của dữ liệu nguồn:

  • Kiểm tra schema: Có những bảng nào, cột nào và ý nghĩa của chúng, datatype và nullability của các cột, mối quan hệ giữa các bảng (khoá chính, khoá ngoại)

  • Kiểm tra nội dung và chất lượng dữ liệu:

    • Kiểm tra data completeness và data distribution (tính đầy đủ và sự phân phối của dữ liệu)

    • Kiểm tra data accuracy (tính chính xác của dữ liệu)

    • Kiểm tra data consistency (tính nhất quán của dữ liệu)

    • Kiểm tra data insert/update mechanism (cơ chế cập nhật dữ liệu)

Xem chi tiết về cách thực hiện data profiling tại Lesson 1 của khoá học Tổng ôn SQL cho. người mới bắt đầu.

7 Data ingestion

7.1 Full load vs Incremental load

Khi trích xuất dữ liệu từ nguồn và lưu vào bảng đích thì có hai chiến lược thường được sử dụng đó là full loadincremental load. Mỗi chiến lược phù hợp với những mục đích và hoàn cảnh khác nhau. Một số chiến lược khác (ví dụ như data replication) sẽ chưa được đề cập trong bài học này.

Full load tức là lấy tất cả dữ liệu hiện có từ nguồn, xử lý và lưu trữ vào đích mỗi khi chạy một ETL hoặc ELT job.

Incremental load tức là mỗi lần chạy ETL hoặc ELT job thì ta chỉ lấy những dữ liệu tại nguồn mà mới phát sinh hoặc đã bị thay đổi tính từ lần lấy dữ liệu gần nhất.

Ta cần hiểu được các điểm căn bản về từng chiến lược và khi nào thì sử dụng chúng để tối ưu được các data pipeline.

7.1.1 Full load

Dù ta sử dụng chiến lược nào thì thường lần load dữ liệu đầu tiên cũng đều là full load, vì đây là lúc ta đưa tất cả dữ liệu hiện có (và có thể bao gồm tất cả dữ liệu lịch sử) vào hệ thống lưu trữ dữ liệu cuối.

Khi thực hiện full load, ta thường hoặc là lưu đè dữ liệu (overwrite) hoặc là ghi thêm dữ liệu (append).

7.1.1.1 Overwrite

Ta thường dùng cách thực hiện này khi load dữ liệu cho 1 bảng mà bảng đó thể hiện trạng thái mới nhất của dữ liệu (bảng đó là một snapshot của dữ liệu) và không có nhu cầu thể hiện thông tin lịch sử của các dòng dữ liệu.

Ví dụ: giả sử ta có bảng Customer ở trong data warehouse với mỗi dòng là một khách hàng (khoá chính là cột customer_id). Ta chỉ có nhu cầu thể hiện thông tin mới nhất của từng khách hàng trong bảng này, bao gồm họ và tên, địa chỉ và email. Ta không cần bảng Customer phải thể hiện các thông tin này trong quá khứ của khách hàng đã thay đổi như thế nào. Vậy nên ta lựa chọn thực hiện full load cho bảng Customer với cách thực hiện là kiểu Overwrite. Mỗi một lần pipeline chạy ta sẽ load các bản ghi thể hiện thông tin cập nhật nhất của từng khách hàng tại thời điểm chạy pipeline tại hệ thống nguồn, làm sạch, biến đổi dữ liệu cho phù hợp, sau đó ghi đè lên bảng Customer (tức là xoá hết dữ liệu cũ tại bảng Customer đi rồi ghi toàn bộ dữ liệu mới vào).

Ưu điểm:

Cách thực hiện full load theo kiểu Overwrite đơn giản, dễ làm nên thường được chọn sử dụng khi ta chỉ quan tâm tới dữ liệu hiện tại (không quan tâm đến lịch sử thay đổi dữ liệu) hoặc không có cách nào để nhận biết được các bản ghi mới hoặc cần có dữ liệu nhanh (như khi cần xây một sản phẩm thử nghiệm hoặc khi xây dựng phiên bản đầu tiên của sản phẩm).

Nhược điểm:

  • Vấn đề về khối lượng dữ liệu: Ta phải thường xuyên load và xử lý một lượng dữ liệu lớn và tăng dần theo thời gian. Nếu lượng dữ liệu này đột ngột tăng lên đáng kể mà phần cứng được sử dụng để ETL dữ liệu không được bổ sung kịp thời (không có các tính năng như auto scaling chẳng hạn) thì pipeline của ta sẽ chạy chậm hoặc có thể bị fail do không đủ phần cứng.

  • Vấn đề về data consistency: Khi ta overwrite dữ liệu thì nếu có lỗi xảy ra thì sẽ có thể dẫn tới việc toàn bộ dữ liệu sẽ có nhiều sai khác so với phiên bản trước lỗi. Thường khi việc này xảy ra thì ta phải khôi phục lại dữ liệu về thời điểm trước khi lỗi xảy ra, có thể sử dụng tính năng “time travel” của một số nền tảng như Delta Lake của Databricks hay BigQuery của Google Cloud Platform, … hoặc ở các cơ sở dữ liệu quan hệ như SQL Server hay MySQL thì ta restore lại riêng bảng đó theo bản backup gần nhất của database, hoặc dùng các tính năng restore khác.

7.1.1.2 Append/Snapshotting

Ta thường dùng cách thực hiện này khi thực hiện full load cho 1 bảng mà vẫn muốn thể hiện dữ liệu của các snapshot trong quá khứ của bảng đó. Khi pipeline chạy nó sẽ load một snapshot mới nhất của tất cả dữ liệu tại nguồn và lưu thêm vào bảng đích mà không xoá dữ liệu cũ ở bảng đích đi.

Ví dụ: Giả sử ta có bảng dữ liệu daily_account_balance theo dõi số dư cuối ngày trong các tài khoản của khách hàng của một ngân hàng. Thường ngân hàng cần theo dõi lịch sử thay đổi số dư cuối ngày của khách hàng để phục vụ công việc kinh doanh của họ và cũng để đảm bảo các yêu cầu tuân thủ của các cơ quan quản lý. Vì vậy, ta có thể xây pipeline full load dữ liệu với cách thực hiện là append. Với cách này, mỗi khi pipeline chạy nó sẽ lấy dữ liệu tại nguồn về số dư cuối ngày mới nhất của tất cả các tài khoản và ghi vào bảng đích là daily_account_balance mà không xoá dữ liệu cũ trong bảng này đi. Xem ví dụ về dữ liệu của bảng này như dưới đây:

Ưu điểm:

  • Đơn giản, dễ làm

  • Nếu nguồn không có cơ chế lưu trữ lịch sử thay đổi của dữ liệu thì ta có thể tự theo dõi lịch sử thay đổi của dữ liệu dựa vào các snapshot đã lưu.

Nhược điểm:

  • Gây dư thừa dữ liệu (giả sử pipeline chạy 1 lần/ngày thì sau một tháng ta sẽ có 30 snapshot, số lượng bản ghi đã tăng lên gấp 30 lần). Cho dù chi phí phần cứng lưu trữ dữ liệu ngày nay đã trở nên rẻ hơn rất nhiều, nếu lượng dữ liệu của mỗi snapshot lớn thì vẫn sẽ có thể gây lãng phí sau một thời gian dài. Ngoài ra, nếu không được lưu trữ hợp lý (vd như thiếu index hoặc partition dữ liệu phù hợp), việc truy cập một snapshot bất kỳ trong quá khứ hoặc build một bảng lịch sử thay đổi dữ liệu từ các snapshot sẽ gặp khó khăn về performance.

7.1.2 Incremental load

Để thực hiện được incremental load thì ta cần phải có các điều kiện sau đây (lấy ví dụ khi làm việc với dữ liệu có cấu trúc cho dễ hiểu):

  • Ta phải hiểu được bản chất dữ liệu và cơ chế ghi và cập nhật dữ liệu của nguồn. Ví dụ, nếu nguồn là một bảng dữ liệu thì ta cần biết:

    • mỗi bản ghi sau khi được INSERT vào bảng nguồn thì có thể bị thay đổi (UPDATE) không? Ví dụ với dữ liệu dạng transaction hay event thì thường mỗi bản ghi sẽ được insert vào bảng và sau đó không bao giờ thay đổi nữa (trừ phi nguồn thực hiện sửa lại dữ liệu quá khứ).

    • nếu dữ liệu của mỗi bản ghi có thể được thay đổi sau khi được INSERT vào bảng thì cơ chế để ghi nhận sự thay đổi này là gì? Xem đầu mục tiếp theo dưới đây để biết thêm chi tiết.

  • Có cơ chế để ghi nhận sự thay đổi dữ liệu tại nguồn (để mỗi khi pipeline chạy thì ta chỉ load những thay đổi dữ liệu mới nhất thay vì load tất cả dữ liệu). Ta thường gặp các cơ chế ghi nhận sự thay đổi dữ liệu sau đây (chúng cũng là các data design pattern thường gặp khi ta cần ingest dữ liệu):

    • Sử dụng “delta column”: Dùng một cột ở dữ liệu nguồn để giúp ta xác định được đâu là những bản ghi mới phát sinh hoặc mới được update kể từ lần gần nhất pipeline chạy. Cột này thường là những cột chỉ thời gian, ví dụ như “insert_timestamp” để thể hiện thời gian bản ghi được insert vào bảng, “last_modified_timestamp” hay “last_updated_datetime”, … để thể hiện thời gian gần nhất (ngày, giờ) mà dữ liệu của mỗi bản ghi được cập nhật. Trong một số trường hợp khác bảng nguồn có thể không dùng các cột thời gian mà thay vào đó dùng các cột dạng như “version_number” để ghi nhận sự thay đổi dữ liệu.

      Ta cần lưu ý là cách sử dụng cột “delta column” này không xử lý được trường hợp dữ liệu nguồn xoá bản ghi. Nếu một bản ghi ở nguồn bị xoá thì bản ghi đó vẫn sẽ luôn tồn tại ở đích. Tuy nhiên, nếu nguồn sử dụng “soft delete”, tức là thay vì xoá hẳn bản ghi đi (tức là dùng operation DELETE), bảng nguồn sẽ đánh dấu bản ghi đó là “deleted” hoặc “inactive” (tức là dùng operation UPDATE), thì cách sử dụng cột “delta column” vẫn sẽ hoạt động được.

    • Change Data Capture (CDC): Khi nguồn có bật tính năng CDC thì sẽ lưu lại những thay đổi đối với dữ liệu tại nguồn, như insert, update, delete, cùng một số metadata, thường là ghi vào các file log. Sau đó, ta đọc các file log này (hoặc đọc từ bảng dữ liệu được tạo từ các file này) để biết được từ lần chạy pipeline gần nhất đến nay thì bản ghi nào ở bảng nguồn được thêm mới, bản ghi nào bị update, bản ghi nào bị xoá. Các hệ thống cơ sở dữ liệu quan hệ truyền thống phổ biến như SQL Server, Oracle, PostgreSQL, MySQL, … hay các data analytics platform mới hiện nay như Google BigQuery, Databricks, … đều có tính năng CDC.

      Ta cần lưu ý là việc triển khai CDC sẽ phức tạp hơn so với sử dụng “delta column” vì thường Data Engineer sẽ cần hỗ trợ từ DevOps hoặc DBA hoặc một team nào đó mà quản trị server chứa dữ liệu nguồn để setup tính năng CDC tại server nguồn và bật tính năng đó lên cho bảng nguồn. Vì vậy mà ta cũng chỉ có thông tin CDC kể từ khi tính năng này được bật lên, còn nếu ta muốn có thông tin về việc thay đổi dữ liệu từ trước thời điểm đó thì ta phải kết hợp CDC với các phương pháp load dữ liệu khác. Ngoài ra, khi ta đọc những dữ liệu CDC này, chúng đi kèm với các metadata và ta sẽ phải viết thêm code để giữ lại những thông tin cần thiết và loại bỏ những thông tin không cần thiết, việc này cũng tăng thêm đôi chút mức độ phức tạp khi triển khai. Nếu thay vì tiêu thụ (consume) dữ liệu CDC theo batch, ta tiêu thụ theo thời gian thực (streaming) thì cũng làm tăng thêm mức độ phức tạp do đòi hỏi các kiến thức về streaming và cách xử lý dữ liệu khi streaming.

  • Dữ liệu phải có khoá chính (primary key) để giúp ta xác định đâu là bản ghi được thêm mới, được chỉnh sửa hoặc bị xoá đi tại bảng nguồn để insert hoặc update, delete chúng tương ứng ở bảng đích.

  • Những thay đổi về dữ liệu tại nguồn phải được cung cấp đầy đủ (nếu hệ thống nguồn gửi thiếu thông tin về một sự thay đổi dữ liệu của một bản ghi nào đó thì ta sẽ mãi không có thông tin mới nhất của bản ghi đó).

  • Có cơ chế để ghi nhận trạng thái của lần load dữ liệu gần nhất của pipeline. Điều này giúp mỗi lần chạy thì pipeline chỉ load các dữ liệu thay đổi mới nhất tính từ trạng thái đó mà thôi (xem ví dụ dưới đây để mường tượng được cụ thể hơn). Việc này thường được thực hiện bằng các cách sau:

    • Metadata Table/ETL Log Table: Ta có thể tạo một bảng lưu lại thông tin timestamp hoặc version hoặc ID của lần load dữ liệu gần nhất. Mỗi lần pipeline chạy thì nó sẽ query dữ liệu từ bảng này để biết cần load tiếp dữ liệu từ thời điểm, version, hoặc từ ID nào.

    • Configuration Files: Thay vì lưu vào bảng metadata/etl log như trên, ta có thể lưu thông tin của lần chạy gần nhất vào file.

    • Checkpoint: thường thấy trong các công nghệ/công cụ dùng cho streaming, hệ thống sẽ thực hiện incremental load bằng cách dựa vào checkpoint để biết được trạng thái của pipeline sau lần chạy gần nhất và chạy tiếp tục từ đó.

Ưu điểm:

  • Incremental load giúp ta giảm bớt lượng dữ liệu phải xử lý mỗi khi pipeline chạy, giúp pipeline chạy nhanh và ổn định hơn.

  • “Delta column” nhìn chung dễ triển khai hơn CDC, còn CDC thì có thể giúp ta đạt được low latency (vẫn có phụ thuộc vào công nghệ CDC sử dụng) vì bên cạnh việc chạy theo batch, ta có thể stream các thay đổi về dữ liệu để update các bảng đích.

Nhược điểm:

  • Logic để triển khai incremental load thường phức tạp hơn full load

  • Có thể gặp phải vấn đề late-arriving records (các bản ghi đến muộn) và có thể cần tạo thêm logic xử lý vấn đề này. Full load không gặp phải vấn đề này. Xem thêm về vấn đề bản ghi đến muộn ở phần Data quality issues trong bài học này.

7.1.2.1 Ví dụ về một incremental load data pipeline

Giả sử ta có bảng lưu dữ liệu thông tin cá nhân của khách hàng tại hệ thống nguồn là một cơ sở dữ liệu quan hệ, ví dụ như MySQL, tên schema là “src” (nghĩa là “source”, đặt tên như vậy để ta dễ mường tượng rằng đây là nguồn dữ liệu) và tên bảng là “customer”. Ta cần load dữ liệu từ bảng nguồn này, xử lý dữ liệu, sau đó lưu vào bảng đích. Trong trường hợp này ta lấy ví dụ bảng đích cũng nằm trên MySQL nhưng ở 1 schema khác, tên là “dwh” (data warehouse).

Để thực hiện incremental load ta có thể thiết kế ETL pipeline như sau:

Bước 1: Bật tính năng Change Data Capture (CDC) của bảng nguồn src.customer. Trong ví dụ này ta sử dụng CDC theo dạng trigger. Mỗi khi có một sự thay đổi về dữ liệu tại bảng src.customer thì dữ liệu thay đổi đó sẽ được ghi vào bảng src.customer_cdc. Tuy đây không phải là cách thiết lập CDC tối ưu nhất (vì có rất nhiều write operation diễn ra ở bảng src.customer_cdc) nhưng nó đơn giản để thiết lập và phù hợp cho mục đích của bài giảng này. Cụ thể như ví dụ dưới đây:

Tạo các bảng cần thiết và bật CDC

USE src;

DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS customer_cdc;

CREATE TABLE customer (
    cust_id INT PRIMARY KEY,
    name VARCHAR(255),
    age INT
);

CREATE TABLE customer_cdc (
    id INT PRIMARY KEY AUTO_INCREMENT,
    cust_id INT,
    name VARCHAR(255),
    age INT,
    update_datetime DATETIME,
    operation_type ENUM('INSERT', 'UPDATE', 'DELETE')
);

-- Trigger for INSERT
CREATE TRIGGER after_insert_customer
AFTER INSERT ON customer
FOR EACH ROW
INSERT INTO customer_cdc (cust_id, name, age, update_datetime, operation_type)
VALUES (NEW.cust_id, NEW.name, NEW.age, NOW(), 'INSERT');

-- Trigger for UPDATE
CREATE TRIGGER after_update_customer
AFTER UPDATE ON customer
FOR EACH ROW
INSERT INTO customer_cdc (cust_id, name, age, update_datetime, operation_type)
VALUES (NEW.cust_id, NEW.name, NEW.age, NOW(), 'UPDATE');

-- Trigger for DELETE
CREATE TRIGGER after_delete_customer
AFTER DELETE ON customer
FOR EACH ROW
INSERT INTO customer_cdc (cust_id, name, age, update_datetime, operation_type)
VALUES (OLD.cust_id, OLD.name, OLD.age, NOW(), 'DELETE');

Insert dữ liệu vào bảng src.customer và kiểm tra xem dữ liệu tương ứng có được ghi vào bảng src.customer_cdc hay không

-- Insert dữ liệu lần đầu tiên
INSERT INTO src.customer
VALUES
    (1, 'Tuan', 32),
  (2, 'Nam', 32),
  (3, 'Minh', 32);
-- Kiểm tra dữ liệu ở bảng customer
SELECT * FROM src.customer;
cust_id name age
1 Tuan 32
2 Nam 32
3 Minh 32
-- Kiểm tra dữ liệu ở bảng customer_cdc
SELECT * FROM src.customer_cdc ORDER BY update_datetime DESC;
id cust_id name age update_datetime operation_type
1 1 Tuan 32 2024-11-17 23:45:53 INSERT
2 2 Nam 32 2024-11-17 23:45:53 INSERT
3 3 Minh 32 2024-11-17 23:45:53 INSERT

Bước 2: Sau khi đã thiết lập CDC xong ở nguồn, ta tiến hành xây dựng các logic cho data pipeline để load và xử lý dữ liệu. Để thực hiện được incremental load thì ta cần kiểm soát được giá trị update_datetime của lần chạy pipeline thành công gần nhất (có nghĩa là lần gần nhất pipeline chạy thành công thì đã load và xử lý đến bản ghi có update_datetime là bao nhiêu ở bảng src.customer_cdc rồi, để lần chạy tiếp theo sẽ xử lý tiếp từ sau update_datetime đó trở đi). Trong ví dụ này ta sẽ dùng phương pháp tạo 1 bảng metadata để lưu lại giá trị update_datetime này mỗi khi chạy pipeline thành công.

USE dwh;

-- Tạo bảng metadata
CREATE TABLE IF NOT EXISTS customer_etl_metadata (
    id INT PRIMARY KEY AUTO_INCREMENT,
    last_update_datetime DATETIME
);

-- Insert dữ liệu để phục vụ cho lần chạy pipeline đầu tiên (sẽ full load toàn bộ dữ liệu từ bảng nguồn)
INSERT INTO customer_etl_metadata (last_update_datetime)
SELECT (CAST('1900-01-01 00:00:00' AS DATETIME))
WHERE NOT EXISTS (SELECT 1 FROM customer_etl_metadata);

SELECT * FROM customer_etl_metadata;
id last_update_datetime
1 1900-01-01 00:00:00

Bước 3: Load dữ liệu từ src.customer_cdc (chỉ lấy các bản ghi mới tính từ lần chạy pipeline thành công gần nhất) vào dwh.stg_customer_cdc, sau đó biến đổi dữ liệu và lưu vào bảng tạm tmp_customer_cdc, rồi merge dữ liệu từ bảng tạm vào bảng đích cuối cùng dwh.customer.

-- Lấy datetime của lần chạy pipeline thành công gần nhất
SET @last_update_datetime := (SELECT last_update_datetime FROM customer_etl_metadata ORDER BY id DESC LIMIT 1);
SELECT @last_update_datetime;
X.last_update_datetime
1900-01-01 00:00:00
-- Load chỉ những bản ghi ở src.customer_cdc mà phát sinh sau lần chạy pipeline thành công gần nhất
CREATE TABLE IF NOT EXISTS dwh.stg_customer_cdc(
    id INT PRIMARY KEY,
    cust_id INT,
    name VARCHAR(255),
    age INT,
    update_datetime DATETIME,
    operation_type ENUM('INSERT', 'UPDATE', 'DELETE')
);

CREATE TABLE IF NOT EXISTS dwh.customer (
    cust_id INT PRIMARY KEY,
    name VARCHAR(255),
    age INT
);

INSERT INTO dwh.stg_customer_cdc(
    id,
    cust_id,
    name,
    age,
    update_datetime,
    operation_type
)
SELECT
    id,
    cust_id,
    name,
    age,
    update_datetime,
    operation_type
FROM src.customer_cdc
WHERE update_datetime > @last_update_datetime;

-- Kiểm tra dữ liệu được load vào dwh.stg_customer_cdc
SELECT *
FROM dwh.stg_customer_cdc ORDER BY update_datetime DESC;
id cust_id name age update_datetime operation_type
1 1 Tuan 32 2024-11-17 23:45:53 INSERT
2 2 Nam 32 2024-11-17 23:45:53 INSERT
3 3 Minh 32 2024-11-17 23:45:53 INSERT
-- # Biến đổi dữ liệu ở bảng dwh.stg_customer_cdc (chuyển tên thành viết hoa) và lưu kết quả vào 1 bảng tạm
-- Chỉ load và biến đổi các bản ghi mới tính từ lần gần nhất pipeline chạy thành công
DROP TABLE IF EXISTS tmp_customer_cdc;

CREATE TEMPORARY TABLE tmp_customer_cdc AS
SELECT
    id,
    cust_id, 
    UPPER(name) AS name, 
    age,
    update_datetime,
    operation_type
FROM dwh.stg_customer_cdc
WHERE update_datetime > @last_update_datetime;

-- Kiểm tra dữ liệu ở bảng tạm
SELECT * FROM tmp_customer_cdc ORDER BY update_datetime DESC;
id cust_id name age update_datetime operation_type
1 1 TUAN 32 2024-11-17 23:45:53 INSERT
2 2 NAM 32 2024-11-17 23:45:53 INSERT
3 3 MINH 32 2024-11-17 23:45:53 INSERT
-- # Merge (insert, update, delete) dữ liệu từ bảng tạm vào bảng đích cuối cùng dwh.customer

-- Insert dữ liệu của các cust_id mới
INSERT INTO dwh.customer (cust_id, name, age)
SELECT cust_id, name, age
FROM tmp_customer_cdc
WHERE operation_type = 'INSERT';

-- Update dữ liệu các cust_id cũ trong dwh.customer dựa trên dữ liệu mới từ bảng tạm
UPDATE dwh.customer AS target
LEFT JOIN tmp_customer_cdc AS source
ON target.cust_id = source.cust_id
SET target.name = source.name,
    target.age = source.age
WHERE source.operation_type = 'UPDATE';

-- Xoá các bản ghi khỏi dwh.customer dựa vào dữ liệu từ bảng tạm
DELETE target
FROM dwh.customer AS target
LEFT JOIN tmp_customer_cdc AS source
ON target.cust_id = source.cust_id
WHERE source.operation_type = 'DELETE';

-- Kiểm tra dữ liệu ở bảng đích cuối cùng
SELECT * FROM dwh.customer;
cust_id name age
1 TUAN 32
2 NAM 32
3 MINH 32

Pipeline đã chạy thành công, giờ ta cập nhật giá trị last_update_datetime của bảng customer_etl_metadata

-- Cập nhật last_update_datetime
SET @new_last_update_datetime := (SELECT max(update_datetime) FROM tmp_customer_cdc);
INSERT INTO customer_etl_metadata (last_update_datetime)
VALUES (@new_last_update_datetime);

SELECT * FROM customer_etl_metadata;
id last_update_datetime
1 1900-01-01 00:00:00
2 2024-11-17 23:45:53

Tiếp theo, ta thực hiện cập nhật giá trị của 1 bản ghi và xoá 1 bản ghi trong bảng src.customer, sau đó kiểm tra kết quả ở trong bảng src.customer_cdc.

UPDATE src.customer
SET age = 20 WHERE cust_id = 2;

DELETE FROM src.customer WHERE cust_id = 3;
SELECT * FROM src.customer;
cust_id name age
1 Tuan 32
2 Nam 20
SELECT * FROM src.customer_cdc ORDER BY update_datetime DESC;
id cust_id name age update_datetime operation_type
5 3 Minh 32 2024-11-18 00:29:52 DELETE
4 2 Nam 20 2024-11-18 00:29:50 UPDATE
1 1 Tuan 32 2024-11-17 23:45:53 INSERT
2 2 Nam 32 2024-11-17 23:45:53 INSERT
3 3 Minh 32 2024-11-17 23:45:53 INSERT

Ta chạy data pipeline (các bước và logic xử lý giống hệt lần chạy đầu tiên)

USE dwh;

-- Tạo bảng metadata
CREATE TABLE IF NOT EXISTS customer_etl_metadata (
    id INT PRIMARY KEY AUTO_INCREMENT,
    last_update_datetime DATETIME
);

-- Insert dữ liệu để phục vụ cho lần chạy pipeline đầu tiên (sẽ full load toàn bộ dữ liệu từ bảng nguồn)
INSERT INTO customer_etl_metadata (last_update_datetime)
SELECT (CAST('1900-01-01 00:00:00' AS DATETIME))
WHERE NOT EXISTS (SELECT 1 FROM customer_etl_metadata);

-- Lấy datetime của lần chạy pipeline thành công gần nhất
SET @last_update_datetime := (SELECT last_update_datetime FROM customer_etl_metadata ORDER BY id DESC LIMIT 1);
SELECT @last_update_datetime;
X.last_update_datetime
2024-11-17 23:45:53
-- Load chỉ những bản ghi ở src.customer_cdc mà phát sinh sau lần chạy pipeline thành công gần nhất
CREATE TABLE IF NOT EXISTS dwh.stg_customer_cdc(
    id INT PRIMARY KEY,
    cust_id INT,
    name VARCHAR(255),
    age INT,
    update_datetime DATETIME,
    operation_type ENUM('INSERT', 'UPDATE', 'DELETE')
);

CREATE TABLE IF NOT EXISTS dwh.customer (
    cust_id INT PRIMARY KEY,
    name VARCHAR(255),
    age INT
);

INSERT INTO dwh.stg_customer_cdc(
    id,
    cust_id,
    name,
    age,
    update_datetime,
    operation_type
)
SELECT
    id,
    cust_id,
    name,
    age,
    update_datetime,
    operation_type
FROM src.customer_cdc
WHERE update_datetime > @last_update_datetime;

-- # Biến đổi dữ liệu ở bảng dwh.stg_customer_cdc (chuyển tên thành viết hoa) và lưu kết quả vào 1 bảng tạm
-- Chỉ load và biến đổi các bản ghi mới tính từ lần gần nhất pipeline chạy thành công
DROP TABLE IF EXISTS tmp_customer_cdc;

CREATE TEMPORARY TABLE tmp_customer_cdc AS
SELECT
    id,
    cust_id, 
    UPPER(name) AS name, 
    age,
    update_datetime,
    operation_type
FROM dwh.stg_customer_cdc
WHERE update_datetime > @last_update_datetime;

-- Kiểm tra dữ liệu ở bảng tạm
SELECT * FROM tmp_customer_cdc ORDER BY update_datetime DESC;
id cust_id name age update_datetime operation_type
5 3 MINH 32 2024-11-18 00:29:52 DELETE
4 2 NAM 20 2024-11-18 00:29:50 UPDATE
-- # Merge (insert, update, delete) dữ liệu từ bảng tạm vào bảng đích cuối cùng dwh.customer

-- Insert dữ liệu của các cust_id mới
INSERT INTO dwh.customer (cust_id, name, age)
SELECT cust_id, name, age
FROM tmp_customer_cdc
WHERE operation_type = 'INSERT';

-- Update dữ liệu các cust_id cũ trong dwh.customer dựa trên dữ liệu mới từ bảng tạm
UPDATE dwh.customer AS target
LEFT JOIN tmp_customer_cdc AS source
ON target.cust_id = source.cust_id
SET target.name = source.name,
    target.age = source.age
WHERE source.operation_type = 'UPDATE';

-- Xoá các bản ghi khỏi dwh.customer dựa vào dữ liệu từ bảng tạm
DELETE target
FROM dwh.customer AS target
LEFT JOIN tmp_customer_cdc AS source
ON target.cust_id = source.cust_id
WHERE source.operation_type = 'DELETE';

-- Kiểm tra dữ liệu ở bảng đích cuối cùng
SELECT * FROM dwh.customer;
cust_id name age
1 TUAN 32
2 NAM 20

Vậy là dữ liệu sau 2 lần chạy của data pipeline theo chiến lược incremental load đều đã chính xác.

Lưu ý: Trong thực tế, thường ta sẽ dùng 1 công cụ orchestration tool nào đó (VD: Apache Airflow) để tổ chức và sắp xếp thứ tự thực hiện các logic load, ghi và xử lý dữ liệu bên trên, cũng như quản lý và đặt lịch chạy các job ETL dữ liệu.

8 Error management

Khi xây dựng các data pipeline, chắc chắn ta sẽ gặp lỗi, điều đó là không thể tránh khỏi. Một hệ thống đáng tin cậy (a resilient system, a fault-tolerant system) là một hệ thống mà có sự chuẩn bị, dự đoán trước lỗi và khi lỗi xảy ra thì hệ thống vẫn chạy chính xác (vẫn hoạt động đúng mục đích, vẫn đảm bảo performance và bảo mật, …). Việc ta cần làm khi xây dựng các data pipeline là chuẩn bị sẵn và triển khai các phương án để hạn chế tối đa lỗi phát sinh, và xử lý lỗi khi phát sinh sao cho đảm bảo được các yêu cầu chức năng và các yêu cầu phi chức năng.

Ta có thể chia các loại lỗi thành 2 loại chính sau:

8.1 Transient errors

Transient error nhìn chung là những lỗi xảy ra tạm thời, thường liên quan tới các hệ thống, service bên ngoài (ví dụ như các hệ thống nguồn hoặc API mà ta cần sử dụng, kết nối tới để lấy dữ liệu) như lỗi mất kết nối mạng hoặc một hệ thống, service nào đó tạm thời không sẵn sàng, hay timeout do service bận, chạm rate limit của API, …

Hướng xử lý:

Đây về bản chất là những lỗi xảy ra tạm thời và chiến lược xử lý thông dụng với loại lỗi này là “retry with backoff”. Cách làm là ta chạy lại (retry) hành động mà bị dừng do lỗi, tuy nhiên nếu retry quá dồn dập có thể làm quá tải băng thông mạng (overload network bandwidth) và tranh chấp tài nguyên hệ thống (resource contention). Vì vậy mà sau mỗi lần retry ta nên cài đặt một khoảng thời gian chờ (wait time) trước khi thử lại lần nữa. Nếu thời gian chờ giữa các lần thử lại này tăng dần theo hàm mũ thì đó chính là ta đang triển khai retry theo pattern “retry with exponential backoff”, một pattern phổ biến trong xử lý các transient error. Lưu ý là ta cũng cần phải cài đặt số lần retry tối đa được thực hiện chứ không phải là cứ để retry mãi.

Tham khảo ví dụ của AWS và Google về triển khai retry with exponential backoff:

https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/retry-backoff.html

https://developers.google.com/analytics/devguides/config/userdeletion/v3/errors#backoff

Pipeline phải đảm bảo tính Idempotency: Một pipeline được gọi là idempotent khi mà ta chạy pipeline đó nhiều hơn 1 lần thì không có tình trạng bị trùng lặp dữ liệu. Điều này là rất quan trọng với mọi data pipeline chứ không chỉ là với transient error, nhưng vì cách xử lý chính của transient error là tự động retry, nên việc pipeline phải idempotent lại càng quan trọng. Xem thêm về cách tạo idempotent pipeline ở phần Idempotency dưới đây.

8.2 Non-transient errors

Non-transient hay persistent error là những lỗi mà không tự khôi phục được (tức là retry sẽ không có tác dụng) và sẽ không tự mất đi trừ phi ta can thiệp thủ công vào để sửa lỗi. Các lỗi persistent error ta thường gặp bao gồm sai schema (thừa hoặc thiếu cột, sai datatype), không đủ quyền truy cập một số tài nguyên hoặc có một số thiết lập không đúng hoặc thiếu (dẫn đến không truy cập được dữ liệu hoặc không truy cập được nơi lưu trữ dữ liệu, file để ghi dữ liệu, …), vi phạm một số constraint (ví dụ ghi dữ liệu bị lặp vào một cột có constraint unique), vi phạm một số business rule và validation rule (ví dụ end_date lại là ngày trước ngày start_date, email thiếu @, …), lỗi logic khi biến đổi dữ liệu, …

Nguyên tắc chung khi xử lý các non-transient error:

  • Phòng bệnh hơn chữa bệnh

  • Không thực hiện retry để xử lý các lỗi loại này vì sẽ lãng phí tài nguyên hoặc có thể làm lỗi nghiêm trọng hơn.

  • Nên tạo các chốt kiểm tra để phát hiện các lỗi non-transient error. Tất nhiên nếu ta không tạo các chốt kiểm tra này thì có thể ta vẫn sẽ phát hiện ra lỗi vì pipeline bị dừng hoạt động, tuy nhiên ta muốn tạo các chốt kiểm tra vì một số lí do sau:

    • Nếu ta để lỗi phát sinh và pipeline dừng hoạt động (pipeline fail) thì rất có thể ta sẽ lãng phí thời gian và tài nguyên hệ thống. VD: pipeline chạy được vài tiếng rồi mới fail do một dòng dữ liệu có vấn đề, vậy là ta phải kiểm tra, sửa lỗi, rồi chạy lại và mất thêm vài tiếng nữa, Ta cần hạn chế những trường hợp như vậy vì tốn tài nguyên và thời gian.

    • Khi pipeline fail thì ta thường nhận được một thông báo chung chung từ hệ thống và việc xác định chính xác nguyên nhân gây lỗi đôi khi gặp khó khăn. Nếu ta tạo một bước kiểm tra cụ thể và có một thông báo cụ thể và rõ ràng hơn thì việc kiểm tra và sửa lỗi sẽ dễ dàng hơn.

    • Có những lỗi không làm pipeline fail, nhất là những lỗi liên quan đến tính chính xác của dữ liệu. VD: giá trị của cột percentage bị lớn hơn 1 do có lỗi xảy ra từ dữ liệu nguồn. Lỗi này sẽ không làm dừng pipeline nên nếu ta không có các bước kiểm tra thì dữ liệu kém chất lượng sẽ đi vào các bảng và làm sai lệch kết quả phân tích.

    • User có thể sẽ thấy dữ liệu không toàn vẹn (có thể chỉ thấy một phần dữ liệu mới nhất hoặc có những phần dữ liệu mới nhưng có những phần khác lại cũ, …) từ đó mà cảm thấy nghi ngờ, không tin tưởng vào các kết quả phân tích, báo cáo.

  • Hệ thống lưu trữ dữ liệu (database, datalake, …), platform hoặc công cụ sử dụng cho ETL pipeline cần hỗ trợ khả năng rollback nhanh chóng để xử lý kịp thời khi có lỗi ảnh hưởng trọng yếu tới dữ liệu production. VD1: Pipeline của ta cập nhật dữ liệu cho một bảng trên production và sau đó ta phát hiện ra dữ liệu đã bị cập nhật sai, vì bảng này phục vụ các báo cáo real time quan trọng cho khách hàng, ta cần ngay lập tức dùng tính năng time travel (có trên các nền tảng dùng kiến trúc Data lakehouse như Delta Lake, Snowflake, Bigquery, …) để trả lại dữ liệu của bảng về thời điểm trước khi cập nhật. VD2: Ta sử dụng SQL Server và một bảng dữ liệu production quan trọng bị cập nhật sai, nếu bảng này được tạo dưới dạng một “System-Versioned Temporal Table”, ta có thể rollback lại dữ liệu tại một thời điểm trong quá khứ, tương tự như tính năng time travel trên các data lakehouse, hoặc ta lấy lại dữ liệu bằng cách sử dụng point-in-time recovery (nếu database được set up full recovery model), hoặc dùng database snapshot gần nhất, …

  • Cần có cơ chế logging để lưu lại và thông báo các lỗi một cách rõ ràng, giúp việc kiểm tra và phát hiện lỗi được nhanh chóng và chính xác hơn.

  • Không nên coi nhẹ kiến trúc dữ liệu (data architecture). Kiến trúc dữ liệu hiểu đơn giản là bản thiết kế thể hiện các công nghệ, công cụ được sử dụng và cách mà ta thu thập, xử lý, luân chuẩn dữ liệu qua các bước trong vòng đời dữ liệu. Một kiến trúc dữ liệu được thiết kế tốt là bước đầu tiên giúp ta hạn chế khả năng phát sinh lỗi, đặc biệt là các lỗi non-transient error. Tuy nhiên, chủ đề này sẽ không được đề cập trong bài học này do đây là một chủ đề đòi hỏi nhiều kinh nghiệm và kiến thức rộng, nên chưa phù hợp với bài học này, và chính tôi cũng chưa thực sự đủ kiến thức và kinh nghiệm để viết về nó được sâu sắc. Xin hẹn bạn đọc vào một dịp khác.

8.2.1 Xử lý các non-transient error thường gặp

8.2.1.1 Schema mismatch/Schema drift

Đây là vấn đề thường xảy ra khi schema của dữ liệu đầu vào từ nguồn thay đổi, thường là:

  • Thêm cột: Thường thì đây không phải là vấn đề lớn, tuy nhiên nếu trong pipeline của ta có những đoạn SELECT * thì có thể sẽ phát sinh lỗi.

  • Xoá cột, đổi tên cột: Gần như chắc chắn sẽ gây lỗi và làm dừng pipeline vì logic xử lý trong pipeline không còn tìm thấy cột cũ nữa.

  • Đổi datatype hoặc định dạng của cột: Ví dụ một cột chứa các mã code đang ở dạng integer (12345) bỗng bị chuyển sang dạng string (012345 hoặc 12345-001, …) thì có thể sẽ gây lỗi, hoặc có thể datatype được tự động thay đổi nhưng trong quá trình đó làm sai lệch dữ liệu (ví dụ mã code 012345 bị tự động cast sang integer thành 12345).

  • Đổi về ý nghĩa và nội dung các giá trị trong cột (datatype và tên cột giữ nguyên): Đây gọi là semantic change hay semantic drift, và có thể sẽ không gây lỗi. Đây là một loại thay đổi nguy hiểm và pipeline không fail nên ta không phát hiện ra, chỉ đến khi thấy các tính toán, phân tích đưa ra kết quả sai thì mới phát hiện được. VD1: ta dùng query sau để lấy dữ liệu một số giao dịch từ hệ thống nguồn: SELECT col1, col2, status FROM tbl1 WHERE status <> 'Pending' . Giờ bảng nguồn thay đổi giá trị “Pending” thành “Awaiting Response” và ta không biết điều này, vì vậy pipeline của ta sẽ bị load dữ liệu của cả các giao dịch pending và làm sai lệch các chỉ số trên báo cáo phân tích. VD2: Ta load dữ liệu những khách hàng “Active” từ nguồn bằng query sau: SELECT col1, col2 FROM customer_table WHERE customer_status = "Active", tuy nhiên giờ hệ thống nguồn đã thay đổi định nghĩa về khách hàng “Active” từ chỗ là khách hàng có giao dịch trong vòng 3 tháng gần nhất thành trong vòng 6 tháng gần nhất tuy nhiên ta không biết điều này, và trong việc tính toán số lượng và giá trị giao dịch trung bình ta vẫn đang chỉ tính toán cho 3 tháng gần nhất, dẫn tới kết quả của báo cáo phân tích bị sai.

Cách phòng tránh và hướng xử lý lỗi:

  • Data contract: Nếu ta lấy dữ liệu từ một hệ thống nội bộ của doanh nghiệp thì ta có thể trao đổi và thống nhất với đơn vị quản lý hệ thống đó để thống nhất schema, cách thức cung cấp dữ liệu cho data team, … đảm bảo không xảy ra trường hợp schema của nguồn bị thay đổi mà data engineer không biết. Đây gọi là data contract (vì nó giống như một bản hợp đồng, giao ước giữa 2 bên). Khi chuẩn bị có sự thay đổi về schema thì data team phải được thông báo kịp thời để đưa ra ý kiến và đánh giá tác động tới các data pipeline.

  • Schema evolution: Khi lấy dữ liệu từ nguồn thì trong nhiều trường hợp ta sẽ lưu vào một bảng staging hoặc nếu theo mô hình medallion thì gọi là bảng bronze (tức là bảng chứa dữ liệu thô hầu như là chưa được xử lý, không phải là bảng để dùng cho phân tích, cho các ứng dụng đầu cuối), ta có thể dùng tính năng của data platform đang sử dụng hoặc một công cụ của bên thứ 3 để bảng này có thể tự động thay đổi schema khi schema của dữ liệu nguồn thay đổi, giúp ta không phải can thiệp thủ công quá nhiều mỗi khi schema của nguồn thay đổi.

  • Schema enforcement (cách tiếp cận kiểu schema on write): Ta kiểm tra schema của dữ liệu nguồn, nếu thấy khác với schema của bảng staging/bronze thì cho dừng pipeline luôn, không cho ghi dữ liệu vào bảng staging/bronze. Khi pipeline dừng, thông báo (alert/notification) sẽ được gửi đi và ta sẽ tiến hành trao đổi với các bên liên quan để xử lý (thường nếu nguồn không điều chỉnh lại schema thì ta sẽ phải điều chỉnh thủ công lại schema của bảng staging/bronze và các bảng sau đó nếu cần, điều chỉnh code để type cast dữ liệu). Schema enforcement hoàn toàn có thể thực hiện kèm với data contract (vì khi đã có data contract thì việc không cho phép những thay đổi schema mà chưa được trao đổi, thống nhất đi vào hệ thống dữ liệu của ta lại càng trở nên hợp lý).

  • Schema on read: Cách này thường được sử dụng khi ta nhận dữ liệu dưới dạng file (ví dụ file JSON trả ra từ API). Theo cách này, ta lưu lại các file dữ liệu nhận được, sau đó khi ta đọc dữ liệu từ các file này, ta viết lệnh để lấy ra những cột cần sử dụng, và ta type-cast các cột đó về datatype phù hợp. Schema được xác định khi ta đọc (read) dữ liệu, vì vậy mà được gọi là schema on read.

Tương tự như với việc ingest dữ liệu từ nguồn, bảng đích của ta cũng sẽ là bảng nguồn của các ứng dụng, báo cáo phân tích, hệ thống nào đó (gọi chung là các downstream consumer), cho nên ta cũng cần phải đảm bảo schema của bảng đích của ta không thay đổi, để tránh ảnh hưởng tới các downstream consumer này. Nói cách khác, ta cũng nên có data contract cho bảng đích.

8.2.1.2 Permission & configuration

Thường data pipeline sẽ cần có quyền truy cập đến một số tài nguyên (vd: truy cập database, schema, bảng dữ liệu ở nguồn, truy cập folder chứa file ở datalake, …) và thực hiện một số hành động trên các tài nguyên đó (đọc dữ liệu, ghi dữ liệu, …). Nếu pipeline không được cấp đủ quyền phù hợp thì sẽ xảy ra lỗi.

Các thiết lập về môi trường để chạy pipeline nếu không thiết lập đúng cũng có thể gây ra lỗi (vd: cài đặt thiếu thư viện python cần để chạy code, lỗi typo của một biến global được cài đặt cho pipeline, …).

Cách phòng tránh và hướng xử lý lỗi:

  • Không hard-code các thông tin truy cập và thông tin cài đặt mà dùng các công cụ để quản lý các thông tin này, ví dụ như các environment/configuration file (.env, .yaml, … Các file này thường được để vào .gitignore và ta không muốn git track chúng), các environment variable được khai báo trong công cụ ETL/platform ETL mà ta sử dụng (vd: các variable khai báo ở Apache Airflow, Azure Data Factory, …), các công cụ quản lý secret (vd: AWS Secret Manager, Azure Key Vault, …), … Sử dụng các công cụ này sẽ giúp ta quản lý thông tin truy cập và cài đặt một cách tập trung, từ đó tránh tối đa lỗi phát sinh và khi có lỗi phát sinh thì xử lý cũng nhanh và chính xác hơn. Ngoài ra, việc hard-code các thông tin này cũng gây rủi ro về bảo mật và không phải là thông lệ tốt.

  • Có quy định và quy trình để thường xuyên thay đổi và thay đổi kịp thời các key và password cần sử dụng trong pipeline trước khi chúng hết hạn.

  • Việc thay đổi các cài đặt hệ thống nếu được thực hiện qua code, có review (infrastructure as code) thì sẽ an toàn hơn và hạn chế lỗi phát sinh.

8.2.1.3 Data quality issues

Các lỗi chất lượng dữ liệu là những lỗi rất nguy hiểm vì chúng làm sai lệch dữ liệu dẫn tới đưa ra các insight không chính xác, ảnh hưởng tới việc ra quyết định trong doanh nghiệp. Các lỗi này có thể đơn giản là những lỗi về định dạng dữ liệu (format), thiếu dữ liệu, trùng lặp dữ liệu mà ta có thể dễ dàng phát hiện, nhưng nhiều khi cũng có thể là những lỗi liên quan tới kiến thức nghiệp vụ kinh doanh mà ta khó phát hiện ra được nếu không làm data profiling tốt và không có sự phối hợp với các bên nghiệp vụ và các data analyst, và thường chỉ được phát hiện ra khi dữ liệu đã đi vào các ứng dụng đầu cuối và các báo cáo phân tích, đôi khi thậm chí sau một thời gian dài sử dụng mới được phát hiện ra.

Cách phòng tránh và hướng xử lý lỗi:

  • Trước khi xây data pipeline thì ta cần thực hiện kiểm tra dữ liệu nguồn (data profiling). Việc này giúp ta hiểu bản chất dữ liệu và phát hiện được các vấn đề hiện có cũng như lường trước được các vấn đề có thể phát sinh trong tương lai, từ đó giúp ta tạo các logic trong pipeline để kiểm tra chất lượng dữ liệu.

  • Cần có các bước kiểm tra chất lượng dữ liệu trong data pipeline (khác với việc kiểm tra chất lượng dữ liệu khi pipeline đã chạy xong) như đếm và so sánh số dòng, đếm giá trị null, kiểm tra một số business rule, kiểm tra referential integrity (kiểm tra xem có giá trị nào của foreign key ở bảng A mà không tồn tại ở primary key ở bảng B không), … để đảm bảo dữ liệu đưa vào các hệ thống báo cáo phân tích, các ứng dụng đầu cuối được chính xác, giúp doanh nghiệp đưa ra quyết định đúng đắn. Có hai chỗ quan trọng trong pipeline mà ta cần kiểm tra chất lượng dữ liệu đó là:

    • Kiểm tra dữ liệu raw ngay sau khi lấy từ nguồn về (chưa hoặc hầu như chưa trải qua biến đổi gì) để phát hiện sớm các lỗi dữ liệu phát sinh do nguồn. Đây là cách làm theo kiểu “fail fast”, tức là phát hiện lỗi và dừng pipeline sớm để tránh tiêu tốn tài nguyên hệ thống không cần thiết cũng như ngăn chặn sớm dữ liệu kém chất lượng đi vào các bảng quan trọng hơn ở phía sau.

    • Kiểm tra dữ liệu ngay trước bước ghi dữ liệu vào bảng đích. Việc này giúp đảm bảo rằng ta chỉ ghi dữ liệu vào bảng đích khi dữ liệu đạt đủ các yêu cầu về chất lượng. Bước kiểm tra này cũng sẽ giúp phát hiện lỗi dữ liệu phát sinh do logic biến đổi dữ liệu của ta.

  • Data engineer cần phối hợp với các bên nghiệp vụ và data analyst/business analyst để thống nhất các business rule để kiểm tra chất lượng dữ liệu.

  • Nếu ta sử dụng cơ sở dữ liệu quan hệ (SQL Server, MySQL, Oracle, …) thì ta nên tận dụng các tính năng có sẵn của chúng (vd: các constraint) để đảm bảo chất lượng của dữ liệu được đưa vào bảng.

  • Idempotency: Nếu pipeline của ta idempotent thì khi có lỗi xảy ra và ta đã can thiệp thủ công để sửa lỗi xong rồi, ta chỉ cần chạy lại pipeline cho batch dữ liệu mà bị lỗi. Nếu pipeline không idempotent thì khi chạy lại sẽ không chạy được hoặc làm sai lệch dữ liệu, hoặc ta phải viết thêm script để xử lý dữ liệu trước rồi mới chạy lại pipeline được (ví dụ phải viết script xoá dữ liệu đã bị insert vào bảng đích từ lần chạy lỗi đi rồi mới chạy lại pipeline).

  • Sử dụng Write-Audit-Publish (WAP) pattern: Pattern này về cơ bản là ta sẽ không ghi dữ liệu trực tiếp vào bảng đích mà sẽ ghi dữ liệu vào 1 bảng staging table trước (Write). Bảng này có schema giống hệt bảng đích và người dùng cuối sẽ không có quyền truy cập bảng này. Sau đó, ta chạy code kiểm tra dữ liệu ở bảng staging này (Audit), nếu dữ liệu đã đảm bảo thì lúc đó ta chuyển đưa dữ liệu từ bảng staging sang bảng production (Publish). Đối với các ETL pipeline truyền thống thì pattern này không có gì mới, chỉ đơn giản là ta lưu dữ liệu tại một bảng staging (thường là ở một database hoặc schema khác so với bảng production), thực hiện các biến đổi dữ liệu và kiểm tra chất lượng dữ liệu tại đó, khi đã đảm bảo dữ liệu tốt thì ta load vào bảng đích (INSERT, MERGE, …). Hoặc ta cũng có thể dùng partition exchange (nếu database có tính năng này) để thay đổi metadata và biến bảng staging thành một partition của bảng đích, như vậy dữ liệu thực tế không bị di chuyển hay bị copy thêm ra, mà chỉ có metadata thay đổi nên việc chuyển dữ liệu vào bảng đích diễn ra rất nhanh. Trên các nền tảng data lake trên cloud thì việc lưu dữ liệu là dưới dạng file, nhưng cách làm thì cũng tương tự, vẫn theo nguyên tắc là lưu dữ liệu staging (ở đây là lưu dưới dạng các file), rồi kiểm tra chất lượng dữ liệu staging, kiểm tra xong thì load dữ liệu vào bảng đích (bằng cách copy và tạo các file mới) hoặc thay đổi metadata để hệ thống nhận diện các file staging là thuộc về dữ liệu của bảng đích thay vì bảng staging, từ đó tránh được việc di chuyển, copy file (tương tự như tính năng partition exchange được nói đến ở bên trên). Nếu dữ liệu staging không qua được các bài kiểm tra chất lượng thì sẽ không được chuyển vào bảng đích. Lúc này dữ liệu bảng đích không bị ảnh hưởng, và data engineer vào bảng staging để kiểm tra các lỗi dữ liệu. Tựu chung lại thì pattern WAP này giúp ta đảm bảo các downstream consumer không bao giờ truy cập được dữ liệu mà chưa được qua kiểm tra chất lượng.

    Góc giải lao:

    Xin chia sẻ thêm với bạn đọc một bài hát về WAP pattern: https://getdbt.wistia.com/medias/a46jumcltv

    Lyrics: https://github.com/voiceoftheanalyst/analyst_jams/blob/main/WAP_write_audit_publish

  • Sử dụng Dead letter queue / Quarantine pattern: Pattern này về cơ bản là ta sẽ kiểm tra chất lượng dữ liệu và nếu phát hiện các bản ghi bị lỗi thì thay vì dừng pipeline, ta lưu các bản ghi lỗi này vào một nơi nào đó, và để pipeline tiếp tục chạy, tiếp tục xử lý và luân chuyển các bản ghi không bị lỗi sang các bước tiếp theo. Cách làm này giúp ta kiểm tra và điều tra lỗi dữ liệu dễ dàng (vì ta chỉ cần kiểm tra các bản ghi bị lỗi đã được lưu riêng vào một chỗ, và ghi lưu các bản ghi bị lỗi ta cũng có thể lưu luôn chi tiết của lỗi để giúp việc kiểm tra càng thêm thuận tiện) và giúp pipeline vẫn hoạt động liên tục mà không cần can thiệp thủ công. Vì vậy, cách làm này khá hữu ích cho các streaming pipeline, và vẫn áp dụng hoàn toàn bình thường được với pipeline theo kiểu batch load. Tuy nhiên, cách làm này sẽ khiến dữ liệu cung cấp cho các ứng dụng đầu cuối không được đầy đủ, và ta cần trao đổi với các bên liên quan để quyết định xem điều này có chấp nhận được hay không. Tuỳ vào bản chất của dữ liệu và nhu cầu sử dụng dữ liệu mà ta có thể sẽ cần tạo một “replay pipeline” để đưa các bản ghi lỗi này trở lại data pipeline chính và đi vào các bảng dữ liệu, tuy nhiên vì batch chứa các bản ghi lỗi này đã được xử lý xong mất rồi, ta có thể phải backfill dữ liệu ở các ứng dụng đầu cuối.

  • Có các bước deduplicate dữ liệu: Việc dữ liệu bị trùng lặp rất thường xuyên xảy ra, có thể do phát sinh tại nguồn, hoặc do logic xử lý, biến đổi dữ liệu của ta. Để loại bỏ trùng lặp (deduplicate) dữ liệu thì về cơ bản ta cần xác định được trường hoặc bộ các trường giúp định danh duy nhất cho từng bản ghi (uniquely identify each record), tiêu chí deduplicate (lấy bản ghi nào trong những cái bị trùng nhau?), và phạm vi deduplicate (trong phạm vi batch dữ liệu xử lý hay trong vài batch gần đây hay trong tất cả các bản ghi hiện có, …). Với batch ingestion thì thường phạm vi sẽ là deduplicate dữ liệu trong batch đó, còn với streaming ingestion thì ta cần xác định một khoảng thời gian cho việc kiểm tra và deduplicate dữ liệu. Nếu khoảng thời gian quá ngắn thì ta có thể bỏ sót một số trường hợp trùng lặp dữ liệu, nếu khoảng thời gian quá dài thì ta phải lưu trữ và xử lý một lượng dữ liệu lớn, làm tốn tài nguyên và thời gian.

  • Phát hiện các bản ghi đến muộn (late-arriving records/late data): Thường thì sau khi chạy pipeline một thời gian, ta kiểm tra dữ liệu của các bảng trung gian và bảng đích thì mới biết được là có hiện tượng bản ghi đến muộn. Trước khi bàn đến cách xử lý thì ta cần bàn về các cách để pipeline mỗi khi chạy load được các bản ghi này về và cách để xác định bản ghi nào là bản ghi đến muộn trong số các bản ghi được load về:

    • Điểm mấu chốt là ta cần phân biệt được giữa “arrival time” và “event time”. Arrival time là thời gian mà dữ liệu xuất hiện/được ghi vào ở hệ thống nguồn (nếu nguồn là bảng thì đây chính là các cột thể hiện thời gian mỗi bản ghi được insert vào hoặc update ở trong bảng, thường là các cột như insert_timestamp, last_update_timestamp, …, nếu nguồn là file thì nó là thời gian mà file xuất hiện ở nơi lưu trữ file (vd: trường file_arrival_datetime ở metadata của file). Event time là thời gian mà một sự kiện kinh doanh/vận hành của doanh nghiệp xảy ra, ví dụ như thời gian mà giao dịch mua hàng diễn ra, thời gian bắt đầu hiệu lực của hợp đồng, … Khi incremental load dữ liệu từ nguồn sử dụng delta column, ta nên dùng cột “arrival time” để làm delta column, thì sẽ đảm bảo ta luôn lấy được tất cả các bản ghi mới nhất, bao gồm cả các bản ghi đến muộn. Nếu ta sử dụng cột “event time” làm delta column thì khi các bản ghi đến muộn phát sinh, chúng sẽ có “event time” trong quá khứ và vì vậy pipeline incremental load của ta sẽ bỏ qua các bản ghi này (vì ta chỉ load các bản ghi có delta column lớn hơn giá trị delta column của lần chạy pipeline gần nhất). Nếu ta incremental load bằng cách đọc dữ liệu CDC (Change Data Capture) của nguồn hoặc sử dụng các file dữ liệu (vd: file json, incremental load dựa trên file_arrival_datetime) do nguồn trả ra thì ta sẽ luôn load được các bản ghi đến muộn.

      Khi đã load được dữ liệu rồi thì ta cần có cách xác định bản ghi nào trong batch dữ liệu thuộc loại bản ghi đến muộn. Ta thường chọn một trường thông tin chỉ thời gian (chính là trường “event time”) làm căn cứ xác định. Bản ghi nào có event time < MAX(event time) của lần chạy pipeline gần nhất thì là bản ghi đến muộn.

      Ta cũng lưu ý một khái niệm nữa đó là “processing time”. Đây là thời gian mà bản ghi được data pipeline của ta xử lý (thời gian này chắc chắn sẽ sau “event time”). Ở các bảng staging và bảng đích của ta đều nên có các cột thể hiện thông tin processing time này, để ta theo dõi được thời gian mà mỗi dòng dữ liệu được ghi mới, cập nhật vào bảng. Ta thường đặt tên các cột này là insert_timestamp, last_update_timestamp, etl_datetime, … Đó là với data pipeline của ta, còn nếu nói đến hệ thống nguồn thì “arrival time” mà ta nói đến bên trên cũng có thể coi là “processing time” của hệ thống nguồn.

    • Nếu data pipeline của ta dùng chiến lược full load thì khi các bản ghi đến muộn xuất hiện chúng sẽ tự động được load về (vì ta luôn load tất cả dữ liệu tại nguồn mỗi khi pipeline chạy). Nếu trong lần chạy pipeline này mà các bản ghi đó chưa xuất hiện thì ta chỉ cần chờ đến những lần chạy tiếp theo kiểu gì chúng cũng sẽ được load về. Thường với những bảng dùng full load thì ta cũng không có nhu cầu quan tâm tới việc xác định bản ghi nào là bản ghi đến muộn.

    • Streaming: Trong các streaming pipeline, thường ta có thực hiện hành động tổng hợp dữ liệu (aggregation, ví dụ như tính tổng, tính trung bình, tính max, tính min, …) theo từng khoảng thời gian (window of time). Ví dụ: đếm số lượng giao dịch phát sinh trong các khoảng thời gian 15 phút (ví dụ: 1h - 1h15, 1h15 - 1h30, 1h30 - 1h45, …). Khi phát hiện các bản ghi đến muộn của khoảng thời gian nào thì hệ thống sẽ cập nhật giá trị tính toán (aggregate value) của khoảng thời gian đó. Tuy nhiên, ta không thể để hệ thống chờ các bản ghi đến muộn mãi được vì làm sao hệ thống biết được là đến bao giờ thì sẽ không còn bản ghi đến muộn nữa để mà chốt danh sách các bản ghi và thực hiện tính toán và trả ra kết quả. Ngoài ra, hệ thống phải lưu trữ trạng thái/kết quả tính toán aggregation của các khoảng thời gian (window) đã diễn ra, thường là lưu trữ trong memory, và nếu ta cứ để hệ thống phải ghi nhớ kết quả tính toán của tất cả mọi khoảng thời gian đã xảy ra như vậy thì hệ thống sẽ phải lưu trữ quá nhiều, sẽ đến lúc bị tràn bộ nhớ. Vì vậy, trong streaming người ta có một khái niệm và tính năng gọi là watermark, tức là ta thiết lập một khoảng thời gian chờ tối đa (delay threshold/allowed lateness/…), thường hệ thống sẽ lấy MAX(event time) của các bản ghi/event đã nhận được rồi trừ đi thời gian chờ này để tính toán ra một mốc thời gian gọi là watermark. Mốc thời gian watermark này liên tục được tính toán lại khi có thêm các bản ghi/event mới. Các bản ghi có event time từ mốc watermark đến hiện tại được hệ thống tính là không muộn và chắc chắn sẽ được sử dụng khi thực hiện aggregation. Các bản ghi có event time xảy ra trước mốc watermark sẽ được hệ thống tính là bản ghi đến muộn.

  • Xử lý các bản ghi đến muộn:

    • Bỏ qua bản ghi đến muộn: Đây là cách đơn giản nhất, tuy nhiên chỉ dùng được trong trường hợp số lượng bản ghi đến muộn là rất nhỏ, không đáng kể, việc xảy ra bản ghi đến muộn cũng rất hãn hữu, và việc bỏ các bản ghi này đi không làm ảnh hưởng đáng kể đến mục đích sử dụng dữ liệu của các downstream consumer (thường là mục đích phân tích, thống kê, thì thiếu một vài bản ghi trong số hàng triệu bản ghi sẽ không làm thay đổi đáng kể các chỉ số thống kê).

    • Sử dụng “static lookback window”: Ta xác định một khoảng thời gian cố định, gọi là static lookback window và mỗi khi pipeline chạy, ta sẽ process dữ liệu mới của lần chạy đó và process lại dữ liệu của tất cả các lần chạy trước trong khoảng thời gian lookback window đó. Ví dụ: pipeline của ta chạy 1 lần/ngày, hôm nay là ngày 20/10, static lookback window của ta là 7 ngày, thì pipeline hôm nay chạy sẽ process dữ liệu ngày 20/10 và process lại dữ liệu từ ngày 13/10 đến ngày 19/10. Khoảng thời gian lookback window này là bao lâu thì tuỳ thuộc vào các yếu tố như yêu cầu của đơn vị sử dụng dữ liệu, tình trạng dữ liệu đến muộn mà ta theo dõi được, … Ta lưu ý là cách làm này sẽ có thể gây lãng phí tài nguyên vì lần chạy nào ta cũng process lại dữ liệu trong khoảng thời gian lookback window, và có thể là có những lần chạy mà không có bản ghi đến muộn nào trong khoảng thời gian đó cả. Ngoài ra, khi ta process lại dữ liệu thì ta cần lưu ý là các downstream consumer mà sử dụng dữ liệu của ta cung cấp rất dễ là cũng sẽ phải process lại dữ liệu của họ (gọi là reprocessing hoặc backfilling data), và ta backfill dữ liệu càng nhiều thì downstream consumer của ta cũng phải backfill lại càng nhiều, và nếu các consumer này cũng có các downstream consumer khác tiêu thụ dữ liệu của họ thì nó sẽ thành ra một hiệu ứng domino và khiến toàn bộ hệ thống phải backfill rất nhiều.

    • Sử dụng “dynamic lookback window”: Ta xác định vùng dữ liệu nào có xuất hiện bản ghi đến muộn và chỉ process lại các vùng dữ liệu đó. Vd: Chỉ reprocess lại ngày 11, 15 và 16 của tháng này do các bản ghi đến muộn có event date rơi vào những ngày đó. Tuy nhiên, ta lưu ý là với các stateful pipeline (tức là kết quả của lần chạy này phụ thuộc vào kết quả của lần chạy trước) thì ta sẽ phải process lại tất cả các ngày kể từ ngày MIN(event date) của các bản ghi đến muộn (trong ví dụ này là ngày 11) chứ không phải là chỉ process lại những ngày có bản ghi đến muộn. VD: ta có pipeline load dữ liệu hành vi người dùng trên website của ta, tối muộn ngày 11 người dùng mở trang chủ website và sau đó ta thấy không làm gì nữa cả, và sáng hôm sau thấy lại có hoạt động trên website, xem một số sản phẩm và mua hàng. Logic trong pipeline của ta xác định người dùng này đã thực hiện 2 session, một session tối ngày 11 được tính là inactive (vì không có hoạt động gì ngoài truy cập vào trang chủ) và một session active vào rạng sáng ngày 12. Tuy nhiên, sau đó ta nhận được một số bản ghi đến muộn, thể hiện người dùng có các hoạt động xem sản phẩm và bỏ sản phẩm vào giỏ hàng trong tối ngày 11. Vậy có nghĩa là thực chất người dùng chỉ thực hiện 1 session trên website từ tối ngày 11 đến rạng sáng ngày 12, và ta cần process lại dữ liệu cả 2 ngày này để thể hiện điều đó, nếu chỉ process lại dữ liệu ngày 11 thì kết quả vẫn sẽ trả ra là 2 session, như vậy là không chính xác. Vậy ta có thể thấy vấn đề đối với dynamic lookback window đó là nó không có ngưỡng chặn nào về thời gian cho lookback window, và nếu có một bản ghi đến muộn với event date từ rất xa trong quá khứ thì pipeline của ta có thể sẽ phải process lại một lượng dữ liệu rất lớn.

Tất nhiên, vẫn còn có những biện pháp khác nữa để phát hiện và xử lý các vấn đề về chất lượng dữ liệu mà trong khuôn khổ một bài học này tôi cho rằng nếu đưa vào sẽ là quá nhiều, gây ngợp cho bạn đọc, và chắc chắn cũng có những biện pháp mà tôi chưa được biết đến. Hi vọng bạn đọc luôn giữ một tinh thần cởi mở, ham tìm tòi, khám phá và hãy nhắn cho tôi biết những cách làm mà bạn và đơn vị của bạn đang sử dụng nhé!

8.2.1.4 Transformation logic errors

Lỗi trong data pipeline cũng có thể đến từ các sai sót khi ta tạo các logic biến đổi dữ liệu. Điều này xảy ra khá thường xuyên, và khi các logic biến đổi càng nhiều và càng phức tạp thì tỉ lệ xảy ra sai sót càng lớn, ngay kể cả với những data engineer đã nhiều kinh nghiệm.

Cách phòng tránh và hướng xử lý lỗi:

  • Tạo các chốt kiểm tra dữ liệu (data quality checks hay gọi là data validation checks) trong pipeline tương tự như đã đề cập ở phần data quality issues bên trên. Các logic kiểm tra dữ liệu này sẽ giúp ta phát hiện được các lỗi dữ liệu phát sinh sau khi thực hiện biến đổi.

  • Thực hiện “unit test”: Đọc về unit test ở mục Testing dưới đây

  • Dưới đây là ví dụ về một số lỗi phổ biến khi thực hiện biến đổi dữ liệu mà ta cần lưu ý:

    • Xử lý một số loại datatype không chính xác. Vd: biến đổi date, datetime với các timezone khác nhau không chính xác, các định dạng date không chính xác (MM/DD/YYYY, DD/MM/YYYY, …), …, các phép tính toán bị tự động làm tròn, không xử lý tình huống chia cho số 0, …, độ dài string vượt quá độ dài tối đa cho phép của cột dẫn tới string tự động bị cắt bớt (string truncating), encoding chưa phù hợp gây phát sinh các ký tự lạ trong string, …

    • Row explosion: Khi ta join 2 bảng với nhau nhưng một bảng có tính trạng trùng lặp dữ liệu (duplicated data) thì dễ dẫn đến tình trạng số dòng dữ liệu sau khi join bị nhân lên nhiều lần

    • Xử lý các giá trị NULL không chính xác. VD: lệnh SELECT * FROM orders WHERE status != 'Cancelled' sẽ bỏ qua các bản ghi có status NULL, nếu muốn lấy cả các bản ghi này thì một trong các cách sửa là: SELECT * FROM orders WHERE status != 'Cancelled' OR status IS NULL

8.3 Testing

Kiểm thử (testing) là vô cùng quan trọng đối với data engineering để hạn chế tối đa sai sót, gây ảnh hưởng tới các ứng dụng, báo cáo mà sử đụng dữ liệu do ta cung cấp. Có một số loại kiểm thử thường được sử dụng, bao gồm unit test, integration test, end-to-end test, user acceptance test, và regression test.

Các data engineer sẽ quan tâm nhiều nhất tới 3 loại test đầu tiên với thứ tự thực hiện các bài test này thường là: Unit test -> Integration test -> End-to-end test.

Regression test được thực hiện khi ta muốn kiểm tra xem các pipeline cũ (hiện có) có bị ảnh hưởng gì bởi các thay đổi hay tính năng mới mà ta đang phát triển hay không.

User acceptance test thường được thực hiện bởi business stakeholder.

Dưới đây là sơ lược về từng loại test này.

8.3.1 Unit test

Unit testing tức là kiểm thử (test) một đơn vị code (unit of code), và thường nó là đơn vị code nhỏ nhất và đơn vị code này thực hiện một chức năng nào đó. Trong tiếng Anh ta gọi là testing the smallest functional unit of code. Thường đơn vị code này là một function hoặc method. Hiểu một cách đơn giản thì khi viết các logic xử lý dữ liệu trong data pipeline, ta nên cố gắng tư duy theo hướng mô đun hoá (modular thinking), đưa các logic xử lý dữ liệu vào các function và method, và để đảm bảo các function, method này hoạt động chính xác như ta mong muốn thì ta viết các unit test (các đoạn code) để kiểm thử chúng.

Khi thực hiện unit test, các bước cơ bản sẽ là:

  • Viết code sao cho ta có thể unit test được một cách dễ dàng. VD: Chia logic xử lý dữ liệu thành các function, script, tuân thủ nguyên tắc single responsibility, không hard code dependencies (vd: database connection, API client, …) trong function mà thay vào đó pass các dependencies này dưới dạng argument vào function (vì như vậy ta sẽ dễ dàng mock các dependencies này).

  • Tạo mock input data và expected output data: Mock data tức là tạo dữ liệu giả (không phải là dữ liệu lấy từ hệ thống production của ta) chỉ phục vụ cho việc kiểm thử. Ta tạo một dataset dữ liệu đầu vào và một dataset dữ liệu đầu ra (chính là kết quả cần đạt được sau khi biến đổi dataset dữ liệu đầu vào)

  • Ta gọi function, method biến đổi dữ liệu mà ta cần test để nó biến đổi dataset đầu vào. Sau đó ta kiểm tra (assert) kết quả trả ra có giống hệt với expected output dataset hay không. Nếu giống thì tức là function, method đó đã pass unit test.

8.3.2 Integration test

Đây là việc kiểm tra xem các cấu phần (module) của một hệ thống phần mềm khi hoạt động cùng với nhau có phát sinh lỗi gì hay không. Về cơ bản là kiểm tra xem các cấu phần kết nối với nhau có lỗi gì không. Nếu như với unit test thì ta thường phải mock các dataset, thì trong integration test thường ta sẽ tạo các database hoặc các service tạm thời (có thể dùng Docker) để xem khi chạy code kết nối, tương tác với các database, service đó thì có lỗi gì không, và sau khi chạy test xong thì chúng sẽ được huỷ bỏ đi.

Integration test rất phổ biến trong phát triển phần mềm (software engineering), tuy nhiên trong data engineering, với kinh nghiệm và hiểu biết còn hạn chế của tôi, tôi chưa thấy loại hình kiểm thử này được sử dụng rộng rãi.

8.3.3 End-to-end test

Đây là việc chạy toàn bộ pipeline từ đầu đến cuối trên dữ liệu thật. Đây là bước test cuối cùng để đảm bảo pipeline chạy chính xác và không phát sinh lỗi gì. Thường ta sẽ phát triển (viết code) ở môi trường dev, và test end-to-end thì được thực hiện ở môi trường staging. Bước end-to-end test này cũng là nơi ta phát hiện ra các vấn đề liên quan tới orchestration (vì ta dùng orchestration tool để chạy pipeline) ví dụ như truyền sai giá trị vào biến của job ETL, để phát hiện các vấn đề liên quan tới performance (vì ta chạy trên dữ liệu thật, giống hoặc gần giống với dữ liệu trên production), và để thực hiện sanity check (đại khái nghĩa là check dữ liệu xem có thấy nó trông hợp lý không. VD: có bảng nào hay cột nào null hoàn toàn không, số dòng trông có hợp lý không, các giá trị, tính toán có phù hợp với business logic không, …).

Để cho dễ nhớ thì ta có thể nhớ là unit test là để check các logic, integration test là để check các kết nối (connection) và tương tác giữa các cấu phần, và end-to-end test là để chạy thử toàn bộ pipeline từ đầu tới cuối.

8.3.4 User acceptance test (UAT)

Nếu như unit test, integration test và end-to-end test được thực hiện bởi data engineer hoặc bộ phận QA (quality assurance) thì UAT được thực hiện bởi các business stakeholder (những người dùng dữ liệu, thường là data analyst, data scientist, nhân sự của các team khác trong doanh nghiệp như tài chính kế toán, marketing, nhân sự, …).

Thường pipeline sẽ được deploy và chạy trong một môi trường gần giống với production nhất có thể để phục vụ cho UAT (có thể là môi trường staging). Tại đây người dùng dữ liệu sẽ kiểm tra dữ liệu và phát hiện các vấn đề mà các bài test tự động không phát hiện ra được. Thường người test sẽ có các test case (các bài test để đảm bảo dữ liệu đúng với yêu cầu sử dụng của họ) và thực hiện các test case này. Nếu các test case pass thì sẽ có tài liệu ghi lại kết quả test và xác nhận của người test. Sau đó, pipeline đã sẵn sàng để được deploy lên production.

8.3.5 Regression test

Regression test được thực hiện khi ta muốn kiểm tra xem các pipeline cũ (hiện có) có bị ảnh hưởng gì bởi các thay đổi hay tính năng mới mà ta đang phát triển hay không. VD: Ta đang muốn đổi tên một cột ở bảng này thì có làm ảnh hưởng đến pipeline nào đang sử dụng bảng đó hay không; ta cần thay đổi INNER JOIN thành LEFT JOIN trong một pipeline thì có ảnh hưởng gì và có phải sửa gì ở các pipeline khác không, về performance có ảnh hưởng gì và cần điều chính gì không.

8.4 Idempotency

Một idempotent data pipeline là pipeline mà khi ta cho pipeline chạy nhiều hơn 1 lần thì kết quả trả ra cũng vẫn như khi chạy chỉ 1 lần. Điều này là rất quan trọng vì trong nhiều trường hợp ta cần chạy lại data pipeline (ví dụ như đã nói đến trong phần transient errors phía trên) và nếu pipeline không idempotent thì khi chạy lại sẽ làm sai lệch hoặc trùng lặp dữ liệu.

Khi xây pipeline sử dụng chiến lược ingestion là full load overwrite như nêu ở phần Overwrite bên trên thì pipeline của ta luôn idempotent.

Khi xây pipeline sử dụng chiến lược ingestion là full load append như nêu ở phần Append/Snapshotting hoặc incremental load như nêu ở phần Incremental load bên trên thì ta có thể dùng các pattern về ghi dữ liệu sau để đảm bảo pipeline được idempotent:

  • Delete-write: Xoá (delete) một vùng dữ liệu ở bảng đích, sau đó ghi (insert/write) dữ liệu mới vào. Điều này đảm bảo là kể cả ta có chạy lại pipeline thì cũng không bị trùng lặp dữ liệu.

    VD:

    BEGIN TRANSACTION;
      -- 1. Delete existing data (if any)
      DELETE FROM sales_table WHERE sale_date = '2025-01-27';
    
      -- 2. Insert new data. 
      -- Staging table only has data loaded for this batch, in this case '2025-01-27'
      INSERT INTO sales_table SELECT * FROM staging_sales;
    COMMIT;
  • Upsert (insert + update): Tuỳ vào cơ sở dữ liệu hoặc hệ thống xử lý dữ liệu ta sử dụng mà sẽ có các lệnh upsert hoặc merge. Các lệnh này hoạt động như sau. Ta có một tập dữ liệu (một bảng hoặc dataframe, ta tạm gọi là bảng nguồn) và ta muốn upsert/merge vào bảng đích. Hệ thống sẽ kiểm tra xem những bản ghi nào ở bảng nguồn mà chưa tồn tại ở bảng đích (dựa vào điều kiện join giữa 2 bảng) thì sẽ insert vào bảng đích, những bản ghi nào tồn tại ở cả 2 bảng thì lấy dữ liệu ở bảng nguồn update vào bảng đích.

Nếu đích đến của việc ghi dữ liệu là các bảng trong database thì ta nên đưa code xử lý dữ liệu và ghi dữ liệu vào trong một transaction (như ví dụ ở phần Delete-write bên trên và như đã trình bày trong nội dung của bài học số 4 của khoá học này) giúp đảm bảo ETL pipeline không lưu dữ liệu không hoàn chỉnh vào database, nên đảm bảo tính idempotency.

Khi viết các câu lệnh DDL (data definition language) ta cũng cần viết chúng sao cho đảm bảo idempotency. Cách làm về cơ bản là check sự tồn tại của object trong database trước khi thực hiện tạo, xoá hay chỉnh sửa. VD:

CREATE TABLE IF NOT EXISTS table_name (...);

CREATE OR REPLACE TABLE table_name (...);

DROP TABLE IF EXISTS table_name;

IF OBJECT_ID('MyProcedure','U') IS NULL
    EXEC('CREATE PROCEDURE MyProcedure AS SET NOCOUNT ON;')
GO
ALTER PROCEDURE MyProcedure
...
;

9 Data orchestration

Đây là việc quản lý và điều phối các luồng dữ liệu đi qua các hệ thống, các dịch vụ, ứng dụng khác nhau. Các công cụ về data orchestration giúp data engineer tổ chức và tối ưu hoá việc chạy tự động các data pipeline, data job.

9.1 Tools

Một trong những nền tảng, công cụ orchestration phổ biến nhất hiện nay là Apache Airflow. Đây là một công nghệ được open source và các bug được cộng đồng fix khá nhanh. Cộng đồng sử dụng Airflow lớn và hoạt động sôi nổi, nên ta có thể dễ dàng tìm kiếm câu trả lời cho các lỗi hay vấn đề gặp phải khi sử dụng. Với các bạn fresher hoặc junior data engineer mà chưa từng sử dụng một công cụ data orchestration nào thì rất nên học sử dụng Airflow vì nó mạnh mẽ, phổ biến, miễn phí, và cũng giúp bạn phần nào đó thực hành viết code Python.

Trên thị trường cũng có những công cụ data orchestration khác của các hãng lớn, và tuỳ vào doanh nghiệp mà bạn làm việc đang sử dụng công cụ gì mà bạn sẽ phải học công cụ đó. Tuy nhiên các công cụ cũng đều có một số những tính năng, đặc điểm quan trọng mà bạn cần nắm được, được nêu ở các phần tiếp theo đây.

9.2 Dependency management

Đây là khía cạnh quan trọng nhất của data orchestration. Về cơ bản thì bất kì một công cụ data orchestration tốt nào cũng cần phải giúp ta thiết lập được là trong một job thì task nào chạy trước task nào chạy sau, task nào phải chờ task nào xong rồi mới được chạy, các task nào được chạy cùng một lúc, task nào chạy khi có một event nhất định xảy ra (ví dụ khi có file mới xuất hiện trong thư mục lưu trữ), … và thậm chí là các job nào phụ thuộc vào job nào, v.v. Đây chính là khía cạnh dependency management. Có những công cụ tự động hoá (automation tool) chỉ có thể giúp ta đặt lịch chạy tự động cho một task riêng lẻ chứ không giúp ta thiết lập trình tự chạy của các task, các job, và sự phụ thuộc lẫn nhau giữa chúng. Một orchestration tool sẽ giúp ta làm được điều đó. Thường các orchestration tool làm điều này thông qua DAG (directed acyclic graph). Dưới đây là hình ảnh ví dụ về một DAG trong Airflow (lấy từ tài liệu hướng dẫn của Airflow).

9.3 Data lineage

Data lineage cho ta thấy chi tiết đường đi của dữ liệu (qua các task, job khác nhau, các bảng, nơi lưu trữ file khác nhau, xuất phát từ đâu, được biến đổi ở đâu, kết thúc được lưu trữ tại đâu, v.v.). Các công cụ orchestration tiên tiến thường cung cấp thông tin data lineage cho ta cả dưới dạng text lẫn đồ thị (visualization). Data lineage rất hữu ích đối với các data engineer vì nó giúp ta xác định được dependencies giữa các task, các data asset (bảng, cột, file, …), xác định được luồn đi của dữ liệu, từ đó mà kiểm tra và xác định nguồn gốc phát sinh lỗi dễ hơn, rà soát tác động của một thay đổi mới cũng dễ hơn, và khi cần cung cấp thông tin về nguồn gốc và cách xử lý, lưu trữ dữ liệu để phục vụ các yêu cầu về quản trị hoặc yêu cầu tuân thủ (của các cơ quan nhà nước chẳng hạn) thì cũng dễ dàng hơn.

Dưới đây là ví dụ về data lineage trong Databricks. Bạn có thể xem thử một demo rất ngắn gọn và trực quan của bên Databricks về data lineage tại đây: https://app.getreprise.com/launch/MnqjQDX/

9.4 Workflow automation

Các công cụ data orchestration đều có tính năng giúp ta tự động hoá việc chạy các task, job (một job gồm một hoặc nhiều task).

9.4.1 Schedule

Đây là cách chạy tự động phổ biến cho batch ingestion. Công cụ orchestration sẽ cho phép ta đặt lịch chạy job ETL dữ liệu vào một khung giờ nhất định và với một tần suất nhất định (ví dụ chạy 4 lần một ngày, bắt đầu từ 0h sáng, và cứ cách 6 tiếng chạy một lần).

9.4.2 Trigger

Về cơ bản thì trigger là cách mà ta cho công cụ orchestration biết là khi nào thì một job được phép chạy. Nếu ta đặt lịch chạy tự động cho job như đề cập ở phần “Schedule” bên trên thì đó cũng là một loại trigger, gọi là schedule-based trigger.

Ngoài ra thì ta còn có event-based trigger, tức là khi có một event nhất định nào đó xảy ra thì sẽ kích hoạt job chạy. Đây là cách kích hoạt job theo kiểu ad-hoc chứ không phải là chạy thường xuyên theo lịch như với schedule-based trigger. Một số loại event-based trigger phổ biến là:

  • Manual trigger: Đây là việc ta kích hoạt một lần chạy của job bằng cách bấm nút chạy trên UI.

  • API trigger: Một số công cụ orchestration cho phép ta gửi lệnh kích hoạt chạy job thông qua API.

  • Trigger from another job: Một số công cụ orchestration cho phép ta tạo một task ở trong job để kịch hoạt một job khác chạy (ví dụ trong các task ở job A thì có kích hoạt job B chạy). Việc kích hoạt này có thể được thực hiện qua việc viết code để gọi API, hoặc là có một loại task đặc biệt được công cụ orchestration cung cấp dùng để kích hoạt job.

  • File arrival: Khi công cụ orchestration phát hiện có file mới hoặc một file thoả mãn điều kiện nào đó xuất hiện ở một nơi lưu trữ mà ta đã chỉ định trước thì công cụ sẽ kích hoạt chạy job.

  • Database change: Khi một bảng mà ta chỉ định trước có xuất hiện thêm dòng mới hoặc được update dữ liệu thì job sẽ được kích hoạt chạy.

  • External service completion: Một số công cụ cho phép ta kích hoạt job chạy sau khi một service nào đó của một công cụ khác chạy xong (vd: Airflow cho phép kích hoạt job chạy dựa trên kết quả chạy job từ một công cụ khác như Azure Data Factory, SageMaker, dbt Cloud, …)

9.5 Integration capabilities

Các công cụ orchestration tốt cần có khả năng kết nối được với các hệ thống sẵn có của doanh nghiệp, và chúng cũng thường đi kèm sẵn với nhiều connector để giúp ta kết nối được với nhiều nguồn dữ liệu khác nhau (các loại database khác nhau, các loại file, v.v.).

9.6 Monitoring & logging

Một trong những ưu điểm rất lớn của các công cụ orchestration là chúng có UI (user interface) và các tính năng để ta nhìn thấy các task, các job chạy một cách trực quan và theo dõi tiến độ và trạng thái chạy của các job (thường là real time) để phát hiện nhanh chóng các task và job bị lỗi.

Việc monitor còn thường được thực hiện với các server dùng để chạy công cụ orchestration và với các server lưu trữ dữ liệu (nơi chứa các file hoặc/và các database mà các job ETL đọc, ghi dữ liệu vào) để kiểm tra và phát hiện kịp thời các vấn đề về performance của các server này.

Các công cụ orchestration cũng thường có log lưu lại các thông tin chi tiết về mỗi lần chạy của task, job như là các thông báo liên quan tới success, failure, thời gian chạy, tài nguyên hệ thống được sử dụng, v.v.

Một số công cụ cho ta tích hợp với các hệ thống khác để gửi thông báo (notification) như email, Microsoft Teams, Slack, … giúp cho việc theo dõi được thuận tiện và kịp thời hơn.

Khi nói đến việc vận hành và theo dõi (monitor) các job ETL, ta thường nghe thấy khái niệm DataOps. Có những nơi thì có một bộ phận DataOps riêng, có những nơi thì data engineer làm luôn công việc này. Các bạn có thể đọc thêm về DataOps tại đây: https://datakitchen.io/what-is-dataops/

9.7 Error handling & alert

Như đã đề cập ở phần Error management bên trên, việc lên kế hoạch trước, phòng tránh và xử lý các lỗi phát sinh khi các pipeline, job ETL chạy là vô cùng quan trọng. Các công cụ orchestration có những tính năng để hỗ trợ ta trong việc này, tiêu biểu là:

  • Chạy lại (rerun) job hoặc chạy lại một vài task trong job. Một số công cụ cho phép ta thiết lập tự động retry/rerun task, job dựa trên một logic nào đó (thường là set số lần retry tối đa, thời gian chờ giữa các lần retry, v.v.)

  • Thiết lập thời gian “execution timeout” cho task hoặc job. Đây là thời gian tối đa mà hệ thống sẽ cho phép task hoặc job được chạy, nếu chạy quá thời gian này thì hệ thống sẽ dừng task/job.

  • Gửi alert/notification khi một số event xảy ra, thường là khi job chạy thành công, job chạy thất bại, job chạy quá lâu (quá một warning threshold đã được thiết lập trước), v.v. hoặc có thể gửi alert dựa trên một logic nào đó (vd: gửi alert khi bảng dữ liệu khách hàng có khách hàng VIP nào được cập nhật thông tin số điện thoại). Alert/notification thường có thể được gửi qua email hoặc gửi vào Microsoft Teams, Slack, hoặc một tool nội bộ nào đó khác của doanh nghiệp.

Lưu ý là ta vẫn cần viết các logic trích xuất dữ liệu, xử lý, lưu trữ dữ liệu đúng cách, tuân theo các thông lệ tốt, pipeline của ta vẫn phải idempotent, chứ các công cụ orchestration thì không thể giúp thay thế được tất cả những điều này, mà chỉ có thể hỗ trợ thêm mà thôi.

9.8 Các chủ điểm khác chưa đề cập tới

Bài học này tập trung vào các chủ điểm mà tôi cho rằng quan trọng và cần thiết giúp cho data engineer phát triển các data pipeline đáng tin cậy. Tuy nhiên, vẫn còn những chủ điểm kiến thức quan trọng khác như “git”, “security & access control”, “CI/CD”, “design for scale”, “Linux command”, v.v. Các chủ điểm này sẽ cần thiết khi ta muốn xây dựng những data pipeline/hệ thống data chuyên nghiệp, chạy được trên môi trường production thực tế. Xin hẹn bạn đọc trong những khoá học hoặc bài viết khác của Data tử tế.

Trong thời gian chờ đợi, bạn đọc hoàn toàn có thể tự tìm hiểu theo những keyword nêu trên, và nên tập trung vào “git” và cách viết code sao cho phù hợp với CI/CD trước, còn chủ điểm “security & access control” có lẽ sẽ phù hợp cho việc “learn on the job” và trong công việc thực tế nếu bạn cần biết sâu thêm thì hẵng học, và vì cũng có thể tại cơ quan của bạn đã có một bộ phận khác đảm nhiệm các việc liên quan tới chủ điểm này rồi. Tương tự với chủ điểm “design for scale” hay các nội dung về tối ưu performance của pipeline, các bạn cũng có thể chưa cần đào sâu ngay (vì có thể các bài toán ở cơ quan của bạn chưa dùng nhiều dữ liệu đến mức cần phải tối ưu nhiều hay phải design để scale lên nhiều lần).