Building a Sitemap URL Extractor Pipeline with Python, dlt, BigQuery & Paradime Bolt

We'll walk through how to build a fully automated sitemap URL extraction pipeline using Python, dlt (data load tool), Google BigQuery, and Paradime Bolt as the orchestrator — all wired together as code.

Kaustav Mitra

·

Feb 28, 2026

·

8

min read

paradime bolt extract sitemaps

TL;DR: In this post, we'll walk through how to build a fully automated sitemap URL extraction pipeline using Python, dlt (data load tool), Google BigQuery, and Paradime Bolt as the orchestrator — all wired together as code. We'll also show how to close the loop with a Zapier workflow that pings your marketing team on Slack the moment new content is published on a competitor or partner site.

🧭 The Problem: You're Missing What Your Competitors Are Publishing

Marketing and content teams often struggle with a deceptively simple question: "What has [competitor / partner / industry publication] published this week?"

The usual answer involves someone manually checking websites, setting up Google Alerts that miss things, or paying for expensive SEO tools that are overkill for this one job.

But every public website worth monitoring has a sitemap — a structured XML file that lists every published URL along with when it was last modified. Sitemaps are the canonical, machine-readable source of truth for a site's content inventory.

The opportunity for data teams is clear:

Treat competitor and industry publication sitemaps as a data source. Extract them on a schedule, load them into your warehouse, and let your marketing team query and alert on new content — automatically.

This pipeline does exactly that. Here's what it delivers:

  1. Extract every URL (and its lastmod date) from any website's sitemap — including nested sitemap indexes.

  2. Filter by content categories relevant to your team (e.g. /blog, /thought-leadership, /research).

  3. Load results incrementally into BigQuery using dlt.

  4. Alert your marketing team on Slack via a Zapier workflow when new articles are detected.

  5. Orchestrate the whole thing weekly using Paradime Bolt — scheduled as code, no manual babysitting required.

💡 Why Does This Matter for Marketing Teams?

Before diving into the technical build, let's ground this in the real problem it solves.

The Thought Leadership & Content Gap Problem

B2B marketing teams — especially in data, tech, and professional services — live or die by thought leadership. The goal isn't just to produce content; it's to produce the right content at the right time, filling gaps that your competitors haven't covered yet.

Without systematic monitoring, content teams face three recurring pain points:

1. 🕳️ Reactive Content Strategy

Teams only discover what competitors are writing about after those articles start ranking or circulating on LinkedIn. By then, the window to be first — or to offer a contrarian take — has closed. A pipeline that surfaces new competitor content within days of publication puts your content team in a proactive position.

2. 📊 No Data on Publishing Cadence

How often does a key industry publication post? Which categories are heating up? Are competitors doubling down on AI governance while you're still writing about data quality basics? Without a queryable dataset of URLs and publish dates, these questions can only be answered with gut feel.

With a sitemap pipeline feeding BigQuery, you can run queries like:

-- How many articles did each content category receive in the last 90 days?
SELECT
  REGEXP_EXTRACT(url, r'https://[^/]+/([^/]+)/') AS category,
  COUNT(*) AS article_count,
  MIN(DATE(last_modified)) AS first_seen,
  MAX(DATE(last_modified)) AS last_seen
FROM `your_project.internal_tools.sitemap_urls`
WHERE DATE(last_modified) >= CURRENT_DATE() - 90
GROUP BY 1
ORDER BY 2 DESC
-- How many articles did each content category receive in the last 90 days?
SELECT
  REGEXP_EXTRACT(url, r'https://[^/]+/([^/]+)/') AS category,
  COUNT(*) AS article_count,
  MIN(DATE(last_modified)) AS first_seen,
  MAX(DATE(last_modified)) AS last_seen
FROM `your_project.internal_tools.sitemap_urls`
WHERE DATE(last_modified) >= CURRENT_DATE() - 90
GROUP BY 1
ORDER BY 2 DESC
-- How many articles did each content category receive in the last 90 days?
SELECT
  REGEXP_EXTRACT(url, r'https://[^/]+/([^/]+)/') AS category,
  COUNT(*) AS article_count,
  MIN(DATE(last_modified)) AS first_seen,
  MAX(DATE(last_modified)) AS last_seen
FROM `your_project.internal_tools.sitemap_urls`
WHERE DATE(last_modified) >= CURRENT_DATE() - 90
GROUP BY 1
ORDER BY 2 DESC

3. 🤝 Missed Partnership & Outreach Windows

If a publication just ran a feature on a topic you specialise in, that's a warm outreach window — for a guest post pitch, a data collaboration, or a quote contribution. But that window is short. Knowing within 48–72 hours that a relevant article was published can be the difference between a timely, relevant pitch and one that lands cold.

