Data Processing

cleaning, validation, and transformation

Objective

In this lab, we focus on data processing, which is the foundation of any MLOps workflow.

By the end of this lab, you will be able to:

  • [Part B] Perform data cleaning and transformation to produce a model-ready dataset
  • [Part C] Perform EDA to understand data distributions and relationships
NoteScope

This section intentionally stops before model training. The processed output will later be used for:

  • Supervised learning (Linear Regression)
  • Unsupervised learning (K-Means clustering)

Part B - Data Transformation

What you need

  • An Azure ML workspace with a running compute instance
  • Data ingested into Azure Blob Storage (from Part A)
  • Connection string stored securely in Azure Key Vault
WarningAzure Credits

Always stop your Azure ML compute instance when not in use to avoid unnecessary credit consumption.


Step 1 β€” Install Dependencies

!/anaconda/envs/azureml_py310_sdkv2/bin/pip install azure-keyvault-secrets azure-identity azure-storage-blob pandas pyarrow seaborn
TipKernel Restart

After installing new packages, restart the Jupyter kernel before proceeding. Some packages (like azure-keyvault-secrets) won’t be available until the kernel is restarted.


Step 2 β€” Securely Retrieve Connection String from Key Vault

Instead of hardcoding credentials, we retrieve the Blob Storage connection string from Azure Key Vault. This keeps secrets out of your codebase and safe for version control.

from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from azure.storage.blob import BlobServiceClient
from pathlib import Path
import pandas as pd

# Retrieve connection string securely from Key Vault
credential = DefaultAzureCredential()
vault_url = "https://<your-keyvault-name>.vault.azure.net/"
secret_client = SecretClient(vault_url=vault_url, credential=credential)
CONNECTION_STRING = secret_client.get_secret("blob-connection-string").value

print("Connection string retrieved securely from Key Vault!")
ImportantKey Vault Access

Make sure your user has Get, List, and Set permissions under the Key Vault’s Access Policies. Without this, the notebook will throw a Forbidden error.


Step 3 β€” Download Raw Data from Blob Storage

We pull the raw parquet files from the nyc-tlc-raw Blob container into the compute instance.

CONTAINER_NAME = "nyc-tlc-raw"

LOCAL_DIR = Path("data_raw")
LOCAL_DIR.mkdir(exist_ok=True)

# Specify the monthly files to download
files = [
    "yellow_tripdata_2025-09.parquet",
    "yellow_tripdata_2025-10.parquet",
    "yellow_tripdata_2025-11.parquet"
]

blob_service = BlobServiceClient.from_connection_string(CONNECTION_STRING)

for f in files:
    blob_client = blob_service.get_blob_client(container=CONTAINER_NAME, blob=f)
    local_path = LOCAL_DIR / f
    with open(local_path, "wb") as fh:
        fh.write(blob_client.download_blob().readall())
    print(f"Downloaded: {f}")

print("All files downloaded!")

Step 4 β€” Load and Combine Data

We load all three monthly files and combine them into a single DataFrame for unified processing.

# Load all parquet files and combine into one DataFrame
df = pd.concat(
    [pd.read_parquet(LOCAL_DIR / f) for f in files],
    ignore_index=True
)

print("Shape:", df.shape)
print("Columns:", df.columns.tolist())
df.head()

Expected output: 12,861,158 rows Γ— 20 columns


Step 5 β€” Initial Exploration

Before cleaning, we inspect the data to understand what we’re working with.

# Explore data types, null counts, duplicates, and summary stats
print("Data types:\n", df.dtypes)
print("\nNull counts:\n", df.isnull().sum())
print("\nDuplicates:", df.duplicated().sum())
df.describe()
NoteKey Findings

From the initial exploration:

  • ~3 million nulls in passenger_count, RatecodeID, store_and_fwd_flag, congestion_surcharge, and Airport_fee
  • Datetime columns are properly typed as datetime64
  • Some numeric columns have suspicious min/max values (negative fares, zero distances)

Step 6 β€” Drop Unnecessary Columns

We remove columns that aren’t needed for downstream analysis or modeling.

# Drop columns not needed for analysis
columns_to_drop = ['store_and_fwd_flag', 'RatecodeID', 'congestion_surcharge',
                   'Airport_fee', 'improvement_surcharge', 'mta_tax',
                   'extra', 'tolls_amount', 'cbd_congestion_fee']

df = df.drop(columns=columns_to_drop)
print("Remaining columns:", df.columns.tolist())
print("Shape after drop:", df.shape)

