Chuyển tới nội dung chính

Tích hợp dữ liệu BigQuery - Data Integration

1. Tổng quan

1.1. Mục đích

Module tích hợp dữ liệu thực thi từ các ads platform (Meta, TikTok, Google) vào hệ thống để:

  • Tracking spend và KPI real-time
  • Tính toán profit/loss
  • Trigger auto-budget guard
  • Generate reports

1.2. Data Flow

Ads Platform (Meta/TikTok/Google)
↓ (API/Webhook)
Raw Data Staging
↓ (ETL)
BigQuery Normalized Tables
↓ (Parsing & Mapping)
System Database (Campaign Execution)
↓ (Aggregation)
Dashboards & Reports

2. Data Sources

2.1. Meta Ads (Facebook & Instagram)

API Endpoints

GET /v18.0/{ad_account_id}/insights

Fields

{
"date_start": "2026-01-15",
"date_stop": "2026-01-15",
"campaign_name": "Kewpie-KWP2026-FB01-FB-Lead-Office-P1",
"impressions": 150000,
"clicks": 4500,
"spend": 2100000,
"actions": [
{"action_type": "lead", "value": 135}
],
"ctr": 3.0,
"cpc": 466.67,
"cpa": 15555.56
}

Data Normalization

normalized = {
"date": row["date_start"],
"platform": "FB",
"campaign_name": row["campaign_name"],
"impressions": int(row["impressions"]),
"clicks": int(row["clicks"]),
"spend": float(row["spend"]),
"conversions": get_action_value(row["actions"], "lead"),
"ctr": float(row.get("ctr", 0)),
"cpc": float(row.get("cpc", 0)),
"data_source": "Meta"
}

2.2. TikTok Ads

API Endpoints

GET /open_api/v1.3/report/integrated/get/

Fields Mapping

TikTok FieldSystem Field
stat_time_daydate
campaign_namecampaign_name
spendspend
impressionsimpressions
clicksclicks
conversionconversions

2.3. Google Ads

API

GoogleAdsService.search()

Query

SELECT
segments.date,
campaign.name,
metrics.impressions,
metrics.clicks,
metrics.cost_micros,
metrics.conversions
FROM campaign
WHERE segments.date \>= '2026-01-01'

Normalization

normalized = {
"date": row.segments.date,
"platform": "Google",
"campaign_name": row.campaign.name,
"spend": row.metrics.cost_micros / 1000000,
"conversions": row.metrics.conversions,
...
}

3. BigQuery Architecture

3.1. Table Structure

Raw Data Tables (Staging)

-- meta_ads_raw
CREATE TABLE ads_data.meta_ads_raw (
id STRING,
date DATE,
ad_account_id STRING,
campaign_id STRING,
campaign_name STRING,
impressions INT64,
clicks INT64,
spend FLOAT64,
actions STRING, -- JSON
raw_json STRING,
ingested_at TIMESTAMP
);

-- tiktok_ads_raw
CREATE TABLE ads_data.tiktok_ads_raw (
... similar structure ...
);

-- google_ads_raw
CREATE TABLE ads_data.google_ads_raw (
... similar structure ...
);

Normalized Table

CREATE TABLE ads_data.ads_daily_normalized (
id STRING NOT NULL,
date DATE NOT NULL,
platform STRING NOT NULL, -- FB, TikTok, Google
campaign_name STRING NOT NULL,

-- Metrics
impressions INT64,
clicks INT64,
spend FLOAT64,
conversions FLOAT64,

-- Calculated
ctr FLOAT64,
cpc FLOAT64,
cpa FLOAT64,

-- Parsing (from campaign name)
client STRING,
contract_code STRING,
scope_code STRING,
channel STRING,
objective STRING,
segment STRING,
phase STRING,

-- Metadata
data_source STRING,
synced_at TIMESTAMP,

PRIMARY KEY (id)
)
PARTITION BY date
CLUSTER BY contract_code, scope_code, platform;

3.2. ETL Process

Step 1: Extract (API calls)

def extract_meta_ads(ad_account_id, date_range):
"""Pull data from Meta Ads API"""
fields = [
'campaign_name',
'impressions',
'clicks',
'spend',
'actions'
]

params = {
'time_range': {
'since': date_range['start'],
'until': date_range['end']
},
'level': 'campaign',
'fields': ','.join(fields),
'time_increment': 1 # daily
}

insights = AdAccount(ad_account_id).get_insights(params=params)

return insights

Step 2: Transform

def transform_to_normalized(raw_data, platform):
"""Transform raw data to normalized schema"""
normalized = []

for row in raw_data:
# Parse campaign name
tokens = parse_campaign_name(row['campaign_name'])

# Extract conversions based on platform
conversions = extract_conversions(row, platform)