What Your Marketing Team Can Do With This Data

Once URLs and publish dates are in BigQuery and Slack alerts are flowing, the use cases multiply quickly:

Use Case

How the Pipeline Enables It

Content gap analysis

Query which topics competitors cover that you don't

Publishing frequency benchmarks

Track how often key publications post by category

Timely outreach

Get Slack alerts within days of relevant articles going live

Editorial calendar alignment

Identify content clusters forming in your industry

Share-of-voice tracking

Monitor which topics dominate a publication over time

Backlink prospecting

Identify freshly published pages worth targeting for links

🏗️ Stack Overview

Layer

Tool

Extraction

Python + requests + xml.etree.ElementTree

Loading

dlt (data load tool)

Destination

Google BigQuery

Orchestration

Paradime Bolt

Scheduling as Code

paradime_schedules.yml

Alerting

Zapier → Slack

📁 Project Structure

The script lives alongside your dbt™ project, making it easy to co-locate your ingestion and transformation layers in a single monorepo:




🔍 Step 1: Extracting URLs from the Sitemap

Sitemaps come in two flavours:

  • Sitemap Index — a top-level XML file pointing to child sitemaps (common on large sites).

  • URL Set — a sitemap containing actual page URLs with optional metadata like lastmod.

Our extractor handles both automatically using a breadth-first processing queue that recursively discovers and fetches child sitemaps:

def extract_urls_from_sitemap(sitemap_url: str) -> List[Dict[str, str]]:
    url_data = {}
    sitemaps_to_process = [sitemap_url]
    processed_sitemaps = set()

    while sitemaps_to_process:
        current_sitemap = sitemaps_to_process.pop(0)

        if current_sitemap in processed_sitemaps:
            continue

        processed_sitemaps.add(current_sitemap)

        response = requests.get(current_sitemap, timeout=30)
        response.raise_for_status()

        root = ET.fromstring(response.content)
        namespaces = {'sm': '<http://www.sitemaps.org/schemas/sitemap/0.9>'}

        # If this is a sitemap index, queue its children
        sitemap_elements = root.findall('.//sm:sitemap/sm:loc', namespaces)
        if sitemap_elements:
            for sitemap_elem in sitemap_elements:
                child_sitemap_url = sitemap_elem.text.strip()
                if child_sitemap_url not in processed_sitemaps:
                    sitemaps_to_process.append(child_sitemap_url)

        # Extract regular page URLs
        for url_elem in root.findall('.//sm:url', namespaces):
            loc = url_elem.find('sm:loc', namespaces)
            lastmod = url_elem.find('sm:lastmod', namespaces)

            if loc is not None:
                url = loc.text.strip()
                url_id = hashlib.md5(url.encode('utf-8')).hexdigest()
                url_data[url] = {
                    'unique_id': url_id,
                    'url': url,
                    'last_modified': lastmod.text.strip() if lastmod is not None else None
                }

    return list(url_data.values())
def extract_urls_from_sitemap(sitemap_url: str) -> List[Dict[str, str]]:
    url_data = {}
    sitemaps_to_process = [sitemap_url]
    processed_sitemaps = set()

    while sitemaps_to_process:
        current_sitemap = sitemaps_to_process.pop(0)

        if current_sitemap in processed_sitemaps:
            continue

        processed_sitemaps.add(current_sitemap)

        response = requests.get(current_sitemap, timeout=30)
        response.raise_for_status()

        root = ET.fromstring(response.content)
        namespaces = {'sm': '<http://www.sitemaps.org/schemas/sitemap/0.9>'}

        # If this is a sitemap index, queue its children
        sitemap_elements = root.findall('.//sm:sitemap/sm:loc', namespaces)
        if sitemap_elements:
            for sitemap_elem in sitemap_elements:
                child_sitemap_url = sitemap_elem.text.strip()
                if child_sitemap_url not in processed_sitemaps:
                    sitemaps_to_process.append(child_sitemap_url)

        # Extract regular page URLs
        for url_elem in root.findall('.//sm:url', namespaces):
            loc = url_elem.find('sm:loc', namespaces)
            lastmod = url_elem.find('sm:lastmod', namespaces)

            if loc is not None:
                url = loc.text.strip()
                url_id = hashlib.md5(url.encode('utf-8')).hexdigest()
                url_data[url] = {
                    'unique_id': url_id,
                    'url': url,
                    'last_modified': lastmod.text.strip() if lastmod is not None else None
                }

    return list(url_data.values())
