paint-brush
Tạo Data Lakehouse bằng Apache Iceberg và MinIOtừ tác giả@minio
7,790 lượt đọc
7,790 lượt đọc

Tạo Data Lakehouse bằng Apache Iceberg và MinIO

từ tác giả MinIO12m2023/09/14
Read on Terminal Reader

dài quá đọc không nổi

Lời hứa của Data Lakehouses nằm ở khả năng xử lý dữ liệu có cấu trúc và không cấu trúc, tất cả đều nằm trong một giải pháp tập trung sử dụng Apache Iceberg và MinIO.
featured image - Tạo Data Lakehouse bằng Apache Iceberg và MinIO
MinIO HackerNoon profile picture
0-item
1-item
2-item

Trong bài đăng trước, tôi đã giới thiệu về Apache Iceberg và chỉ ra cách nó sử dụng MinIO để lưu trữ. Tôi cũng đã chỉ ra cách thiết lập một máy phát triển. Để thực hiện việc này, tôi đã sử dụng Docker Compose để cài đặt bộ chứa Apache Spark làm công cụ xử lý, danh mục REST và MinIO để lưu trữ. Tôi kết luận bằng một ví dụ rất đơn giản sử dụng Apache Spark để nhập dữ liệu và PyIceberg để truy vấn dữ liệu. Nếu bạn là người mới làm quen với Apache Iceberg hoặc nếu bạn cần thiết lập Apache Iceberg trên máy phát triển của mình thì hãy đọc bài đăng giới thiệu này.


Trong bài đăng này, tôi sẽ tiếp tục phần mà bài đăng trước của tôi đã dừng lại và điều tra một vấn đề phổ biến về dữ liệu lớn – nhu cầu về một giải pháp duy nhất để cung cấp bộ nhớ cho dữ liệu thô, dữ liệu phi cấu trúc và dữ liệu có cấu trúc (dữ liệu đã được quản lý từ dữ liệu thô). dữ liệu). Ngoài ra, giải pháp tương tự sẽ cung cấp một công cụ xử lý cho phép báo cáo hiệu quả đối với dữ liệu được quản lý. Đây là lời hứa của Data Lakehouses - khả năng của Kho dữ liệu đối với dữ liệu có cấu trúc và khả năng của Data Lakes đối với dữ liệu phi cấu trúc - tất cả đều có trong một giải pháp tập trung.


Hãy xem xét kịch bản dữ liệu lớn của chúng tôi chi tiết hơn.

Một vấn đề chung

Sơ đồ dưới đây mô tả một vấn đề phổ biến và một giải pháp giả định. Dữ liệu đang được đưa vào trung tâm dữ liệu từ nhiều vị trí và ở nhiều định dạng. Điều cần thiết là một giải pháp tập trung cho phép chuyển đổi dữ liệu thô sao cho công cụ xử lý có thể hỗ trợ hiệu quả hoạt động kinh doanh thông minh, phân tích dữ liệu và học máy. Đồng thời, giải pháp này cũng phải có khả năng lưu trữ dữ liệu phi cấu trúc (văn bản, hình ảnh, âm thanh và video) để khám phá dữ liệu và học máy. Nó cũng phải giữ lại mọi dữ liệu đã được chuyển đổi ở định dạng ban đầu trong trường hợp cần phát lại quá trình chuyển đổi hoặc cần điều tra vấn đề về tính toàn vẹn dữ liệu.


Dữ liệu có cấu trúc


Một ví dụ cụ thể, hãy tưởng tượng một ngân hàng giám sát toàn cầu đang quản lý quỹ tương hỗ cho khách hàng của mình. Dữ liệu thể hiện sổ sách kế toán và sổ sách đầu tư của từng quỹ cho mỗi khách hàng được truyền liên tục vào Data Lakehouse từ các khu vực địa lý trên khắp thế giới. Từ đó, cần phải tiến hành kiểm tra đường đi an toàn (mọi thứ đã gửi đã được nhận chưa) và cần phải tiến hành kiểm tra chất lượng dữ liệu. Cuối cùng, dữ liệu có thể được phân vùng và tải vào một cửa hàng khác sẽ hỗ trợ báo cáo đầu ngày và cuối ngày.


