Scrapling 是什么?

Scrapling 是一个自适应的 Python 网页爬虫框架,由开发者 D4Vinci 创建。它在 2026 年 GitHub Trending 上迅速走红,获得超过 57,000 个 Star,成为新一代爬虫工具的代表。

为什么需要 Scrapling?

传统爬虫框架(如 Scrapy、BeautifulSoup)面临三大痛点:

  1. 网站结构变化导致代码失效:CSS 选择器或 XPath 一旦页面改版就全部崩溃
  2. 反爬系统拦截:Cloudflare Turnstile、Akamai 等防护让普通请求直接返回 403
  3. 扩展性差:从小规模抓取到大规模并发爬取,需要重写大量代码

Scrapling 的核心设计理念是 "One library, zero compromises"(一个库,零妥协):

  • 自适应解析器:自动学习网站结构,页面更新后自动重新定位元素
  • 内置反反爬:开箱即用,绕过 Cloudflare Turnstile 等主流防护
  • Scrapy-like API:熟悉 Scrapy 的开发者可以无缝迁移
  • 从单请求到全量爬取:同一套 API 支持简单抓取和大规模并发爬虫

Scrapling vs 传统爬虫工具

特性 BeautifulSoup Scrapy Scrapling
学习曲线
自适应解析
绕过 Cloudflare 需插件 ✅ 内置
并发爬取
动态页面支持 需中间件 ✅ 内置 Playwright
暂停/恢复 需扩展 ✅ 内置
流式输出

安装 Scrapling

环境要求

  • Python 3.8+
  • pip 包管理器

快速安装

pip install scrapling

验证安装

from scrapling.fetchers import Fetcher

# 测试基本功能
p = Fetcher.fetch('https://example.com')
print(p.title)  # 输出页面标题

可选依赖

如果需要动态页面渲染(JavaScript 网站),需要安装 Playwright:

pip install playwright
playwright install chromium

快速上手:第一个爬虫

示例 1:简单页面抓取

让我们从一个简单的例子开始——抓取 Hacker News 的头条新闻。

from scrapling.fetchers import Fetcher

# 发起 HTTP 请求
page = Fetcher.fetch('https://news.ycombinator.com/')

# 使用 CSS 选择器提取数据
stories = page.css('.titleline > a')

for story in stories[:5]:  # 只取前 5 条
    title = story.text
    link = story.attrs.get('href', '')
    print(f"标题: {title}")
    print(f"链接: {link}")
    print("-" * 40)

输出示例:

标题: Show HN: I built a real-time code collaboration tool
链接: https://github.com/example/collab-tool
----------------------------------------
标题: Ask HN: What's your favorite Python library in 2026?
链接: https://news.ycombinator.com/item?id=123456
----------------------------------------

示例 2:自适应解析(Auto-save)

Scrapling 的核心特性是自适应解析。当你第一次抓取数据时,可以启用 auto_save=True,Scrapling 会学习页面结构并保存特征。当网站改版后,只需传入 adaptive=True,它就能自动找到目标元素。

from scrapling.fetchers import Fetcher

page = Fetcher.fetch('https://quotes.toscrape.com/')

# 第一次抓取:启用 auto_save
quotes = page.css('.quote', auto_save=True)

for quote in quotes[:3]:
    text = quote.css('.text::text').get()
    author = quote.css('.author::text').get()
    print(f"{text}{author}")

如果网站结构变化了:

# 后续抓取:传入 adaptive=True
page = Fetcher.fetch('https://quotes.toscrape.com/')
quotes = page.css('.quote', adaptive=True)  # 自动适应新结构!

for quote in quotes[:3]:
    text = quote.css('.text::text').get()
    author = quote.css('.author::text').get()
    print(f"{text}{author}")

💡 工作原理:Scrapling 会记录元素的多种特征(标签类型、附近文本、属性模式等),即使 CSS 类名改变,它也能通过其他特征重新定位元素。


进阶功能

1. 绕过 Cloudflare Turnstile

很多网站使用 Cloudflare Turnstile 或其他反爬系统。Scrapling 的 StealthyFetcher 可以自动处理这些防护。

from scrapling.fetchers import StealthyFetcher

# 启用自适应模式
StealthyFetcher.adaptive = True

# 自动绕过 Cloudflare
page = StealthyFetcher.fetch(
    'https://example-protected-site.com',
    headless=True,       # 无头浏览器模式
    network_idle=True    # 等待网络空闲
)

# 正常提取数据
products = page.css('.product-item')
for product in products:
    name = product.css('.name::text').get()
    price = product.css('.price::text').get()
    print(f"{name}: {price}")

关键参数说明: - headless=True:使用无头浏览器,模拟真实用户行为 - network_idle=True:等待所有网络完成后再提取(适合 SPA 应用) - adaptive=True:启用自适应解析

2. 动态页面渲染(Playwright)

对于需要 JavaScript 渲染的网站,使用 DynamicFetcher

from scrapling.fetchers import DynamicFetcher

page = DynamicFetcher.fetch(
    'https://spa-example.com',
    wait_for='.content-loaded',  # 等待特定元素出现
    timeout=30000                # 超时时间(毫秒)
)