def extract_urls_from_sitemap(sitemap_url: str) -> List[Dict[str, str]]:
    url_data = {}
    sitemaps_to_process = [sitemap_url]
    processed_sitemaps = set()

    while sitemaps_to_process:
        current_sitemap = sitemaps_to_process.pop(0)

        if current_sitemap in processed_sitemaps:
            continue

        processed_sitemaps.add(current_sitemap)

        response = requests.get(current_sitemap, timeout=30)
        response.raise_for_status()

        root = ET.fromstring(response.content)
        namespaces = {'sm': '<http://www.sitemaps.org/schemas/sitemap/0.9>'}

        # If this is a sitemap index, queue its children
        sitemap_elements = root.findall('.//sm:sitemap/sm:loc', namespaces)
        if sitemap_elements:
            for sitemap_elem in sitemap_elements:
                child_sitemap_url = sitemap_elem.text.strip()
                if child_sitemap_url not in processed_sitemaps:
                    sitemaps_to_process.append(child_sitemap_url)

        # Extract regular page URLs
        for url_elem in root.findall('.//sm:url', namespaces):
            loc = url_elem.find('sm:loc', namespaces)
            lastmod = url_elem.find('sm:lastmod', namespaces)

            if loc is not None:
                url = loc.text.strip()
                url_id = hashlib.md5(url.encode('utf-8')).hexdigest()
                url_data[url] = {
                    'unique_id': url_id,
                    'url': url,
                    'last_modified': lastmod.text.strip() if lastmod is not None else None
                }

    return list(url_data.values())