Ngoài ra, có thể sơ đồ này thể hiện một kịch bản IOT trong đó các trạm thời tiết đang gửi nhiệt độ và các dữ liệu liên quan đến thời tiết khác. Bất kể kịch bản nào, điều cần thiết là cách lưu trữ dữ liệu một cách an toàn ở định dạng ban đầu, sau đó chuyển đổi và xử lý bất kỳ dữ liệu nào cần được lưu trữ theo cách có cấu trúc hơn - tất cả trong một giải pháp tập trung. Đây là lời hứa của Data Lakehouse - giải pháp tốt nhất của Kho dữ liệu và Hồ dữ liệu được kết hợp thành một giải pháp tập trung.


Hãy biến giải pháp giả định được mô tả ở trên thành hiện thực. Điều này được mô tả trong sơ đồ dưới đây.


Tạo một Lakehouse dữ liệu


Có hai thành phần logic trong Data Lakehouse của chúng tôi. Đầu tiên là việc triển khai Apache Iceberg cho dữ liệu có cấu trúc - tương đương với Kho dữ liệu. (Đây là những gì tôi đã xây dựng trong bài viết trước - vì vậy tôi sẽ không đi sâu vào chi tiết ở đây.) Thành phần logic thứ hai là MinIO cho dữ liệu phi cấu trúc - phía Data Lake của Data Lakehouse của chúng tôi. Tất cả dữ liệu vào Lakehouse đều được gửi tới phiên bản logic này của MinIO. Trên thực tế, hai phiên bản logic của MinIO được hiển thị ở trên có thể là cùng một phiên bản của MinIO trong trung tâm dữ liệu của bạn. Nếu cụm bạn đang chạy MinIO có thể xử lý việc nhập tất cả dữ liệu đến và các yêu cầu xử lý của Apache Iceberg thì việc triển khai như vậy sẽ tiết kiệm tiền. Trên thực tế, đây là những gì tôi sẽ làm trong bài viết này. Tôi sẽ sử dụng một nhóm trong phiên bản MinIO của Apache Iceberg để chứa tất cả dữ liệu thô và phi cấu trúc.


Hãy bắt đầu thử nghiệm với dữ liệu bằng cách giới thiệu tập dữ liệu mà tôi sẽ sử dụng cho bài tập này và nhập nó vào MinIO.

Tóm tắt toàn cầu về bộ dữ liệu ngày

Tập dữ liệu mà chúng tôi sẽ thử nghiệm trong bài đăng này là tập dữ liệu công khai được gọi là Tóm tắt bề mặt toàn cầu trong ngày (GSOD), được quản lý bởi Cơ quan Khí quyển và Đại dương Quốc gia (NOAA). NOAA hiện duy trì dữ liệu từ hơn 9000 trạm trên khắp thế giới và bộ dữ liệu GSOD chứa thông tin tóm tắt mỗi ngày từ các trạm này. Bạn có thể tải dữ liệu tại đây . Có một tệp gzip mỗi năm. Nó bắt đầu vào năm 1929 và kết thúc vào năm 2022 (tại thời điểm viết bài này). Để xây dựng Data Lakehouse của chúng tôi, tôi đã tải xuống tệp hàng năm và đặt nó vào phiên bản MinIO đang được sử dụng cho Data Lakehouse của chúng tôi. Tôi đặt tất cả các tệp vào một nhóm có tên `lake.` Hai nhóm trong phiên bản MinIO của chúng tôi được hiển thị bên dưới. Nhóm `kho` được tạo khi chúng tôi cài đặt Apache Iceberg.


Bộ dữ liệu ngày



Tôi đã sử dụng bảng điều khiển MinIO để nhập dữ liệu thô theo cách thủ công. Trong một quy trình chuyên nghiệp, bạn sẽ muốn thực hiện việc này một cách tự động. Hãy xem Cách thiết lập Kafka và truyền dữ liệu tới MinIO trong Kubernetes để biết cách sử dụng Kafka và Kubernetes để lấy dữ liệu vào MinIO.


