320 lines
10 KiB
Python
320 lines
10 KiB
Python
|
|
import pandas as pd
|
||
|
|
import requests
|
||
|
|
import time
|
||
|
|
import json
|
||
|
|
from typing import Optional, Dict, Any
|
||
|
|
import logging
|
||
|
|
|
||
|
|
# Set up logging
|
||
|
|
logging.basicConfig(level=logging.INFO)
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
class PostalCodeFetcher:
|
||
|
|
"""Fetch postal codes for Canadian addresses using various geocoding services"""
|
||
|
|
|
||
|
|
def __init__(self, api_key: Optional[str] = None, service: str = 'nominatim'):
|
||
|
|
"""
|
||
|
|
Initialize the postal code fetcher
|
||
|
|
|
||
|
|
Args:
|
||
|
|
api_key: API key for paid services (Google, MapBox, etc.)
|
||
|
|
service: Geocoding service to use ('nominatim', 'google', 'mapbox')
|
||
|
|
"""
|
||
|
|
self.api_key = api_key
|
||
|
|
self.service = service.lower()
|
||
|
|
self.session = requests.Session()
|
||
|
|
self.rate_limit_delay = 1.0 # seconds between requests
|
||
|
|
|
||
|
|
def format_address(self, row: Dict[str, Any]) -> str:
|
||
|
|
"""
|
||
|
|
Format address from CSV row data
|
||
|
|
|
||
|
|
Args:
|
||
|
|
row: Dictionary containing address components
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Formatted address string
|
||
|
|
"""
|
||
|
|
# Extract components
|
||
|
|
house_num = str(row.get('HOUSE_NUMBER', '')).strip()
|
||
|
|
house_alpha = str(row.get('HOUSE_ALPHA', '')).strip()
|
||
|
|
street_name = str(row.get('STREET_NAME', '')).strip()
|
||
|
|
street_type = str(row.get('STREET_TYPE', '')).strip()
|
||
|
|
street_quad = str(row.get('STREET_QUAD', '')).strip()
|
||
|
|
|
||
|
|
# Build address
|
||
|
|
address_parts = []
|
||
|
|
|
||
|
|
# House number and alpha
|
||
|
|
house_part = house_num
|
||
|
|
if house_alpha and house_alpha != 'nan':
|
||
|
|
house_part += house_alpha
|
||
|
|
if house_part:
|
||
|
|
address_parts.append(house_part)
|
||
|
|
|
||
|
|
# Street name and type
|
||
|
|
street_part = street_name
|
||
|
|
if street_type and street_type != 'nan' and street_type.upper() not in street_name.upper():
|
||
|
|
street_part += f" {street_type}"
|
||
|
|
if street_part:
|
||
|
|
address_parts.append(street_part)
|
||
|
|
|
||
|
|
# Quadrant
|
||
|
|
if street_quad and street_quad != 'nan':
|
||
|
|
address_parts.append(street_quad)
|
||
|
|
|
||
|
|
# Add city and province
|
||
|
|
address_parts.extend(["Calgary", "AB", "Canada"])
|
||
|
|
|
||
|
|
return ", ".join(address_parts)
|
||
|
|
|
||
|
|
def get_postal_code_nominatim(self, address: str, lat: float = None, lon: float = None) -> Optional[str]:
|
||
|
|
"""
|
||
|
|
Get postal code using OpenStreetMap Nominatim (free)
|
||
|
|
|
||
|
|
Args:
|
||
|
|
address: Full address string
|
||
|
|
lat: Latitude (optional, for reverse geocoding)
|
||
|
|
lon: Longitude (optional, for reverse geocoding)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Postal code if found, None otherwise
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
# Try reverse geocoding first if coordinates available
|
||
|
|
if lat and lon:
|
||
|
|
url = "https://nominatim.openstreetmap.org/reverse"
|
||
|
|
params = {
|
||
|
|
'format': 'json',
|
||
|
|
'lat': lat,
|
||
|
|
'lon': lon,
|
||
|
|
'zoom': 18,
|
||
|
|
'addressdetails': 1
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
# Forward geocoding
|
||
|
|
url = "https://nominatim.openstreetmap.org/search"
|
||
|
|
params = {
|
||
|
|
'q': address,
|
||
|
|
'format': 'json',
|
||
|
|
'addressdetails': 1,
|
||
|
|
'limit': 1,
|
||
|
|
'countrycodes': 'ca'
|
||
|
|
}
|
||
|
|
|
||
|
|
headers = {
|
||
|
|
'User-Agent': 'PostalCodeFetcher/1.0 (your-email@domain.com)'
|
||
|
|
}
|
||
|
|
|
||
|
|
response = self.session.get(url, params=params, headers=headers, timeout=10)
|
||
|
|
response.raise_for_status()
|
||
|
|
|
||
|
|
data = response.json()
|
||
|
|
if not data:
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Extract postal code
|
||
|
|
if isinstance(data, list):
|
||
|
|
result = data[0] if data else {}
|
||
|
|
else:
|
||
|
|
result = data
|
||
|
|
|
||
|
|
address_details = result.get('address', {})
|
||
|
|
postal_code = address_details.get('postcode')
|
||
|
|
|
||
|
|
return postal_code
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Nominatim error for {address}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_postal_code_google(self, address: str) -> Optional[str]:
|
||
|
|
"""
|
||
|
|
Get postal code using Google Geocoding API (requires API key)
|
||
|
|
|
||
|
|
Args:
|
||
|
|
address: Full address string
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Postal code if found, None otherwise
|
||
|
|
"""
|
||
|
|
if not self.api_key:
|
||
|
|
logger.error("Google API key required")
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
url = "https://maps.googleapis.com/maps/api/geocode/json"
|
||
|
|
params = {
|
||
|
|
'address': address,
|
||
|
|
'key': self.api_key,
|
||
|
|
'region': 'ca'
|
||
|
|
}
|
||
|
|
|
||
|
|
response = self.session.get(url, params=params, timeout=10)
|
||
|
|
response.raise_for_status()
|
||
|
|
|
||
|
|
data = response.json()
|
||
|
|
if data.get('status') != 'OK' or not data.get('results'):
|
||
|
|
return None
|
||
|
|
|
||
|
|
result = data['results'][0]
|
||
|
|
|
||
|
|
# Extract postal code from address components
|
||
|
|
for component in result.get('address_components', []):
|
||
|
|
if 'postal_code' in component.get('types', []):
|
||
|
|
return component.get('long_name')
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Google API error for {address}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_postal_code(self, row: Dict[str, Any]) -> Optional[str]:
|
||
|
|
"""
|
||
|
|
Get postal code for a single address row
|
||
|
|
|
||
|
|
Args:
|
||
|
|
row: Dictionary containing address data
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Postal code if found, None otherwise
|
||
|
|
"""
|
||
|
|
# Format the address
|
||
|
|
address = self.format_address(row)
|
||
|
|
logger.info(f"Fetching postal code for: {address}")
|
||
|
|
|
||
|
|
# Extract coordinates if available
|
||
|
|
lat = row.get('latitude')
|
||
|
|
lon = row.get('longitude')
|
||
|
|
|
||
|
|
# Convert to float if they're strings
|
||
|
|
try:
|
||
|
|
if lat and str(lat) != 'nan':
|
||
|
|
lat = float(lat)
|
||
|
|
else:
|
||
|
|
lat = None
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
lat = None
|
||
|
|
|
||
|
|
try:
|
||
|
|
if lon and str(lon) != 'nan':
|
||
|
|
lon = float(lon)
|
||
|
|
else:
|
||
|
|
lon = None
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
lon = None
|
||
|
|
|
||
|
|
postal_code = None
|
||
|
|
|
||
|
|
# Try different services
|
||
|
|
if self.service == 'nominatim':
|
||
|
|
postal_code = self.get_postal_code_nominatim(address, lat, lon)
|
||
|
|
elif self.service == 'google':
|
||
|
|
postal_code = self.get_postal_code_google(address)
|
||
|
|
|
||
|
|
# Rate limiting
|
||
|
|
time.sleep(self.rate_limit_delay)
|
||
|
|
|
||
|
|
return postal_code
|
||
|
|
|
||
|
|
def process_csv(self, csv_file_path: str, output_file_path: str = None) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Process CSV file and add postal codes
|
||
|
|
|
||
|
|
Args:
|
||
|
|
csv_file_path: Path to input CSV file
|
||
|
|
output_file_path: Path to output CSV file (optional)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
DataFrame with postal codes added
|
||
|
|
"""
|
||
|
|
logger.info(f"Processing CSV file: {csv_file_path}")
|
||
|
|
|
||
|
|
# Read CSV
|
||
|
|
df = pd.read_csv(csv_file_path)
|
||
|
|
logger.info(f"Loaded {len(df)} addresses")
|
||
|
|
|
||
|
|
# Add postal code column
|
||
|
|
df['POSTAL_CODE'] = None
|
||
|
|
|
||
|
|
# Process each row
|
||
|
|
for index, row in df.iterrows():
|
||
|
|
try:
|
||
|
|
postal_code = self.get_postal_code(row.to_dict())
|
||
|
|
df.at[index, 'POSTAL_CODE'] = postal_code
|
||
|
|
|
||
|
|
if postal_code:
|
||
|
|
logger.info(f"Row {index + 1}: Found postal code {postal_code}")
|
||
|
|
else:
|
||
|
|
logger.warning(f"Row {index + 1}: No postal code found")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error processing row {index + 1}: {e}")
|
||
|
|
df.at[index, 'POSTAL_CODE'] = None
|
||
|
|
|
||
|
|
# Save results
|
||
|
|
if output_file_path:
|
||
|
|
df.to_csv(output_file_path, index=False)
|
||
|
|
logger.info(f"Results saved to: {output_file_path}")
|
||
|
|
|
||
|
|
# Summary
|
||
|
|
found_count = df['POSTAL_CODE'].notna().sum()
|
||
|
|
logger.info(f"Successfully found postal codes for {found_count}/{len(df)} addresses")
|
||
|
|
|
||
|
|
return df
|
||
|
|
|
||
|
|
# Example usage functions
|
||
|
|
def fetch_postal_codes_free(csv_file_path: str, output_file_path: str = None) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Fetch postal codes using free Nominatim service
|
||
|
|
|
||
|
|
Args:
|
||
|
|
csv_file_path: Path to CSV file with address data
|
||
|
|
output_file_path: Optional output file path
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
DataFrame with postal codes
|
||
|
|
"""
|
||
|
|
fetcher = PostalCodeFetcher(service='nominatim')
|
||
|
|
return fetcher.process_csv(csv_file_path, output_file_path)
|
||
|
|
|
||
|
|
def fetch_postal_codes_google(csv_file_path: str, api_key: str, output_file_path: str = None) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Fetch postal codes using Google Geocoding API
|
||
|
|
|
||
|
|
Args:
|
||
|
|
csv_file_path: Path to CSV file with address data
|
||
|
|
api_key: Google API key
|
||
|
|
output_file_path: Optional output file path
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
DataFrame with postal codes
|
||
|
|
"""
|
||
|
|
fetcher = PostalCodeFetcher(api_key=api_key, service='google')
|
||
|
|
return fetcher.process_csv(csv_file_path, output_file_path)
|
||
|
|
|
||
|
|
# Sample usage
|
||
|
|
if __name__ == "__main__":
|
||
|
|
# Example 1: Using free Nominatim service
|
||
|
|
df = fetch_postal_codes_free('./Address.csv', 'addresses_with_postal_codes.csv')
|
||
|
|
|
||
|
|
# Example 2: Using Google API (requires API key)
|
||
|
|
# df = fetch_postal_codes_google('addresses.csv', 'YOUR_GOOGLE_API_KEY', 'addresses_with_postal_codes.csv')
|
||
|
|
|
||
|
|
# Example 3: Manual usage
|
||
|
|
#fetcher = PostalCodeFetcher(service='nominatim')
|
||
|
|
|
||
|
|
# Test with sample data
|
||
|
|
#sample_row = {
|
||
|
|
# 'HOUSE_NUMBER': '531',
|
||
|
|
# 'STREET_NAME': 'NORTHMOUNT',
|
||
|
|
# 'STREET_TYPE': 'DR',
|
||
|
|
# 'STREET_QUAD': 'NW',
|
||
|
|
# 'latitude': 51.0893695,
|
||
|
|
# 'longitude': -114.08514
|
||
|
|
#}
|
||
|
|
|
||
|
|
#postal_code = fetcher.get_postal_code(sample_row)
|
||
|
|
#print(f"Postal code: {postal_code}")
|