A conceptual Python code example illustrating key concepts in Data Pipeline Efficiency for large datasets, focusing on Preprocessing, Caching, and Streaming, often uses libraries like pandas, scikit-learn, and dask or custom generators.

Step 1: Setup and Conceptual Preprocessing

This step sets up a simulated large dataset and applies common preprocessing steps like feature scaling.


# Step 1: Conceptual Setup and Preprocessing Definition

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import time
import os

# --- Configuration ---
DATASET_SIZE = 10**6 # 1 million rows for simulation
N_FEATURES = 10

# --- Simulate Large Data Creation ---
print("1. Simulating large dataset...")
data = {f'feature_{i}': np.random.rand(DATASET_SIZE) * (i + 1)
    for i in range(N_FEATURES)}
data['target'] = np.random.randint(0, 2, DATASET_SIZE)
df_full = pd.DataFrame(data)

# --- Define Preprocessing Function ---
scaler = StandardScaler()

def preprocess_chunk(chunk_df: pd.DataFrame) -> np.ndarray:
    """Applies necessary transformations (e.g., scaling) to a data chunk."""
    features = chunk_df.drop(columns=['target']).values
    # In a real pipeline, the scaler must be 'fit' on training data
    # and then used to 'transform' both training and inference data.
    return scaler.fit_transform(features) # Conceptual fit_transform

print("Preprocessing defined.")

Step 2: Caching (Conceptual Simulation using Disk)

Caching the processed data prevents redundant computation. Instead of reprocessing the raw data every time, we load the cleaned, transformed data from disk.


# Step 2: Conceptual Caching

CACHE_FILE = 'processed_data_cache.npy'

def process_and_cache(df):
    """Processes the full dataset and saves the result to a cache file."""
    print("2a. Preprocessing full dataset (Time-consuming step)...")
    start_time = time.time()

    # Process the data (using a conceptual batch approach if needed)
    processed_data = preprocess_chunk(df)

    end_time = time.time()
    print(f" -> Processing time: {end_time - start_time:.4f} seconds")

    # Save the processed data (Cache write)
    np.save(CACHE_FILE, processed_data)
    print(f" -> Data saved to cache: {CACHE_FILE}")

def load_or_process(df):
    """Loads from cache if available, otherwise processes and caches."""
    if os.path.exists(CACHE_FILE):
        print("2b. Cache HIT: Loading processed data from disk (Fast step)...")
        start_time = time.time()
        # Cache read
        data = np.load(CACHE_FILE, mmap_mode='r') # Use mmap for large files
        end_time = time.time()
        print(f" -> Cache load time: {end_time - start_time:.4f} seconds")
        return data
    else
        # Cache MISS
        process_and_cache(df)
        return load_or_process(df) # Load the newly created cache

# --- Demonstration of Caching ---
processed_data_1 = load_or_process(df_full) # Run 1 (Cache MISS, heavy processing)

processed_data_2 = load_or_process(df_full) # Run 2 (Cache HIT, fast load)

if os.path.exists(CACHE_FILE):
    os.remove(CACHE_FILE) # Clean up the conceptual cache file

print("Caching demonstration complete.")

Step 3: Streaming Large Datasets (Chunking)

Streaming (or chunking) is vital when the full dataset doesn't fit into memory. Instead of loading everything at once, we process it in manageable parts using an iterator.


# Step 3: Streaming Large Datasets (Chunking)

CHUNK_SIZE = 100000 # 100k rows per chunk

def data_generator(file_path_or_df: pd.DataFrame, chunk_size):
    """
    A generator function to stream the data in chunks.
    In a real scenario, this would use pandas.read_csv(..., chunksize=...)
    """
    n_rows = len(file_path_or_df)
    for start in range(0, n_rows, chunk_size):
        end = start + chunk_size
        # Yield a manageable chunk of data
        yield file_path_or_df.iloc[start:end]

# --- Demonstration of Streaming Processing ---
print("\n3. Demonstrating Data Streaming (Processing in chunks)...")
total_processed_chunks = 0
start_stream_time = time.time()

# The generator yields one chunk at a time, keeping memory usage low
for chunk in data_generator(df_full, CHUNK_SIZE):
    # Apply preprocessing to the current chunk
    processed_chunk = preprocess_chunk(chunk)
    
    # --- Conceptual use of the chunk ---
    # In reality, this chunk would be used for:
    # 1. Training a model iteratively (e.g., stochastic gradient descent).
    # 2. Writing results to a database/file.
    
    total_processed_chunks += 1
    # print(f"   -> Processed Chunk {total_processed_chunks}. Shape: {processed_chunk.shape}")

end_stream_time = time.time()

print(f"   -> Total Chunks Processed: {total_processed_chunks}")
print(f"   -> Total Streaming Time: {end_stream_time - start_stream_time:.4f} seconds")

# KEY CONCEPT: Streaming ensures the memory footprint remains small 
# regardless of the total dataset size.