Manually loading data export
This article is for data engineers
Learn how to manually load Attribution data export.
If you're going to work with Attribution data export you can use our ETL service or build data pipeline manually. This article contains sample queries to create table structure and how to ingest data. Please note that this example was prepared using Google BigQuery but syntax should be similar (if not the same) for other database.
Create table structure
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.users(
id bigint NOT NULL
,identifier STRING NOT NULL
,created_at timestamp NOT NULL
,project_id integer NOT NULL
,updated_at timestamp NOT NULL
,original_created_at timestamp
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.events(
id bigint NOT NULL
,name STRING(64) NOT NULL
,ip STRING(64)
,created_at timestamp NOT NULL
,user_id bigint
,browser_id bigint
,project_id integer NOT NULL
,time timestamp NOT NULL
,referring_url STRING(8192)
,referring_host STRING(256)
,revenue bigint
,visitor_id bigint
,updated_at timestamp NOT NULL
,uri STRING(8192)
,uri_path STRING(4096)
,self_referral boolean
,message_id STRING(60)
,source STRING(60)
,type STRING(1)
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.params(
key STRING(128) NOT NULL
,value STRING(2048)
,project_id integer NOT NULL
,time timestamp NOT NULL
,event_id bigint
,updated_at timestamp NOT NULL
,id bigint NOT NULL
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.amounts(
id bigint NOT NULL
,value bigint
,created_at timestamp NOT NULL
,filter_id bigint
,source STRING(256)
,date date
,updated_at timestamp NOT NULL
,project_id integer
,amount_range_id bigint
,deleted boolean NOT NULL
,original_value bigint
,original_currency STRING(255)
,conversion_rate numeric(18,6)
,currency_converted_at timestamp
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.browsers(
id bigint NOT NULL
,cookie_id STRING(64) NOT NULL
,created_at timestamp NOT NULL
,user_agent STRING(512)
,updated_at timestamp NOT NULL
,project_id integer
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.visitors(
id bigint NOT NULL
,user_id bigint
,browser_id bigint
,project_id integer NOT NULL
,updated_at timestamp NOT NULL
,ip STRING(255)
,traits STRING
,email STRING(255)
,company_id bigint
,migrated_to bigint
,original_created_at timestamp
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.companies(
id bigint NOT NULL
,project_id integer NOT NULL
,identifier STRING(64) NOT NULL
,updated_at timestamp
,name STRING(255)
,traits STRING
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.properties(
id bigint NOT NULL
,key STRING(200) NOT NULL
,value STRING(200) NOT NULL
,project_id integer NOT NULL
,event_id bigint NOT NULL
,updated_at timestamp NOT NULL
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.impressions (
id bigint NOT NULL,
value bigint,
filter_id bigint,
source STRING,
date date,
updated_at timestamp NOT NULL,
project_id integer,
deleted boolean,
clicks bigint
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.visits_2707(
id bigint
,visitor_id bigint
,visit_time timestamp
,filter bigint
,company_id bigint
,visit_type STRING(1)
,user_id bigint
,original_created_at timestamp
,path STRING(255)
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.x2707_filters(
id bigint NOT NULL
,parent_group_id bigint
,top_parent_group_id bigint
,type STRING NOT NULL
,name STRING NOT NULL
,label STRING
,integration STRING
,path STRING
);
CREATE TABLE IF NOT EXISTS attribution_dataset_2707.visits_2707_costs (
cpv FLOAT64,
visit_count integer,
visit_date date,
filter integer,
amount integer
);
Ingest single data export
The queries below show to load a single data export. Each export is placed in a separate folder for each table and named in format of YYYYMMDD_hhmmss
(in example below it's 20240422_190936
). Please note that these set of queries need to be performed for each data export in alphabetical order. Queries below consist of two types:
- Ingest data into temporary table
ext_table
from cloud storage (bucket). The syntax to do that will vary from database to database, please consult with the documentation of the product you are using. - Merge data from
ext_table
into destination ones - this will update existing data and insert new rows. Note that some tables do not support incremental merge and instead would be truncated and load in full.
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/users/20240422_190936/users.*.parquet"]
);
MERGE attribution_dataset_2707.users AS target
USING (
SELECT * FROM ext_table
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.id = source.id, target.identifier = source.identifier, target.created_at = source.created_at, target.project_id = source.project_id, target.updated_at = source.updated_at, target.original_created_at = source.original_created_at
WHEN NOT MATCHED THEN
INSERT (id, identifier, created_at, project_id, updated_at, original_created_at) VALUES (source.id, source.identifier, source.created_at, source.project_id, source.updated_at, source.original_created_at);
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/events/20240422_190936/events.*.parquet"]
);
MERGE attribution_dataset_2707.events AS target
USING (
SELECT * FROM ext_table
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.id = source.id, target.name = source.name, target.ip = source.ip, target.created_at = source.created_at, target.user_id = source.user_id, target.browser_id = source.browser_id, target.project_id = source.project_id, target.time = source.time, target.visitor_id = source.visitor_id, target.referring_url = source.referring_url, target.referring_host = source.referring_host, target.revenue = source.revenue, target.updated_at = source.updated_at, target.uri = source.uri, target.uri_path = source.uri_path, target.self_referral = source.self_referral, target.message_id = source.message_id, target.source = source.source, target.type = source.type
WHEN NOT MATCHED THEN
INSERT (id, name, ip, created_at, user_id, browser_id, project_id, time, visitor_id, referring_url, referring_host, revenue, updated_at, uri, uri_path, self_referral, message_id, source, type) VALUES (source.id, source.name, source.ip, source.created_at, source.user_id, source.browser_id, source.project_id, source.time, source.visitor_id, source.referring_url, source.referring_host, source.revenue, source.updated_at, source.uri, source.uri_path, source.self_referral, source.message_id, source.source, source.type);
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/params/20240422_190936/params.*.parquet"]
);
MERGE attribution_dataset_2707.params AS target
USING (
SELECT * FROM ext_table
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.id = source.id, target.key = source.key, target.value = source.value, target.project_id = source.project_id, target.time = source.time, target.event_id = source.event_id, target.updated_at = source.updated_at
WHEN NOT MATCHED THEN
INSERT (id, key, value, project_id, time, event_id, updated_at) VALUES (source.id, source.key, source.value, source.project_id, source.time, source.event_id, source.updated_at);
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/amounts/20240422_190936/amounts.*.parquet"]
);
MERGE attribution_dataset_2707.amounts AS target
USING (
SELECT * FROM ext_table
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.id = source.id, target.value = source.value, target.created_at = source.created_at, target.filter_id = source.filter_id, target.source = source.source, target.date = source.date, target.updated_at = source.updated_at, target.project_id = source.project_id, target.amount_range_id = source.amount_range_id, target.deleted = source.deleted, target.original_value = source.original_value, target.original_currency = source.original_currency, target.conversion_rate = source.conversion_rate, target.currency_converted_at = source.currency_converted_at
WHEN NOT MATCHED THEN
INSERT (id, value, created_at, filter_id, source, date, updated_at, project_id, amount_range_id, deleted, original_value, original_currency, conversion_rate, currency_converted_at) VALUES (source.id, source.value, source.created_at, source.filter_id, source.source, source.date, source.updated_at, source.project_id, source.amount_range_id, source.deleted, source.original_value, source.original_currency, source.conversion_rate, source.currency_converted_at);
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/browsers/20240422_190936/browsers.*.parquet"]
);
MERGE attribution_dataset_2707.browsers AS target
USING (
SELECT * FROM ext_table
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.id = source.id, target.cookie_id = source.cookie_id, target.created_at = source.created_at, target.user_agent = source.user_agent, target.updated_at = source.updated_at, target.project_id = source.project_id
WHEN NOT MATCHED THEN
INSERT (id, cookie_id, created_at, user_agent, updated_at, project_id) VALUES (source.id, source.cookie_id, source.created_at, source.user_agent, source.updated_at, source.project_id);
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/visitors/20240422_190936/visitors.*.parquet"]
);
MERGE attribution_dataset_2707.visitors AS target
USING (
SELECT * FROM ext_table
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.id = source.id, target.user_id = source.user_id, target.browser_id = source.browser_id, target.project_id = source.project_id, target.updated_at = source.updated_at, target.ip = source.ip, target.traits = source.traits, target.email = source.email, target.company_id = source.company_id, target.migrated_to = source.migrated_to, target.original_created_at = source.original_created_at
WHEN NOT MATCHED THEN
INSERT (id, user_id, browser_id, project_id, updated_at, ip, traits, email, company_id, migrated_to, original_created_at) VALUES (source.id, source.user_id, source.browser_id, source.project_id, source.updated_at, source.ip, source.traits, source.email, source.company_id, source.migrated_to, source.original_created_at);
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/companies/20240422_190936/companies.*.parquet"]
);
MERGE attribution_dataset_2707.companies AS target
USING (
SELECT * FROM ext_table
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.id = source.id, target.project_id = source.project_id, target.identifier = source.identifier, target.updated_at = source.updated_at, target.name = source.name, target.traits = source.traits
WHEN NOT MATCHED THEN
INSERT (id, project_id, identifier, updated_at, name, traits) VALUES (source.id, source.project_id, source.identifier, source.updated_at, source.name, source.traits);
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/properties/20240422_190936/properties.*.parquet"]
);
MERGE attribution_dataset_2707.properties AS target
USING (
SELECT * FROM ext_table
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.id = source.id, target.key = source.key, target.value = source.value, target.project_id = source.project_id, target.event_id = source.event_id, target.updated_at = source.updated_at
WHEN NOT MATCHED THEN
INSERT (id, key, value, project_id, event_id, updated_at) VALUES (source.id, source.key, source.value, source.project_id, source.event_id, source.updated_at);
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/impressions/20240422_190936/impressions.*.parquet"]
);
MERGE attribution_dataset_2707.impressions AS target
USING (
SELECT * FROM ext_table
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.id = source.id, target.value = source.value, target.filter_id = source.filter_id, target.source = source.source, target.date = source.date, target.updated_at = source.updated_at, target.project_id = source.project_id, target.deleted = source.deleted, target.clicks = source.clicks
WHEN NOT MATCHED THEN
INSERT (id, key, value, project_id, event_id, updated_at) VALUES (source.id, source.value, source.filter_id, source.source, source.date, source.updated_at, source.project_id, source.deleted, source.clicks);
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/visits_2707/20240422_190936/visits_2707.*.parquet"]
);
TRUNCATE TABLE attribution_dataset_2707.visits_2707
INSERT INTO attribution_dataset_2707.visits_2707
SELECT * FROM ext_table;
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/x2707_filters/20240422_190936/x2707_filters.*.parquet"]
);
TRUNCATE TABLE attribution_dataset_2707.x2707_filters
INSERT INTO attribution_dataset_2707.x2707_filters
SELECT * FROM ext_table;
CREATE OR REPLACE EXTERNAL TABLE attribution_dataset_2707.ext_table
OPTIONS (
format = "PARQUET",
uris = ["gs://attribution-export-2707/visits_2707_costs/20240422_190936/visits_2707_costs.*.parquet"]
);
TRUNCATE TABLE attribution_dataset_2707.visits_2707_costs
INSERT INTO attribution_dataset_2707.visits_2707_costs
SELECT * FROM ext_table;
Updated 6 months ago