Tích hợp dữ liệu BigQuery - Data Integration
1. Tổng quan
1.1. Mục đích
Module tích hợp dữ liệu thực thi từ các ads platform (Meta, TikTok, Google) vào hệ thống để:
- Tracking spend và KPI real-time
- Tính toán profit/loss
- Trigger auto-budget guard
- Generate reports
1.2. Data Flow
Ads Platform (Meta/TikTok/Google)
↓ (API/Webhook)
Raw Data Staging
↓ (ETL)
BigQuery Normalized Tables
↓ (Parsing & Mapping)
System Database (Campaign Execution)
↓ (Aggregation)
Dashboards & Reports
2. Data Sources
2.1. Meta Ads (Facebook & Instagram)
API Endpoints
GET /v18.0/{ad_account_id}/insights
Fields
{
"date_start": "2026-01-15",
"date_stop": "2026-01-15",
"campaign_name": "Kewpie-KWP2026-FB01-FB-Lead-Office-P1",
"impressions": 150000,
"clicks": 4500,
"spend": 2100000,
"actions": [
{"action_type": "lead", "value": 135}
],
"ctr": 3.0,
"cpc": 466.67,
"cpa": 15555.56
}
Data Normalization
normalized = {
"date": row["date_start"],
"platform": "FB",
"campaign_name": row["campaign_name"],
"impressions": int(row["impressions"]),
"clicks": int(row["clicks"]),
"spend": float(row["spend"]),
"conversions": get_action_value(row["actions"], "lead"),
"ctr": float(row.get("ctr", 0)),
"cpc": float(row.get("cpc", 0)),
"data_source": "Meta"
}
2.2. TikTok Ads
API Endpoints
GET /open_api/v1.3/report/integrated/get/
Fields Mapping
| TikTok Field | System Field |
|---|---|
| stat_time_day | date |
| campaign_name | campaign_name |
| spend | spend |
| impressions | impressions |
| clicks | clicks |
| conversion | conversions |
2.3. Google Ads
API
GoogleAdsService.search()
Query
SELECT
segments.date,
campaign.name,
metrics.impressions,
metrics.clicks,
metrics.cost_micros,
metrics.conversions
FROM campaign
WHERE segments.date \>= '2026-01-01'
Normalization
normalized = {
"date": row.segments.date,
"platform": "Google",
"campaign_name": row.campaign.name,
"spend": row.metrics.cost_micros / 1000000,
"conversions": row.metrics.conversions,
...
}
3. BigQuery Architecture
3.1. Table Structure
Raw Data Tables (Staging)
-- meta_ads_raw
CREATE TABLE ads_data.meta_ads_raw (
id STRING,
date DATE,
ad_account_id STRING,
campaign_id STRING,
campaign_name STRING,
impressions INT64,
clicks INT64,
spend FLOAT64,
actions STRING, -- JSON
raw_json STRING,
ingested_at TIMESTAMP
);
-- tiktok_ads_raw
CREATE TABLE ads_data.tiktok_ads_raw (
... similar structure ...
);
-- google_ads_raw
CREATE TABLE ads_data.google_ads_raw (
... similar structure ...
);
Normalized Table
CREATE TABLE ads_data.ads_daily_normalized (
id STRING NOT NULL,
date DATE NOT NULL,
platform STRING NOT NULL, -- FB, TikTok, Google
campaign_name STRING NOT NULL,
-- Metrics
impressions INT64,
clicks INT64,
spend FLOAT64,
conversions FLOAT64,
-- Calculated
ctr FLOAT64,
cpc FLOAT64,
cpa FLOAT64,
-- Parsing (from campaign name)
client STRING,
contract_code STRING,
scope_code STRING,
channel STRING,
objective STRING,
segment STRING,
phase STRING,
-- Metadata
data_source STRING,
synced_at TIMESTAMP,
PRIMARY KEY (id)
)
PARTITION BY date
CLUSTER BY contract_code, scope_code, platform;
3.2. ETL Process
Step 1: Extract (API calls)
def extract_meta_ads(ad_account_id, date_range):
"""Pull data from Meta Ads API"""
fields = [
'campaign_name',
'impressions',
'clicks',
'spend',
'actions'
]
params = {
'time_range': {
'since': date_range['start'],
'until': date_range['end']
},
'level': 'campaign',
'fields': ','.join(fields),
'time_increment': 1 # daily
}
insights = AdAccount(ad_account_id).get_insights(params=params)
return insights
Step 2: Transform
def transform_to_normalized(raw_data, platform):
"""Transform raw data to normalized schema"""
normalized = []
for row in raw_data:
# Parse campaign name
tokens = parse_campaign_name(row['campaign_name'])
# Extract conversions based on platform
conversions = extract_conversions(row, platform)
# Calculate metrics
ctr = (row['clicks'] / row['impressions'] * 100) if row['impressions'] \> 0 else 0
cpc = (row['spend'] / row['clicks']) if row['clicks'] \> 0 else 0
cpa = (row['spend'] / conversions) if conversions \> 0 else 0
normalized.append({
'id': generate_id(platform, row['campaign_name'], row['date']),
'date': row['date'],
'platform': platform,
'campaign_name': row['campaign_name'],
'impressions': row['impressions'],
'clicks': row['clicks'],
'spend': row['spend'],
'conversions': conversions,
'ctr': ctr,
'cpc': cpc,
'cpa': cpa,
'client': tokens.get('client'),
'contract_code': tokens.get('contract_code'),
'scope_code': tokens.get('scope_code'),
'channel': tokens.get('channel'),
'objective': tokens.get('objective'),
'segment': tokens.get('segment'),
'phase': tokens.get('phase'),
'data_source': platform,
'synced_at': datetime.now()
})
return normalized
Step 3: Load
def load_to_bigquery(normalized_data, table_id):
"""Load normalized data to BigQuery"""
client = bigquery.Client()
errors = client.insert_rows_json(table_id, normalized_data)
if errors:
raise Exception(f"Failed to load data: {errors}")
return len(normalized_data)
4. Campaign Name Parsing
4.1. Parsing Function (BigQuery SQL)
CREATE OR REPLACE FUNCTION ads_data.parse_campaign_name(campaign_name STRING)
RETURNS STRUCT<
client STRING,
contract_code STRING,
scope_code STRING,
channel STRING,
objective STRING,
segment STRING,
phase STRING
\>
AS (
STRUCT(
SPLIT(campaign_name, '-')[SAFE_OFFSET(0)] AS client,
SPLIT(campaign_name, '-')[SAFE_OFFSET(1)] AS contract_code,
SPLIT(campaign_name, '-')[SAFE_OFFSET(2)] AS scope_code,
SPLIT(campaign_name, '-')[SAFE_OFFSET(3)] AS channel,
SPLIT(campaign_name, '-')[SAFE_OFFSET(4)] AS objective,
SPLIT(campaign_name, '-')[SAFE_OFFSET(5)] AS segment,
SPLIT(campaign_name, '-')[SAFE_OFFSET(6)] AS phase
)
);
4.2. Usage
SELECT
campaign_name,
ads_data.parse_campaign_name(campaign_name).*
FROM ads_data.ads_daily_normalized
WHERE date = '2026-01-15';
5. Mapping Engine
5.1. Join với Campaign Plan
CREATE OR REPLACE VIEW ads_data.campaign_execution_view AS
SELECT
-- From BigQuery
n.date,
n.campaign_name,
n.platform,
n.spend,
n.conversions,
n.impressions,
n.clicks,
n.cpa,
-- From Campaign Plan (System DB)
cp.campaign_plan_id,
cp.scope_id,
cp.budget_allocated,
cp.kpi_target,
cp.break_even_cost,
-- Calculated
SUM(n.spend) OVER (
PARTITION BY n.campaign_name
ORDER BY n.date
) AS spend_to_date,
SUM(n.conversions) OVER (
PARTITION BY n.campaign_name
ORDER BY n.date
) AS kpi_to_date,
cp.budget_allocated - SUM(n.spend) OVER (
PARTITION BY n.campaign_name
ORDER BY n.date
) AS budget_remaining,
SAFE_DIVIDE(
SUM(n.spend) OVER (PARTITION BY n.campaign_name ORDER BY n.date),
cp.budget_allocated
) * 100 AS budget_usage_pct
FROM ads_data.ads_daily_normalized n
JOIN `system_db.campaign_plan` cp
ON n.campaign_name = cp.campaign_code
5.2. Aggregate to Scope Level
CREATE OR REPLACE VIEW ads_data.scope_performance_view AS
SELECT
s.scope_id,
s.contract_code,
s.scope_code,
s.service_type,
-- Budget
s.budget AS budget_allocated,
SUM(e.spend) AS spend_to_date,
s.budget - SUM(e.spend) AS budget_remaining,
SAFE_DIVIDE(SUM(e.spend), s.budget) * 100 AS budget_usage_pct,
-- KPI
s.kpi_target,
SUM(e.conversions) AS kpi_achieved,
s.kpi_target - SUM(e.conversions) AS kpi_remaining,
SAFE_DIVIDE(SUM(e.conversions), s.kpi_target) * 100 AS kpi_completion_pct,
-- Cost
SAFE_DIVIDE(SUM(e.spend), SUM(e.conversions)) AS actual_cpa,
-- Profit
s.revenue AS expected_revenue,
SUM(e.conversions) * s.unit_price AS actual_revenue,
SUM(e.spend) AS actual_cost,
(SUM(e.conversions) * s.unit_price) - SUM(e.spend) AS actual_profit,
SAFE_DIVIDE(
(SUM(e.conversions) * s.unit_price) - SUM(e.spend),
SUM(e.conversions) * s.unit_price
) * 100 AS actual_margin
FROM `system_db.scope` s
LEFT JOIN ads_data.campaign_execution_view e
ON s.scope_id = e.scope_id
GROUP BY 1,2,3,4,5,9,13
6. Sync Schedule
6.1. Data Freshness Requirements
| Metric | Freshness | Method |
|---|---|---|
| Spend | 4 hours | Scheduled API pull |
| KPI | 4 hours | Scheduled API pull |
| Budget alerts | Real-time | Event-driven (webhooks) |
| Reports | Daily | Batch at 2 AM |
6.2. Sync Jobs
Job 1: Meta Ads Sync (Every 4 hours)
def sync_meta_ads():
"""Sync Meta Ads data every 4 hours"""
# Get yesterday and today data
date_range = {
'start': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'),
'end': datetime.now().strftime('%Y-%m-%d')
}
# Extract
raw_data = extract_meta_ads(AD_ACCOUNT_ID, date_range)
# Transform
normalized = transform_to_normalized(raw_data, 'FB')
# Load to BigQuery
load_to_bigquery(normalized, 'ads_data.ads_daily_normalized')
# Sync to system DB
sync_to_system_db(normalized)
# Check alerts
check_budget_alerts()
# Schedule: 0 */4 * * * (every 4 hours)
Job 2: Daily Report (2 AM)
def generate_daily_reports():
"""Generate daily performance reports"""
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# Run BigQuery aggregation
query = f"""
SELECT * FROM ads_data.scope_performance_view
WHERE date = '{yesterday}'
"""
results = bq_client.query(query).result()
# Update system DB
for row in results:
update_scope_performance(row)
# Send email reports
send_daily_report_email(results)
# Schedule: 0 2 * * * (daily at 2 AM)
7. Real-time Event Processing
7.1. Meta Webhooks (Optional)
Setup
@app.route('/webhooks/meta', methods=['POST'])
def meta_webhook():
"""Handle Meta Ads webhooks for real-time updates"""
data = request.json
if data.get('entry'):
for entry in data['entry']:
for change in entry.get('changes', []):
if change['field'] == 'campaign_status':
handle_campaign_status_change(change['value'])
elif change['field'] == 'adset':
handle_adset_change(change['value'])
return jsonify({'status': 'ok'})
Event Types
- Campaign status changed (paused, active)
- Budget updated
- Performance threshold reached
8. Data Quality & Validation
8.1. Validation Rules
Campaign Name Format
def validate_campaign_name(campaign_name):
"""Validate campaign name follows convention"""
pattern = r'^[A-Za-z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z]+-[A-Za-z]+-[A-Za-z0-9]+-[A-Z0-9]+$'
if not re.match(pattern, campaign_name):
return False, "Invalid format"
tokens = campaign_name.split('-')
if len(tokens) != 7:
return False, f"Expected 7 tokens, got {len(tokens)}"
# Check if contract exists
contract_code = tokens[1]
if not contract_exists(contract_code):
return False, f"Contract {contract_code} not found"
# Check if scope exists
scope_code = tokens[2]
if not scope_exists(contract_code, scope_code):
return False, f"Scope {scope_code} not found in {contract_code}"
return True, "Valid"
Data Completeness
-- Check for missing data
SELECT
date,
COUNT(*) as campaigns,
SUM(CASE WHEN spend IS NULL THEN 1 ELSE 0 END) as missing_spend,
SUM(CASE WHEN conversions IS NULL THEN 1 ELSE 0 END) as missing_conversions
FROM ads_data.ads_daily_normalized
WHERE date \>= CURRENT_DATE() - 7
GROUP BY date
ORDER BY date DESC;
8.2. Data Anomaly Detection
def detect_anomalies(campaign_data):
"""Detect unusual patterns in campaign data"""
anomalies = []
# Check 1: Spend spike
avg_spend = campaign_data['spend'].rolling(7).mean()
if campaign_data['spend'].iloc[-1] \> avg_spend.iloc[-1] * 2:
anomalies.append({
'type': 'spend_spike',
'message': f"Spend is 2x higher than 7-day average"
})
# Check 2: CPA spike
avg_cpa = campaign_data['cpa'].rolling(7).mean()
if campaign_data['cpa'].iloc[-1] \> avg_cpa.iloc[-1] * 1.5:
anomalies.append({
'type': 'cpa_spike',
'message': f"CPA is 50% higher than average"
})
# Check 3: Zero conversions
if campaign_data['conversions'].iloc[-1] == 0 and campaign_data['spend'].iloc[-1] \> 0:
anomalies.append({
'type': 'zero_conversions',
'message': f"Campaign spent {campaign_data['spend'].iloc[-1]} but got 0 conversions"
})
return anomalies
9. Performance Optimization
9.1. BigQuery Optimization
Partitioning
-- Partition by date for faster date range queries
CREATE TABLE ads_data.ads_daily_normalized (...)
PARTITION BY date;
Clustering
-- Cluster by frequently filtered fields
CREATE TABLE ads_data.ads_daily_normalized (...)
PARTITION BY date
CLUSTER BY contract_code, scope_code, platform;
Materialized Views
-- Pre-aggregate commonly queried data
CREATE MATERIALIZED VIEW ads_data.campaign_daily_summary
AS
SELECT
date,
campaign_name,
contract_code,
scope_code,
SUM(spend) as total_spend,
SUM(conversions) as total_conversions,
AVG(cpa) as avg_cpa
FROM ads_data.ads_daily_normalized
GROUP BY 1,2,3,4;
9.2. Caching Strategy
from functools import lru_cache
from datetime import datetime, timedelta
@lru_cache(maxsize=100)
def get_campaign_performance(campaign_code, date):
"""Cache campaign performance for frequently accessed data"""
# Only cache data older than today
if date \>= datetime.now().date():
return fetch_from_db(campaign_code, date)
# Cache hit for historical data
return fetch_from_db(campaign_code, date)
# Cache expires after 1 hour for recent data
10. Error Handling & Retry Logic
10.1. API Call Retry
import backoff
@backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, RateLimitError),
max_tries=5
)
def call_meta_api_with_retry(endpoint, params):
"""Call Meta API with exponential backoff retry"""
response = requests.get(endpoint, params=params)
if response.status_code == 429:
# Rate limited
raise RateLimitError("Rate limit exceeded")
response.raise_for_status()
return response.json()
10.2. Data Sync Failure Handling
def sync_with_error_handling():
"""Sync data with comprehensive error handling"""
failed_syncs = []
try:
# Try to sync
data = extract_meta_ads(AD_ACCOUNT_ID, date_range)
load_to_bigquery(data, TABLE_ID)
except Exception as e:
# Log error
logger.error(f"Sync failed: {str(e)}")
# Save failed sync info
failed_syncs.append({
'timestamp': datetime.now(),
'error': str(e),
'date_range': date_range
})
# Send alert
send_alert(f"Data sync failed: {str(e)}")
# Retry later
schedule_retry(date_range, delay=3600) # 1 hour
return failed_syncs
11. Monitoring & Alerting
11.1. Data Pipeline Health Check
def check_pipeline_health():
"""Check if data pipeline is healthy"""
checks = []
# Check 1: Data freshness
latest_data = get_latest_sync_timestamp()
if datetime.now() - latest_data \> timedelta(hours=6):
checks.append({
'status': 'warning',
'message': 'Data is more than 6 hours old'
})
# Check 2: Data completeness
yesterday_rows = count_rows_for_date(datetime.now() - timedelta(days=1))
expected_rows = get_active_campaign_count()
if yesterday_rows \< expected_rows * 0.9:
checks.append({
'status': 'error',
'message': f'Only {yesterday_rows}/{expected_rows} campaigns synced'
})
# Check 3: Error rate
error_rate = get_sync_error_rate_last_24h()
if error_rate \> 0.1:
checks.append({
'status': 'warning',
'message': f'Sync error rate: {error_rate*100}%'
})
return checks
11.2. Alerting Rules
- Data freshness > 6 hours → Warning email
- Data completeness < 90% → Error email + Slack
- Sync error rate > 10% → Warning
- API rate limit hit → Info notification
12. Security & Access Control
12.1. BigQuery IAM
roles:
- role: roles/bigquery.dataViewer
members:
- serviceAccount:[email protected]
- group:[email protected]
- role: roles/bigquery.dataEditor
members:
- serviceAccount:[email protected]
- role: roles/bigquery.admin
members:
- user:[email protected]
12.2. API Credentials Management
from google.cloud import secretmanager
def get_meta_access_token():
"""Get Meta access token from Secret Manager"""
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{PROJECT_ID}/secrets/meta-access-token/versions/latest"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode('UTF-8')
13. Documentation
13.1. Data Dictionary
Maintain comprehensive documentation for all BigQuery tables:
- Table purpose
- Column definitions
- Data types
- Sample queries
- Update frequency
13.2. Runbook
Document common issues and solutions:
- Data sync failures → Check API credentials, rate limits
- Missing data → Check campaign naming, verify campaigns active
- Incorrect mapping → Validate campaign name format