Key design decisions:

  • Deduplication: URLs are stored in a dictionary keyed by URL string — no duplicates across nested sitemaps.

  • MD5 hashing: Each URL gets a stable unique_id derived from an MD5 hash of the URL string. This is used as the merge key in BigQuery.

  • Namespace-aware parsing: The sitemap XML namespace (http://www.sitemaps.org/schemas/sitemap/0.9) is handled explicitly to ensure compatibility across all compliant sitemaps.

🔎 Step 2: Filtering to Relevant Content Categories

Large publication sites can have tens of thousands of URLs — including author profiles, tag pages, pagination, and other non-article pages you don't care about. The --filter-paths argument lets you narrow scope to only the content categories that matter:

def filter_urls_by_paths(
    url_data: List[Dict[str, str]],
    path_filters: List[str]
) -> List[Dict[str, str]]:
    if not path_filters:
        return url_data

    filtered_data = []
    for item in url_data:
        url = item['url']
        for path_filter in path_filters:
            if path_filter in url:
                filtered_data.append(item)
                break

    return filtered_data
def filter_urls_by_paths(
    url_data: List[Dict[str, str]],
    path_filters: List[str]
) -> List[Dict[str, str]]:
    if not path_filters:
        return url_data

    filtered_data = []
    for item in url_data:
        url = item['url']
        for path_filter in path_filters:
            if path_filter in url:
                filtered_data.append(item)
                break

    return filtered_data
def filter_urls_by_paths(
    url_data: List[Dict[str, str]],
    path_filters: List[str]
) -> List[Dict[str, str]]:
    if not path_filters:
        return url_data

    filtered_data = []
    for item in url_data:
        url = item['url']
        for path_filter in path_filters:
            if path_filter in url:
                filtered_data.append(item)
                break

    return filtered_data

For an industry publication, you might filter to editorial content paths only:

--filter-paths
--filter-paths
--filter-paths

This keeps your BigQuery table clean and query-performant — only the signal, none of the noise.

🚀 Step 3: Loading to BigQuery with dlt

dlt makes loading data to BigQuery dead simple. It handles schema inference, table creation, and incremental loading out of the box — no boilerplate infrastructure code required.

The dlt Resource

We wrap our URL data in a dlt resource generator that stamps each record with a loaded_on timestamp:

def sitemap_urls_resource(url_data: List[Dict[str, str]]) -> Iterator[Dict[str, str]]:
    loaded_on = datetime.utcnow().isoformat()

    for item in url_data:
        yield {
            'loaded_on': loaded_on,
            'uid': item['unique_id'],
            'url': item['url'],
            'last_modified': item['last_modified']
        }
def sitemap_urls_resource(url_data: List[Dict[str, str]]) -> Iterator[Dict[str, str]]:
    loaded_on = datetime.utcnow().isoformat()

    for item in url_data:
        yield {
            'loaded_on': loaded_on,
            'uid': item['unique_id'],
            'url': item['url'],
            'last_modified': item['last_modified']
        }
def sitemap_urls_resource(url_data: List[Dict[str, str]]) -> Iterator[Dict[str, str]]:
    loaded_on = datetime.utcnow().isoformat()

    for item in url_data:
        yield {
            'loaded_on': loaded_on,
            'uid': item['unique_id'],
            'url': item['url'],
            'last_modified': item['last_modified']
        }

The dlt Pipeline

The pipeline is configured with write_disposition="merge" and primary_key="uid" — meaning on every run, dlt will upsert rows into BigQuery. New URLs are inserted; existing ones are updated if their last_modified date has changed.

pipeline = dlt.pipeline(
    pipeline_name="sitemap_urls",
    destination="bigquery",
    dataset_name=dataset_name,
)

sitemap_resource = dlt.resource(
    sitemap_urls_resource(url_data),
    name=table_name,
    write_disposition="merge",
    primary_key="uid",
)

load_info = pipeline.run(sitemap_resource)
pipeline = dlt.pipeline(
    pipeline_name="sitemap_urls",
    destination="bigquery",
    dataset_name=dataset_name,
)

sitemap_resource = dlt.resource(
    sitemap_urls_resource(url_data),
    name=table_name,
    write_disposition="merge",
    primary_key="uid",
)

load_info = pipeline.run(sitemap_resource)
pipeline = dlt.pipeline(
    pipeline_name="sitemap_urls",
    destination="bigquery",
    dataset_name=dataset_name,
)

sitemap_resource = dlt.resource(
    sitemap_urls_resource(url_data),
    name=table_name,
    write_disposition="merge",
    primary_key="uid",
)

load_info = pipeline.run(sitemap_resource)

This gives you a clean, deduplicated table in BigQuery that grows incrementally over time — a permanent, queryable history of every published URL and when it was last modified.

📊 What Lands in BigQuery

After the first successful run, you'll have a table with this schema:

Column

Type

Description

loaded_on

TIMESTAMP

When the row was loaded by the pipeline

uid

STRING

MD5 hash of the URL (primary key for upserts)

url

STRING

The full page URL

last_modified

STRING

The lastmod value from the sitemap (if present)

Because we use write_disposition="merge" with primary_key="uid", each pipeline run will:

  • Insert new URLs discovered since the last run.

  • 🔄 Update existing URLs if their metadata has changed.

  • Never duplicate rows.

🔐 Step 4: Authentication — Reusing dbt™ Profile Credentials

The script integrates with your existing dbt™ profile credentials via a ProfileConnectionCredentialsLoader utility. This means you don't need to manage a separate set of secrets for your ingestion layer — it reuses whatever BigQuery service account is already configured for your dbt™ project:

def setup_bigquery_credentials(dbt_project_dir, dbt_profiles_dir, ...) -> None:
    loader = ProfileConnectionCredentialsLoader(
        project_dir=dbt_project_dir,
        profiles_dir=dbt_profiles_dir
    )

    dbt_creds = loader.get_credentials_by_environment(non_prod_target='ingestion')
    keyfile_json = dbt_creds.get('keyfile_json')
    bq_location = dbt_creds.get('location') or 'EU'

    if isinstance(keyfile_json, dict):
        os.environ["DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID"] = (
            bigquery_project or keyfile_json.get("project_id", "")
        )
        os.environ["DESTINATION__BIGQUERY__CREDENTIALS__PRIVATE_KEY"] = keyfile_json.get("private_key", "")
        os.environ["DESTINATION__BIGQUERY__CREDENTIALS__CLIENT_EMAIL"] = keyfile_json.get("client_email", "")
        os.environ["DESTINATION__BIGQUERY__LOCATION"] = bq_location
def setup_bigquery_credentials(dbt_project_dir, dbt_profiles_dir, ...) -> None:
    loader = ProfileConnectionCredentialsLoader(
        project_dir=dbt_project_dir,
        profiles_dir=dbt_profiles_dir
    )

    dbt_creds = loader.get_credentials_by_environment(non_prod_target='ingestion')
    keyfile_json = dbt_creds.get('keyfile_json')
    bq_location = dbt_creds.get('location') or 'EU'

    if isinstance(keyfile_json, dict):
        os.environ["DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID"] = (
            bigquery_project or keyfile_json.get("project_id", "")
        )
        os.environ["DESTINATION__BIGQUERY__CREDENTIALS__PRIVATE_KEY"] = keyfile_json.get("private_key", "")
        os.environ["DESTINATION__BIGQUERY__CREDENTIALS__CLIENT_EMAIL"] = keyfile_json.get("client_email", "")
        os.environ["DESTINATION__BIGQUERY__LOCATION"] = bq_location
def setup_bigquery_credentials(dbt_project_dir, dbt_profiles_dir, ...) -> None:
    loader = ProfileConnectionCredentialsLoader(
        project_dir=dbt_project_dir,
        profiles_dir=dbt_profiles_dir
    )

    dbt_creds = loader.get_credentials_by_environment(non_prod_target='ingestion')
    keyfile_json = dbt_creds.get('keyfile_json')
    bq_location = dbt_creds.get('location') or 'EU'

    if isinstance(keyfile_json, dict):
        os.environ["DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID"] = (
            bigquery_project or keyfile_json.get("project_id", "")
        )
        os.environ["DESTINATION__BIGQUERY__CREDENTIALS__PRIVATE_KEY"] = keyfile_json.get("private_key", "")
        os.environ["DESTINATION__BIGQUERY__CREDENTIALS__CLIENT_EMAIL"] = keyfile_json.get("client_email", "")
        os.environ["DESTINATION__BIGQUERY__LOCATION"] = bq_location

dlt picks up BigQuery credentials automatically from the DESTINATION__BIGQUERY__CREDENTIALS__* environment variables — no separate config files or credential juggling needed.

🎛️ Step 5: CLI Interface

The script is fully driven by command-line arguments, making it trivial to plug into any orchestrator:

python extract.py \
  --sitemap-url <https://www.industry-publication.com/sitemap.xml> \
  --bq-dataset competitive_intel \
  --bq-table sitemap_urls \
  --bq-project your-gcp-project \
  --filter-paths

python extract.py \
  --sitemap-url <https://www.industry-publication.com/sitemap.xml> \
  --bq-dataset competitive_intel \
  --bq-table sitemap_urls \
  --bq-project your-gcp-project \
  --filter-paths

python extract.py \
  --sitemap-url <https://www.industry-publication.com/sitemap.xml> \
  --bq-dataset competitive_intel \
  --bq-table sitemap_urls \
  --bq-project your-gcp-project \
  --filter-paths

Argument

Required

Description

--sitemap-url

The root sitemap URL to crawl

--bq-dataset

BigQuery dataset name

--bq-table

BigQuery table name

--bq-project

Override the BigQuery project ID

--filter-paths

Comma-separated URL path filters

--test

Test mode — only loads first 10 rows

⚙️ Step 6: Orchestrating with Paradime Bolt

Now comes the part that ties it all together — scheduling this pipeline to run automatically using Paradime Bolt.

Bolt supports schedules as code, meaning your pipeline schedules live in a paradime_schedules.yml file right inside your dbt™ project directory — version-controlled, PR-reviewed, and deployed automatically.

Why Paradime Bolt?

  • 📦 Schedules as code — pipeline config lives in Git alongside your dbt™ models, reviewed like any other code change.

  • 🔁 Flexible triggers — cron schedules, run-completion chains, merge triggers, or API-driven execution.

  • 📣 Built-in notifications — Slack, email, and Microsoft Teams alerts without any extra tooling.

  • 🔒 Production environment isolation — runs against your production warehouse credentials securely.

  • 👀 Full observability — logs, run history, and SLA monitoring in the Bolt UI.

The paradime_schedules.yml Configuration

This is the real-world schedule configuration — running every Friday at 8 AM UTC, monitoring an industry publication for new content across a defined set of content categories, then triggering a Zapier webhook to fan out Slack alerts:

# dbt/paradime_schedules.yml

schedules:

  - name: extract-industry-publication-sitemap
    description: >-
      This pipeline extracts URLs from an industry publication website so we
      can detect new content and build effective outreach or content pipelines
      around it. Data is fetched from the website sitemap and written to
      BigQuery, then we trigger a Zapier workflow to send alerts to Slack on
      new content published.
    owner_email: data-team@yourcompany.com
    environment: production
    git_branch: main
    # Runs every Friday at 8:00 AM UTC — a weekly content intelligence brief
    schedule: "0 8 * * 5"
    timezone: UTC
    sla_minutes: 120

    commands:
      # Step 1: Extract sitemap URLs and load to BigQuery
      - >-
        python ./python/dlthub/extract-sitemap-url/extract.py
        --sitemap-url <https://www.industry-publication.com/sitemap.xml>
        --bq-project your-gcp-project
        --bq-dataset internal_tools
        --bq-table sitemap_urls
        --filter-paths /opinion,/research,/interviews,/thought-leadership,/podcasts,/leadership

      # Step 2: Trigger Zapier webhook to fire Slack alerts
      - >-
        python ./python/trigger_zapier_webhook.py
        --webhook-url <https://hooks.zapier.com/hooks/catch/YOUR_HOOK_ID/>

    notifications:
      slack_channels:
        - channel: '#analytics-alerts'
          events

# dbt/paradime_schedules.yml

schedules:

  - name: extract-industry-publication-sitemap
    description: >-
      This pipeline extracts URLs from an industry publication website so we
      can detect new content and build effective outreach or content pipelines
      around it. Data is fetched from the website sitemap and written to
      BigQuery, then we trigger a Zapier workflow to send alerts to Slack on
      new content published.
    owner_email: data-team@yourcompany.com
    environment: production
    git_branch: main
    # Runs every Friday at 8:00 AM UTC — a weekly content intelligence brief
    schedule: "0 8 * * 5"
    timezone: UTC
    sla_minutes: 120

    commands:
      # Step 1: Extract sitemap URLs and load to BigQuery
      - >-
        python ./python/dlthub/extract-sitemap-url/extract.py
        --sitemap-url <https://www.industry-publication.com/sitemap.xml>
        --bq-project your-gcp-project
        --bq-dataset internal_tools
        --bq-table sitemap_urls
        --filter-paths /opinion,/research,/interviews,/thought-leadership,/podcasts,/leadership

      # Step 2: Trigger Zapier webhook to fire Slack alerts
      - >-
        python ./python/trigger_zapier_webhook.py
        --webhook-url <https://hooks.zapier.com/hooks/catch/YOUR_HOOK_ID/>

    notifications:
      slack_channels:
        - channel: '#analytics-alerts'
          events

# dbt/paradime_schedules.yml

schedules:

  - name: extract-industry-publication-sitemap
    description: >-
      This pipeline extracts URLs from an industry publication website so we
      can detect new content and build effective outreach or content pipelines
      around it. Data is fetched from the website sitemap and written to
      BigQuery, then we trigger a Zapier workflow to send alerts to Slack on
      new content published.
    owner_email: data-team@yourcompany.com
    environment: production
    git_branch: main
    # Runs every Friday at 8:00 AM UTC — a weekly content intelligence brief
    schedule: "0 8 * * 5"
    timezone: UTC
    sla_minutes: 120

    commands:
      # Step 1: Extract sitemap URLs and load to BigQuery
      - >-
        python ./python/dlthub/extract-sitemap-url/extract.py
        --sitemap-url <https://www.industry-publication.com/sitemap.xml>
        --bq-project your-gcp-project
        --bq-dataset internal_tools
        --bq-table sitemap_urls
        --filter-paths /opinion,/research,/interviews,/thought-leadership,/podcasts,/leadership

      # Step 2: Trigger Zapier webhook to fire Slack alerts
      - >-
        python ./python/trigger_zapier_webhook.py
        --webhook-url <https://hooks.zapier.com/hooks/catch/YOUR_HOOK_ID/>

    notifications:
      slack_channels:
        - channel: '#analytics-alerts'
          events

How Bolt Deploys It

Once you merge paradime_schedules.yml to your default branch (main or master), Paradime automatically picks up the schedule within 10 minutes. No manual configuration in the UI required.

You can also trigger an immediate refresh by navigating to the Bolt UI and clicking Parse Schedules.

🔔 Step 7: Closing the Loop — Slack Alerts via Zapier

Loading data into BigQuery is only half the value. The other half is getting that information in front of your marketing team without them having to remember to check a dashboard.

This is where a Zapier workflow bridges the gap between your data warehouse and your team's Slack channel.

The Zapier Workflow

The workflow has 8 steps that together form a complete "new content detected → Slack notification" loop:

[1] Webhook Trigger (Catch Hook)
        │
        ▼
[2] BigQuery Query
    └─ SELECT urls modified in the last 6 days
    └─ Exclude top-level category index pages
        │
        ▼
[3] Filter: Only continue if totalRows > 0
    └─ Skip silently if no new content found
        │
        ▼
[4] Format: Today's Date  (DD-MM-YYYY)
        │
        ▼
[5] Format: Date 6 Days Ago  (DD-MM-YYYY)
        │
        ▼
[6] Slack: Post summary message to channel
    └─ "📚 Found X new articles posted between [date] and [today]"
        │
        ▼
[7] Loop: Iterate over each URL row
        │
        ▼
[8] Slack: Post each URL as a threaded reply
    └─ "<url> was posted on [last_modified_on]

[1] Webhook Trigger (Catch Hook)
        │
        ▼
[2] BigQuery Query
    └─ SELECT urls modified in the last 6 days
    └─ Exclude top-level category index pages
        │
        ▼
[3] Filter: Only continue if totalRows > 0
    └─ Skip silently if no new content found
        │
        ▼
[4] Format: Today's Date  (DD-MM-YYYY)
        │
        ▼
[5] Format: Date 6 Days Ago  (DD-MM-YYYY)
        │
        ▼
[6] Slack: Post summary message to channel
    └─ "📚 Found X new articles posted between [date] and [today]"
        │
        ▼
[7] Loop: Iterate over each URL row
        │
        ▼
[8] Slack: Post each URL as a threaded reply
    └─ "<url> was posted on [last_modified_on]

[1] Webhook Trigger (Catch Hook)
        │
        ▼
[2] BigQuery Query
    └─ SELECT urls modified in the last 6 days
    └─ Exclude top-level category index pages
        │
        ▼
[3] Filter: Only continue if totalRows > 0
    └─ Skip silently if no new content found
        │
        ▼
[4] Format: Today's Date  (DD-MM-YYYY)
        │
        ▼
[5] Format: Date 6 Days Ago  (DD-MM-YYYY)
        │
        ▼
[6] Slack: Post summary message to channel
    └─ "📚 Found X new articles posted between [date] and [today]"
        │
        ▼
[7] Loop: Iterate over each URL row
        │
        ▼
[8] Slack: Post each URL as a threaded reply
    └─ "<url> was posted on [last_modified_on]

The BigQuery Query Inside Zapier

The heart of the workflow is this query, which runs inside Zapier's BigQuery action. It fetches only articles published in the last 6 days, filtering out top-level category index pages (which appear in the sitemap but aren't articles):