# Calculate metrics
ctr = (row['clicks'] / row['impressions'] * 100) if row['impressions'] \> 0 else 0
cpc = (row['spend'] / row['clicks']) if row['clicks'] \> 0 else 0
cpa = (row['spend'] / conversions) if conversions \> 0 else 0

normalized.append({
'id': generate_id(platform, row['campaign_name'], row['date']),
'date': row['date'],
'platform': platform,
'campaign_name': row['campaign_name'],
'impressions': row['impressions'],
'clicks': row['clicks'],
'spend': row['spend'],
'conversions': conversions,
'ctr': ctr,
'cpc': cpc,
'cpa': cpa,
'client': tokens.get('client'),
'contract_code': tokens.get('contract_code'),
'scope_code': tokens.get('scope_code'),
'channel': tokens.get('channel'),
'objective': tokens.get('objective'),
'segment': tokens.get('segment'),
'phase': tokens.get('phase'),
'data_source': platform,
'synced_at': datetime.now()
})

return normalized

Step 3: Load

def load_to_bigquery(normalized_data, table_id):
"""Load normalized data to BigQuery"""
client = bigquery.Client()

errors = client.insert_rows_json(table_id, normalized_data)

if errors:
raise Exception(f"Failed to load data: {errors}")

return len(normalized_data)

4. Campaign Name Parsing

4.1. Parsing Function (BigQuery SQL)

CREATE OR REPLACE FUNCTION ads_data.parse_campaign_name(campaign_name STRING)
RETURNS STRUCT<
client STRING,
contract_code STRING,
scope_code STRING,
channel STRING,
objective STRING,
segment STRING,
phase STRING
\>
AS (
STRUCT(
SPLIT(campaign_name, '-')[SAFE_OFFSET(0)] AS client,
SPLIT(campaign_name, '-')[SAFE_OFFSET(1)] AS contract_code,
SPLIT(campaign_name, '-')[SAFE_OFFSET(2)] AS scope_code,
SPLIT(campaign_name, '-')[SAFE_OFFSET(3)] AS channel,
SPLIT(campaign_name, '-')[SAFE_OFFSET(4)] AS objective,
SPLIT(campaign_name, '-')[SAFE_OFFSET(5)] AS segment,
SPLIT(campaign_name, '-')[SAFE_OFFSET(6)] AS phase
)
);

4.2. Usage

SELECT
campaign_name,
ads_data.parse_campaign_name(campaign_name).*
FROM ads_data.ads_daily_normalized
WHERE date = '2026-01-15';

5. Mapping Engine

5.1. Join với Campaign Plan

CREATE OR REPLACE VIEW ads_data.campaign_execution_view AS
SELECT
-- From BigQuery
n.date,
n.campaign_name,
n.platform,
n.spend,
n.conversions,
n.impressions,
n.clicks,
n.cpa,

-- From Campaign Plan (System DB)
cp.campaign_plan_id,
cp.scope_id,
cp.budget_allocated,
cp.kpi_target,
cp.break_even_cost,

-- Calculated
SUM(n.spend) OVER (
PARTITION BY n.campaign_name
ORDER BY n.date
) AS spend_to_date,

SUM(n.conversions) OVER (
PARTITION BY n.campaign_name
ORDER BY n.date
) AS kpi_to_date,

cp.budget_allocated - SUM(n.spend) OVER (
PARTITION BY n.campaign_name
ORDER BY n.date
) AS budget_remaining,

SAFE_DIVIDE(
SUM(n.spend) OVER (PARTITION BY n.campaign_name ORDER BY n.date),
cp.budget_allocated
) * 100 AS budget_usage_pct

FROM ads_data.ads_daily_normalized n
JOIN `system_db.campaign_plan` cp
ON n.campaign_name = cp.campaign_code

5.2. Aggregate to Scope Level

CREATE OR REPLACE VIEW ads_data.scope_performance_view AS
SELECT
s.scope_id,
s.contract_code,
s.scope_code,
s.service_type,

-- Budget
s.budget AS budget_allocated,
SUM(e.spend) AS spend_to_date,
s.budget - SUM(e.spend) AS budget_remaining,
SAFE_DIVIDE(SUM(e.spend), s.budget) * 100 AS budget_usage_pct,

-- KPI
s.kpi_target,
SUM(e.conversions) AS kpi_achieved,
s.kpi_target - SUM(e.conversions) AS kpi_remaining,
SAFE_DIVIDE(SUM(e.conversions), s.kpi_target) * 100 AS kpi_completion_pct,

-- Cost
SAFE_DIVIDE(SUM(e.spend), SUM(e.conversions)) AS actual_cpa,