# 提取动态加载的内容
articles = page.css('article')
for article in articles:
    title = article.css('h2::text').get()
    summary = article.css('.summary::text').get()
    print(f"{title}\n{summary}\n")

3. 异步并发抓取

使用 AsyncFetcher 实现高并发抓取。

import asyncio
from scrapling.fetchers import AsyncFetcher

async def fetch_multiple_pages():
    urls = [
        'https://example.com/page/1',
        'https://example.com/page/2',
        'https://example.com/page/3',
        'https://example.com/page/4',
        'https://example.com/page/5',
    ]

    # 并发发起请求
    pages = await AsyncFetcher.fetch_many(urls, concurrency=3)

    for url, page in zip(urls, pages):
        if page:
            title = page.title
            print(f"{url}: {title}")
        else:
            print(f"{url}: 请求失败")

asyncio.run(fetch_multiple_pages())

Spider 框架:大规模爬取

Scrapling 提供了类似 Scrapy 的 Spider 框架,支持大规模并发爬取。

基础 Spider

from scrapling.spiders import Spider, Response

class QuoteSpider(Spider):
    name = "quotes"
    start_urls = ["https://quotes.toscrape.com/"]

    async def parse(self, response: Response):
        # 提取当前页的名言
        for quote in response.css('.quote'):
            yield {
                "text": quote.css('.text::text').get(),
                "author": quote.css('.author::text').get(),
                "tags": quote.css('.tag::text').getall()
            }

        # 翻页
        next_page = response.css('.next a::attr(href)').get()
        if next_page:
            yield self.follow(next_page, callback=self.parse)

# 启动爬虫
QuoteSpider().start()

并发配置

class MultiPageSpider(Spider):
    name = "multi-page"
    start_urls = [f"https://example.com/page/{i}" for i in range(1, 101)]

    # 配置并发数
    custom_settings = {
        "concurrency": 5,           # 最大并发数
        "download_delay": 1,        # 下载延迟(秒)
        "robots_txt_obey": True,    # 遵守 robots.txt
    }

    async def parse(self, response: Response):
        title = response.css('h1::text').get()
        yield {"url": response.url, "title": title}

MultiPageSpider().start()

暂停与恢复

Scrapling 支持 checkpoint-based 持久化,按 Ctrl+C 优雅退出后,下次启动会自动恢复。

class LongRunningSpider(Spider):
    name = "long-crawl"
    start_urls = ["https://large-site.com/"]

    custom_settings = {
        "checkpoint_dir": "./checkpoints",  # 检查点目录
    }

    async def parse(self, response: Response):
        # 提取数据...
        yield {"data": "..."}

        # 继续爬取
        for link in response.css('a::attr(href)').getall():
            yield self.follow(link, callback=self.parse)

LongRunningSpider().start()

实战案例

from scrapling.fetchers import Fetcher

def scrape_github_trending():
    page = Fetcher.fetch('https://github.com/trending')

    repos = page.css('.Box-row')

    trending = []
    for repo in repos[:10]:
        name = repo.css('h2 a::text').get('').strip()
        description = repo.css('p.col-9::text').get('').strip()
        stars = repo.css('[href$=stargazers] span::text').get('').strip()
        language = repo.css('[itemprop=programmingLanguage]::text').get('').strip()

        trending.append({
            "name": name,
            "description": description,
            "stars": stars,
            "language": language
        })

    return trending

if __name__ == "__main__":
    results = scrape_github_trending()
    for repo in results:
        print(f"📦 {repo['name']}")
        print(f"   {repo['description'][:80]}...")
        print(f"   ⭐ {repo['stars']} | 📝 {repo['language']}")
        print()

案例 2:电商产品价格监控

from scrapling.fetchers import StealthyFetcher
import json
from datetime import datetime

def monitor_prices():
    urls = [
        "https://amazon.com/dp/B08N5WRWNW",
        "https://amazon.com/dp/B0BSHF7WHW",
        "https://amazon.com/dp/B09G9FPHY6",
    ]

    results = []

    for url in urls:
        page = StealthyFetcher.fetch(url, headless=True)

        title = page.css('#productTitle::text').get('').strip()
        price = page.css('.a-price .a-offscreen::text').get('').strip()

        results.append({
            "url": url,
            "title": title,
            "price": price,
            "timestamp": datetime.now().isoformat()
        })

        print(f"✅ {title[:50]}... - {price}")

    # 保存到 JSON
    with open('price_monitor.json', 'w', encoding='utf-8') as f:
        json.dump(results, f, ensure_ascii=False, indent=2)

    print(f"\n📊 已保存 {len(results)} 条价格数据到 price_monitor.json")

if __name__ == "__main__":
    monitor_prices()

案例 3:流式输出(实时处理)

对于长时间运行的爬虫,可以使用流式模式实时处理数据。

from scrapling.spiders import Spider, Response