SELECT
  DATE(last_modified) AS last_modified_on,
  url
FROM `your-project.internal_tools.sitemap_urls`
WHERE
  -- Only look at content from the last 6 days (matching the weekly cadence)
  DATE(last_modified) BETWEEN CURRENT_DATE() - 6 AND CURRENT_DATE()
  -- Exclude top-level category index pages — these aren't articles
  AND url NOT IN (
    '<https://www.industry-publication.com/>'
  )
ORDER BY 1 DESC
SELECT
  DATE(last_modified) AS last_modified_on,
  url
FROM `your-project.internal_tools.sitemap_urls`
WHERE
  -- Only look at content from the last 6 days (matching the weekly cadence)
  DATE(last_modified) BETWEEN CURRENT_DATE() - 6 AND CURRENT_DATE()
  -- Exclude top-level category index pages — these aren't articles
  AND url NOT IN (
    '<https://www.industry-publication.com/>'
  )
ORDER BY 1 DESC
SELECT
  DATE(last_modified) AS last_modified_on,
  url
FROM `your-project.internal_tools.sitemap_urls`
WHERE
  -- Only look at content from the last 6 days (matching the weekly cadence)
  DATE(last_modified) BETWEEN CURRENT_DATE() - 6 AND CURRENT_DATE()
  -- Exclude top-level category index pages — these aren't articles
  AND url NOT IN (
    '<https://www.industry-publication.com/>'
  )