Các tệp này được đóng gói để thuận tiện cho việc tải xuống – nếu bạn cố gắng sử dụng chúng trực tiếp để tạo báo cáo hoặc biểu đồ thì đó sẽ là một hoạt động tiêu tốn rất nhiều IO (và có thể tiêu tốn nhiều CPU). Hãy tưởng tượng rằng bạn muốn lập biểu đồ nhiệt độ trung bình mỗi năm từ một trạm được chỉ định. Để thực hiện việc này, bạn phải mở mọi tệp và tìm kiếm qua từng hàng, tìm các mục nhập khớp với trạm của bạn vào ngày quan tâm. Tùy chọn tốt hơn là sử dụng khả năng Data Lakehouses của chúng tôi để quản lý dữ liệu và báo cáo về dữ liệu được quản lý. Bước đầu tiên là thiết lập sổ ghi chép Jupyter mới.

Thiết lập sổ ghi chép Jupyter

Đầu tiên, điều hướng đến máy chủ Jupyter Notebook được cài đặt trong công cụ xử lý Apache Spark. Nó có thể được tìm thấy tại http://localhost:8888 . Tạo một sổ ghi chép mới và trong ô đầu tiên, thêm nội dung nhập được hiển thị bên dưới. (Tất cả các sổ ghi chép hoàn chỉnh được tạo trong bài đăng này có thể được tìm thấy ở đây .)


 from collections import namedtuple import csv import json import logging import tarfile from time import time from typing import List from minio import Minio from minio.error import S3Error import pandas as pd import pyarrow as pa import pyarrow.parquet as pq pd.options.mode.chained_assignment = None bucket_name = 'lake'


Lưu ý rằng chúng tôi đang nhập thư viện MinIO. Sổ ghi chép mà chúng tôi đang xây dựng là một đường dẫn ETL từ bộ lưu trữ phi cấu trúc (Hồ dữ liệu MinIO) đến bộ lưu trữ có cấu trúc (Apache Iceberg, sử dụng MinIO cơ bản.) Phần đầu của sổ ghi chép của bạn sẽ trông như thế này.


Nhập thư viện MinIO


Bây giờ, chúng ta có thể tạo cơ sở dữ liệu và bảng Iceberg cho dữ liệu của mình.

Tạo bảng và cơ sở dữ liệu Iceberg

Việc tạo cơ sở dữ liệu và bảng cho tập dữ liệu GSOD rất đơn giản. Đoạn script bên dưới sẽ tạo cơ sở dữ liệu mà chúng ta sẽ đặt tên là `noaa`. Thêm phần này vào một ô sau khi nhập.


 %%sql CREATE DATABASE IF NOT EXISTS noaa;


Đoạn script dưới đây sẽ tạo bảng gsod .

 %%sql CREATE TABLE IF NOT EXISTS noaa.gsod ( station string, date timestamp, latitude double, longitude double, name string, temp double ) USING iceberg PARTITIONED BY (station)


Khi chơi với Apache Iceberg, bạn thường muốn bỏ một bàn xuống để có thể bắt đầu lại thử nghiệm. Đoạn script bên dưới sẽ loại bỏ bảng gsod nếu bạn muốn thay đổi bất cứ điều gì về thiết lập của nó.


 %%sql DROP TABLE IF EXISTS noaa.gsod;

Nhập dữ liệu từ MinIO vào Iceberg