class StreamingSpider(Spider):
    name = "streaming-demo"
    start_urls = ["https://quotes.toscrape.com/"]

    async def parse(self, response: Response):
        for quote in response.css('.quote'):
            item = {
                "text": quote.css('.text::text').get(),
                "author": quote.css('.author::text').get()
            }
            yield item  # 立即产出,无需等待全部完成

        next_page = response.css('.next a::attr(href)').get()
        if next_page:
            yield self.follow(next_page, callback=self.parse)

# 流式消费
async def main():
    spider = StreamingSpider()

    async for item in spider.stream():
        # 实时处理每个 item
        print(f"收到: {item['text'][:50]}... by {item['author']}")
        # 可以立即存入数据库、发送到消息队列等

import asyncio
asyncio.run(main())

高级技巧

1. 代理轮换

from scrapling.spiders import Spider, Response

class ProxySpider(Spider):
    name = "proxy-spider"
    start_urls = ["https://httpbin.org/ip"]

    custom_settings = {
        "proxy_list": [
            "http://proxy1.example.com:8080",
            "http://proxy2.example.com:8080",
            "http://proxy3.example.com:8080",
        ],
        "proxy_rotation": "per_request",  # 每个请求轮换代理
    }

    async def parse(self, response: Response):
        ip = response.json().get('origin')
        print(f"当前 IP: {ip}")

2. 自定义导出管道

from scrapling.spiders import Spider, Response
import csv

class CSVS pider(Spider):
    name = "csv-export"
    start_urls = ["https://quotes.toscrape.com/"]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.csv_file = open('quotes.csv', 'w', newline='', encoding='utf-8')
        self.writer = csv.writer(self.csv_file)
        self.writer.writerow(['Text', 'Author', 'Tags'])

    async def parse(self, response: Response):
        for quote in response.css('.quote'):
            text = quote.css('.text::text').get()
            author = quote.css('.author::text').get()
            tags = ', '.join(quote.css('.tag::text').getall())

            self.writer.writerow([text, author, tags])

        next_page = response.css('.next a::attr(href)').get()
        if next_page:
            yield self.follow(next_page, callback=self.parse)

    def close(self, reason):
        self.csv_file.close()
        print(f"✅ 数据已保存到 quotes.csv")

3. 开发模式(缓存响应)

在调试解析逻辑时,避免重复请求服务器。

from scrapling.spiders import Spider, Response

class DevSpider(Spider):
    name = "dev-mode"
    start_urls = ["https://example.com"]

    custom_settings = {
        "dev_mode": True,              # 启用开发模式
        "cache_dir": "./http_cache",   # 缓存目录
    }

    async def parse(self, response: Response):
        # 第一次运行:缓存响应到磁盘
        # 后续运行:直接从磁盘读取,无需网络请求
        title = response.css('h1::text').get()
        yield {"title": title}

常见问题

Q1: Scrapling 和 Scrapy 有什么区别?

A: Scrapling 可以看作是 Scrapy 的现代化增强版: - 自适应解析:Scrapy 的选择器在页面改版后会失效,Scrapling 能自动适应 - 内置反反爬:Scrapy 需要额外中间件才能绕过 Cloudflare,Scrapling 开箱即用 - 更简洁的 API:Scrapling 的 Fetcher API 更适合小规模快速抓取

如果你已经熟悉 Scrapy,迁移到 Scrapling 几乎没有学习成本。

Q2: 如何处理登录后的页面?

A: 使用 StealthyFetcherDynamicFetcher 模拟登录:

from scrapling.fetchers import DynamicFetcher

page = DynamicFetcher.fetch(
    'https://example.com/login',
    headless=True,
    wait_for='#dashboard'  # 等待登录后跳转
)

# 执行登录操作(通过 Playwright)
page.page.fill('#username', 'your_username')
page.page.fill('#password', 'your_password')
page.page.click('#login-button')
page.page.wait_for_selector('#dashboard')

# 现在可以抓取登录后的内容
data = page.css('.private-data::text').getall()

Q3: 如何限制爬取速度,避免被封?

A: 在 Spider 中配置下载延迟和并发限制:

class PoliteSpider(Spider):
    custom_settings = {
        "concurrency": 2,           # 降低并发数
        "download_delay": 2,        # 每个请求间隔 2 秒
        "robots_txt_obey": True,    # 遵守 robots.txt
    }

Q4: Scrapling 支持哪些选择器?

A: 支持 CSS 选择器和 XPath:

# CSS 选择器
page.css('.class-name::text').get()
page.css('#id-name').getall()

# XPath
page.xpath('//div[@class="example"]/text()').get()

总结

Scrapling 是 2026 年最值得关注的 Python 爬虫框架。它将自适应解析反反爬能力Scrapy-like API完美结合,让开发者可以用最少的代码实现最稳定的爬虫。

核心优势回顾: - ✅ 自适应解析器:页面改版后自动重新定位元素 - ✅ 内置反反爬:开箱即用绕过 Cloudflare Turnstile - ✅ 从单请求到全量爬取:同一套 API 覆盖所有场景 - ✅ 并发、暂停/恢复、流式输出:生产级特性一应俱全

资源链接: - GitHub 仓库 - 官方文档 - Discord 社区

如果你觉得这篇文章有帮助,欢迎分享给更多开发者!