Background
Have you encountered this frustration? You write a simple scraping script, but its running speed is maddeningly slow. What should be a small task of scraping a few thousand web pages ends up taking several hours. That's a terrible experience, isn't it?
I recently encountered this issue while refactoring an old scraping project. The project needed to scrape product data from an e-commerce platform, and the original synchronous code performed poorly when handling large numbers of requests. After research and practice, I discovered that asynchronous concurrent scraping was key to solving this type of problem. Today I'll share my insights from this process.
Pain Points
What problems exist with traditional synchronous scrapers? Let's look at a specific example. Suppose we need to scrape 1000 product pages, with an average response time of 0.5 seconds per page. Using a synchronous approach would take at least 500 seconds to complete. And that's with good network conditions - it would take even longer with network fluctuations.
Let's look at some traditional synchronous scraping code:
import requests
import time
def fetch_page(url):
response = requests.get(url)
return response.text
def main():
urls = [f"http://example.com/item/{i}" for i in range(1000)]
start_time = time.time()
for url in urls:
content = fetch_page(url)
# Process page content
end_time = time.time()
print(f"Total time: {end_time - start_time} seconds")
if __name__ == "__main__":
main()
Would you like me to explain or break down the code?
Breakthrough
So how can we improve this? The answer is to use asynchronous concurrent technology. The async/await syntax introduced after Python 3.5 provides us with an elegant asynchronous programming solution. We can use the aiohttp library to implement asynchronous HTTP requests.
Here's the code rewritten using an asynchronous approach:
import asyncio
import aiohttp
import time
from typing import List
async def fetch_page(session: aiohttp.ClientSession, url: str) -> str:
async with session.get(url) as response:
return await response.text()
async def main():
urls = [f"http://example.com/item/{i}" for i in range(1000)]
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
end_time = time.time()
print(f"Total time: {end_time - start_time} seconds")
if __name__ == "__main__":
asyncio.run(main())
Would you like me to explain or break down the code?
Deep Dive
The advantages of asynchronous concurrent scraping aren't just about speed. Through practical testing, I found it has clear advantages in resource utilization, error handling, and code maintainability.
Let's explore several key features of asynchronous scraping:
- Event Loop Mechanism
Python's asynchronous programming is built on the event loop. When a coroutine is waiting for an I/O operation, the event loop can switch to other coroutines to continue execution - this is the core reason why asynchronous programming is efficient.
Here's a more complete example:
import asyncio
import aiohttp
import time
from typing import List, Dict
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncCrawler:
def __init__(self, concurrency: int = 10):
self.concurrency = concurrency
self.semaphore = asyncio.Semaphore(concurrency)
self.session: aiohttp.ClientSession = None
self.results: List[Dict] = []
async def fetch_with_semaphore(self, url: str) -> Dict:
async with self.semaphore:
try:
async with self.session.get(url) as response:
if response.status == 200:
return {
'url': url,
'content': await response.text(),
'status': response.status
}
else:
logger.warning(f"Failed to fetch {url}, status: {response.status}")
return {'url': url, 'error': f'Status {response.status}'}
except Exception as e:
logger.error(f"Error fetching {url}: {str(e)}")
return {'url': url, 'error': str(e)}
async def crawl(self, urls: List[str]) -> List[Dict]:
async with aiohttp.ClientSession() as session:
self.session = session
tasks = [self.fetch_with_semaphore(url) for url in urls]
self.results = await asyncio.gather(*tasks)
return self.results
@staticmethod
async def main(urls: List[str], concurrency: int = 10):
crawler = AsyncCrawler(concurrency)
start_time = time.time()
results = await crawler.crawl(urls)
end_time = time.time()
success_count = len([r for r in results if 'error' not in r])
logger.info(f"Crawled {len(results)} urls in {end_time - start_time:.2f} seconds")
logger.info(f"Success: {success_count}, Failed: {len(results) - success_count}")
return results
Would you like me to explain or break down the code?
- Concurrency Control
In practical applications, we need to control the number of concurrent operations to avoid overwhelming the target server. This can be elegantly implemented using asyncio.Semaphore. In my projects, I typically set the concurrency between 10-20, with the specific value depending on the target website's capacity.
- Error Handling
Error handling requires special attention in asynchronous programming. We need to ensure that the failure of a single request doesn't affect the entire scraper's operation. Through the use of try/except and appropriate logging, we can build more robust scraping systems.
- Performance Optimization
In practice, I found the following points helpful in improving asynchronous scraper performance:
- Using connection pools to reuse HTTP connections
- Implementing request retry mechanisms
- Setting appropriate timeout values
- Using compressed transfers
Here's an optimized example:
import asyncio
import aiohttp
import time
from typing import List, Dict
import logging
from aiohttp import TCPConnector
from tenacity import retry, stop_after_attempt, wait_exponential
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OptimizedAsyncCrawler:
def __init__(self, concurrency: int = 10):
self.concurrency = concurrency
self.semaphore = asyncio.Semaphore(concurrency)
self.session: aiohttp.ClientSession = None
self.results: List[Dict] = []
self.connector = TCPConnector(limit=100, ttl_dns_cache=300)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def fetch_with_retry(self, url: str) -> Dict:
timeout = aiohttp.ClientTimeout(total=30)
async with self.session.get(url, timeout=timeout, compress=True) as response:
if response.status == 200:
return {
'url': url,
'content': await response.text(),
'status': response.status
}
else:
raise aiohttp.ClientError(f"Status {response.status}")
async def fetch_with_semaphore(self, url: str) -> Dict:
async with self.semaphore:
try:
return await self.fetch_with_retry(url)
except Exception as e:
logger.error(f"Error fetching {url}: {str(e)}")
return {'url': url, 'error': str(e)}
async def crawl(self, urls: List[str]) -> List[Dict]:
session_timeout = aiohttp.ClientTimeout(total=3600)
async with aiohttp.ClientSession(
connector=self.connector,
timeout=session_timeout,
headers={'Accept-Encoding': 'gzip, deflate'}
) as session:
self.session = session
tasks = [self.fetch_with_semaphore(url) for url in urls]
self.results = await asyncio.gather(*tasks)
return self.results
Would you like me to explain or break down the code?
Practice
In real projects, we need to consider many details. For example:
- Data Persistence
When dealing with large amounts of scraped data, we need to save it to a database in a timely manner. I recommend using asynchronous database drivers like aiopg (PostgreSQL) or aiomysql (MySQL).
- Proxy Pool Management
To avoid IP bans, we usually need to maintain a proxy pool. Special attention needs to be paid to proxy usage in asynchronous scrapers:
import aiohttp
import asyncio
from typing import List, Dict
import random
class ProxyPool:
def __init__(self, proxies: List[str]):
self.proxies = proxies
def get_random_proxy(self) -> str:
return random.choice(self.proxies)
class ProxiedAsyncCrawler:
def __init__(self, proxy_pool: ProxyPool, concurrency: int = 10):
self.proxy_pool = proxy_pool
self.concurrency = concurrency
self.semaphore = asyncio.Semaphore(concurrency)
async def fetch_with_proxy(self, url: str) -> Dict:
proxy = self.proxy_pool.get_random_proxy()
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, proxy=proxy) as response:
return {
'url': url,
'content': await response.text(),
'proxy': proxy
}
except Exception as e:
return {'url': url, 'error': str(e), 'proxy': proxy}
Would you like me to explain or break down the code?
- Queue Management
For large-scale scraping tasks, using a queue to manage URLs to be scraped is essential:
import asyncio
from typing import List, Dict
import aiohttp
from asyncio import Queue
class QueuedCrawler:
def __init__(self, concurrency: int = 10):
self.concurrency = concurrency
self.queue: Queue = Queue()
self.results: List[Dict] = []
async def producer(self, urls: List[str]):
for url in urls:
await self.queue.put(url)
async def consumer(self):
while True:
try:
url = await self.queue.get()
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
result = {
'url': url,
'content': await response.text()
}
self.results.append(result)
self.queue.task_done()
except Exception as e:
print(f"Error processing {url}: {str(e)}")
self.queue.task_done()
async def crawl(self, urls: List[str]):
# Start producer
producer = asyncio.create_task(self.producer(urls))
# Start consumers
consumers = [
asyncio.create_task(self.consumer())
for _ in range(self.concurrency)
]
# Wait for producer to finish
await producer
# Wait for queue to be empty
await self.queue.join()
# Cancel consumers
for consumer in consumers:
consumer.cancel()
return self.results
Would you like me to explain or break down the code?
Reflections
Through this period of practice, I've gained a deeper understanding of asynchronous scraping. It's not just a tool for improving efficiency, but a transformation in programming thinking. When designing scraping systems, we need to balance efficiency and friendliness, finding the sweet spot between speed and stability.
While asynchronous concurrent scraping is powerful, it's not a silver bullet. In some scenarios, such as scraping tasks requiring lots of CPU-intensive operations, using multiprocessing might be a better choice. My suggestion is to choose the appropriate technical solution based on specific requirements.
Finally, I'd like to share some experience summaries from using asynchronous scrapers:
- Reasonably control concurrency numbers, don't sacrifice target website stability for temporary speed
- Implement good error handling and logging for easy problem location and system maintenance
- Pay attention to timely resource release, especially important resources like database connections
- Regularly check and update the proxy pool to ensure stable scraper operation
- Follow website robots.txt rules - be an ethical scraping engineer
What areas do you think asynchronous scraping can be improved in? Feel free to share your thoughts and experiences in the comments.