Giờ đây, chúng tôi đã có các tệp zip thô dựa trên năm trong Lakehouse, chúng tôi có thể trích xuất, chuyển đổi và tải chúng vào Data Lakehouse của mình. Trước tiên hãy giới thiệu một số chức năng trợ giúp. Hàm bên dưới sẽ trả về danh sách các đối tượng MinIO trong một nhóm được chỉ định khớp với tiền tố.


 def get_object_list(bucket_name: str, prefix: str) -> List[str]: ''' Gets a list of objects from a bucket. ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_list = [] objects = client.list_objects(bucket_name, prefix=prefix, recursive=True) for obj in objects: object_list.append(obj.object_name) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return object_list


Lưu ý rằng trong đoạn mã trên, cần có tệp thông tin xác thực MinIO. Điều này có thể được lấy từ bảng điều khiển MinIO. Nếu bạn không biết cách lấy thông tin xác thực MinIO thì có một phần trong bài đăng này hướng dẫn cách tạo và tải chúng xuống.


Tiếp theo, chúng ta cần một hàm để lấy một đối tượng từ MinIO. Vì các đối tượng là tệp tar nên chúng tôi cũng cần chức năng này để trích xuất dữ liệu ra khỏi kho lưu trữ tar và chuyển đổi nó thành Pandas DataFrame. Điều này được thực hiện bằng cách sử dụng chức năng dưới đây.


 def tar_to_df(bucket_name: str, object_name: str) -> pd.DataFrame: ''' This function will take a tarfile reference in MinIO and do the following: - unzip the tarfile - turn the data into a single DataFrame object ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Temp file to use for processing the tar files. temp_file_name = 'temp.tar.gz' # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_info = client.fget_object(bucket_name, object_name, temp_file_name) Row = namedtuple('Row', ('station', 'date', 'latitude', 'longitude', 'elevation', 'name', 'temp', 'temp_attributes', 'dewp', 'dewp_attributes', 'slp', 'SLP_attributes', 'stp', 'stp_attributes', 'visib', 'visib_attributes', 'wdsp', 'wdsp_attributes', 'mxspd', 'gust', 'max', 'max_attributes', 'min', 'min_attributes', 'prcp', 'prcp_attributes', 'sndp', 'frshtt')) # Columns of interest and their data types. dtypes={ 'station': 'string', 'date': 'datetime64[ns]', 'latitude': 'float64', 'longitude': 'float64', 'name': 'string', 'temp': 'float64' } tar = tarfile.open(temp_file_name, 'r:gz') all_rows = [] for member in tar.getmembers(): member_handle = tar.extractfile(member) byte_data = member_handle.read() decoded_string = byte_data.decode() lines = decoded_string.splitlines() reader = csv.reader(lines, delimiter=',') # Get all the rows in the member. Skip the header. _ = next(reader) file_rows = [Row(*l) for l in reader] all_rows += file_rows df = pd.DataFrame.from_records(all_rows, columns=Row._fields) df = df[list(dtypes.keys())] for c in df.columns: if dtypes[c] == 'float64': df[c] = pd.to_numeric(df[c], errors='coerce') df = df.astype(dtype=dtypes) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return df


Cả hai chức năng này đều là những tiện ích chung có thể được sử dụng lại bất kể bạn đang làm gì với MinIO. Hãy cân nhắc đưa chúng vào bộ sưu tập đoạn mã cá nhân của bạn hoặc Github Gist của tổ chức bạn.


Bây giờ, chúng tôi đã sẵn sàng gửi dữ liệu đến kho của Lakehouse. Điều này có thể được thực hiện bằng mã bên dưới, mã này bắt đầu phiên Spark, lặp qua tất cả các tệp tar GSOD, trích xuất, chuyển đổi và gửi nó đến bảng Iceberg của chúng tôi.


 from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Jupyter').getOrCreate() objects = get_object_list(bucket_name, 'noaa/gsod') for obj in reversed(objects): print(obj) df = tar_to_df(bucket_name, obj) table = pa.Table.from_pandas(df) pq.write_table(table, 'temp.parquet') df = spark.read.parquet('temp.parquet') df.write.mode('append').saveAsTable('noaa.gsod')


Mã trong phần này được tải dữ liệu theo cách thủ công từ nhóm MinIO. Trong môi trường sản xuất, bạn sẽ muốn triển khai mã này trong một dịch vụ và sử dụng Sự kiện nhóm MinIO để nhập tự động.

Truy vấn Iceberg Data Lakehouse bằng PyIceberg

Hãy bắt đầu một cuốn sổ mới để báo cáo. Ô bên dưới nhập các tiện ích mà chúng ta sẽ cần. Cụ thể, chúng tôi sẽ sử dụng PyIceberg để truy xuất dữ liệu, Pandas để sắp xếp dữ liệu và Seaborn để trực quan hóa dữ liệu.


 from pyiceberg.catalog import load_catalog from pyiceberg.expressions import GreaterThanOrEqual, EqualTo import pandas as pd import seaborn as sns pd.options.mode.chained_assignment = None catalog = load_catalog('default')