ORDER BY 1 DESC

What Your Team Sees in Slack

Every Friday morning, the marketing channel receives:

📚 industry-publication.com — Found 7 new articles posted between 21-02-2026 and 28-02-2026 (today)

Followed by a thread of individual links:

https://www.industry-publication.com/research/data-talent-report-q1 was posted on 2026-02-24(and so on...)

No dashboard login. No manual checking. Just a weekly digest landing directly in the channel where your content team already works.

🧪 Testing the Pipeline Locally

Before scheduling in Bolt, test the script locally with the --test flag to load only the first 10 rows:

# Set your dbt project environment variables
export DBT_PROJECT_DIR=/path/to/dbt
export DBT_PROFILES_DIR=/path/to/.dbt

# Run in test mode
python dbt/python/dlthub/extract-sitemap-url/extract.py \
  --sitemap-url <https://www.industry-publication.com/sitemap.xml> \
  --bq-dataset internal_tools_dev \
  --bq-table sitemap_urls \
  --test
# Set your dbt project environment variables
export DBT_PROJECT_DIR=/path/to/dbt
export DBT_PROFILES_DIR=/path/to/.dbt

# Run in test mode
python dbt/python/dlthub/extract-sitemap-url/extract.py \
  --sitemap-url <https://www.industry-publication.com/sitemap.xml> \
  --bq-dataset internal_tools_dev \
  --bq-table sitemap_urls \
  --test
