[TOOLS] 5 分鐘閱讀OraCore 編輯部

BigQuery Arrow 向量化 Python UDF 實作

這篇教你在 BigQuery 啟用 Arrow RecordBatch 向量化 Python UDF,完成連線、建立函式、SQL 呼叫與批次驗證。

分享 LinkedIn
BigQuery Arrow 向量化 Python UDF 實作

這篇教你在 BigQuery 啟用 Arrow RecordBatch 向量化 Python UDF,完成連線、建立函式、SQL 呼叫與批次驗證。

這篇給 BigQuery 開發者看,特別是想把 Python UDF 從逐列處理改成批次處理的人。照著做完,你會拿到一個可執行的 Arrow RecordBatch UDF、可直接呼叫的 SQL 查詢,以及一套確認它真的在批次模式運作的方法。

內容會以 BigQuery release notes 與 python-bigquery GitHub repo 的新 UDF 路徑為主,目標是讓你把資料處理成本降到更適合批次工作的方式。

開始之前

訂閱 AI 趨勢週報

每週精選模型發布、工具應用與深度分析,直送信箱。不定期,不騷擾。

不會寄垃圾信,隨時可取消。

  • Google Cloud 專案,且已啟用 BigQuery
  • 專案已開啟 Billing
  • 可使用 BigQuery Studio,或具備執行 SQL jobs 的權限
  • BigQuery Python UDF 的 Cloud resource connection
  • Python 3.10+
  • Apache Arrow 14+
  • Google Cloud CLI 470+

Step 1: 確認向量化 UDF 功能

目的:先確認你對應的是 BigQuery 已公開的 Python UDF 路徑,避免在舊版行為上白做工。

BigQuery Arrow 向量化 Python UDF 實作

打開 BigQuery release notes,找到 Python UDF 一般可用的公告,並確認新路徑是透過 Apache Arrow RecordBatch 來接收批次資料。

驗收:你應該看到 Python UDF GA 的更新說明,並且能打開 BigQuery 查詢編輯器,代表專案與權限都可用。

Step 2: 建立 Cloud resource connection

目的:替 BigQuery Python UDF 準備安全的執行連線,讓函式能在正確區域與權限下運作。

BigQuery Arrow 向量化 Python UDF 實作

在 BigQuery 建立或重用一個 Cloud resource connection,位置要和 dataset 相同。接著把這個 connection 對應的 service account 授予 UDF 執行所需的 IAM 權限。

-- 範例:請用 BigQuery Console 或 gcloud 依你的區域建立 connection

驗收:你應該看到 connection 已出現在 BigQuery 中,且其 service account 已具備預期角色。

Step 3: 撰寫 Arrow RecordBatch UDF

目的:建立真正吃批次輸入的 Python UDF,這是向量化效能的核心。

先寫一個最小版本,讓 Python 函式接收 Apache Arrow RecordBatch,做簡單轉換,再回傳 BigQuery 可接受的結果格式。先不要加第三方套件,先把資料流驗證通。

CREATE OR REPLACE FUNCTION `my_dataset.normalize_text_batch`(input STRING)
RETURNS STRING
LANGUAGE PYTHON
OPTIONS (
  runtime_version = 'python-3.11',
  entry_point = 'normalize_text_batch',
  packages = ['pyarrow']
)
AS r'''
import pyarrow as pa

def normalize_text_batch(batch):
    # Batch-oriented logic goes here
    return batch
''';

驗收:你應該看到函式成功儲存,且 dataset 裡已出現這個 UDF,編輯器也接受 runtime 與 package 設定。

Step 4: 用 SQL 呼叫 UDF

目的:把 UDF 放進真實查詢中,確認 BigQuery 會在查詢執行時呼叫批次 Python 程式

先對小型 sample table 做 SELECT,再擴大到正式資料集。這樣可以先驗證輸出,再觀察批次路徑是否正常。

SELECT
  my_dataset.normalize_text_batch(col) AS normalized_value
FROM my_dataset.sample_table
LIMIT 100;

驗收:你應該看到查詢完成,結果格內出現轉換後的值,且 job details 裡沒有 Python runtime error。

Step 5: 驗證批次行為與效能

目的:確認這個 UDF 不只是能跑,而是真的用批次方式降低每列開銷。

把這次查詢的 job details 跟逐列版 baseline 比較,觀察 Python 呼叫次數、每列開銷與總耗時。若你的邏輯偏 CPU 密集,請用更大的樣本,批次優勢會更明顯。

驗收:你應該看到相同的輸出結果,但在相同資料量下,批次版的執行特徵更好。

指標基準/優化前結果/優化後
Python 執行方式逐列 UDFArrow RecordBatch 向量化 UDF
資料傳輸單位單列批次
每列額外開銷較高較低
適合觀察的資料量小樣本不明顯較大樣本更容易看出差異

常見錯誤

  • connection、dataset、UDF 不在同一區域。修法:把三者放到同一 location,再重新部署。
  • Python runtime 與套件版本不相容。修法:先固定 runtime_version,再挑支援該版本的 pyarrow 與其他套件。
  • 拿很小的查詢測速度。修法:改用較大的表或重複多次的工作負載,才看得到批次優勢。

接下來可以看什麼

下一步可以把這個 UDF 擴充成可處理第三方套件的版本,加入錯誤處理,並和 SQL 原生轉換比較,找出每種工作負載最適合的做法。