-- Profit
s.revenue AS expected_revenue,
SUM(e.conversions) * s.unit_price AS actual_revenue,
SUM(e.spend) AS actual_cost,
(SUM(e.conversions) * s.unit_price) - SUM(e.spend) AS actual_profit,
SAFE_DIVIDE(
(SUM(e.conversions) * s.unit_price) - SUM(e.spend),
SUM(e.conversions) * s.unit_price
) * 100 AS actual_margin

FROM `system_db.scope` s
LEFT JOIN ads_data.campaign_execution_view e
ON s.scope_id = e.scope_id
GROUP BY 1,2,3,4,5,9,13

6. Sync Schedule

6.1. Data Freshness Requirements

MetricFreshnessMethod
Spend4 hoursScheduled API pull
KPI4 hoursScheduled API pull
Budget alertsReal-timeEvent-driven (webhooks)
ReportsDailyBatch at 2 AM

6.2. Sync Jobs

Job 1: Meta Ads Sync (Every 4 hours)

def sync_meta_ads():
"""Sync Meta Ads data every 4 hours"""
# Get yesterday and today data
date_range = {
'start': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'),
'end': datetime.now().strftime('%Y-%m-%d')
}

# Extract
raw_data = extract_meta_ads(AD_ACCOUNT_ID, date_range)

# Transform
normalized = transform_to_normalized(raw_data, 'FB')

# Load to BigQuery
load_to_bigquery(normalized, 'ads_data.ads_daily_normalized')

# Sync to system DB
sync_to_system_db(normalized)

# Check alerts
check_budget_alerts()

# Schedule: 0 */4 * * * (every 4 hours)

Job 2: Daily Report (2 AM)

def generate_daily_reports():
"""Generate daily performance reports"""
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')

# Run BigQuery aggregation
query = f"""
SELECT * FROM ads_data.scope_performance_view
WHERE date = '{yesterday}'
"""
results = bq_client.query(query).result()

# Update system DB
for row in results:
update_scope_performance(row)

# Send email reports
send_daily_report_email(results)

# Schedule: 0 2 * * * (daily at 2 AM)

7. Real-time Event Processing

7.1. Meta Webhooks (Optional)

Setup

@app.route('/webhooks/meta', methods=['POST'])
def meta_webhook():
"""Handle Meta Ads webhooks for real-time updates"""
data = request.json

if data.get('entry'):
for entry in data['entry']:
for change in entry.get('changes', []):
if change['field'] == 'campaign_status':
handle_campaign_status_change(change['value'])
elif change['field'] == 'adset':
handle_adset_change(change['value'])

return jsonify({'status': 'ok'})

Event Types

  • Campaign status changed (paused, active)
  • Budget updated
  • Performance threshold reached

8. Data Quality & Validation

8.1. Validation Rules

Campaign Name Format

def validate_campaign_name(campaign_name):
"""Validate campaign name follows convention"""
pattern = r'^[A-Za-z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z]+-[A-Za-z]+-[A-Za-z0-9]+-[A-Z0-9]+$'

if not re.match(pattern, campaign_name):
return False, "Invalid format"

tokens = campaign_name.split('-')
if len(tokens) != 7:
return False, f"Expected 7 tokens, got {len(tokens)}"

# Check if contract exists
contract_code = tokens[1]
if not contract_exists(contract_code):
return False, f"Contract {contract_code} not found"

# Check if scope exists
scope_code = tokens[2]
if not scope_exists(contract_code, scope_code):
return False, f"Scope {scope_code} not found in {contract_code}"

return True, "Valid"

Data Completeness

-- Check for missing data
SELECT
date,
COUNT(*) as campaigns,
SUM(CASE WHEN spend IS NULL THEN 1 ELSE 0 END) as missing_spend,
SUM(CASE WHEN conversions IS NULL THEN 1 ELSE 0 END) as missing_conversions
FROM ads_data.ads_daily_normalized
WHERE date \>= CURRENT_DATE() - 7
GROUP BY date
ORDER BY date DESC;

8.2. Data Anomaly Detection

def detect_anomalies(campaign_data):
"""Detect unusual patterns in campaign data"""
anomalies = []

# Check 1: Spend spike
avg_spend = campaign_data['spend'].rolling(7).mean()
if campaign_data['spend'].iloc[-1] \> avg_spend.iloc[-1] * 2:
anomalies.append({
'type': 'spend_spike',
'message': f"Spend is 2x higher than 7-day average"
})

# Check 2: CPA spike
avg_cpa = campaign_data['cpa'].rolling(7).mean()
if campaign_data['cpa'].iloc[-1] \> avg_cpa.iloc[-1] * 1.5:
anomalies.append({
'type': 'cpa_spike',
'message': f"CPA is 50% higher than average"
})

