Data Processing
cleaning, validation, and transformation
Objective
In this lab, we focus on data processing, which is the foundation of any MLOps workflow.
By the end of this lab, you will be able to:
- [Part B] Perform data cleaning and transformation to produce a model-ready dataset
- [Part C] Perform EDA to understand data distributions and relationships
This section intentionally stops before model training. The processed output will later be used for:
- Supervised learning (Linear Regression)
- Unsupervised learning (K-Means clustering)
Part B - Data Transformation
What you need
- An Azure ML workspace with a running compute instance
- Data ingested into Azure Blob Storage (from Part A)
- Connection string stored securely in Azure Key Vault
Always stop your Azure ML compute instance when not in use to avoid unnecessary credit consumption.
Step 1 β Install Dependencies
!/anaconda/envs/azureml_py310_sdkv2/bin/pip install azure-keyvault-secrets azure-identity azure-storage-blob pandas pyarrow seabornAfter installing new packages, restart the Jupyter kernel before proceeding. Some packages (like azure-keyvault-secrets) wonβt be available until the kernel is restarted.
Step 2 β Securely Retrieve Connection String from Key Vault
Instead of hardcoding credentials, we retrieve the Blob Storage connection string from Azure Key Vault. This keeps secrets out of your codebase and safe for version control.
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from azure.storage.blob import BlobServiceClient
from pathlib import Path
import pandas as pd
# Retrieve connection string securely from Key Vault
credential = DefaultAzureCredential()
vault_url = "https://<your-keyvault-name>.vault.azure.net/"
secret_client = SecretClient(vault_url=vault_url, credential=credential)
CONNECTION_STRING = secret_client.get_secret("blob-connection-string").value
print("Connection string retrieved securely from Key Vault!")Make sure your user has Get, List, and Set permissions under the Key Vaultβs Access Policies. Without this, the notebook will throw a Forbidden error.
Step 3 β Download Raw Data from Blob Storage
We pull the raw parquet files from the nyc-tlc-raw Blob container into the compute instance.
CONTAINER_NAME = "nyc-tlc-raw"
LOCAL_DIR = Path("data_raw")
LOCAL_DIR.mkdir(exist_ok=True)
# Specify the monthly files to download
files = [
"yellow_tripdata_2025-09.parquet",
"yellow_tripdata_2025-10.parquet",
"yellow_tripdata_2025-11.parquet"
]
blob_service = BlobServiceClient.from_connection_string(CONNECTION_STRING)
for f in files:
blob_client = blob_service.get_blob_client(container=CONTAINER_NAME, blob=f)
local_path = LOCAL_DIR / f
with open(local_path, "wb") as fh:
fh.write(blob_client.download_blob().readall())
print(f"Downloaded: {f}")
print("All files downloaded!")Step 4 β Load and Combine Data
We load all three monthly files and combine them into a single DataFrame for unified processing.
# Load all parquet files and combine into one DataFrame
df = pd.concat(
[pd.read_parquet(LOCAL_DIR / f) for f in files],
ignore_index=True
)
print("Shape:", df.shape)
print("Columns:", df.columns.tolist())
df.head()Expected output: 12,861,158 rows Γ 20 columns
Step 5 β Initial Exploration
Before cleaning, we inspect the data to understand what weβre working with.
# Explore data types, null counts, duplicates, and summary stats
print("Data types:\n", df.dtypes)
print("\nNull counts:\n", df.isnull().sum())
print("\nDuplicates:", df.duplicated().sum())
df.describe()From the initial exploration:
- ~3 million nulls in
passenger_count,RatecodeID,store_and_fwd_flag,congestion_surcharge, andAirport_fee - Datetime columns are properly typed as
datetime64 - Some numeric columns have suspicious min/max values (negative fares, zero distances)
Step 6 β Drop Unnecessary Columns
We remove columns that arenβt needed for downstream analysis or modeling.
# Drop columns not needed for analysis
columns_to_drop = ['store_and_fwd_flag', 'RatecodeID', 'congestion_surcharge',
'Airport_fee', 'improvement_surcharge', 'mta_tax',
'extra', 'tolls_amount', 'cbd_congestion_fee']
df = df.drop(columns=columns_to_drop)
print("Remaining columns:", df.columns.tolist())
print("Shape after drop:", df.shape)Result: 20 columns β 11 columns
Step 7 β Handle Null Values
Rather than dropping ~3 million rows with null passenger_count, we fill with the median value to preserve data volume.
# Fill null passenger_count with median instead of dropping rows
print("Nulls before fill:", df['passenger_count'].isnull().sum())
df['passenger_count'] = df['passenger_count'].fillna(df['passenger_count'].median())
print("Nulls after fill:", df['passenger_count'].isnull().sum())
print("Shape:", df.shape)We use the median instead of the mean because passenger count is a discrete value and the median (1.0) better represents a typical taxi ride. Dropping 3M rows would unnecessarily reduce our dataset by ~24%.
Step 8 β Remove Invalid and Outlier Rows
We filter out rows that are clearly erroneous or represent extreme outliers.
# Remove rows with zero/negative values and extreme outliers
df = df[df['fare_amount'] > 0]
df = df[df['trip_distance'] > 0]
df = df[df['passenger_count'] > 0]
df = df[df['fare_amount'] < 500]
df = df[df['trip_distance'] < 100]
df = df[df['passenger_count'] <= 6]
df = df[df['total_amount'] > 0]
print("Shape after removing outliers:", df.shape)| Filter | Rationale |
|---|---|
fare_amount > 0 |
Negative/zero fares are data entry errors |
trip_distance > 0 |
Zero distance means the trip didnβt happen |
passenger_count > 0 |
A trip with 0 passengers is invalid |
fare_amount < 500 |
Fares above $500 are almost certainly errors |
trip_distance < 100 |
Trips over 100 miles are extreme outliers |
passenger_count <= 6 |
Standard taxis seat a maximum of 6 passengers |
total_amount > 0 |
Negative totals indicate refunds or errors |
Step 9 β Feature Engineering
We create new columns derived from existing data to enrich the dataset for modeling.
# Create trip duration and time-based features
df['trip_duration_min'] = (
df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
).dt.total_seconds() / 60
# Remove nonsensical durations (less than 1 min or more than 3 hours)
df = df[(df['trip_duration_min'] > 1) & (df['trip_duration_min'] < 180)]
# Extract time-based features
df['pickup_hour'] = df['tpep_pickup_datetime'].dt.hour
df['pickup_dayofweek'] = df['tpep_pickup_datetime'].dt.dayofweek
print("Shape after feature engineering:", df.shape)
df.head()| New Column | Description |
|---|---|
trip_duration_min |
Trip length in minutes, computed from pickup and dropoff timestamps |
pickup_hour |
Hour of the day (0β23) when the trip started |
pickup_dayofweek |
Day of the week (0 = Monday, 6 = Sunday) |
Step 10 β Verify Cleaned Data
A final sanity check to confirm the dataset is clean and ready.
# Final check on cleaned dataset
print("Final shape:", df.shape)
print("\nNull counts:\n", df.isnull().sum())
df.describe()Expected output: 11,320,592 rows Γ 14 columns, zero nulls
Step 11 β Save Cleaned Data
We save the processed dataset as a parquet file for reuse in future modeling labs.
# Save cleaned data as parquet for future use
output_path = Path("data_cleaned")
output_path.mkdir(exist_ok=True)
df.to_parquet(output_path / "yellow_tripdata_cleaned.parquet", index=False)
print("Saved cleaned data to:", output_path / "yellow_tripdata_cleaned.parquet")
print("File size: {:.2f} MB".format(
(output_path / "yellow_tripdata_cleaned.parquet").stat().st_size / 1e6
))Transformation Summary
| Stage | Rows | Columns | Action |
|---|---|---|---|
| Raw data loaded | 12,861,158 | 20 | Combined 3 monthly parquet files |
| Drop columns | 12,861,158 | 11 | Removed 9 unused columns |
| Fill nulls | 12,861,158 | 11 | Filled passenger_count nulls with median |
| Remove outliers | 11,546,705 | 11 | Filtered invalid/extreme rows |
| Feature engineering | 11,320,592 | 14 | Added duration, hour, day of week |
Part C - Exploratory Data Analysis (EDA)
Now that the data is clean, we perform EDA to understand patterns and distributions before modeling.
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("whitegrid")Fare Amount Distribution
# Distribution of fare amounts
fig, ax = plt.subplots(figsize=(10, 5))
df['fare_amount'].clip(upper=100).hist(bins=50, ax=ax, edgecolor='black')
ax.set_title('Distribution of Fare Amount (capped at $100)')
ax.set_xlabel('Fare Amount ($)')
ax.set_ylabel('Frequency')
plt.tight_layout()
plt.show()The distribution is right-skewed with most fares concentrated between $5β$20. There is a noticeable spike around $70, which corresponds to the JFK airport flat-rate fare.
Trip Distance Distribution
# Distribution of trip distances
fig, ax = plt.subplots(figsize=(10, 5))
df['trip_distance'].clip(upper=30).hist(bins=50, ax=ax, edgecolor='black')
ax.set_title('Distribution of Trip Distance (capped at 30 miles)')
ax.set_xlabel('Trip Distance (miles)')
ax.set_ylabel('Frequency')
plt.tight_layout()
plt.show()The majority of trips are under 5 miles, typical of short urban rides. A small bump around 17β20 miles corresponds to airport trips (JFK/EWR).
Trips by Hour of Day
# Number of trips by hour of day
fig, ax = plt.subplots(figsize=(10, 5))
df['pickup_hour'].value_counts().sort_index().plot(kind='bar', ax=ax, color='steelblue')
ax.set_title('Number of Trips by Hour of Day')
ax.set_xlabel('Hour')
ax.set_ylabel('Trip Count')
plt.tight_layout()
plt.show()Trip volume peaks at 6 PM (evening rush hour) and is lowest between 4β5 AM. Demand rises steadily from 7 AM through the afternoon.
Correlation Heatmap
# Correlation heatmap of numeric columns
numeric_cols = ['trip_distance', 'fare_amount', 'tip_amount',
'total_amount', 'trip_duration_min', 'passenger_count']
fig, ax = plt.subplots(figsize=(8, 6))
sns.heatmap(df[numeric_cols].corr(), annot=True, cmap='coolwarm',
fmt='.2f', ax=ax, vmin=-1, vmax=1)
ax.set_title('Correlation Heatmap')
plt.tight_layout()
plt.show()- trip_distance βοΈ fare_amount: 0.90 β very strong positive correlation, as expected
- trip_duration_min βοΈ fare_amount: 0.79 β longer trips cost more
- fare_amount βοΈ total_amount: 0.97 β fare is the dominant component of total
- passenger_count: Near-zero correlation with all other features β does not meaningfully affect fare
Fare vs Trip Distance
# Scatter plot of fare vs distance (sampled for performance)
sample = df.sample(n=10000, random_state=42)
fig, ax = plt.subplots(figsize=(10, 6))
ax.scatter(sample['trip_distance'], sample['fare_amount'], alpha=0.3, s=5)
ax.set_title('Fare Amount vs Trip Distance (10K sample)')
ax.set_xlabel('Trip Distance (miles)')
ax.set_ylabel('Fare Amount ($)')
ax.set_xlim(0, 40)
ax.set_ylim(0, 150)
plt.tight_layout()
plt.show()A clear linear relationship between distance and fare is visible. The horizontal band at ~$70 for trips between 15β20 miles represents the JFK airport flat-rate fare. We sample 10,000 points for visualization performance.
Average Fare by Hour of Day
# Average fare by hour of day
fig, ax = plt.subplots(figsize=(10, 5))
df.groupby('pickup_hour')['fare_amount'].mean().plot(kind='line', ax=ax,
marker='o', color='green')
ax.set_title('Average Fare by Hour of Day')
ax.set_xlabel('Hour')
ax.set_ylabel('Average Fare ($)')
plt.tight_layout()
plt.show()The highest average fares occur at 5 AM (~$25), likely driven by longer early-morning airport trips. Fares dip to their lowest around 2β3 AM, then remain relatively stable during daytime hours ($19β$22).
EDA Summary
| Insight | Finding |
|---|---|
| Fare distribution | Right-skewed, most fares between $5β$20, spike at $70 (airport flat rate) |
| Trip distance | Majority under 5 miles, bump at 17β20 miles (airport trips) |
| Peak hours | Highest volume at 6 PM, lowest at 4β5 AM |
| Highest avg fare | 5 AM (~$25) β early morning airport trips |
| Strongest predictor | Trip distance (0.90 correlation with fare) |
| Passenger count | Near-zero correlation β minimal impact on fare |
The cleaned and explored dataset is now ready for:
- Supervised learning β Predicting fare amount using Linear Regression
- Unsupervised learning β Identifying taxi demand clusters using K-Means