# Set your dbt project environment variables
export DBT_PROJECT_DIR=/path/to/dbt
export DBT_PROFILES_DIR=/path/to/.dbt

# Run in test mode
python dbt/python/dlthub/extract-sitemap-url/extract.py \
  --sitemap-url <https://www.industry-publication.com/sitemap.xml> \
  --bq-dataset internal_tools_dev \
  --bq-table sitemap_urls \
  --test

Validate your paradime_schedules.yml with the Paradime CLI:

cd

cd

cd

Simulate execution without touching BigQuery or Zapier:

paradime schedule run extract-industry-publication-sitemap --dry-run
paradime schedule run extract-industry-publication-sitemap --dry-run
paradime schedule run extract-industry-publication-sitemap --dry-run

🔄 The Full End-to-End Flow

Putting it all together, here's what happens every Friday at 8 AM UTC:

08:00 UTC — Friday
    │
    ▼
[Paradime Bolt] triggers the schedule
    │
    ├──► python extract.py
    │         └─ Fetches sitemap XML (recursively)
    │         └─ Filters to editorial content paths
    │         └─ Upserts into BigQuery: internal_tools.sitemap_urls
    │
    └──► python trigger_zapier_webhook.py
              └─ HTTP POST to Zapier webhook
                      │
                      ▼
              [Zapier] runs BigQuery query
                      └─ Finds articles modified in last 6 days
                      └─ Filters: totalRows > 0?
                              │
                              ▼ (if yes)
                      [Slack]