# Check 3: Zero conversions
if campaign_data['conversions'].iloc[-1] == 0 and campaign_data['spend'].iloc[-1] \> 0:
anomalies.append({
'type': 'zero_conversions',
'message': f"Campaign spent {campaign_data['spend'].iloc[-1]} but got 0 conversions"
})

return anomalies

9. Performance Optimization

9.1. BigQuery Optimization

Partitioning

-- Partition by date for faster date range queries
CREATE TABLE ads_data.ads_daily_normalized (...)
PARTITION BY date;

Clustering

-- Cluster by frequently filtered fields
CREATE TABLE ads_data.ads_daily_normalized (...)
PARTITION BY date
CLUSTER BY contract_code, scope_code, platform;

Materialized Views

-- Pre-aggregate commonly queried data
CREATE MATERIALIZED VIEW ads_data.campaign_daily_summary
AS
SELECT
date,
campaign_name,
contract_code,
scope_code,
SUM(spend) as total_spend,
SUM(conversions) as total_conversions,
AVG(cpa) as avg_cpa
FROM ads_data.ads_daily_normalized
GROUP BY 1,2,3,4;

9.2. Caching Strategy

from functools import lru_cache
from datetime import datetime, timedelta

@lru_cache(maxsize=100)
def get_campaign_performance(campaign_code, date):
"""Cache campaign performance for frequently accessed data"""
# Only cache data older than today
if date \>= datetime.now().date():
return fetch_from_db(campaign_code, date)

# Cache hit for historical data
return fetch_from_db(campaign_code, date)

# Cache expires after 1 hour for recent data

10. Error Handling & Retry Logic

10.1. API Call Retry

import backoff

@backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, RateLimitError),
max_tries=5
)
def call_meta_api_with_retry(endpoint, params):
"""Call Meta API with exponential backoff retry"""
response = requests.get(endpoint, params=params)

if response.status_code == 429:
# Rate limited
raise RateLimitError("Rate limit exceeded")

response.raise_for_status()
return response.json()

10.2. Data Sync Failure Handling

def sync_with_error_handling():
"""Sync data with comprehensive error handling"""
failed_syncs = []

try:
# Try to sync
data = extract_meta_ads(AD_ACCOUNT_ID, date_range)
load_to_bigquery(data, TABLE_ID)

except Exception as e:
# Log error
logger.error(f"Sync failed: {str(e)}")

# Save failed sync info
failed_syncs.append({
'timestamp': datetime.now(),
'error': str(e),
'date_range': date_range
})

# Send alert
send_alert(f"Data sync failed: {str(e)}")

# Retry later
schedule_retry(date_range, delay=3600) # 1 hour

return failed_syncs

11. Monitoring & Alerting

11.1. Data Pipeline Health Check

def check_pipeline_health():
"""Check if data pipeline is healthy"""
checks = []

# Check 1: Data freshness
latest_data = get_latest_sync_timestamp()
if datetime.now() - latest_data \> timedelta(hours=6):
checks.append({
'status': 'warning',
'message': 'Data is more than 6 hours old'
})

# Check 2: Data completeness
yesterday_rows = count_rows_for_date(datetime.now() - timedelta(days=1))
expected_rows = get_active_campaign_count()
if yesterday_rows \< expected_rows * 0.9:
checks.append({
'status': 'error',
'message': f'Only {yesterday_rows}/{expected_rows} campaigns synced'
})

# Check 3: Error rate
error_rate = get_sync_error_rate_last_24h()
if error_rate \> 0.1:
checks.append({
'status': 'warning',
'message': f'Sync error rate: {error_rate*100}%'
})

return checks

11.2. Alerting Rules

  • Data freshness > 6 hours → Warning email
  • Data completeness < 90% → Error email + Slack
  • Sync error rate > 10% → Warning
  • API rate limit hit → Info notification

12. Security & Access Control

12.1. BigQuery IAM

roles:
- role: roles/bigquery.dataViewer
members:
- serviceAccount:[email protected]
- group:[email protected]

- role: roles/bigquery.dataEditor
members:
- serviceAccount:[email protected]

- role: roles/bigquery.admin
members:
- user:[email protected]

12.2. API Credentials Management

from google.cloud import secretmanager

def get_meta_access_token():
"""Get Meta access token from Secret Manager"""
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{PROJECT_ID}/secrets/meta-access-token/versions/latest"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode('UTF-8')

13. Documentation

13.1. Data Dictionary

Maintain comprehensive documentation for all BigQuery tables:

  • Table purpose
  • Column definitions
  • Data types
  • Sample queries
  • Update frequency

13.2. Runbook

Document common issues and solutions:

  • Data sync failures → Check API credentials, rate limits
  • Missing data → Check campaign naming, verify campaigns active
  • Incorrect mapping → Validate campaign name format