Result: 20 columns β†’ 11 columns


Step 7 β€” Handle Null Values

Rather than dropping ~3 million rows with null passenger_count, we fill with the median value to preserve data volume.

# Fill null passenger_count with median instead of dropping rows
print("Nulls before fill:", df['passenger_count'].isnull().sum())
df['passenger_count'] = df['passenger_count'].fillna(df['passenger_count'].median())
print("Nulls after fill:", df['passenger_count'].isnull().sum())
print("Shape:", df.shape)
TipWhy Median?

We use the median instead of the mean because passenger count is a discrete value and the median (1.0) better represents a typical taxi ride. Dropping 3M rows would unnecessarily reduce our dataset by ~24%.


Step 8 β€” Remove Invalid and Outlier Rows

We filter out rows that are clearly erroneous or represent extreme outliers.

# Remove rows with zero/negative values and extreme outliers
df = df[df['fare_amount'] > 0]
df = df[df['trip_distance'] > 0]
df = df[df['passenger_count'] > 0]
df = df[df['fare_amount'] < 500]
df = df[df['trip_distance'] < 100]
df = df[df['passenger_count'] <= 6]
df = df[df['total_amount'] > 0]

print("Shape after removing outliers:", df.shape)
Filter Rationale
fare_amount > 0 Negative/zero fares are data entry errors
trip_distance > 0 Zero distance means the trip didn’t happen
passenger_count > 0 A trip with 0 passengers is invalid
fare_amount < 500 Fares above $500 are almost certainly errors
trip_distance < 100 Trips over 100 miles are extreme outliers
passenger_count <= 6 Standard taxis seat a maximum of 6 passengers
total_amount > 0 Negative totals indicate refunds or errors

Step 9 β€” Feature Engineering

We create new columns derived from existing data to enrich the dataset for modeling.

# Create trip duration and time-based features
df['trip_duration_min'] = (
    df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
).dt.total_seconds() / 60

# Remove nonsensical durations (less than 1 min or more than 3 hours)
df = df[(df['trip_duration_min'] > 1) & (df['trip_duration_min'] < 180)]

# Extract time-based features
df['pickup_hour'] = df['tpep_pickup_datetime'].dt.hour
df['pickup_dayofweek'] = df['tpep_pickup_datetime'].dt.dayofweek

print("Shape after feature engineering:", df.shape)
df.head()
New Column Description
trip_duration_min Trip length in minutes, computed from pickup and dropoff timestamps
pickup_hour Hour of the day (0–23) when the trip started
pickup_dayofweek Day of the week (0 = Monday, 6 = Sunday)

Step 10 β€” Verify Cleaned Data

A final sanity check to confirm the dataset is clean and ready.

# Final check on cleaned dataset
print("Final shape:", df.shape)
print("\nNull counts:\n", df.isnull().sum())
df.describe()

Expected output: 11,320,592 rows Γ— 14 columns, zero nulls


Step 11 β€” Save Cleaned Data

We save the processed dataset as a parquet file for reuse in future modeling labs.

# Save cleaned data as parquet for future use
output_path = Path("data_cleaned")
output_path.mkdir(exist_ok=True)

df.to_parquet(output_path / "yellow_tripdata_cleaned.parquet", index=False)
print("Saved cleaned data to:", output_path / "yellow_tripdata_cleaned.parquet")
print("File size: {:.2f} MB".format(
    (output_path / "yellow_tripdata_cleaned.parquet").stat().st_size / 1e6
))

Transformation Summary

Stage Rows Columns Action
Raw data loaded 12,861,158 20 Combined 3 monthly parquet files
Drop columns 12,861,158 11 Removed 9 unused columns
Fill nulls 12,861,158 11 Filled passenger_count nulls with median
Remove outliers 11,546,705 11 Filtered invalid/extreme rows
Feature engineering 11,320,592 14 Added duration, hour, day of week

Part C - Exploratory Data Analysis (EDA)

Now that the data is clean, we perform EDA to understand patterns and distributions before modeling.

import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("whitegrid")

Fare Amount Distribution

# Distribution of fare amounts
fig, ax = plt.subplots(figsize=(10, 5))
df['fare_amount'].clip(upper=100).hist(bins=50, ax=ax, edgecolor='black')
ax.set_title('Distribution of Fare Amount (capped at $100)')
ax.set_xlabel('Fare Amount ($)')
ax.set_ylabel('Frequency')
plt.tight_layout()
plt.show()
NoteObservation