08:00 UTC — Friday
    │
    ▼
[Paradime Bolt] triggers the schedule
    │
    ├──► python extract.py
    │         └─ Fetches sitemap XML (recursively)
    │         └─ Filters to editorial content paths
    │         └─ Upserts into BigQuery: internal_tools.sitemap_urls
    │
    └──► python trigger_zapier_webhook.py
              └─ HTTP POST to Zapier webhook
                      │
                      ▼
              [Zapier] runs BigQuery query
                      └─ Finds articles modified in last 6 days
                      └─ Filters: totalRows > 0?
                              │
                              ▼ (if yes)
                      [Slack]

08:00 UTC — Friday
    │
    ▼
[Paradime Bolt] triggers the schedule
    │
    ├──► python extract.py
    │         └─ Fetches sitemap XML (recursively)
    │         └─ Filters to editorial content paths
    │         └─ Upserts into BigQuery: internal_tools.sitemap_urls
    │
    └──► python trigger_zapier_webhook.py
              └─ HTTP POST to Zapier webhook
                      │
                      ▼
              [Zapier] runs BigQuery query
                      └─ Finds articles modified in last 6 days
                      └─ Filters: totalRows > 0?
                              │
                              ▼ (if yes)
                      [Slack]

✅ Summary

Here's what this pipeline gives you end-to-end:

Step

What Happens

Extraction

Python recursively crawls any sitemap, including nested sitemap indexes

Filtering

Path-based filters keep only relevant editorial content categories

Deduplication

MD5-hashed primary keys ensure no duplicate rows in BigQuery

Incremental Loading

dlt upserts into BigQuery — inserts new, updates changed, never duplicates

Authentication

Credentials reused from dbt™ profile — no extra secrets management

Scheduling

Paradime Bolt runs the pipeline on a cron schedule via paradime_schedules.yml

Alerting

Zapier queries BigQuery and posts a Slack digest for new content

Marketing Value

Content teams get a weekly brief on new articles — no dashboards required

The beauty of this architecture is that it's entirely low-cost, low-maintenance, and high-signal. Sitemaps are publicly available on almost every major website. The pipeline requires no scraping, no authentication against third-party APIs, and no paid SEO tools. Just Python, dlt, BigQuery, Bolt, and Zapier — tools most data teams already have.

🔗 Resources

Interested to Learn More?
Try Out the Free 14-Days Trial

More Articles

decorative icon

Experience Analytics for the AI-Era

Start your 14-day trial today - it's free and no credit card needed

decorative icon

Experience Analytics for the AI-Era

Start your 14-day trial today - it's free and no credit card needed

decorative icon

Experience Analytics for the AI-Era

Start your 14-day trial today - it's free and no credit card needed

Copyright © 2026 Paradime Labs, Inc.

Made with ❤️ in San Francisco ・ London

*dbt® and dbt Core® are federally registered trademarks of dbt Labs, Inc. in the United States and various jurisdictions around the world. Paradime is not a partner of dbt Labs. All rights therein are reserved to dbt Labs. Paradime is not a product or service of or endorsed by dbt Labs, Inc.

Copyright © 2026 Paradime Labs, Inc.

Made with ❤️ in San Francisco ・ London

*dbt® and dbt Core® are federally registered trademarks of dbt Labs, Inc. in the United States and various jurisdictions around the world. Paradime is not a partner of dbt Labs. All rights therein are reserved to dbt Labs. Paradime is not a product or service of or endorsed by dbt Labs, Inc.

Copyright © 2026 Paradime Labs, Inc.

Made with ❤️ in San Francisco ・ London

*dbt® and dbt Core® are federally registered trademarks of dbt Labs, Inc. in the United States and various jurisdictions around the world. Paradime is not a partner of dbt Labs. All rights therein are reserved to dbt Labs. Paradime is not a product or service of or endorsed by dbt Labs, Inc.