Những gì chúng tôi muốn làm là tính nhiệt độ trung bình mỗi năm cho một trạm thời tiết nhất định. Điều này cho chúng ta một con số mỗi năm và tính đến tất cả các mùa trong năm. Bước đầu tiên là truy vấn Iceberg về tất cả dữ liệu của một trạm nhất định. Điều này được thực hiện dưới đây bằng cách sử dụng PyIceberg.


 tbl = catalog.load_table('noaa.gsod') sc = tbl.scan(row_filter="station == '72502014734'") df = sc.to_arrow().to_pandas() df.head(10)


Id trạm được sử dụng trong mã ở trên là dành cho trạm đặt tại Sân bay Quốc tế Newark Liberty, NJ, Hoa Kỳ. Nó đã hoạt động từ năm 1973 (gần 50 năm dữ liệu). Khi mã chạy, bạn sẽ nhận được kết quả đầu ra bên dưới. (Tôi đang sử dụng hàm DataFrame head() để lấy mẫu.)


đầu ra


Tiếp theo, chúng ta cần nhóm theo năm và tính giá trị trung bình. Sử dụng Pandas, đây là một vài dòng mã. Không cần vòng lặp.


 df['year'] = df['date'].dt.year df = df[['year','temp']] grouped_by_year = df.groupby('year') average_by_year = grouped_by_year.mean() average_by_year


Khi ô này chạy, bạn sẽ thấy một giá trị duy nhất cho mỗi năm. Một vài năm hàng đầu được hiển thị dưới đây.


Phân nhóm theo năm


Cuối cùng, chúng ta có thể hình dung mức trung bình hàng năm của mình. Chúng tôi sẽ sử dụng Seaborn để tạo một biểu đồ đường. Điều này chỉ mất một dòng mã.


 sns.lineplot(data=df, x="year", y="temp", errorbar=None)


Biểu đồ dòng được hiển thị dưới đây.


Sơ đồ đường


Dưới đây là một lệnh khác mà bạn phải luôn chạy sau khi chạy báo cáo lần đầu tiên.


 [task.file.file_path for task in sc.plan_files()]


Đây là cách hiểu danh sách sẽ cung cấp cho bạn danh sách tất cả các tệp dữ liệu trong Apache Iceberg có dữ liệu khớp với truy vấn của bạn. Sẽ có rất nhiều, mặc dù siêu dữ liệu của Iceberg có thể lọc ra rất nhiều. Xem tất cả các tệp liên quan sẽ giúp bạn nhận ra rằng việc lưu trữ đối tượng tốc độ cao là một phần quan trọng của Lakehouse.

Bản tóm tắt

Trong bài đăng này, chúng tôi đã xây dựng Data Lakehouse bằng cách sử dụng MinIO và Apache Iceberg. Chúng tôi đã thực hiện việc này bằng cách sử dụng bộ dữ liệu GSOD. Đầu tiên, dữ liệu thô được tải lên Lake side của Data Lakehouse (MinIO) của chúng tôi. Từ đó, chúng tôi đã tạo cơ sở dữ liệu và bảng trong Apache Iceberg (phía Kho dữ liệu của Data Lakehouse của chúng tôi). Sau đó, chúng tôi đã xây dựng một đường dẫn ETL đơn giản để di chuyển dữ liệu từ Hồ đến Kho trong Data Lakehouse.


Sau khi có đầy đủ dữ liệu về Apache Iceberg, chúng tôi có thể tạo báo cáo nhiệt độ trung bình hàng năm và trực quan hóa nó.


Hãy nhớ rằng nếu bạn muốn xây dựng Data Lakehouse trong sản xuất thì bạn sẽ cần các tính năng dành cho doanh nghiệp của MinIO. Hãy xem xét việc xem xét Quản lý vòng đời đối tượng , Thực tiễn bảo mật tốt nhất , Phát trực tuyến KafkaSự kiện nhóm .


Cũng được xuất bản ở đây .