The distribution is right-skewed with most fares concentrated between $5–$20. There is a noticeable spike around $70, which corresponds to the JFK airport flat-rate fare.


Trip Distance Distribution

# Distribution of trip distances
fig, ax = plt.subplots(figsize=(10, 5))
df['trip_distance'].clip(upper=30).hist(bins=50, ax=ax, edgecolor='black')
ax.set_title('Distribution of Trip Distance (capped at 30 miles)')
ax.set_xlabel('Trip Distance (miles)')
ax.set_ylabel('Frequency')
plt.tight_layout()
plt.show()
NoteObservation

The majority of trips are under 5 miles, typical of short urban rides. A small bump around 17–20 miles corresponds to airport trips (JFK/EWR).


Trips by Hour of Day

# Number of trips by hour of day
fig, ax = plt.subplots(figsize=(10, 5))
df['pickup_hour'].value_counts().sort_index().plot(kind='bar', ax=ax, color='steelblue')
ax.set_title('Number of Trips by Hour of Day')
ax.set_xlabel('Hour')
ax.set_ylabel('Trip Count')
plt.tight_layout()
plt.show()
NoteObservation

Trip volume peaks at 6 PM (evening rush hour) and is lowest between 4–5 AM. Demand rises steadily from 7 AM through the afternoon.


Correlation Heatmap

# Correlation heatmap of numeric columns
numeric_cols = ['trip_distance', 'fare_amount', 'tip_amount',
                'total_amount', 'trip_duration_min', 'passenger_count']

fig, ax = plt.subplots(figsize=(8, 6))
sns.heatmap(df[numeric_cols].corr(), annot=True, cmap='coolwarm',
            fmt='.2f', ax=ax, vmin=-1, vmax=1)
ax.set_title('Correlation Heatmap')
plt.tight_layout()
plt.show()
NoteKey Correlations
  • trip_distance β†”οΈŽ fare_amount: 0.90 β€” very strong positive correlation, as expected
  • trip_duration_min β†”οΈŽ fare_amount: 0.79 β€” longer trips cost more
  • fare_amount β†”οΈŽ total_amount: 0.97 β€” fare is the dominant component of total
  • passenger_count: Near-zero correlation with all other features β€” does not meaningfully affect fare

Fare vs Trip Distance

# Scatter plot of fare vs distance (sampled for performance)
sample = df.sample(n=10000, random_state=42)

fig, ax = plt.subplots(figsize=(10, 6))
ax.scatter(sample['trip_distance'], sample['fare_amount'], alpha=0.3, s=5)
ax.set_title('Fare Amount vs Trip Distance (10K sample)')
ax.set_xlabel('Trip Distance (miles)')
ax.set_ylabel('Fare Amount ($)')
ax.set_xlim(0, 40)
ax.set_ylim(0, 150)
plt.tight_layout()
plt.show()
NoteObservation

A clear linear relationship between distance and fare is visible. The horizontal band at ~$70 for trips between 15–20 miles represents the JFK airport flat-rate fare. We sample 10,000 points for visualization performance.


Average Fare by Hour of Day

# Average fare by hour of day
fig, ax = plt.subplots(figsize=(10, 5))
df.groupby('pickup_hour')['fare_amount'].mean().plot(kind='line', ax=ax,
                                                       marker='o', color='green')
ax.set_title('Average Fare by Hour of Day')
ax.set_xlabel('Hour')
ax.set_ylabel('Average Fare ($)')
plt.tight_layout()
plt.show()
NoteObservation

The highest average fares occur at 5 AM (~$25), likely driven by longer early-morning airport trips. Fares dip to their lowest around 2–3 AM, then remain relatively stable during daytime hours ($19–$22).


EDA Summary

Insight Finding
Fare distribution Right-skewed, most fares between $5–$20, spike at $70 (airport flat rate)
Trip distance Majority under 5 miles, bump at 17–20 miles (airport trips)
Peak hours Highest volume at 6 PM, lowest at 4–5 AM
Highest avg fare 5 AM (~$25) β€” early morning airport trips
Strongest predictor Trip distance (0.90 correlation with fare)
Passenger count Near-zero correlation β€” minimal impact on fare
NoteNext Steps

The cleaned and explored dataset is now ready for:

  • Supervised learning β€” Predicting fare amount using Linear Regression
  • Unsupervised learning β€” Identifying taxi demand clusters using K-Means
Back to top