From Notes to Networks: Decoding the Musical DNA of Classical Composers through Symbolic Sequence Learning¶

Composer Classification Using Deep Learning¶

Composer Classification Using Deep Learning

University of San Diego AAI-511: Neural Networks and Deep Learning


Professor: Rod Albuyeh

Section: 5
Group: 2
Contributors:

  • Ashley Moore
  • Kevin Pooler
  • Swapnil Patil

August, 2025


Index¶

  1. Introduction
  2. Objective
  3. Dataset
  4. Data Cleaning and Preprocessing
  5. Feature Extraction
  6. Exploratory Data Analysis
  7. Model Building:
    • LSTM
    • CNN
  8. Model Training
  9. Model Evaluation
  10. Model Optimization
  11. Model Prediction
  12. Deployment
  13. Recommendations
  14. Use Cases
  15. Conclusion
  16. References

Streamlit Application¶

Application URL

1. Introduction: ⇧¶


Project Overview¶

Classical music represents one of humanity's most sophisticated artistic achievements, with master composers like Bach, Beethoven, Chopin, and Mozart developing distinctive musical signatures that have influenced centuries of musical evolution. Each composer possesses unique characteristics in their compositional style from Bach's intricate counterpoint and mathematical precision to Chopin's romantic expressiveness and innovative harmonies. These stylistic fingerprints, embedded within the very fabric of their musical works, present an fascinating opportunity for artificial intelligence to learn and recognize patterns that define musical authorship.

The Challenge¶

Traditional musicological analysis relies heavily on human expertise to identify compositional styles, requiring years of specialized training to distinguish between composers based on harmonic progressions, melodic patterns, rhythmic structures, and formal organization. However, with the advent of digital music representation and deep learning technologies, we can now approach this problem computationally, extracting and analyzing musical features at a scale and precision impossible for human analysis alone.

Approach¶

This project introduces a novel hybrid deep learning architecture that combines the sequential pattern recognition capabilities of Long Short-Term Memory (LSTM) networks with the spatial feature extraction power of Convolutional Neural Networks (CNNs) to automatically classify musical compositions by their composers. Our methodology integrates three distinct feature extraction approaches:

  1. Musical Features: Traditional music theory elements including pitch distributions, harmonic progressions, tempo variations, and rhythmic patterns
  2. Harmonic Features: Advanced harmonic analysis capturing chord progressions, key relationships, and tonal structures
  3. Sequential Features: Time-series representation of musical events, preserving the temporal relationships crucial to compositional style

Innovation and Impact¶

By processing MIDI files through sophisticated feature engineering and applying state-of-the-art neural network architectures, this system demonstrates how machine learning can complement musicological research, offering new insights into compositional analysis while providing practical applications for music education, digital humanities, and automated music cataloging.

This work represents a significant step toward understanding how artificial intelligence can capture and interpret the subtle nuances that define artistic style, with implications extending beyond music to other domains of creative expression and cultural analysis.

2. Objective: ⇧¶


Objective¶

Develop a hybrid LSTM-CNN deep learning model to classify classical music compositions by composer (Bach, Beethoven, Chopin, Mozart) using multi-modal feature extraction from MIDI files.

Key Components¶

  • Feature Engineering: Musical, harmonic, and sequential feature extraction
  • Architecture: Hybrid LSTM-CNN fusion network for temporal and spatial pattern recognition
  • Evaluation: Cross-validation with accuracy, precision, recall, and F1-score metrics
  • Target: >85% classification accuracy with model interpretability analysis

Deliverables¶

  1. Multi-modal feature extraction pipeline
  2. Hybrid neural network architecture
  3. Trained composer classification model
  4. Performance evaluation and optimization results

3. Dataset:¶


The project will use a dataset consisting of musical scores from various composers.

https://www.kaggle.com/datasets/blanderbuss/midi-classic-music

The dataset contains the midi files of compositions from well-known classical composers like Bach, Beethoven, Chopin, and Mozart. The dataset should be labeled with the name of the composer for each score. Please only do your prediction only for below composers, therefore you need to select the required composers from the given dataset above.

  1. Bach
  2. Beethoven
  3. Chopin
  4. Mozart

4. Data Cleaning and Pre-processing: ⇧¶


In [52]:
%pip install -q -r requirements.txt
%load_ext autoreload
%autoreload 2
^C
Note: you may need to restart the kernel to use updated packages.
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip
In [ ]:
# Toggle the following flags to control the behavior of the notebook
EXTRACT_FEATURES = False
SAMPLING = False
SAMPLING_FILES_PER_COMPOSER = 20
In [ ]:
# System and utility imports
import sys
import os
import time
import warnings
import pickle
import hashlib
from functools import lru_cache
from collections import Counter, defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
from tqdm import tqdm

# Data processing libraries
import numpy as np
import pandas as pd

# Visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns
from mpl_toolkits.mplot3d import Axes3D

# Music processing libraries
import music21
from music21 import (
    converter, note, chord, roman, key, tempo, instrument,
    interval, pitch, stream
)
import pypianoroll
import pretty_midi
import librosa
import librosa.display

# Deep learning libraries
import torch
import tensorflow as tf


# Add parent directory to path 
warnings.filterwarnings('ignore')
sys.path.append(os.path.abspath(os.path.join(os.pardir, 'src')))
In [ ]:
class EnvironmentSetup:
    """
    Centralized environment configuration class for faster music classification.
    Handles all environment setup including GPU configuration, reproducibility,
    and optimization settings.
    """

    def __init__(self):
        self.random_seed = 42
        self.optimal_workers = None
        self.gpu_configured = False
        self.cuda_available = False
        self.config_summary = []
        self.force_gpu = True  # New flag to force GPU usage when available
        self.cuda_device_id = 0  # Default CUDA device ID to use
        self.color_pallete = "viridis"

    def configure_environment(self):
        """Configure all environment settings for optimal performance and
        reproducibility."""
        # Configure reproducibility
        self._configure_reproducibility()

        # Configure GPU and memory settings
        self._configure_gpu_and_memory()

        # Configure CPU workers
        self._configure_cpu_workers()

        # Configure visualization settings
        self._configure_visualization()

        # Configure pandas settings
        self._configure_pandas()

        # Configure warnings
        self._configure_warnings()

        # Print final configuration summary
        self._print_configuration_summary()

    def _configure_reproducibility(self):
        """Set random seeds for reproducible results."""
        np.random.seed(self.random_seed)
        tf.random.set_seed(self.random_seed)
        self.config_summary.append(f"Random seeds set to {self.random_seed} for reproducibility")

    def _configure_gpu_and_memory(self):
        """Configure GPU settings and memory management for optimal performance."""

        try:
            # Check for CUDA availability first
            self.cuda_available = torch.cuda.is_available()
            if self.cuda_available:
                cuda_version = torch.version.cuda
                device_count = torch.cuda.device_count()
                self.config_summary.append(f"CUDA is available (version: {cuda_version}, devices: {device_count})")

                # Set CUDA device
                if self.cuda_device_id < device_count:
                    torch.cuda.set_device(self.cuda_device_id)
                    self.config_summary.append(f"Set active CUDA device to: {torch.cuda.get_device_name(self.cuda_device_id)}")

                # Set seeds for reproducibility
                torch.cuda.manual_seed(self.random_seed)
                torch.cuda.manual_seed_all(self.random_seed)
                torch.backends.cudnn.deterministic = True
                torch.backends.cudnn.benchmark = True  # Enable for improved performance with fixed input sizes

                self.config_summary.append(f"CUDA random seed set to {self.random_seed}")
                self.config_summary.append(f"CUDA deterministic mode: enabled")
                self.config_summary.append(f"CUDA benchmark mode: enabled")

                # Export CUDA device for other libraries
                os.environ["CUDA_VISIBLE_DEVICES"] = str(self.cuda_device_id)
                self.config_summary.append(f"Set CUDA_VISIBLE_DEVICES={self.cuda_device_id}")
            else:
                self.config_summary.append("CUDA is not available")

            # Check for GPU availability in TensorFlow
            gpus = tf.config.experimental.list_physical_devices('GPU')

            if gpus:
                self.config_summary.append(f"Found {len(gpus)} GPU(s)")

                if self.force_gpu:
                    # Force TensorFlow to use GPU
                    tf.config.set_visible_devices(gpus, 'GPU')
                    self.config_summary.append("Forced TensorFlow to use GPU")

                    # Configure memory growth to prevent TensorFlow from allocating all GPU memory
                    for gpu in gpus:
                        tf.config.experimental.set_memory_growth(gpu, True)
                        self.config_summary.append(f"Configured memory growth for GPU: {gpu.name}")

                    # Set mixed precision for better performance on modern GPUs
                    tf.config.optimizer.set_jit(True)  # Enable XLA compilation
                    policy = tf.keras.mixed_precision.Policy('mixed_float16')
                    tf.keras.mixed_precision.set_global_policy(policy)
                    self.config_summary.append("Enabled mixed precision training (float16)")
                    self.config_summary.append("Enabled XLA compilation for faster execution")

                    # Configure GPU device placement
                    tf.config.experimental.set_device_policy('explicit')
                    self.config_summary.append("Set explicit device placement policy")

                    self.gpu_configured = True
                else:
                    self.config_summary.append("GPU available but not forced (force_gpu=False)")

            else:
                self.config_summary.append("No GPU found in TensorFlow. Using CPU for computation.")
                # Configure CPU optimizations
                tf.config.threading.set_intra_op_parallelism_threads(0)  # Use all available cores
                tf.config.threading.set_inter_op_parallelism_threads(0)  # Use all available cores
                self.config_summary.append("Configured CPU threading for optimal performance")

        except Exception as e:
            self.config_summary.append(f"Error configuring GPU: {e}")
            self.config_summary.append("Falling back to default CPU configuration")

    def set_cuda_device(self, device_id=0):
        self.cuda_device_id = device_id

    def toggle_force_gpu(self, force=True):
        self.force_gpu = force

    def _configure_cpu_workers(self):
        """Configure optimal number of CPU workers for parallel processing."""

        # Set optimal number of workers based on CPU cores
        self.optimal_workers = min(cpu_count(), 12)  # Cap at 12 to avoid memory issues

        # Make it globally accessible
        globals()['OPTIMAL_WORKERS'] = self.optimal_workers

        self.config_summary.append(f"Configured {self.optimal_workers} workers for parallel processing (CPU cores: {cpu_count()})")

    def _configure_visualization(self):
        """Configure matplotlib and seaborn settings."""
        sns.set(style='whitegrid')
        plt.style.use('default')
        plt.ioff()  # Turn off interactive mode for better performance
        self.config_summary.append("Visualization settings configured (seaborn whitegrid, matplotlib default)")
        self.color_pallete = sns.color_palette("viridis")

    def _configure_pandas(self):
        """Configure pandas display options."""
        pd.set_option('display.max_columns', None)
        pd.set_option('display.width', None)
        pd.set_option('display.max_rows', 100)
        self.config_summary.append("Pandas display options configured (max_columns=None, max_rows=100)")

    def _configure_warnings(self):
        """Suppress unnecessary warnings."""
        warnings.filterwarnings('ignore')
        self.config_summary.append("Warning filters configured (suppressed for cleaner output)")

    def _print_configuration_summary(self):
        """Print a summary of the current configuration."""
        print("=" * 60)
        print("ENVIRONMENT CONFIGURATION SUMMARY")
        print("=" * 60)

        for item in self.config_summary:
            print(item)

        print("\nCONFIGURATION DETAILS:")
        print(f"Random Seed: {self.random_seed}")
        print(f"CPU Workers: {self.optimal_workers}")
        print(f"GPU Configured: {self.gpu_configured}")
        print(f"CUDA Available: {self.cuda_available}")
        print(f"Force GPU: {self.force_gpu}")

        if self.cuda_available:
            print(f"Active CUDA Device: {torch.cuda.get_device_name(self.cuda_device_id)}")
            print(f"CUDA Memory Allocated: {torch.cuda.memory_allocated(self.cuda_device_id) / 1024**2:.2f} MB")
            print(f"CUDA Memory Reserved: {torch.cuda.memory_reserved(self.cuda_device_id) / 1024**2:.2f} MB")

        if self.gpu_configured:
            gpus = tf.config.experimental.list_physical_devices('GPU')
            for i, gpu in enumerate(gpus):
                print(f"GPU {i}: {gpu.name}")

        print(f"TensorFlow Version: {tf.__version__}")
        print(f"PyTorch Version: {torch.__version__}")
        print(f"NumPy Version: {np.__version__}")
        print(f"Pandas Version: {pd.__version__}")
        print("=" * 60)
In [ ]:
# Initialize and configure environment
env_config = EnvironmentSetup()
env_config.configure_environment()
OPTIMAL_WORKERS = env_config.optimal_workers
============================================================
ENVIRONMENT CONFIGURATION SUMMARY
============================================================
Random seeds set to 42 for reproducibility
CUDA is not available
No GPU found in TensorFlow. Using CPU for computation.
Configured CPU threading for optimal performance
Configured 12 workers for parallel processing (CPU cores: 20)
Visualization settings configured (seaborn whitegrid, matplotlib default)
Pandas display options configured (max_columns=None, max_rows=100)
Warning filters configured (suppressed for cleaner output)

CONFIGURATION DETAILS:
Random Seed: 42
CPU Workers: 12
GPU Configured: False
CUDA Available: False
Force GPU: True
TensorFlow Version: 2.20.0-rc0
PyTorch Version: 2.7.1+cpu
NumPy Version: 2.2.6
Pandas Version: 2.3.1
============================================================

The EnvironmentSetup class centralizes system configuration for optimal deep learning performance, automatically detecting and configuring GPU/CPU resources, setting reproducibility seeds, and optimizing memory management. This ensures consistent results across different hardware environments while maximizing computational efficiency for our neural network training.# Initialize and configure environment

In [ ]:
try:
    from google.colab import drive
    COLAB_DRIVE_PATH = '/content/drive'
    drive.mount(COLAB_DRIVE_PATH)
    DATA_DIRECTORY = f"{COLAB_DRIVE_PATH}/MyDrive/data"
    MUSIC_DATA_DIRECTORY = f"{DATA_DIRECTORY}/midiclassics"
    IS_COLAB = True
    print(f"Google Drive mounted at {COLAB_DRIVE_PATH}")
except:
    DATA_DIRECTORY = "data"
    MUSIC_DATA_DIRECTORY = f"{DATA_DIRECTORY}\\midiclassics"
    IS_COLAB = False
    print("Running in local environment, not Google Colab")

MUSICAL_FEATURES_DF_PATH = f'{DATA_DIRECTORY}/features/musical_features_df.pkl'
HARMONIC_FEATURES_DF_PATH = f'{DATA_DIRECTORY}/features/harmonic_features_df.pkl'
NOTE_SEQUENCES_PATH = f'{DATA_DIRECTORY}/features/note_sequences.npy'
NOTE_SEQUENCES_LABELS_PATH = f'{DATA_DIRECTORY}/features/sequence_labels.npy'
NOTE_MAPPING_PATH = f'{DATA_DIRECTORY}/features/note_mapping.pkl'
MODEL_ARTIFACTS_PATH = f'{DATA_DIRECTORY}/model/composer_classification_model_artifacts.pkl'
MODEL_HISTORY_PATH = f'{DATA_DIRECTORY}/model/composer_classification_model_history.pkl'
MODEL_PATH = f'{DATA_DIRECTORY}/model/composer_classification_model.keras'
MODEL_BEST_PATH = f'{DATA_DIRECTORY}/model/composer_classification_model_best.keras'

# List of target composers for classification
TARGET_COMPOSERS = ['Bach', 'Beethoven', 'Chopin', 'Mozart']
print(f"Target composers for classification: {TARGET_COMPOSERS}")
Running in local environment, not Google Colab
Target composers for classification: ['Bach', 'Beethoven', 'Chopin', 'Mozart']

This project aims to develop an accurate deep learning–based system for composer classification, using MIDI data as the primary input. To achieve this, we implement a MidiProcessor class (below) that streamlines the end-to-end preprocessing pipeline. It includes the following:


  1. Scanning the MIDI dataset

    • load_midi_files() searches through composer-specific folders, recursively finding .mid and .midi files.
    • Uses parallel processing via ThreadPoolExecutor to scan multiple composers at once.
    • Returns a pandas DataFrame containing file paths, composer labels, and filenames for downstream processing.
  2. Extracting notes from MIDI files

    • extract_notes_from_midi() parses a single MIDI file, extracting individual notes and chords (converted into pitch strings).
    • Limits extraction to a maximum number of notes for efficiency.
    • Includes fallback handling for flat MIDI structures and skips empty or corrupted files.
    • extract_notes_from_midi_parallel() enables batch note extraction in parallel with a progress bar for large datasets.
  3. Extracting detailed musical features

    • extract_musical_features() analyzes each MIDI file to gather rich metadata:
      • Total notes, total duration, time signature, key and confidence, tempo.
      • Pitch statistics (average, range, standard deviation), intervals, note durations.
      • Structural features like number of parts, measures, chords, rests, rest ratios, and average notes per measure.
      • Instrument detection and chord type analysis.
    • extract_musical_features_parallel() runs this process in parallel for scalability.
  4. Creating note sequences for deep learning models

    • create_note_sequences() builds a vocabulary of unique notes from the dataset.
    • Converts notes into integer representations and slices them into fixed-length sequences (e.g., 100-note windows) for LSTM/CNN model training.
    • Includes batching and memory safeguards to handle large datasets efficiently.
  5. Preprocessing and filtering the dataset

    • preprocess_midi_files() filters out invalid, duplicate, corrupted, too-short, or low-key-confidence MIDI files.
    • Uses file hashing to detect duplicates and minimum thresholds for duration and key confidence.
    • Tracks processing statistics to provide a clear summary of dataset quality after filtering.
In [ ]:
class MidiProcessor:

    def __init__(self):
        self._file_cache = {}  # Cache for parsed files
        self._feature_cache = {}  # Cache for extracted features

    def load_midi_files(self, data_dir, max_files_per_composer=None, n_jobs=None):

        if n_jobs is None:
            n_jobs = OPTIMAL_WORKERS

        start_time = time.time()

        # Get all composer directories
        composers = [c for c in os.listdir(data_dir)
                    if os.path.isdir(os.path.join(data_dir, c))]

        def scan_composer(composer):
            composer_path = os.path.join(data_dir, composer)
            try:
                # Get all MIDI files with recursive search
                midi_files = []
                for root, dirs, files in os.walk(composer_path):
                    for file in files:
                        if file.lower().endswith(('.mid', '.midi')):
                            # Get path relative to composer_path
                            rel_path = os.path.relpath(os.path.join(root, file), composer_path)
                            midi_files.append(rel_path)

                # Apply limit if specified
                if max_files_per_composer:
                    midi_files = midi_files[:max_files_per_composer]

                # Create file info list
                return [
                    {
                        'file_path': os.path.join(composer_path, midi_file),
                        'composer': composer,
                        'filename': midi_file
                    }
                    for midi_file in midi_files
                ]
            except Exception as e:
                print(f"Error scanning {composer}: {e}")
                return []

        # Parallel composer scanning
        all_files = []
        with ThreadPoolExecutor(max_workers=n_jobs) as executor:
            futures = [executor.submit(scan_composer, composer) for composer in composers]
            for future in as_completed(futures):
                composer_files = future.result()
                all_files.extend(composer_files)

        elapsed = time.time() - start_time
        print(f"Loaded {len(all_files)} MIDI files from {len(composers)} composers in {elapsed:.2f}s")

        return pd.DataFrame(all_files)

    @staticmethod
    def extract_notes_from_midi(midi_path, max_notes=500):
        try:
            # Parse once and extract efficiently
            midi = converter.parse(midi_path)
            notes = []

            # Note extraction with limits
            if hasattr(midi, 'parts') and midi.parts:
                # Process only first few parts for speed
                for part in midi.parts[:2]:  # Limit to 2 parts
                    part_notes = part.recurse().notes
                    for element in part_notes:
                        if len(notes) >= max_notes:
                            break
                        if isinstance(element, note.Note):
                            notes.append(str(element.pitch))
                        elif isinstance(element, chord.Chord):
                            notes.append('.'.join(str(n) for n in element.normalOrder))
                    if len(notes) >= max_notes:
                        break
            else:
                # Flat structure
                for element in midi.flat.notes:
                    if len(notes) >= max_notes:
                        break
                    if isinstance(element, note.Note):
                        notes.append(str(element.pitch))
                    elif isinstance(element, chord.Chord):
                        notes.append('.'.join(str(n) for n in element.normalOrder))

            return notes[:max_notes]  # Ensure we don't exceed limit

        except Exception as e:
            print(f"Error processing {midi_path}: {str(e)}")
            return []

    @staticmethod
    def extract_notes_batch(midi_paths_batch, max_notes=500):
        results = []
        for midi_path in midi_paths_batch:
            notes = MidiProcessor.extract_notes_from_midi(midi_path, max_notes)
            results.append(notes)
        return results

    @staticmethod
    def extract_notes_from_midi_parallel(midi_paths, max_notes=500, n_jobs=None):
        if n_jobs is None:
            n_jobs = OPTIMAL_WORKERS

        total_files = len(midi_paths)

        # Create batches for better memory management
        batch_size = max(1, len(midi_paths) // (n_jobs * 2))
        batches = [midi_paths[i:i + batch_size] for i in range(0, len(midi_paths), batch_size)]

        results = [None] * len(midi_paths)

        # Create a progress bar
        pbar = tqdm(total=total_files, desc="Extracting notes from MIDI files")

        def process_batch(batch_data):
            batch_idx, batch_paths = batch_data
            batch_results = MidiProcessor.extract_notes_batch(batch_paths, max_notes)
            # Update progress bar for each file in the batch
            pbar.update(len(batch_paths))
            return batch_idx, batch_results

        try:
            # Process batches in parallel
            with ThreadPoolExecutor(max_workers=n_jobs) as executor:
                batch_data = [(i, batch) for i, batch in enumerate(batches)]
                futures = [executor.submit(process_batch, data) for data in batch_data]

                for future in as_completed(futures):
                    batch_idx, batch_results = future.result()
                    batch_start = batch_idx * batch_size
                    for i, notes in enumerate(batch_results):
                        if batch_start + i < len(results):
                            results[batch_start + i] = notes
        finally:
            # Make sure to close the progress bar
            pbar.close()

        # Report completion stats
        successful = sum(1 for r in results if r is not None and len(r) > 0)
        print(f"\nNote extraction complete: {successful}/{total_files} files processed successfully")

        return results

    @staticmethod
    def extract_musical_features(midi_path):
        try:
            midi = converter.parse(midi_path)
            features = {}

            # Basic counts
            all_notes = list(midi.flat.notes)
            features['total_notes'] = len(all_notes)
            features['total_duration'] = float(midi.duration.quarterLength) if midi.duration else 0

            # Time signature (quick lookup)
            time_sigs = midi.getTimeSignatures()
            if time_sigs:
                features['time_signature'] = f"{time_sigs[0].numerator}/{time_sigs[0].denominator}"
            else:
                features['time_signature'] = "4/4"

            # Key analysis
            try:
                key_sigs = midi.analyze('key')
                features['key'] = str(key_sigs) if key_sigs else "C major"
                if hasattr(key_sigs, 'correlationCoefficient'):
                    features['key_confidence'] = float(abs(key_sigs.correlationCoefficient))
                else:
                    features['key_confidence'] = 0.5
            except:
                features['key'] = "C major"
                features['key_confidence'] = 0.5

            # Tempo
            tempo_markings = midi.flat.getElementsByClass('MetronomeMark')
            features['tempo'] = float(tempo_markings[0].number) if tempo_markings else 120.0

            # Pitch features (vectorized calculation)
            pitches = []
            durations = []
            for n in all_notes[:1000]:  # Limit for speed
                if isinstance(n, note.Note):
                    pitches.append(n.pitch.midi)
                    durations.append(float(n.duration.quarterLength))
                elif isinstance(n, chord.Chord):
                    pitches.append(n.root().midi)
                    durations.append(float(n.duration.quarterLength))

            if pitches:
                pitches_array = np.array(pitches)
                features['avg_pitch'] = float(np.mean(pitches_array))
                features['pitch_range'] = float(np.ptp(pitches_array))
                features['pitch_std'] = float(np.std(pitches_array))

                # Intervals (vectorized)
                if len(pitches) > 1:
                    intervals = np.abs(np.diff(pitches_array))
                    features['avg_interval'] = float(np.mean(intervals))
                    features['interval_std'] = float(np.std(intervals))
                else:
                    features['avg_interval'] = 0.0
                    features['interval_std'] = 0.0
            else:
                features.update({
                    'avg_pitch': 60.0, 'pitch_range': 0.0, 'pitch_std': 0.0,
                    'avg_interval': 0.0, 'interval_std': 0.0
                })

            # Duration features (vectorized)
            if durations:
                dur_array = np.array(durations)
                features['avg_duration'] = float(np.mean(dur_array))
                features['duration_std'] = float(np.std(dur_array))
            else:
                features['avg_duration'] = 1.0
                features['duration_std'] = 0.0

            # Structural features (limited for speed)
            features['num_parts'] = len(midi.parts) if hasattr(midi, 'parts') else 1

            # Count specific elements (limited)
            limited_elements = list(midi.flat.notesAndRests)[:500]  # Limit for speed
            chords = [el for el in limited_elements if isinstance(el, chord.Chord)]
            rests = [el for el in limited_elements if isinstance(el, note.Rest)]

            features['num_chords'] = len(chords)
            features['num_rests'] = len(rests)
            features['rest_ratio'] = len(rests) / features['total_notes'] if features['total_notes'] > 0 else 0

            # Measures (efficient approximation)
            if hasattr(midi, 'parts') and midi.parts:
                measures = list(midi.parts[0].getElementsByClass('Measure'))
                features['num_measures'] = len(measures) if measures else max(1, int(features['total_duration'] / 4))
            else:
                features['num_measures'] = max(1, int(features['total_duration'] / 4))

            features['avg_notes_per_measure'] = features['total_notes'] / features['num_measures'] if features['num_measures'] > 0 else 0

            # Instruments (simplified)
            instruments = []
            if hasattr(midi, 'parts') and midi.parts:
                for part in midi.parts[:3]:  # Limit to first 3 parts
                    try:
                        instr = part.getInstrument(returnDefault=True)
                        instruments.append(str(instr))
                    except:
                        instruments.append('Piano')
            else:
                instruments = ['Piano']

            features['instruments'] = list(set(instruments))

            # Chord types (limited analysis)
            chord_types = []
            for c in chords[:10]:  # Limit to first 10 chords
                try:
                    chord_types.append(c.commonName)
                except:
                    chord_types.append('Unknown')

            features['chord_types'] = list(set(chord_types))

            return features

        except Exception as e:
            print(f"Error extracting features from {midi_path}: {str(e)}")
            return {
                'total_notes': 0, 'total_duration': 0, 'time_signature': '4/4',
                'key': 'C major', 'key_confidence': 0, 'tempo': 120.0,
                'avg_pitch': 60.0, 'pitch_range': 0, 'pitch_std': 0,
                'avg_duration': 1.0, 'duration_std': 0,
                'avg_interval': 0, 'interval_std': 0, 'num_parts': 1,
                'num_chords': 0, 'num_rests': 0, 'rest_ratio': 0,
                'num_measures': 1, 'avg_notes_per_measure': 0,
                'instruments': ['Piano'], 'chord_types': []
            }

    @staticmethod
    def extract_musical_features_parallel(midi_paths, n_jobs=None):
        if n_jobs is None:
            n_jobs = OPTIMAL_WORKERS

        total_files = len(midi_paths)
        results = [None] * total_files

        # Create a progress bar
        pbar = tqdm(total=total_files, desc="Extracting musical features")

        def worker(idx, midi_path):
            try:
                features = MidiProcessor.extract_musical_features(midi_path)
                return idx, features
            except Exception as e:
                print(f"\nError processing {midi_path}: {e}")
                return idx, MidiProcessor.extract_musical_features(None)  # Return empty features
            finally:
                # Update progress bar
                pbar.update(1)

        try:
            with ThreadPoolExecutor(max_workers=n_jobs) as executor:
                # Submit all tasks
                futures = [executor.submit(worker, idx, path) for idx, path in enumerate(midi_paths)]

                # Process results as they complete
                for future in as_completed(futures):
                    try:
                        idx, features = future.result()
                        results[idx] = features
                    except Exception as e:
                        print(f"\nError in feature extraction worker: {e}")
        finally:
            # Make sure to close the progress bar
            pbar.close()

        # Report completion stats
        successful = sum(1 for r in results if r is not None and r.get('total_notes', 0) > 0)
        print(f"\nFeature extraction complete: {successful}/{total_files} files processed successfully")

        return results

    @staticmethod
    def create_note_sequences(notes_list, sequence_length=100):
        print("Building vocabulary...")

        # Vocabulary building
        all_notes = []
        for notes in notes_list:
            if notes:  # Skip empty note lists
                all_notes.extend(notes[:1000])  # Limit notes per file for speed

        unique_notes = sorted(set(all_notes))
        note_to_int = {note: i for i, note in enumerate(unique_notes)}

        print(f"Vocabulary size: {len(unique_notes)}")
        print("Creating sequences...")

        # Create sequences with memory optimization
        sequences = []
        batch_size = 1000

        for i in range(0, len(notes_list), batch_size):
            batch_notes = notes_list[i:i + batch_size]
            batch_sequences = []

            for notes in batch_notes:
                if len(notes) >= sequence_length:
                    # Convert notes to integers (with error handling)
                    note_ints = []
                    for note in notes:
                        if note in note_to_int:
                            note_ints.append(note_to_int[note])

                    # Create sequences from this file
                    for j in range(len(note_ints) - sequence_length + 1):
                        seq = note_ints[j:j + sequence_length]
                        if len(seq) == sequence_length:  # Ensure correct length
                            batch_sequences.append(seq)

            sequences.extend(batch_sequences)

            # Progress update
            if i % (batch_size * 10) == 0:
                print(f"Processed {min(i + batch_size, len(notes_list))}/{len(notes_list)} files, "
                      f"created {len(sequences)} sequences")

            # Memory cleanup
            if len(sequences) > 50000:  # Prevent memory overflow
                break

        print(f"Total sequences created: {len(sequences)}")
        return np.array(sequences), note_to_int, unique_notes

    @staticmethod
    def preprocess_midi_files(file_df, min_duration=5.0, min_key_confidence=0.7, verbose=True, n_jobs=None):
        if n_jobs is None:
            n_jobs = OPTIMAL_WORKERS

        processed_files = []
        seen_hashes = set()
        total = len(file_df)
        stats = {
            'total': total,
            'corrupted': 0,
            'zero_length': 0,
            'duplicates': 0,
            'too_short': 0,
            'ambiguous_key': 0,
            'kept': 0
        }

        def process_file(row_data):
            idx, row = row_data
            path = row['file_path']
            try:
                # Quick file validation
                if not os.path.exists(path) or os.path.getsize(path) == 0:
                    return ('zero_length', None)

                # File hash for deduplication
                with open(path, 'rb') as f:
                    file_hash = hashlib.md5(f.read()).hexdigest()

                # Basic MIDI parsing
                score = converter.parse(path)
                duration = score.duration.seconds if score.duration else 0

                if duration < min_duration:
                    return ('too_short', None)

                # Key confidence check (if needed)
                if min_key_confidence > 0:
                    try:
                        key_obj = score.analyze('key')
                        if hasattr(key_obj, 'correlationCoefficient'):
                            confidence = abs(key_obj.correlationCoefficient)
                        else:
                            confidence = 1.0

                        if confidence < min_key_confidence:
                            return ('ambiguous_key', None)
                    except:
                        return ('ambiguous_key', None)

                return ('kept', (file_hash, row))

            except Exception as e:
                return ('corrupted', None)

        completed = 0
        with ThreadPoolExecutor(max_workers=n_jobs) as executor:
            row_data = [(idx, row) for idx, row in file_df.iterrows()]
            futures = [executor.submit(process_file, data) for data in row_data]

            for future in as_completed(futures):
                result, data = future.result()
                completed += 1

                if verbose and completed % 50 == 0:
                    percent = (completed / total) * 100
                    print(f"\rProcessing: {completed}/{total} ({percent:.1f}%)", end='', flush=True)

                if result == 'kept' and data:
                    file_hash, row = data
                    if file_hash not in seen_hashes:
                        seen_hashes.add(file_hash)
                        processed_files.append(row)
                        stats['kept'] += 1
                    else:
                        stats['duplicates'] += 1
                else:
                    stats[result] += 1

        if verbose:
            print()  # New line after progress
            print("\nPreprocessing Results:")
            for key, value in stats.items():
                print(f"{key.replace('_', ' ').title()}: {value}")

        return pd.DataFrame(processed_files)

The MidiProcessor above addresses the challenges of preparing a large, multi-class MIDI dataset.

Manual data cleaning for such datasets can be slow and prone to error, especially when handling files from different sources with varying lengths, complexities, and numerous features. By automating our MIDI dataset scanning, parallelized note and feature extraction, sequence creation, and filtering, this pipeline provides us with quality input data that captures some key musical attributes necessary for training LSTM and CNN models.

The next preprocessing step initializes the MidiProcessor and prepares the dataset for deep learning–based composer classification. The pipeline:

  1. Initialization & Metadata Loading

    • Creates a MidiProcessor instance and loads all MIDI file metadata into a Pandas DataFrame from the specified dataset directory.
  2. Filtering Target Composers

    • Restricts the dataset to a predefined set of composers (TARGET_COMPOSERS) and removes any files with missing or invalid paths.
  3. Dataset Overview

    • Counts and prints the total number of MIDI files before and after filtering to track dataset size and integrity.
  4. Data Visualization

    • Generates two plots for quick dataset inspection:
      • Bar Chart: Displays the number of valid MIDI files for each target composer, with counts labeled on the bars.
      • Pie Chart: Shows the proportion of MIDI files per composer, giving a visual breakdown of class distribution.

Justification:
These steps ensure that the dataset used for training is both relevant (only target composers) and valid (no broken paths or duplicates). The visualizations provide an immediate overview of data balance, which is crucial for understanding potential class imbalance issues before model training.

In [ ]:
# Initialize MidiProcessor
midi_processor = MidiProcessor()

# Load MIDI file metadata into a DataFrame
all_midi_df = midi_processor.load_midi_files(MUSIC_DATA_DIRECTORY)

# Define colors dictionary for visualization
colors = dict(zip(TARGET_COMPOSERS, sns.color_palette(env_config.color_pallete, len(TARGET_COMPOSERS))))

print(f"Total MIDI files found: {len(all_midi_df)}")

# Filter for required composers only
midi_df = all_midi_df[all_midi_df['composer'].isin(TARGET_COMPOSERS)].reset_index(drop=True)

print(f"Filtered MIDI files for target composers ({len(TARGET_COMPOSERS)}): {len(midi_df)}")

# Remove files with missing or invalid paths
midi_df = midi_df[midi_df['file_path'].apply(os.path.exists)].reset_index(drop=True)

# Preprocess MIDI files (clean, deduplicate, filter)
# midi_df = midi_processor.preprocess_midi_files(midi_df)

print(f"Valid MIDI files after preprocessing: {len(midi_df)}")

# Count files for required composers
required_composer_counts = midi_df['composer'].value_counts()

# Close any existing plots to start fresh
plt.close('all')
fig, axs = plt.subplots(1, 2, figsize=(12, 4))

# Plot 1: Bar chart of target composers
sns.barplot(
    x=required_composer_counts.index,
    y=required_composer_counts.values,
    palette=env_config.color_pallete,
    ax=axs[0]
)
axs[0].set_title('MIDI Files per Composer', fontsize=14)
axs[0].set_xlabel('Composer', fontsize=12)
axs[0].set_ylabel('Number of Files', fontsize=12)

# Add count numbers on top of the bars
for i, count in enumerate(required_composer_counts.values):
    percentage = 100 * count / required_composer_counts.sum()
    axs[0].text(i, count + 5, f"{count}", ha='center')

# Plot 2: Pie chart showing proportion of composers
axs[1].pie(
    required_composer_counts.values,
    labels=required_composer_counts.index,
    autopct='%1.1f%%',
    startangle=90,
    colors=[colors[composer] for composer in required_composer_counts.index],
    shadow=True
)
axs[1].set_title('Proportion of MIDI Files by Composer', fontsize=14)
axs[1].axis('equal')

plt.tight_layout()
plt.show()
Loaded 1643 MIDI files from 4 composers in 0.02s
Total MIDI files found: 1643
Filtered MIDI files for target composers (4): 1643
Valid MIDI files after preprocessing: 1643
No description has been provided for this image
In [ ]:
# To handle the data imbalance, we can sample a fixed number of files per composer
if SAMPLING == True:
    midi_df = midi_df.groupby('composer').head(SAMPLING_FILES_PER_COMPOSER).reset_index(drop=True)
    print(f"Reduced MIDI files to {SAMPLING_FILES_PER_COMPOSER} per composer: Total Files: {len(midi_df)}")
else:
    print("Sampling is disabled, using all available MIDI files.")
Sampling is disabled, using all available MIDI files.

Dataset Summary and Class Distribution Analysis

The dataset contains 1,640 MIDI files across 4 target composers (Bach, Mozart, Beethoven, and Chopin). All files were successfully loaded, filtered for the selected composers, and passed preprocessing without removals, indicating good overall data integrity.

Key Findings from our Visualizations:

  1. Bar Chart – MIDI Files per Composer

    • Bach has the largest number of files (1,024), followed by Mozart (257), Beethoven (220), and Chopin (136).
    • This shows a significant difference in data availability among composers.
  2. Pie Chart – Proportion of MIDI Files by Composer

    • Bach's works make up 62.6% of the dataset.
    • Mozart accounts for 15.7%, Beethoven 13.4%, and Chopin only 8.3%.

Data Imbalance Considerations:
The dataset is heavily imbalanced, with Bach dominating the sample size. Such imbalance can bias the deep learning model toward predicting Bach more often, potentially reducing classification accuracy for underrepresented composers like Chopin. This imbalance will need to be addressed during preprocessing or model training possible strategies include data augmentation, class weighting, or sampling techniques to create a more balanced input distribution.

5. Feature Extraction: ⇧¶


5.1 HarmonyAnalyzer:¶

HarmonyAnalyzer – Purpose and Role in Feature Extraction

The HarmonyAnalyzer is a specialized preprocessing tool we designed to extract high-level harmonic and tonal features from MIDI data before feeding it into the LSTM and CNN composer classification model. These features go beyond note or pitch features by working to quantify how composers use harmony, tonality, and modulation in their works—key stylistic markers for distinguishing between composers. We felt this provided an edge to our model using special domain knowledge.

Note: We also provide in-line code commenting to help detail each harmonic feature and it's musical importance.

Why These Analyses Are Needed¶

Traditional statistical features (e.g., average pitch, tempo, note durations) are important, however some deeper stylistic patterns that define a composer’s “musical fingerprint” seemed pertinent to include.

Classical composers differ greatly in harmonic language, use of consonance/dissonance, modulations, and cadences. Capturing these traits creates a deeper musical theory based feature set, with an aim to improving model performance for composer prediction down the pipeline.

Main Components in Harmony Analyzer Class¶

  1. Chord and Progression Analysis (analyze_chords_and_progressions)

    • Extracts total and unique chord counts.
    • Calculates chord quality ratios (major, minor, diminished, augmented).
    • Detects occurrences of common classical progressions using 'old school' roman numeral analysis(e.g., I–V–vi–IV).
    • Measures chord inversion usage and root position frequency.
    • Why: Different composers favor specific chord types and progressions, which can be strong stylistic indicators.
  2. Dissonance Analysis (analyze_dissonance)

    • Quantifies average, maximum, and variance of dissonance in sliding note windows.
    • Measures tritone frequency (a historically important dissonance).
    • Why: Composers vary in their tolerance for tension—Baroque composers may use consonance more often, while Romantic composers may use richer dissonances.
  3. Tonality Analysis (analyze_tonality)

    • Calculates how often the tonic note/chord appears in melody and bass.
    • Tracks cadential tonic usage in phrase endings.
    • Why: Staying “close to home” harmonically vs. frequent departures is a defining compositional choice.
  4. Key Modulation Analysis (analyze_key_modulations)

    • Detects key changes across the score using multiple key-finding methods.
    • Calculates modulation rate, key diversity, stability, and distances between keys on the circle of fifths.
    • Why: Some composers (e.g., Beethoven) modulate frequently to explore harmonic space, while others remain harmonically stable.
  5. Cadence Analysis (analyze_cadences)

    • Detects authentic (V–I), plagal (IV–I), and deceptive (V–vi) cadences.
    • Calculates cadence type ratios and total cadences.
    • Why: Cadence preferences can strongly signal the harmonic era and composer identity.

Value for Model Preparation¶

By combining these advanced harmonic features with the statistical ones, the dataset gains musically meaningful, high-level descriptors that directly reflect compositional style. This gives the deep learning model access to nuanced harmonic behavior, improving classification accuracy and interpretability.

In [ ]:
class HarmonyAnalyzer:
    """Analyzer for harmony-related features."""

    def __init__(self, np_module):
        self.np = np_module
        self.has_gpu = np_module.__name__ == 'cupy'

        # Perfect/major/minor consonances
        self.consonant_intervals = {0, 3, 4, 5, 7, 8, 9, 12}

        # The More dissonant intervals
        self.dissonant_intervals = {1, 2, 6, 10, 11}

        # Common classical chord progressions
        self.common_progressions = {
            'I-V-vi-IV': [(1, 5, 6, 4)],
            'vi-IV-I-V': [(6, 4, 1, 5)],
            'I-vi-IV-V': [(1, 6, 4, 5)],
            'ii-V-I': [(2, 5, 1)],
            'I-IV-V-I': [(1, 4, 5, 1)],
            'vi-ii-V-I': [(6, 2, 5, 1)]
        }

        # Circle of fifths for key relationships
        self.circle_of_fifths = {
            'C': 0, 'G': 1, 'D': 2, 'A': 3, 'E': 4, 'B': 5, 'F#': 6, 'Gb': 6,
            'Db': 5, 'Ab': 4, 'Eb': 3, 'Bb': 2, 'F': 1
        }

        # Relative major/minor relationships
        self.relative_keys = {
            ('C', 'major'): ('A', 'minor'),
            ('G', 'major'): ('E', 'minor'),
            ('D', 'major'): ('B', 'minor'),
            ('A', 'major'): ('F#', 'minor'),
            ('E', 'major'): ('C#', 'minor'),
            ('B', 'major'): ('G#', 'minor'),
            ('F#', 'major'): ('D#', 'minor'),
            ('Db', 'major'): ('Bb', 'minor'),
            ('Ab', 'major'): ('F', 'minor'),
            ('Eb', 'major'): ('C', 'minor'),
            ('Bb', 'major'): ('G', 'minor'),
            ('F', 'major'): ('D', 'minor'),
        }

        # Add reverse mappings
        for maj, min in list(self.relative_keys.items()):
            self.relative_keys[min] = maj

    def analyze_chords_and_progressions(self, score, window_size, overlap, shared_data):
        """Analyze chord progressions patterns.

            We should embrace the "old school" musical analysis process here.
            Roman rumerals are traditionally used in musical analysis as a means
            to evaluate how composers go from one idea to the next, or one
            feeling to the next via their chosen chord progressions. How often
            composers use different chord qualities, such as major/minor/
            or augmented/diminished defines their stylistic tendencies and
            compositional choices over time. Chord inversions are related
            indicators, as they inform how each note in a harmonic chord is
            stacked, representing key variations of musical harmononic analysis.
        """
        features = {}

        try:
            # Use shared data
            chords = shared_data.get('chords', [])
            key_signature = shared_data.get('key_signature')

            if not chords or not key_signature:
                return self.empty_harmony_features()

            features['total_chords'] = len(chords)

            # Use batch processing for better performance
            batch_size = min(100, max(1, len(chords) // 2))

            # Pre-compute pitchedCommonName for optimization
            pitchedCommonNames = [c.pitchedCommonName for c in chords
                                 if hasattr(c, 'pitchedCommonName')]
            features['unique_chords'] = len(set(pitchedCommonNames))

            # Process chords in batches using thread pool
            chord_qualities, roman_numerals, inversions = self._process_chord_batches(
                chords, key_signature, batch_size)

            # Calculate chord type ratios
            quality_counts = Counter(chord_qualities)
            chord_count = len(chords) if chords else 1  # Avoid division by zero

            features['major_chord_ratio'] = quality_counts.get('major', 0) / chord_count
            features['minor_chord_ratio'] = quality_counts.get('minor', 0) / chord_count
            features['diminished_chord_ratio'] = quality_counts.get('diminished', 0) / chord_count
            features['augmented_chord_ratio'] = quality_counts.get('augmented', 0) / chord_count

            # Chord progression analysis
            if len(roman_numerals) >= 4:
                progression_counts = self._find_chord_progressions(roman_numerals)
                features.update(progression_counts)
            else:
                for prog_name in self.common_progressions:
                    features[f'progression_{prog_name.lower().replace("-", "_")}_count'] = 0

            # Chord inversion analysis
            if inversions:
                inv_array = self.np.array(inversions, dtype=float)
                features['avg_chord_inversion'] = float(self.np.mean(inv_array))
                features['root_position_ratio'] = inversions.count(0) / len(inversions)
            else:
                features['avg_chord_inversion'] = 0
                features['root_position_ratio'] = 0

        except Exception:
            features = self.empty_harmony_features()

        return features

    def _process_chord_batches(self, chords, key_signature, batch_size):
        """Process chords in batches for better performance."""
        def process_chord_batch(chord_batch):
            qualities = []
            rns = []
            invs = []

            for c in chord_batch:
                try:
                    qualities.append(c.quality)
                    rns.append(roman.romanNumeralFromChord(c, key_signature).figure)
                    invs.append(c.inversion())
                except:
                    qualities.append('unknown')
                    rns.append('Unknown')
                    invs.append(0)

            return qualities, rns, invs

        # Split chords into batches
        chord_batches = [chords[i:i+batch_size] for i in range(0, len(chords), batch_size)]

        # Process batches in parallel
        chord_qualities = []
        roman_numerals = []
        inversions = []

        with ThreadPoolExecutor() as executor:
            batch_results = list(executor.map(process_chord_batch, chord_batches))

        # Combine results
        for qualities, rns, invs in batch_results:
            chord_qualities.extend(qualities)
            roman_numerals.extend(rns)
            inversions.extend(invs)

        return chord_qualities, roman_numerals, inversions

    def _find_chord_progressions(self, roman_numerals):
        """Find common chord progression patterns."""
        progression_counts = {}

        # Use a lookup table for faster scale degree extraction
        scale_degree_map = {
            'I': 1, 'II': 2, 'III': 3, 'IV': 4, 'V': 5, 'VI': 6, 'VII': 7,
            'i': 1, 'ii': 2, 'iii': 3, 'iv': 4, 'v': 5, 'vi': 6, 'vii': 7
        }

        # Pre-process roman numerals to scale degrees efficiently
        scale_degrees = []
        for rn in roman_numerals:
            # Extract first 1-2 characters for faster lookup
            rn_start = rn[:2] if len(rn) > 1 else rn[:1]
            rn_start = rn_start.upper()  # Normalize case

            # Fast lookup
            if rn_start in scale_degree_map:
                scale_degrees.append(scale_degree_map[rn_start])
            elif rn_start[0] in scale_degree_map:
                scale_degrees.append(scale_degree_map[rn_start[0]])
            else:
                scale_degrees.append(0)  # Unknown

        # Convert to numpy array for faster pattern matching
        scale_degrees_array = self.np.array(scale_degrees)

        # Process each progression pattern efficiently
        for prog_name, patterns in self.common_progressions.items():
            count = 0

            for pattern in patterns:
                pattern_length = len(pattern)
                if len(scale_degrees) >= pattern_length:
                    # Use vectorized operations for pattern matching when possible
                    if self.has_gpu and len(scale_degrees) > 1000:
                        # GPU-optimized pattern matching
                        pattern_array = self.np.array(pattern)
                        matches = 0

                        # Sliding window implementation optimized for GPU
                        for i in range(len(scale_degrees) - pattern_length + 1):
                            window = scale_degrees_array[i:i+pattern_length]
                            if self.np.array_equal(window, pattern_array):
                                matches += 1

                        count += matches
                    else:
                        # CPU version with optimized lookups
                        pattern_tuple = tuple(pattern)

                        # Use sliding window approach
                        for i in range(len(scale_degrees) - pattern_length + 1):
                            if tuple(scale_degrees[i:i+pattern_length]) == pattern_tuple:
                                count += 1

            progression_counts[f'progression_{prog_name.lower().replace("-", "_")}_count'] = count

        return progression_counts

    def analyze_dissonance(self, score, window_size, overlap, shared_data):
        """Analyze dissonance measurements.

            Dissonance in music theory is a measurement of perceptual clashing
            or tension between either 2 or more notes, whether they are in
            a harmonic setting (i.e. in a triad), or melodically over time.
            It's a perceptual concept, as one person might find something more
            dissonant than another (experiemental jazz vs. orderly Mozart), yet
            even though it's perceptual we have some preconceived notions of
            note relationships coresponding to tonal expectations. More dissonance
            equates to less harmonicity, and vice versa-- a great measurement
            for our classification purposes.

        """
        features = {}

        try:
            # Use shared data
            notes = shared_data.get('notes', [])

            # Early return for empty notes
            if not notes:
                return self.empty_dissonance_features()

            # Extract all pitches
            all_pitches = []
            for note in notes:
                if hasattr(note, 'pitch'):
                    all_pitches.append(note.pitch.midi)

            if not all_pitches:
                return self.empty_dissonance_features()

            # Convert to numpy array for faster processing
            all_pitches_array = self.np.array(all_pitches)

            # Create sliding windows of notes
            window_size_notes = 10  # Fixed size for consistent analysis
            dissonance_samples = self._analyze_dissonance_windows(all_pitches_array, window_size_notes)

            # Calculate statistics
            if dissonance_samples:
                dissonance_array = self.np.array(dissonance_samples)
                features['avg_dissonance'] = float(self.np.mean(dissonance_array))
                features['max_dissonance'] = float(self.np.max(dissonance_array))
                features['dissonance_variance'] = float(self.np.var(dissonance_array))
            else:
                features['avg_dissonance'] = 0
                features['max_dissonance'] = 0
                features['dissonance_variance'] = 0

            # Tritone analysis
            if len(all_pitches_array) > 1:
                # Compute consecutive intervals
                intervals = self.np.abs(self.np.diff(all_pitches_array)) % 12
                tritone_count = self.np.sum(intervals == 6)
                features['tritone_frequency'] = float(tritone_count) / len(notes)
            else:
                features['tritone_frequency'] = 0

        except Exception:
            features = self.empty_dissonance_features()

        return features

    def _analyze_dissonance_windows(self, all_pitches_array, window_size_notes):
        # Optimize for large datasets with adaptive chunking
        num_chunks = min(os.cpu_count() or 4, 8)
        chunk_size = max(10, len(all_pitches_array) // num_chunks)

        # Process sliding windows more efficiently
        def analyze_windows_batch(start_idx, end_idx):
            batch_results = []
            for i in range(start_idx, min(end_idx, len(all_pitches_array) - window_size_notes + 1), 3):
                window = all_pitches_array[i:i+window_size_notes]
                if len(window) < 2:
                    continue

                # Create all pairwise combinations efficiently
                if self.has_gpu:
                    # GPU-optimized version
                    pitches_col = window.reshape(-1, 1)
                    pitches_row = window.reshape(1, -1)
                    interval_matrix = self.np.abs(pitches_col - pitches_row) % 12
                    # Get upper triangle of matrix excluding diagonal
                    intervals = interval_matrix[self.np.triu_indices_from(interval_matrix, k=1)]

                    # Count dissonant intervals
                    dissonant_count = 0
                    for interval in intervals:
                        if int(interval) in self.dissonant_intervals:
                            dissonant_count += 1

                    if len(intervals) > 0:
                        batch_results.append(dissonant_count / len(intervals))
                else:
                    # CPU version
                    window_dissonance = 0
                    note_pairs = 0

                    for j in range(len(window)):
                        for k in range(j+1, len(window)):
                            interval_semitones = abs(window[j] - window[k]) % 12
                            if interval_semitones in self.dissonant_intervals:
                                window_dissonance += 1
                            note_pairs += 1

                    if note_pairs > 0:
                        batch_results.append(window_dissonance / note_pairs)

            return batch_results

        # Divide work into chunks
        chunk_starts = list(range(0, len(all_pitches_array), chunk_size))
        chunk_ends = [min(start + chunk_size, len(all_pitches_array)) for start in chunk_starts]

        # Process chunks in parallel
        all_results = []
        with ThreadPoolExecutor() as executor:
            all_results = list(executor.map(analyze_windows_batch, chunk_starts, chunk_ends))

        # Flatten results
        dissonance_samples = []
        for result in all_results:
            dissonance_samples.extend(result)

        return dissonance_samples

    def analyze_tonality(self, score, window_size, overlap, shared_data):
        """Analyze how often the tonic is reached.

            This is important as it shows how composers stay close to 'home' key
            vs. how often they drift into interludes or other musical structures.
            A composer like Mozart more often feels safe in the tonic,
            whereas Chopin might drift away more often to add a more drifting
            sensation to the composition.

        """

        features = {}

        try:
            key_signature = shared_data.get('key_signature')
            notes = shared_data.get('notes', [])
            chords = shared_data.get('chords', [])

            if not key_signature or not notes:
                return self.empty_tonality_features()

            tonic_pitch = key_signature.tonic
            tonic_name = tonic_pitch.name

            # Process notes to count tonic occurrences
            tonic_hits, bass_tonic_hits = self._count_tonic_notes(notes, tonic_name)

            # Process chords to count tonic chords
            tonic_chord_count = self._count_tonic_chords(chords, tonic_name)

            # Calculate ratios
            total_notes = len(notes) if notes else 1  # Avoid division by zero
            features['tonic_frequency'] = tonic_hits / total_notes
            features['bass_tonic_frequency'] = bass_tonic_hits / total_notes
            features['tonic_chord_ratio'] = tonic_chord_count / len(chords) if chords else 0

            # Process phrase endings
            phrase_endings = self._identify_phrase_endings(score)
            cadential_tonic = 0

            for ending_chord in phrase_endings:
                try:
                    if ending_chord.root().name == tonic_name:
                        cadential_tonic += 1
                except:
                    continue

            features['cadential_tonic_ratio'] = cadential_tonic / len(phrase_endings) if phrase_endings else 0

        except Exception:
            features = self.empty_tonality_features()

        return features

    def _count_tonic_notes(self, notes, tonic_name):
        # Process notes in parallel using batches
        def process_notes_batch(note_batch):
            tonic_count = 0
            bass_tonic_count = 0

            for note in note_batch:
                try:
                    if hasattr(note, 'pitch') and note.pitch.name == tonic_name:
                        tonic_count += 1
                    elif hasattr(note, 'bass') and note.bass().name == tonic_name:
                        bass_tonic_count += 1
                except:
                    continue

            return tonic_count, bass_tonic_count

        # Split into batches
        batch_size = 100
        note_batches = [notes[i:i+batch_size] for i in range(0, len(notes), batch_size)]

        # Process batches in parallel
        tonic_hits = 0
        bass_tonic_hits = 0

        with ThreadPoolExecutor() as executor:
            # Process notes
            note_results = list(executor.map(process_notes_batch, note_batches))
            for t_count, bt_count in note_results:
                tonic_hits += t_count
                bass_tonic_hits += bt_count

        return tonic_hits, bass_tonic_hits

    def _count_tonic_chords(self, chords, tonic_name):
        # Process chords in parallel
        def process_chords_batch(chord_batch):
            tonic_chord_count = 0

            for c in chord_batch:
                try:
                    if c.root().name == tonic_name:
                        tonic_chord_count += 1
                except:
                    continue

            return tonic_chord_count

        # Split into batches
        batch_size = 100
        chord_batches = [chords[i:i+batch_size] for i in range(0, len(chords), batch_size)]

        # Process batches in parallel
        tonic_chord_count = 0

        with ThreadPoolExecutor() as executor:
            chord_results = list(executor.map(process_chords_batch, chord_batches))
            tonic_chord_count = sum(chord_results)

        return tonic_chord_count

    def analyze_key_modulations(self, score, window_size, overlap, shared_data):
        """Count key modulations using windowed key analysis.

            Modulation is when we move from key center to key center, i.e. from
            C Major to Ab Minor (a harmonically unrelated key signature).
            Composers in classical music tend to move from key to key when they
            want to diverge from a mood or a tonal center into a new avenue of
            expressions. This is a feature more common in certain composers,
            and a valuable addition to measure from our MIDI files.
        """

        features = {}

        try:
            duration = shared_data.get('total_duration', 0) if shared_data else score.duration.quarterLength

            if duration == 0 or duration < window_size:
                return self.empty_modulation_features()

            # Get note events for analysis
            note_events = self._extract_note_events(score)
            if len(note_events) < 10:  # Need minimum notes for analysis
                return self.empty_modulation_features()

            # Analyze keys using multiple methods
            window_keys = self._analyze_keys_robust(score, note_events, window_size, overlap, duration)

            if len(window_keys) < 2:
                return self.empty_modulation_features()

            # Filter and smooth key sequence
            smoothed_keys = self._smooth_key_sequence(window_keys)

            # Detect meaningful modulations
            modulations = self._detect_meaningful_modulations(smoothed_keys)

            # Calculate features
            features = self._calculate_modulation_features(modulations, smoothed_keys, duration)

        except Exception as e:
            print(f"Error in key modulation analysis: {e}")
            features = self.empty_modulation_features()

        return features

    def _extract_note_events(self, score):
        """Extract note events with timing information"""
        note_events = []

        for element in score.recurse():
            if hasattr(element, 'pitch') and hasattr(element, 'offset'):
                note_events.append({
                    'pitch': element.pitch.midi,
                    'pitch_class': element.pitch.pitchClass,
                    'offset': float(element.offset),
                    'duration': float(element.duration.quarterLength)
                })

        # Sort by onset time
        note_events.sort(key=lambda x: x['offset'])
        return note_events

    def _analyze_keys_robust(self, score, note_events, window_size, overlap, total_duration):
        """
        Robust key analysis using multiple methods and validation
        """
        step_size = window_size * (1 - overlap)
        window_keys = []

        current_offset = 0
        while current_offset < total_duration - window_size/2:
            window_end = current_offset + window_size

            # Get notes in current window
            window_notes = [
                note for note in note_events
                if current_offset <= note['offset'] < window_end
            ]

            if len(window_notes) < 3:  # Need minimum notes
                current_offset += step_size
                continue

            # Method 1: Krumhansl-Schmuckler key-finding
            key1 = self._krumhansl_schmuckler_key(window_notes)

            # Method 2: Chord-based key finding
            key2 = self._chord_based_key_finding(score, current_offset, window_end)

            # Method 3: Simple pitch class distribution
            key3 = self._pitch_class_key_finding(window_notes)

            # Consensus or best guess
            final_key = self._consensus_key([key1, key2, key3])

            if final_key:
                window_keys.append({
                    'key': final_key,
                    'offset': current_offset,
                    'confidence': self._calculate_key_confidence(window_notes, final_key)
                })

            current_offset += step_size

        return window_keys

    def _krumhansl_schmuckler_key(self, window_notes):
        """Krumhansl-Schmuckler key-finding algorithm"""
        if not window_notes:
            return None

        # Krumhansl-Schmuckler profiles
        major_profile = [6.35, 2.23, 3.48, 2.33, 4.38, 4.09, 2.52, 5.19, 2.39, 3.66, 2.29, 2.88]
        minor_profile = [6.33, 2.68, 3.52, 5.38, 2.60, 3.53, 2.54, 4.75, 3.98, 2.69, 3.34, 3.17]

        # Calculate pitch class distribution
        pc_distribution = [0] * 12
        for note in window_notes:
            pc_distribution[note['pitch_class']] += note['duration']

        # Normalize
        total = sum(pc_distribution)
        if total == 0:
            return None
        pc_distribution = [x/total for x in pc_distribution]

        # Find best key
        best_correlation = -1
        best_key = None

        for tonic in range(12):
            # Major key correlation
            major_corr = self._correlation(
                pc_distribution,
                major_profile[tonic:] + major_profile[:tonic]
            )

            # Minor key correlation
            minor_corr = self._correlation(
                pc_distribution,
                minor_profile[tonic:] + minor_profile[:tonic]
            )

            if major_corr > best_correlation:
                best_correlation = major_corr
                best_key = (self._pc_to_name(tonic), 'major')

            if minor_corr > best_correlation:
                best_correlation = minor_corr
                best_key = (self._pc_to_name(tonic), 'minor')

        return best_key if best_correlation > 0.6 else None

    def _chord_based_key_finding(self, score, start_offset, end_offset):
        """Find key based on chord progressions in the window"""
        try:
            # Extract chords from the window
            window_stream = score.measures(
                numberStart=int(start_offset/4) + 1,
                numberEnd=int(end_offset/4) + 1
            )

            # Analyze chords
            chords = window_stream.chordify()
            chord_symbols = []

            for chord in chords.recurse().getElementsByClass('Chord'):
                if len(chord.pitches) >= 2:
                    chord_symbols.append(chord.root().pitchClass)

            if not chord_symbols:
                return None

            # Simple chord-to-key mapping
            chord_counts = Counter(chord_symbols)
            most_common_root = chord_counts.most_common(1)[0][0]

            # Heuristic: assume major key based on most common chord root
            return (self._pc_to_name(most_common_root), 'major')

        except:
            return None

    def _pitch_class_key_finding(self, window_notes):
        """Simple pitch class distribution key finding"""
        if not window_notes:
            return None

        pc_counts = Counter(note['pitch_class'] for note in window_notes)

        if not pc_counts:
            return None

        # Find most common pitch class
        tonic_pc = pc_counts.most_common(1)[0][0]

        # Simple heuristic: check if it's more likely major or minor
        # Look for characteristic intervals
        total_notes = len(window_notes)
        third_major = (tonic_pc + 4) % 12
        third_minor = (tonic_pc + 3) % 12

        major_evidence = pc_counts.get(third_major, 0) / total_notes
        minor_evidence = pc_counts.get(third_minor, 0) / total_notes

        mode = 'major' if major_evidence >= minor_evidence else 'minor'

        return (self._pc_to_name(tonic_pc), mode)

    def _consensus_key(self, key_candidates):
        """Get consensus from multiple key-finding methods"""
        valid_keys = [k for k in key_candidates if k is not None]

        if not valid_keys:
            return None

        if len(valid_keys) == 1:
            return valid_keys[0]

        # Count votes
        key_votes = Counter(valid_keys)

        # If there's a clear winner, use it
        most_common = key_votes.most_common(1)[0]
        if most_common[1] > 1:
            return most_common[0]

        # Otherwise, prefer Krumhansl-Schmuckler result (first in list)
        return valid_keys[0]

    def _smooth_key_sequence(self, window_keys, min_duration=2):
        """Remove brief key changes that are likely analysis errors"""
        if len(window_keys) <= 2:
            return window_keys

        smoothed = []
        i = 0

        while i < len(window_keys):
            current_key = window_keys[i]['key']

            # Look ahead to see how long this key lasts
            duration = 1
            j = i + 1

            while j < len(window_keys) and window_keys[j]['key'] == current_key:
                duration += 1
                j += 1

            # If this key change is too brief, skip it
            if duration >= min_duration or i == 0 or i == len(window_keys) - 1:
                smoothed.extend(window_keys[i:i+duration])

            i = j if j > i + 1 else i + 1

        return smoothed

    def _detect_meaningful_modulations(self, smoothed_keys):
        """Detect meaningful modulations (not just relative major/minor changes)"""
        if len(smoothed_keys) < 2:
            return []

        modulations = []
        prev_key = smoothed_keys[0]['key']

        for key_info in smoothed_keys[1:]:
            current_key = key_info['key']

            if current_key != prev_key:
                # Check if this is a meaningful modulation
                modulation_type = self._classify_modulation(prev_key, current_key)

                if modulation_type != 'relative':  # Filter out relative major/minor
                    modulations.append({
                        'from_key': prev_key,
                        'to_key': current_key,
                        'type': modulation_type,
                        'offset': key_info['offset'],
                        'distance': self._key_distance(prev_key, current_key)
                    })

            prev_key = current_key

        return modulations

    def _classify_modulation(self, key1, key2):
        """Classify the type of modulation"""
        if key1 == key2:
            return 'none'

        # Check if relative major/minor
        if key1 in self.relative_keys and self.relative_keys[key1] == key2:
            return 'relative'

        # Check circle of fifths distance
        distance = self._key_distance(key1, key2)

        if distance <= 1:
            return 'close'  # Adjacent on circle of fifths
        elif distance <= 2:
            return 'medium'
        else:
            return 'distant'

    def _key_distance(self, key1, key2):
        """Calculate distance between keys on circle of fifths"""
        try:
            pos1 = self.circle_of_fifths.get(key1[0], 0)
            pos2 = self.circle_of_fifths.get(key2[0], 0)

            # Handle wraparound
            distance = min(abs(pos1 - pos2), 12 - abs(pos1 - pos2))

            # Add penalty for major/minor difference
            if key1[1] != key2[1]:
                distance += 0.5

            return distance
        except:
            return 6  # Maximum distance

    def _calculate_modulation_features(self, modulations, smoothed_keys, duration):
        """Calculate comprehensive modulation features"""
        features = {}

        # Basic counts
        features['key_modulation_count'] = len(modulations)
        features['modulation_rate'] = len(modulations) / duration if duration > 0 else 0
        features['avg_modulations_per_minute'] = (len(modulations) / duration) * 60 if duration > 0 else 0

        # Key diversity
        unique_keys = set(key_info['key'] for key_info in smoothed_keys)
        features['unique_keys'] = len(unique_keys)

        # Key stability (percentage of time in most common key)
        if smoothed_keys:
            key_counts = Counter(key_info['key'] for key_info in smoothed_keys)
            most_common_count = key_counts.most_common(1)[0][1]
            features['key_stability'] = most_common_count / len(smoothed_keys)
        else:
            features['key_stability'] = 1.0

        # Modulation characteristics
        if modulations:
            distances = [mod['distance'] for mod in modulations]
            features['avg_modulation_distance'] = np.mean(distances)
            features['max_modulation_distance'] = max(distances)

            # Count modulation types
            types = [mod['type'] for mod in modulations]
            type_counts = Counter(types)
            features['close_modulations'] = type_counts.get('close', 0)
            features['medium_modulations'] = type_counts.get('medium', 0)
            features['distant_modulations'] = type_counts.get('distant', 0)
        else:
            features['avg_modulation_distance'] = 0
            features['max_modulation_distance'] = 0
            features['close_modulations'] = 0
            features['medium_modulations'] = 0
            features['distant_modulations'] = 0

        return features

    # Helper methods
    def _correlation(self, x, y):
        """Calculate Pearson correlation coefficient"""
        if len(x) != len(y):
            return 0

        n = len(x)
        if n == 0:
            return 0

        sum_x = sum(x)
        sum_y = sum(y)
        sum_xy = sum(x[i] * y[i] for i in range(n))
        sum_x2 = sum(x[i] ** 2 for i in range(n))
        sum_y2 = sum(y[i] ** 2 for i in range(n))

        denominator = ((n * sum_x2 - sum_x ** 2) * (n * sum_y2 - sum_y ** 2)) ** 0.5

        if denominator == 0:
            return 0

        return (n * sum_xy - sum_x * sum_y) / denominator

    def _pc_to_name(self, pc):
        """Convert pitch class number to note name"""
        names = ['C', 'C#', 'D', 'Eb', 'E', 'F', 'F#', 'G', 'Ab', 'A', 'Bb', 'B']
        return names[pc % 12]

    def _calculate_key_confidence(self, window_notes, key):
        """Calculate confidence score for a key assignment"""
        if not key or not window_notes:
            return 0.0

        # Simple confidence based on how well the notes fit the key
        tonic_pc = self.circle_of_fifths.get(key[0], 0)

        # Key notes for major/minor
        if key[1] == 'major':
            key_pcs = [(tonic_pc + i) % 12 for i in [0, 2, 4, 5, 7, 9, 11]]
        else:
            key_pcs = [(tonic_pc + i) % 12 for i in [0, 2, 3, 5, 7, 8, 10]]

        in_key = sum(1 for note in window_notes if note['pitch_class'] in key_pcs)
        total = len(window_notes)

        return in_key / total if total > 0 else 0.0

    def empty_modulation_features(self):
        """Return empty/default modulation features"""
        return {
            'key_modulation_count': 0,
            'modulation_rate': 0.0,
            'avg_modulations_per_minute': 0.0,
            'unique_keys': 1,
            'key_stability': 1.0,
            'avg_modulation_distance': 0.0,
            'max_modulation_distance': 0.0,
            'close_modulations': 0,
            'medium_modulations': 0,
            'distant_modulations': 0
        }



    def analyze_cadences(self, score, window_size, overlap, shared_data):
        """Analyze cadential patterns.

        Cadences show how a composer resolves a phrase, or ends an idea.
        In particular eras of classical music, cadences are quite predicatable.
        For example more western religious music styles utilized plagal cadences
        and authentic cadences to give a feeling of finality to a section of
        music. We use three types for this excerise, but there are a whole
        number of others that could be used for further analaysis. We also
        count how many cadences there are as it shows the composer's tenden-
        cies of resolution over time. How often one resolves a phrase should
        measurably correlate to a composer's feeling of arriving at home base.
        """

        features = {}

        try:
            # Use shared data
            chords = shared_data.get('chords', [])
            key_signature = shared_data.get('key_signature')

            if len(chords) < 2 or not key_signature:
                return self.empty_cadence_features()

            # Detect cadences
            authentic, plagal, deceptive = self._detect_cadences(chords, key_signature)

            total_cadences = authentic + plagal + deceptive

            # Calculate ratios
            if total_cadences > 0:
                features['authentic_cadence_ratio'] = authentic / total_cadences
                features['plagal_cadence_ratio'] = plagal / total_cadences
                features['deceptive_cadence_ratio'] = deceptive / total_cadences
            else:
                features['authentic_cadence_ratio'] = 0
                features['plagal_cadence_ratio'] = 0
                features['deceptive_cadence_ratio'] = 0

            features['total_cadences'] = total_cadences

        except Exception:
            features = self.empty_cadence_features()

        return features

    def _detect_cadences(self, chords, key_signature):
        # Optimize cadence detection with batch processing
        batch_size = min(50, max(1, len(chords) // 4))

        def analyze_chord_pairs(start_idx, end_idx):
            authentic = 0
            plagal = 0
            deceptive = 0

            for i in range(start_idx, min(end_idx, len(chords) - 1)):
                try:
                    current_rn = roman.romanNumeralFromChord(chords[i], key_signature)
                    next_rn = roman.romanNumeralFromChord(chords[i+1], key_signature)

                    # V-I (authentic cadence)
                    if current_rn.scaleDegree == 5 and next_rn.scaleDegree == 1:
                        authentic += 1
                    # IV-I (plagal cadence)
                    elif current_rn.scaleDegree == 4 and next_rn.scaleDegree == 1:
                        plagal += 1
                    # V-vi (deceptive cadence)
                    elif current_rn.scaleDegree == 5 and next_rn.scaleDegree == 6:
                        deceptive += 1
                except:
                    continue

            return authentic, plagal, deceptive

        # Define batch boundaries
        batch_starts = list(range(0, len(chords) - 1, batch_size))
        batch_ends = [start + batch_size for start in batch_starts]

        # Process batches in parallel
        authentic_cadences = 0
        plagal_cadences = 0
        deceptive_cadences = 0

        with ThreadPoolExecutor() as executor:
            results = list(executor.map(analyze_chord_pairs, batch_starts, batch_ends))

        # Combine results
        for auth, plag, dec in results:
            authentic_cadences += auth
            plagal_cadences += plag
            deceptive_cadences += dec

        return authentic_cadences, plagal_cadences, deceptive_cadences

    def _identify_phrase_endings(self, score):
        """Identify likely phrase ending points.

            A helper method to see if we can determine where phrases end.
            This could be more comprehensive, but for now let's measure typical
            phrase spanning 4 or 8 bars.

        """
        # Simplified: assume phrase endings at measure boundaries with rests or longer notes
        phrase_endings = []
        try:
            measures = score.parts[0].getElementsByClass('Measure') if score.parts else []

            # Optimize for large measure sets
            if len(measures) > 100:
                # Only check measures at phrase boundaries (every 4th or 8th measure)
                phrase_indices = [i for i in range(len(measures))
                                 if (i + 1) % 4 == 0 or (i + 1) % 8 == 0]

                # Process only relevant measures
                for i in phrase_indices:
                    measure = measures[i]
                    chords_in_measure = measure.getElementsByClass(chord.Chord)
                    if chords_in_measure:
                        phrase_endings.append(chords_in_measure[-1])
            else:
                # For smaller sets, process normally
                for i, measure in enumerate(measures):
                    if (i + 1) % 4 == 0 or (i + 1) % 8 == 0:  # Assume 4 or 8-bar phrases
                        chords_in_measure = measure.getElementsByClass(chord.Chord)
                        if chords_in_measure:
                            phrase_endings.append(chords_in_measure[-1])
        except:
            pass

        return phrase_endings

    def empty_harmony_features(self):
        return {
            'total_chords': 0,
            'unique_chords': 0,
            'major_chord_ratio': 0,
            'minor_chord_ratio': 0,
            'diminished_chord_ratio': 0,
            'augmented_chord_ratio': 0,
            'avg_chord_inversion': 0,
            'root_position_ratio': 0,
            'progression_i_v_vi_iv_count': 0,
            'progression_vi_iv_i_v_count': 0,
            'progression_i_vi_iv_v_count': 0,
            'progression_ii_v_i_count': 0,
            'progression_i_iv_v_i_count': 0,
            'progression_vi_ii_v_i_count': 0
        }

    def empty_dissonance_features(self):
        return {
            'avg_dissonance': 0,
            'max_dissonance': 0,
            'dissonance_variance': 0,
            'tritone_frequency': 0
        }

    def empty_tonality_features(self):
        return {
            'tonic_frequency': 0,
            'bass_tonic_frequency': 0,
            'tonic_chord_ratio': 0,
            'cadential_tonic_ratio': 0
        }

    def empty_modulation_features(self):
        return {
            'key_modulation_count': 0,
            'modulation_rate': 0,
            'unique_keys': 1,
            'key_stability': 1,
            'avg_modulations_per_minute': 0
        }

    def empty_cadence_features(self):
        return {
            'authentic_cadence_ratio': 0,
            'plagal_cadence_ratio': 0,
            'deceptive_cadence_ratio': 0,
            'total_cadences': 0
        }

5.2 RhythmAnalyzer:¶

RhythmAnalyzer – Purpose and Role in Feature Extraction

The RhythmAnalyzer extracts time-based rhythmic features that capture how often notes occur and how quickly harmony changes—two behaviors that strongly differentiate composers. These features complement pitch/harmony features by describing texture, momentum, and phrase pacing, all of which improve downstream composer classification.

Note: We also provide in-line code commenting to help detail each rhythmic feature and it's musical importance.

Rhythmic Components¶

  1. Note Density (analyze_note_density)

    • What it computes: For adaptively sized, overlapping windows across the piece, it calculates notes-per-beat (or per quarterLength) and summarizes with:
      • note_density_avg, note_density_variance, note_density_max, note_density_min
      • Peak metrics: density_peak_count, density_peak_ratio (windows > 1.5× mean)
    • How it’s implemented:
      • Derives window boundaries from score duration (5–30 windows), ensuring stable coverage regardless of piece length.
      • Uses getElementsByOffset for fast window queries and parallelizes window processing.
      • Applies vectorized NumPy statistics for efficiency and consistency.
    • Why it matters:
      • Density reflects texture/complexity (e.g., Bach’s contrapuntal busyness vs. sparser lyricism).
      • Peaks reveal climaxes or virtuosic passages; variance indicates dynamic contrast within a work—useful signals for stylistic fingerprints.
  2. Harmonic Rhythm (analyze_harmonic_rhythm)

    • What it computes: Measures the rate and regularity of chord changes using chord onset offsets:
      • harmonic_rhythm_avg (mean time between chord changes)
      • harmonic_rhythm_variance (consistency of changes)
      • harmonic_rhythm_regularity (= 1 / (1 + variance)) as a normalized stability score
    • How it’s implemented:
      • Extracts chord offsets in one pass, computes consecutive differences with NumPy (np.diff), and summarizes with vectorized stats.
    • Why it matters:
      • Composers and eras differ in pace of harmonic motion (e.g., Classical regularity vs. Romantic flexibility).
      • Regular vs. irregular change patterns inform phrase structure, cadential planning, and emotional pacing, aiding model discrimination.

Value for Model Prep/Train¶

These rhythmic descriptors add orthogonal information to pitch and harmony features, enabling the model to learn not just what harmonies are used, but how quickly and densely musical events happen. The combination improves class separability, reduces over-reliance on pitch-only cues, in an effort to improve our LSTM/CNN training.

In [ ]:
class RhythmAnalyzer:
    def __init__(self, np_module):
        self.np = np_module

    def analyze_note_density(self, score, window_size, overlap, shared_data):
        """Analyze density of notes over time.

            A note density analysis provides an measurement of how many notes
            occur within a given segment of musical time. This is useful in
            understanding the overall sliding scale of simplicitiy and complexity
            of the composition. Some composers might change how 'dense' a
            section or a composition is to express certain emotive qualities,
            a very useful contributer to our feature analysis.
        """
        features = {}

        try:
            # Use shared data
            duration = shared_data.get('total_duration', 0)

            if duration == 0:
                return self.empty_density_features()

            # Use adaptive window sizing for more consistent results
            step_size = window_size * (1 - overlap)

            # Calculate optimal number of windows based on score duration
            num_windows = max(5, min(30, int(duration / step_size)))
            adjusted_step = duration / num_windows

            # Pre-calculate all window boundaries
            window_boundaries = []
            for i in range(num_windows):
                start_offset = i * adjusted_step
                end_offset = min(start_offset + window_size, duration)
                window_boundaries.append((start_offset, end_offset))

            # Analyze windows in parallel
            densities = self._analyze_density_windows(score, window_boundaries)

            if densities:
                # Use numpy for vectorized statistics
                densities_array = self.np.array(densities)

                # Calculate statistics
                features['note_density_avg'] = float(self.np.mean(densities_array))
                features['note_density_variance'] = float(self.np.var(densities_array))
                features['note_density_max'] = float(self.np.max(densities_array))
                features['note_density_min'] = float(self.np.min(densities_array))

                # Find density peaks efficiently using vectorized operations
                mean_density = self.np.mean(densities_array)
                peak_threshold = mean_density * 1.5
                peaks = int(self.np.sum(densities_array > peak_threshold))

                features['density_peak_count'] = peaks
                features['density_peak_ratio'] = peaks / len(densities_array) if len(densities_array) > 0 else 0
            else:
                features.update(self.empty_density_features())

        except Exception:
            features = self.empty_density_features()

        return features

    def _analyze_density_windows(self, score, window_boundaries):
        # Process windows in parallel with efficient offset querying
        def analyze_window(boundary):
            start_offset, end_offset = boundary
            try:
                window_notes = score.flat.notes.getElementsByOffset(
                    start_offset, end_offset, includeEndBoundary=False
                )
                note_count = len(window_notes)
                window_duration = end_offset - start_offset
                if window_duration > 0:
                    return note_count / window_duration
            except:
                pass
            return 0

        # Use ThreadPoolExecutor for parallel processing
        with ThreadPoolExecutor() as executor:
            densities = list(executor.map(analyze_window, window_boundaries))

        # Filter out any None values
        densities = [d for d in densities if d is not None]
        return densities

    def analyze_harmonic_rhythm(self, score, window_size, overlap, shared_data):
        """Analyze the rate of harmonic change.

        Harmonic change is the rate of variance in chordal structures measured
        over time. A composer who moves from chord to chord quickly might not
        only be indicative of tempo, but the harmonic expressiveness of a
        passage of music. For example changing chords often in a slow song
        might indicate a specific feeling of excited progression. Conversely,
        a fast tempo composition with less chord changes indicates a potentially
        more stagnant mood.
        """
        features = {}

        try:
            # Use shared data
            chords = shared_data.get('chords', [])

            if len(chords) < 2:
                return self.empty_rhythm_features()

            # Efficient chord offset extraction using list comprehension
            offsets = [float(c.offset) for c in chords]

            # Convert to numpy array for vectorized operations
            offsets_array = self.np.array(offsets)

            # Calculate differences between consecutive offsets
            chord_changes = self.np.diff(offsets_array)

            if len(chord_changes) > 0:
                # Calculate statistics efficiently using numpy
                avg_change = float(self.np.mean(chord_changes))
                variance = float(self.np.var(chord_changes))

                features['harmonic_rhythm_avg'] = avg_change
                features['harmonic_rhythm_variance'] = variance
                features['harmonic_rhythm_regularity'] = 1 / (1 + variance) if variance != 0 else 0
            else:
                features.update(self.empty_rhythm_features())

        except Exception:
            features = self.empty_rhythm_features()

        return features

    def empty_density_features(self):
        return {
            'note_density_avg': 0,
            'note_density_variance': 0,
            'note_density_max': 0,
            'note_density_min': 0,
            'density_peak_count': 0,
            'density_peak_ratio': 0
        }

    def empty_rhythm_features(self):
        return {
            'harmonic_rhythm_avg': 0,
            'harmonic_rhythm_variance': 0,
            'harmonic_rhythm_regularity': 0
        }

5.3 MelodyAnalyzer:¶

The MelodyAnalyzer captures melodic movement and voice-leading behavior—how lines move from note to note; an essential stylistic cue in classical composition. These descriptors complement harmony and rhythm by quantifying the smoothness vs. angularity of melodic writing, which varies meaningfully across composers and eras.

Note: We also provide in-line code commenting to help detail our melodic features and the musical importance.

Melodic Component¶

  1. Voice Leading (analyze_voice_leading)
    • What it computes: For each musical part (voice), measures how often successive notes move by step (≤ 2 semitones) versus leap (> 2 semitones), then aggregates across parts:
      • step_motion_ratio – proportion of stepwise intervals
      • leap_motion_ratio – proportion of leaping intervals
      • voice_leading_smoothness – alias of step_motion_ratio as a single smoothness indicator
    • How it’s implemented:
      • Pulls parts from shared data, flattens notes per part, converts pitches to MIDI, and uses NumPy diff for vectorized interval calculations.
      • Counts steps/leaps efficiently and sums results across parts; returns stable defaults when parts are missing or too short.
    • Why it matters:
      • Composers differ in melodic contour—e.g., Bach often features stepwise contrapuntal motion, while others may use wider arpeggiated leaps.
      • Smoothness vs. angularity informs expressive character, vocal idiom, and period style, providing discriminative features for composer classification.

Value for Model Prep/Train¶

By encoding the micro-level interval behavior of melodic lines, the MelodyAnalyzer supplies features that are orthogonal to harmony and rhythm, improving class separability and giving LSTM/CNN models richer musical context for learning stylistic signatures.

In [ ]:
class MelodyAnalyzer:

    def __init__(self, np_module):
        self.np = np_module

    def analyze_voice_leading(self, score, window_size, overlap, shared_data):
        """Analyze voice leading patterns.

        Voice leading is very important consideration in classical music analysis.
        How a series of melodic notes 'evolve' over time directly coresponds
        to the stylistic tendencies of each composer. Compositions that
        have a more step-wise motion (from one note to the nearest neightboring
        note) will be considered less abrupt and provide more scalar motion.
        Consecutive notes that jump (called leaping) could be more arpeggiated
        in nature and sound less smooth. Although voice leading is more complex
        than this, we believe this will help our classification analysis by
        attempting to extract the overall 'smoothness' of the melodic voicing.

        """

        features = {}

        try:
            # Use shared data
            parts = shared_data.get('parts', [])

            if not parts:
                return self.empty_voice_features()

            # Process each part in parallel
            step_motions, leap_motions, total_motions = self._analyze_parts(parts)

            # Calculate ratios
            if total_motions > 0:
                features['step_motion_ratio'] = step_motions / total_motions
                features['leap_motion_ratio'] = leap_motions / total_motions
                features['voice_leading_smoothness'] = features['step_motion_ratio']
            else:
                features.update(self.empty_voice_features())

        except Exception:
            features = self.empty_voice_features()

        return features

    def _analyze_parts(self, parts):
        def analyze_part(part):
            try:
                if not hasattr(part, 'flat'):
                    return 0, 0, 0

                notes = [n for n in part.flat.notes if hasattr(n, 'pitch')]
                if len(notes) < 2:
                    return 0, 0, 0

                # Extract pitches efficiently
                pitches = [n.pitch.midi for n in notes]

                if not pitches:
                    return 0, 0, 0

                pitch_array = self.np.array(pitches)

                # Calculate intervals using vectorized operations
                intervals = self.np.abs(self.np.diff(pitch_array))

                # Count step vs leap motions efficiently
                steps = int(self.np.sum(intervals <= 2))
                leaps = int(self.np.sum(intervals > 2))

                return steps, leaps, len(intervals)
            except Exception as e:
                print(f"Error analyzing part: {e}")
                return 0, 0, 0

        # Check if parts list is valid
        if not parts or not isinstance(parts, list):
            return 0, 0, 0

        # Process parts in sequential mode for debugging
        part_results = []
        for part in parts:
            part_results.append(analyze_part(part))

        # Combine results
        step_motions = sum(steps for steps, _, _ in part_results)
        leap_motions = sum(leaps for _, leaps, _ in part_results)
        total_motions = sum(total for _, _, total in part_results)

        return step_motions, leap_motions, total_motions

    def empty_voice_features(self):
        return {
            'step_motion_ratio': 0,
            'leap_motion_ratio': 0,
            'voice_leading_smoothness': 0
        }

5.4 StructureAnalyzer:¶

The StructureAnalyzer captures formal organization in a piece includinf phrase sizing, sectional regularity, and melodic repetition. These descriptors complement pitch, harmony, rhythm, and melody by extracting how musical properties are grouped and reused, which is a strong stylistic cue for composer classification.

Structural Components¶

  1. Musical Structure (analyze_musical_structure)

    • What it computes:
      • total_measures – total count of measures in the first part (fast proxy for form length).
      • phrase_structure_regularity – binary flag indicating if the piece divides cleanly into common phrase lengths (4/8/16-bar units).
      • estimated_phrase_length – best-fitting phrase unit among {4, 8, 16}, defaulting to 8 if none fit.
      • melodic_repetition_count – number of recurring 4-note patterns.
      • repetition_density – repetitions per measure (melodic_repetition_count / total_measures).
    • How it’s implemented:
      • Efficiently fetches measures from the first part and checks divisibility by canonical phrase lengths.
      • Calls _extract_melodic_patterns to count repeated 4-note pitch tuples and normalizes by measure count.
      • Returns safe defaults via empty_structure_features() for edge cases (e.g., no measures).
    • Why it matters:
      • Regular 4/8/16-bar phrasing is period- and genre-typical (e.g., Classical clarity vs. freer Romantic phrasing).
      • Repetition density reflects motivic economy vs. developmental variety, aiding stylistic discrimination.
  2. Melodic Pattern Mining (_extract_melodic_patterns)

    • What it computes: A dictionary of 4-note pitch patterns → occurrence counts, later aggregated into repetition metrics.
    • How it’s implemented:
      • Extracts pitches from score.flat.notes once; uses tuple windows of length 4.
      • Adaptive strategy:
        • For small/medium scores: single-pass sequential counting (cache-friendly).
        • For large scores (>1000 notes): chunked parallel processing with overlapping boundaries to avoid missing patterns spanning chunks; merges partial dictionaries efficiently.
    • Why it matters:
      • Frequent short patterns indicate motivic repetition (e.g., classical periodicity), while sparse repetition suggests through-composition or developmental writing—both are stylistically meaningful.

Value for Model Prep/Train¶

Structural signals—phrase regularity and motivic reuse—are orthogonal to pitch/rhythm/harmony features and encode the composer’s macro-level organization. Including them enriches the feature set, improves class separability, and provides interpretable levers (e.g., “regular phrasing + high repetition density”) that help LSTM/CNN models learn consistent stylistic signatures.

In [ ]:
class StructureAnalyzer:

    def __init__(self, np_module):
        self.np = np_module

    def analyze_musical_structure(self, score, window_size, overlap, shared_data):
        features = {}

        try:
            # Get measures efficiently
            measures = score.parts[0].getElementsByClass('Measure') if score.parts else []
            total_measures = len(measures)

            # Fast phrase analysis with early return for edge cases
            if total_measures == 0:
                return self.empty_structure_features()

            # phrase length analysis
            phrase_lengths = [4, 8, 16]
            best_phrase_fit = 0

            # Check divisibility efficiently
            for length in phrase_lengths:
                if total_measures % length == 0:
                    best_phrase_fit = length
                    break

            features['phrase_structure_regularity'] = 1 if best_phrase_fit > 0 else 0
            features['estimated_phrase_length'] = best_phrase_fit if best_phrase_fit > 0 else 8
            features['total_measures'] = total_measures

            # Extract melodic patterns
            note_patterns = self._extract_melodic_patterns(score)

            # Count repetitions efficiently
            pattern_repetitions = sum(1 for count in note_patterns.values() if count > 1)

            features['melodic_repetition_count'] = pattern_repetitions
            features['repetition_density'] = pattern_repetitions / total_measures if total_measures > 0 else 0

        except Exception:
            features = self.empty_structure_features()

        return features

    def _extract_melodic_patterns(self, score):
        patterns = defaultdict(int)

        try:
            notes = [n for n in score.flat.notes if hasattr(n, 'pitch')]
            pattern_length = 4  # 4-note patterns

            # Early return for short note sequences
            if len(notes) < pattern_length:
                return patterns

            # Extract all pitches at once for better performance
            pitches = [n.pitch.midi for n in notes]

            # Adaptive parallelization based on data size
            if len(notes) > 1000:
                # For very large sets, use parallel processing with optimized chunk size
                chunk_size = max(pattern_length * 20, len(notes) // (os.cpu_count() or 4))

                # Create overlapping chunks to ensure we catch patterns at chunk boundaries
                chunks = []
                for i in range(0, len(pitches) - pattern_length + 1, chunk_size - pattern_length + 1):
                    end_idx = min(i + chunk_size, len(pitches))
                    if end_idx - i >= pattern_length:
                        chunks.append((i, end_idx))

                def process_chunk(chunk_range):
                    start_idx, end_idx = chunk_range
                    chunk_patterns = defaultdict(int)

                    # Process patterns within this chunk
                    for i in range(start_idx, end_idx - pattern_length + 1):
                        pattern = tuple(pitches[i:i+pattern_length])
                        chunk_patterns[pattern] += 1

                    return dict(chunk_patterns)

                # Process chunks in parallel
                with ThreadPoolExecutor() as executor:
                    chunk_results = list(executor.map(process_chunk, chunks))

                # Merge results efficiently
                for chunk_pattern in chunk_results:
                    for pattern, count in chunk_pattern.items():
                        patterns[pattern] += count
            else:
                # For smaller sets, use efficient sequential processing
                for i in range(len(pitches) - pattern_length + 1):
                    pattern = tuple(pitches[i:i+pattern_length])
                    patterns[pattern] += 1

        except:
            pass

        return patterns

    def empty_structure_features(self):
        return {
            'phrase_structure_regularity': 0,
            'estimated_phrase_length': 8,
            'total_measures': 0,
            'melodic_repetition_count': 0,
            'repetition_density': 0
        }

End-to-End Feature Extraction & Sequencing Pipeline

The next blocks run our full pipeline to create model-ready datasets from filtered and feature-extracted MIDI files. It covers base musical features, symbolic note sequences for LSTM/CNNs, advanced harmonic features, and on-disk caching for ease of reproducibility and speed.

  1. Setup & File List

    • Initialize FeatureExtractor, collect midi_paths from midi_df, and gate everything with EXTRACT_FEATURES so the run can be toggled between build and load-from-cache modes.
  2. Base Musical Features (Parallel)

    • MidiProcessor.extract_musical_features_parallel(midi_paths, n_jobs=OPTIMAL_WORKERS) computes per-file descriptors (pitch stats, intervals, durations, tempo, key, chords/rests, etc.) using a thread pool and a progress bar.
    • Results are assembled into features_df and augmented with composer, filename, and file_path.
  3. Persist Base Features

    • Save features_df to MUSICAL_FEATURES_DF_PATH (pickle).
    • Why: Fast reloads, reproducible experiments, and no re-parsing on subsequent runs.
  4. Note Extraction & Sequence Building (for Sequence Models)

    • MidiProcessor.extract_notes_from_midi_parallel(..., max_notes=500) collects compact symbolic note streams per file (cap keeps memory predictable).
    • create_note_sequences(..., sequence_length=100) builds a global vocabulary (note_to_int, unique_notes) and generates fixed-length windows for deep models.
    • Balancing: At most 50 sequences per file are used to limit dominance by large files and mitigate class imbalance at the sequence level.
  5. Labeling & Serialization for Training

    • Map TARGET_COMPOSERS to integers (composer_to_int), create sequence_labels aligned to the sequences, and save:
      • Sequences → NOTE_SEQUENCES_PATH (.npy)
      • Labels → NOTE_SEQUENCES_LABELS_PATH (.npy)
      • Mappings (note_to_int, unique_notes, composer_to_int, int_to_composer) → NOTE_MAPPING_PATH (pickle).
    • Why: Clean separation of data/labels, stable vocabularies, and drop-in loading during model training.
  6. Advanced Harmonic Feature Set (Batch)

    • feature_extractor.extract_features_batch(...) computes higher-level harmony/tonality metrics (e.g., progressions, cadence ratios, modulation stats).
    • Pack into harmonic_df, append identifiers, and save to HARMONIC_FEATURES_DF_PATH (pickle).
    • Why: Adds musically meaningful, composer-specific signals beyond basic stats and raw note tokens.
  7. Cache-Aware Reloading

    • If EXTRACT_FEATURES is False, the code loads existing pickles (MUSICAL_FEATURES_DF_PATH, HARMONIC_FEATURES_DF_PATH) with clear messaging.
    • Why: Rapid iteration, avoids duplicate compute, and ensures consistent datasets across experiments.

Outcome:
We end up with (a) a tabular feature matrix, (b) fixed-length note sequences and labels for LSTM/CNN training, and (c) a musically rich harmonic feature table—each cached for fast reuse. The caps (max_notes=500, ≤50 sequences/file) and composer label mapping help control memory, reduce imbalance, and keep the training set standardized.

In [ ]:
class FeatureExtractor:
    def __init__(self):
        # Force GPU backend (CuPy) if available, else fallback to NumPy
        try:
            import cupy as cp
            self.np = cp
            self.has_gpu = True
            print("CuPy GPU backend enabled for feature extraction.")
        except ImportError:
            import numpy as np
            self.np = np
            self.has_gpu = False
            print("CuPy not available, using NumPy (CPU) backend.")

        self.harmony_analyzer = HarmonyAnalyzer(self.np)
        self.structure_analyzer = StructureAnalyzer(self.np)
        self.rhythm_analyzer = RhythmAnalyzer(self.np)
        self.melody_analyzer = MelodyAnalyzer(self.np)
        self.score_cache = {}

    def _extract_shared_data(self, score):
        try:
            if hasattr(score, 'duration') and score.duration is not None:
                total_duration = float(score.duration.quarterLength)
            else:
                all_elements = list(score.flat.notesAndRests)
                if all_elements and self.has_gpu:
                    offsets = self.np.array([float(el.offset) for el in all_elements])
                    durations = self.np.array([float(el.duration.quarterLength) for el in all_elements])
                    total_duration = float(self.np.max(offsets + durations))
                else:
                    total_duration = sum(float(el.offset + el.duration.quarterLength) for el in all_elements) if all_elements else 0
            try:
                key_signature = score.analyze('key')
            except Exception:
                key_signature = key.Key('C')
            chords = list(score.flat.getElementsByClass(chord.Chord))
            notes = list(score.flat.notes)
            parts = score.parts if hasattr(score, 'parts') and score.parts else [score]
            return {
                'chords': chords,
                'notes': notes,
                'parts': parts,
                'total_duration': total_duration,
                'key_signature': key_signature
            }
        except Exception as e:
            print(f"Error in _extract_shared_data: {e}")
            return {
                'chords': [],
                'notes': [],
                'parts': [score],
                'total_duration': 0,
                'key_signature': key.Key('C')
            }

    def _run_parallel_analysis(self, score, window_size, overlap, shared_data):
        features = {}
        analysis_tasks = {
            'harmony': (self.harmony_analyzer, 'analyze_chords_and_progressions'),
            'dissonance': (self.harmony_analyzer, 'analyze_dissonance'),
            'tonality': (self.harmony_analyzer, 'analyze_tonality'),
            'modulation': (self.harmony_analyzer, 'analyze_key_modulations'),
            'density': (self.rhythm_analyzer, 'analyze_note_density'),
            'rhythm': (self.rhythm_analyzer, 'analyze_harmonic_rhythm'),
            'voice': (self.melody_analyzer, 'analyze_voice_leading'),
            'structure': (self.structure_analyzer, 'analyze_musical_structure'),
            'cadence': (self.harmony_analyzer, 'analyze_cadences')
        }
        for name, (analyzer, method) in analysis_tasks.items():
            try:
                analyzer_method = getattr(analyzer, method)
                result = analyzer_method(score, window_size, overlap, shared_data)
                features.update(result)
            except Exception as e:
                print(f"Error in {name} analysis: {e}")
                empty_method = f"empty_{name}_features"
                if hasattr(analyzer, empty_method):
                    features.update(getattr(analyzer, empty_method)())
        return features

    def extract_harmonic_features(self, midi_path, window_size=4.0, overlap=0.5):
        """
        Extract harmonic and structural features from our MIDI file dataset.

        Arguments:
            midi_path (str): Path to the MIDI file
            window_size (float): Size of analysis window in quarter notes
            overlap (float): Overlap between windows (0-1)

        Returns:
            dict: Dictionary containing all the extracted harmonic features
        """
        try:
            cache_key = f"{midi_path}_{window_size}_{overlap}"
            if cache_key in self.score_cache:
                score = self.score_cache[cache_key]
            else:
                # Use faster parsing: only parse first part for large files
                score = converter.parse(midi_path, quantizePost=True)
                if hasattr(score, 'parts') and len(score.parts) > 1:
                    score = score.parts[0]  # Only analyze first part for speed
                self.score_cache[cache_key] = score
                if len(self.score_cache) > 100:
                    self.score_cache.clear()
            shared_data = self._extract_shared_data(score)
            # Reduce window_size and overlap for faster analysis
            features = self._run_parallel_analysis(score, window_size=8.0, overlap=0.2, shared_data=shared_data)
            if self.has_gpu:
                try:
                    self.np.cuda.Stream.null.synchronize()
                except:
                    pass
            return features
        except Exception as e:
            print(f"Error extracting features from {midi_path}: {e}")
            return self._empty_features()

    def extract_features_batch(self, midi_paths, n_jobs=4, window_size=4.0, overlap=0.5):
        from concurrent.futures import ThreadPoolExecutor, as_completed
        from tqdm import tqdm
        optimal_jobs = min(n_jobs, os.cpu_count() or 4)
        if len(midi_paths) < optimal_jobs * 2:
            optimal_jobs = max(1, len(midi_paths) // 2)
        # Increase batch size for fewer thread launches
        batch_size = max(8, len(midi_paths) // optimal_jobs)
        results = [None] * len(midi_paths)
        batches = self._create_balanced_batches(midi_paths, optimal_jobs)
        pbar = tqdm(total=len(midi_paths), desc="Extracting features (GPU)")
        with ThreadPoolExecutor(max_workers=optimal_jobs) as executor:
            futures = [executor.submit(self._process_batch, batch) for batch in batches]
            for future in as_completed(futures):
                try:
                    batch_results = future.result()
                    for idx, features in batch_results.items():
                        results[idx] = features
                    pbar.update(len(batch_results))
                    if self.has_gpu:
                        try:
                            self.np.cuda.Stream.null.synchronize()
                        except:
                            pass
                except Exception as e:
                    print(f"\nError processing batch: {e}")
        pbar.close()
        if self.has_gpu:
            try:
                self.np.cuda.runtime.deviceSynchronize()
                self.score_cache.clear()
            except:
                pass
        return results

    def _create_balanced_batches(self, midi_paths, n_jobs):
        if len(midi_paths) <= n_jobs:
            return [(i, [path]) for i, path in enumerate(midi_paths)]
        else:
            batch_size = max(1, len(midi_paths) // n_jobs)
            batches = []
            for i in range(0, len(midi_paths), batch_size):
                batch_paths = midi_paths[i:i+batch_size]
                batch_indices = list(range(i, min(i+batch_size, len(midi_paths))))
                batches.append((batch_indices, batch_paths))
            return batches

    def _process_batch(self, batch_data):
        if isinstance(batch_data[0], int):
            idx, path = batch_data
            return {idx: self.extract_harmonic_features(path)}
        else:
            batch_indices, batch_paths = batch_data
            batch_results = {}
            for idx, path in zip(batch_indices, batch_paths):
                try:
                    features = self.extract_harmonic_features(path)
                    batch_results[idx] = features
                except Exception:
                    batch_results[idx] = self._empty_features()
                if self.has_gpu and len(batch_results) % 2 == 0:
                    try:
                        self.np.cuda.Stream.null.synchronize()
                    except:
                        pass
            return batch_results

    def _empty_features(self):
        features = {}
        try:
            features.update(self.harmony_analyzer.empty_harmony_features())
            features.update(self.harmony_analyzer.empty_dissonance_features())
            features.update(self.harmony_analyzer.empty_tonality_features())
            features.update(self.harmony_analyzer.empty_modulation_features())
            features.update(self.harmony_analyzer.empty_cadence_features())
            features.update(self.rhythm_analyzer.empty_density_features())
            features.update(self.rhythm_analyzer.empty_rhythm_features())
            features.update(self.melody_analyzer.empty_voice_features())
            features.update(self.structure_analyzer.empty_structure_features())
        except Exception as e:
            print(f"Error creating empty features: {e}")
            features = {"error": "Could not generate empty features"}
        return features
In [ ]:
# Initialize the FeatureExtractor
feature_extractor = FeatureExtractor()
midi_paths = midi_df['file_path'].tolist()
if EXTRACT_FEATURES:

    # Get the list of file paths from the filtered dataframe
    print(f"Extracting musical features from {len(midi_paths)} MIDI files...")

    # Extract base features with parallel processing and progress bar
    base_features = MidiProcessor.extract_musical_features_parallel(
        midi_paths,
        n_jobs=OPTIMAL_WORKERS  # Use optimal number of workers
    )

    # Create a DataFrame with the extracted features
    features_df = pd.DataFrame(base_features)

    # Add composer and filename information
    features_df['composer'] = midi_df['composer'].values
    features_df['filename'] = midi_df['filename'].values
    features_df['file_path'] = midi_df['file_path'].values

    # Display information about the extracted features
    print(f"\nExtracted {len(features_df)} feature sets")
    print(f"Number of features: {len(features_df.columns) - 3}")
    display(features_df.head())

    # Save to pickle for faster access later
    # Ensure the directory exists before saving
    os.makedirs(os.path.dirname(MUSICAL_FEATURES_DF_PATH), exist_ok=True)
    features_df.to_pickle(MUSICAL_FEATURES_DF_PATH)
    print(f"Features saved to '{MUSICAL_FEATURES_DF_PATH}'")
else:
    print("Feature extraction is disabled. Musical Features will be extracted from the existing data files.")
CuPy not available, using NumPy (CPU) backend.
Feature extraction is disabled. Musical Features will be extracted from the existing data files.
In [ ]:
if EXTRACT_FEATURES:
    # Extract notes from MIDI files
    print("\nExtracting notes from MIDI files for sequence modeling...")
    notes_list = MidiProcessor.extract_notes_from_midi_parallel(
        midi_paths,
        max_notes=500,  # Limit notes per file for memory efficiency
        n_jobs=OPTIMAL_WORKERS
    )

    # Add notes to the dataframe
    features_df['notes'] = notes_list

    # Create note sequences for modeling
    all_notes_list = features_df['notes'].tolist()
    sequence_length = 100
    note_sequences, note_to_int, unique_notes = MidiProcessor.create_note_sequences(all_notes_list, sequence_length=sequence_length)

    # Create labels for sequences based on the originating files
    sequence_labels = []
    sequence_composers = []
    sequence_counts = []

    # Track sequences per file
    current_idx = 0
    for idx, row in features_df.iterrows():
        notes = row['notes']
        composer = row['composer']
        filename = row['filename']
        # Count potential sequences from this file
        if len(notes) >= sequence_length:
            potential_sequences = len(notes) - sequence_length + 1
            actual_sequences = min(potential_sequences, 50)  # Cap at 50 sequences per file for balance
        else:
            actual_sequences = 0

        # Add labels for each sequence
        sequence_composers.extend([composer] * actual_sequences)
        sequence_counts.append(actual_sequences)
        current_idx += actual_sequences

    print(f"Created {len(sequence_composers)} labeled sequences")

    # Create a mapping from composer to integer label
    composer_to_int = {composer: i for i, composer in enumerate(TARGET_COMPOSERS)}
    sequence_labels = [composer_to_int[composer] for composer in sequence_composers]

    # Save the sequences, labels and mappings for model training
    print(note_sequences)
    np.save(NOTE_SEQUENCES_PATH, note_sequences[:len(sequence_labels)])
    print(f"Note sequences saved to '{NOTE_SEQUENCES_PATH}'")
    print(f"Note sequences labels shape: {np.array(sequence_labels).shape}")
    np.save(NOTE_SEQUENCES_LABELS_PATH, np.array(sequence_labels))
    print(f"Note sequence labels saved to '{NOTE_SEQUENCES_LABELS_PATH}'")

    # Save mappings for later use
    note_mapping = {
            'note_to_int': note_to_int,
            'unique_notes': unique_notes,
            'composer_to_int': composer_to_int,
            'int_to_composer': {i: composer for composer, i in composer_to_int.items()}
        }
    print(note_mapping)
    with open(NOTE_MAPPING_PATH, 'wb') as f:
        pickle.dump(note_mapping, f)
    print(f"Note mappings saved to '{NOTE_MAPPING_PATH}'")
else:
    print("Feature extraction is disabled. Notes Features will be extracted from the existing data files.")
Feature extraction is disabled. Notes Features will be extracted from the existing data files.
In [ ]:
if EXTRACT_FEATURES:
    # Extract  harmonic features
    print("\nExtracting harmonic features...")
    harmonic_features = feature_extractor.extract_features_batch(
        midi_paths,
        n_jobs=OPTIMAL_WORKERS
    )

    # Create a separate dataframe for harmonic features
    harmonic_df = pd.DataFrame(harmonic_features)
    harmonic_df['composer'] = midi_df['composer'].values
    harmonic_df['filename'] = midi_df['filename'].values
    harmonic_df['file_path'] = midi_df['file_path'].values

    # Save harmonic features to pickle
    harmonic_df.to_pickle(HARMONIC_FEATURES_DF_PATH)
    print(f"Harmonic features saved to '{HARMONIC_FEATURES_DF_PATH}'")

    # Show a sample of the extracted features
    display(harmonic_df.head())
else:
    print("Harmonic feature extraction is disabled. Harmonic Features will be extracted from the existing data files.")
Harmonic feature extraction is disabled. Harmonic Features will be extracted from the existing data files.
In [ ]:
if not EXTRACT_FEATURES:
    if os.path.exists(MUSICAL_FEATURES_DF_PATH):
            print(f"Musical features already extracted. Loading from '{MUSICAL_FEATURES_DF_PATH}'")
            features_df = pd.read_pickle(MUSICAL_FEATURES_DF_PATH)
    else:
        print(f"Musical features not found at '{MUSICAL_FEATURES_DF_PATH}'. Please run feature extraction first.")
    if os.path.exists(HARMONIC_FEATURES_DF_PATH):
        print(f"Harmonic features already extracted. Loading from '{HARMONIC_FEATURES_DF_PATH}'")
        harmonic_df = pd.read_pickle(HARMONIC_FEATURES_DF_PATH)
    else:
        print(f"Harmonic features not found at '{HARMONIC_FEATURES_DF_PATH}'. Please run feature extraction first.")
else:
    print("Feature extraction is enabled. Musical Features and Harmonic Features have been extracted and saved.")
Musical features already extracted. Loading from 'data/features/musical_features_df.pkl'
Harmonic features already extracted. Loading from 'data/features/harmonic_features_df.pkl'

Notice the use of "if EXTRACT_FEATURES" in the code above. This conditional allows us to cache extracted features in Pickle files, eliminating a need to re-run the feature extraction process each time we train or evaluate our models.

6. Exploratory Data Analysis: ⇧¶


In [ ]:
def plot_boxplots_and_violin(features_df, target_composers, colors):
    fig, axs = plt.subplots(1, 3, figsize=(20, 5.6))

    # 1. Distribution of total notes per composer
    sns.boxplot(x='composer', y='total_notes', data=features_df, palette=env_config.color_pallete, ax=axs[0])
    axs[0].set_title('Total Notes per Composer')
    axs[0].set_xlabel('Composer')
    axs[0].set_ylabel('Total Notes')

    # 2. Distribution of average pitch per composer
    sns.boxplot(x='composer', y='avg_pitch', data=features_df, palette=env_config.color_pallete, ax=axs[1])
    axs[1].set_title('Average Pitch per Composer')
    axs[1].set_xlabel('Composer')
    axs[1].set_ylabel('Average Pitch (MIDI Number)')

    # 3. Distribution of tempo per composer
    sns.violinplot(x='composer', y='tempo', data=features_df, palette=env_config.color_pallete, ax=axs[2])
    axs[2].set_title('Tempo per Composer')
    axs[2].set_xlabel('Composer')
    axs[2].set_ylabel('Tempo (BPM)')

    plt.tight_layout()
    plt.show()

def plot_correlation_heatmap(features_df):
    plt.figure(figsize=(16, 10))
    corr = features_df.select_dtypes(include=[np.number]).corr()
    sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", square=True)
    plt.title("Correlation Heatmap of Musical Features")
    plt.tight_layout()
    plt.show()

def plot_3d_scatter(features_df, target_composers, colors):
    fig_3d = plt.figure(figsize=(12, 8))
    ax = fig_3d.add_subplot(111, projection='3d')
    for composer in target_composers:
        subset = features_df[features_df['composer'] == composer]
        ax.scatter(subset['avg_pitch'], subset['tempo'], subset['total_notes'],
                  label=composer, alpha=0.7, s=50, color=colors[composer])
    ax.set_xlabel('Average Pitch')
    ax.set_ylabel('Tempo (BPM)')
    ax.set_zlabel('Total Notes')
    ax.set_title('3D Scatter: Pitch, Tempo, Notes by Composer')
    ax.legend()
    plt.tight_layout()
    plt.show()

def plot_top_chords(features_df):
    all_chords = [chord for sublist in features_df['chord_types'] for chord in sublist]
    top_chords = Counter(all_chords).most_common(10)
    chord_names, chord_counts = zip(*top_chords)
    plt.figure(figsize=(10, 6))
    sns.barplot(x=list(chord_names), y=list(chord_counts), palette=env_config.color_pallete,)
    plt.title('Top 10 Chord Types in Dataset')
    plt.ylabel('Count')
    plt.xlabel('Chord Type')
    plt.xticks(rotation=30)
    plt.tight_layout()
    plt.show()

def plot_rest_ratio(features_df, colors):
    plt.figure(figsize=(10, 6))
    rest_means = features_df.groupby('composer')['rest_ratio'].mean().reset_index()
    sns.barplot(x='composer', y='rest_ratio', data=rest_means, palette=env_config.color_pallete,)
    plt.title('Average Rest Ratio per Composer')
    plt.xlabel('Composer')
    plt.ylabel('Average Rest Ratio')

    # Add numerical values on top of each bar
    for i, value in enumerate(rest_means['rest_ratio']):
        plt.text(i, value + 0.002, f'{value:.4f}', ha='center')

    plt.tight_layout()
    plt.show()

def plot_pairplot(features_df, colors):
    # No direct figsize for pairplot, but we'll use the figure.set_size_inches method
    g = sns.pairplot(features_df, vars=['avg_pitch', 'tempo', 'pitch_range', 'rest_ratio'],
                hue='composer', palette=env_config.color_pallete, diag_kind='kde', plot_kws={'alpha':0.6, 's':40})
    g.fig.set_size_inches(20, 20)
    plt.suptitle('Pairplot of Musical Features by Composer', y=1.02)
    plt.show()

def plot_midi_samples(midi_df, sample_indices, target_composers):
    for i, idx in enumerate(sample_indices):
        sample_path = midi_df.loc[idx, 'file_path']
        try:
            multitrack = pypianoroll.read(sample_path)
            pr = multitrack.tracks[0].pianoroll
            pitch_sum = pr.sum(axis=0)
            nonzero_pitches = np.where(pitch_sum > 0)[0]
            if len(nonzero_pitches) > 0:
                min_pitch = max(nonzero_pitches[0] - 2, 0)
                max_pitch = min(nonzero_pitches[-1] + 2, 127)
            else:
                min_pitch, max_pitch = 21, 108
            fig = plt.figure(figsize=(20, 5))
            ax = fig.add_subplot(111)
            ax.imshow(
                multitrack.tracks[0].pianoroll.T,
                aspect='auto',
                origin='lower',
                interpolation='nearest',
                cmap='Blues'
            )
            ax.set_ylim(min_pitch, max_pitch)
            plt.title(f"{target_composers[i]}: {midi_df.loc[idx, 'filename']}")
            plt.tight_layout()
            plt.show()
        except Exception as e:
            print(f"Error processing MIDI file {midi_df.loc[idx, 'filename']}: {str(e)}")
            continue

def plot_spectrograms(midi_df, target_composers):
    plt.figure(figsize=(20, 12))
    for i, composer in enumerate(target_composers, 1):
        composer_files = midi_df[midi_df['composer'] == composer]['file_path'].values
        if len(composer_files) == 0:
            print(f"No files found for composer {composer}")
            continue

        # Try multiple files if one fails
        for _ in range(min(3, len(composer_files))):  # Try up to 3 files
            sample_file = np.random.choice(composer_files)
            try:
                pm = pretty_midi.PrettyMIDI(sample_file)
                audio_data = pm.synthesize(fs=22050)

                plt.subplot(2, 2, i)
                mel_spec = librosa.feature.melspectrogram(y=audio_data, sr=22050, n_mels=128, fmax=8000)
                mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)

                librosa.display.specshow(mel_spec_db, y_axis='mel', x_axis='time', fmax=8000, cmap='coolwarm')
                plt.colorbar(format='%+2.0f dB')
                plt.title(f'Mel-spectrogram: {composer}')
                break  # If successful, break the loop
            except Exception as e:
                print(f"Error processing spectrogram for {composer} file {sample_file}: {str(e)}")
                continue
        else:
            # If all attempts failed, plot an empty subplot with an error message
            plt.subplot(2, 2, i)
            plt.text(0.5, 0.5, f"Could not process\nany {composer} files",
                    ha='center', va='center', transform=plt.gca().transAxes)
            plt.title(f'Mel-spectrogram: {composer}')

    plt.tight_layout()
    plt.show()

def plot_harmonic_feature_graphs(features_df, target_composers, colors=None):
    if colors is None:
        colors = dict(zip(target_composers, sns.color_palette(env_config.color_pallete, len(target_composers))))

    # Key harmonic features to visualize
    key_features = [
        ('avg_dissonance', 'Average Dissonance'),
        ('tonic_frequency', 'Tonic Frequency'),
        ('note_density_avg', 'Average Note Density'),
        ('major_chord_ratio', 'Major Chord Ratio'),
        ('authentic_cadence_ratio', 'Authentic Cadence Ratio'),
        ('deceptive_cadence_ratio', 'Deceptive Cadence Ratio')
    ]

    fig, axes = plt.subplots(2, 3, figsize=(16, 10))
    axes = axes.flatten()

    for i, (feature, title) in enumerate(key_features):
        if feature in features_df.columns:
            sns.boxplot(x='composer', y=feature, data=features_df,
                       palette=env_config.color_pallete, ax=axes[i])
            axes[i].set_title(title)
            axes[i].tick_params(axis='x', rotation=45)
        else:
            axes[i].text(0.5, 0.5, f'{feature}\nnot available',
                        ha='center', va='center', transform=axes[i].transAxes)
            axes[i].set_title(title)

    plt.tight_layout()
    plt.show()

    # Correlation heatmap of harmonic features
    harmonic_cols = [col for col in features_df.columns
                    if any(keyword in col.lower() for keyword in
                          ['dissonance', 'tonic', 'density',
                           'chord', 'cadence', 'progression', 'rhythm'])]

    if len(harmonic_cols) > 1:
        plt.figure(figsize=(20, 14))
        correlation_matrix = features_df[harmonic_cols].corr()
        sns.heatmap(correlation_matrix, annot=True, fmt='.2f',
                   cmap='coolwarm', center=0, square=True)
        plt.title('Correlation Heatmap of Harmonic Features')
        plt.tight_layout()
        plt.show()

# Create a dictionary for colors and select one sample for each composer
colors = dict(zip(TARGET_COMPOSERS, sns.color_palette(env_config.color_pallete, len(TARGET_COMPOSERS))))
sample_indices = []
for composer in TARGET_COMPOSERS:
    composer_files = midi_df[midi_df['composer'] == composer].index
    if len(composer_files) > 0:
        sample_indices.append(np.random.choice(composer_files))

# Call Plot functions
plot_boxplots_and_violin(features_df, TARGET_COMPOSERS, colors)
plot_correlation_heatmap(features_df)
plot_harmonic_feature_graphs(harmonic_df, TARGET_COMPOSERS)
plot_3d_scatter(features_df, TARGET_COMPOSERS, colors)
plot_top_chords(features_df)
plot_rest_ratio(features_df, colors)
plot_pairplot(features_df, colors)
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
In [ ]:
# Check if we have any samples before trying to plot them
if sample_indices:
    plot_midi_samples(midi_df, sample_indices, TARGET_COMPOSERS)
    plot_spectrograms(midi_df, TARGET_COMPOSERS)
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

1. Musical Feature Distributions¶

  • Total Notes: Beethoven exhibits the highest variability and extreme outliers, while Chopin and Bach have lower medians with fewer high-count outliers.
  • Average Pitch: Across all composers, average pitch ranges between ~60–65 (MIDI numbers), with Mozart showing a slightly higher median pitch.
  • Tempo: Mozart and Chopin have higher median tempos compared to Bach, who generally favors slower tempos.

2. Correlation Insights (Musical Features)¶

  • Strong positive correlations:
    • total_notes ↔ total_duration (0.76) and num_measures (0.72)
    • pitch_range ↔ pitch_std (0.86)
  • Strong negative correlations:
    • total_notes ↔ avg_interval (-0.50)
    • pitch_range ↔ avg_interval (-0.45)
  • Interpretation: Larger compositions (more notes) are often associated with shorter average intervals and broader pitch ranges.

3. Harmonic Feature Patterns¶

  • Average Dissonance: Beethoven shows higher dissonance medians compared to Mozart.
  • Tonic Frequency: Bach and Mozart display higher tonic frequency ratios.
  • Note Density: Bach has more variability and higher note density peaks.
  • Cadence Ratios:
    • Bach shows a higher authentic cadence ratio.
    • Mozart and Beethoven balance authentic and deceptive cadences, whereas Bach leans toward authentic.

4. Correlation Insights (Harmonic Features)¶

  • Strong positive correlations:
    • total_chords ↔ unique_chords (0.94)
    • major_chord_ratio ↔ authentic_cadence_ratio (0.48)
  • Negative correlations:
    • avg_dissonance ↔ tonic_frequency (-0.25)
    • note_density_avg ↔ avg_interval (-0.27)
  • Interpretation: Increased harmonic diversity tends to coincide with richer cadence usage.

5. 3D Feature Relationships¶

  • 3D scatter (Average Pitch, Tempo, Total Notes) shows overlap among composers, but clusters suggest Beethoven favors high note counts, Mozart higher tempos, and Bach balanced pitch-tempo structures.

6. Chord Usage Trends¶

  • Most frequent chord types:
    1. Major Second
    2. Minor Third
    3. Minor Second
  • Interpretation: Intervals of seconds and thirds dominate, contributing to stylistic texture.

7. Rest Ratio Patterns¶

  • Chopin has the highest average rest ratio (0.1328), indicating more frequent pauses in compositions.
  • Bach has the lowest rest ratio (0.0495), favoring continuous note sequences.

8. Pairwise Feature Distributions¶

  • Pitch range and tempo distributions reveal stylistic clusters per composer.
  • Rest ratio distribution confirms Chopin's distinct use of space and silence.

9. Example Piece Visualizations¶

  • Piano roll (Bach) shows dense, continuous note clusters, consistent with low rest ratio.
  • Mel-spectrograms:
    • Bach: Balanced frequency spread, dense harmonic textures.
    • Beethoven: Broad temporal span, moderate high-frequency activity.
    • Chopin: Clear melodic contours with space between phrases.
    • Mozart: Moderate density with clear structural segmentation.

Overall Observations:
Composers exhibit distinctive patterns in note density, rest usage, pitch distribution, and harmonic structure. These stylistic fingerprints can inform classification models by leveraging both melodic and harmonic features.

7. Model Building: ⇧¶


1. CNN: Convolutional Neural Network¶

A Convolutional Neural Network (CNN) is a type of deep learning model designed to process grid-like data, such as images, by automatically learning spatial hierarchies of features through convolutional and pooling operations. Its architecture enables translation-equivariant feature extraction, which makes it particularly effective for tasks like image classification, object detection, and segmentation (Yamashita et al., 2018).

2. LSTM: Long Short-Term Memory¶

A Long Short-Term Memory (LSTM) network is a specialized type of recurrent neural network (RNN) explicitly designed to overcome the vanishing gradient problem prevalent in traditional RNNs. It achieves this by employing memory cells and three gating mechanisms input, forget, and output gates that regulate the flow of information, allowing the network to retain or discard information over extended time intervals. This architecture empowers LSTMs to capture long-range dependencies in sequence data, making them highly effective for tasks such as speech recognition, language modeling, and time-series prediction (Hochreiter & Schmidhuber, 1997).

3. CNN + LSTM Hybrid Model Approach¶

A hybrid CNN+LSTM approach was chosen for this project to leverage the complementary strengths of both architectures in modeling symbolic music data. Convolutional Neural Networks (CNNs) are effective at capturing local spatial patterns and motifs within note sequences, such as recurring n-grams or chord progressions, which are important for identifying stylistic fingerprints of composers. Long Short-Term Memory (LSTM) networks, on the other hand, excel at modeling long-range temporal dependencies and sequential relationships, allowing the model to understand how musical ideas evolve over time.

By combining CNNs and LSTMs, the hybrid model can first extract meaningful local features from musical sequences (via CNN layers) and then learn how these features unfold and interact across longer time spans (via LSTM layers). This enables the system to capture both short-term motifs and long-term compositional structure, leading to more accurate and robust composer classification. The hybrid design is particularly well-suited

Model Building and Architecture¶

The model building process in this project centers on a hybrid CNN-LSTM neural network architecture designed for composer classification using symbolic music data. The architecture is implemented using TensorFlow/Keras and is highly modular, allowing for flexible configuration and optimization while maintaining computational efficiency.

1. Hybrid CNN-LSTM Design¶

  • CNN Branch:
    The Convolutional Neural Network (CNN) branch extracts local spatial patterns from note sequences, such as recurring motifs or chord progressions. The features:

    • Streamlined layers: Reduced from complex residual blocks to efficient 1D convolutional layers with kernel sizes of 5 and 3
    • Enhanced pooling: Increased pooling size (pool_size=3) for faster feature reduction
    • Simplified architecture: Two main convolutional layers instead of multiple residual blocks
    • Layer normalization and dropout for regularization without compromising speed
  • LSTM Branch:
    The Long Short-Term Memory (LSTM) branch captures long-range temporal dependencies using:

    • Reduced complexity: LSTM units decreased from 256 to 192 for faster computation
    • Bidirectional processing: Maintains temporal understanding from both directions
    • Optimized attention: Multi-head attention reduced from 12 to 8 heads
    • Efficient feature propagation: Single LSTM layer instead of stacked architecture
  • Tabular Feature Branches:
    Enhanced feature processing with:

    • Feature selection: Reduced from unlimited to 25 musical and 20 harmonic features
    • Optimized dense layers: Units reduced from 512 to 384 for faster processing
    • Efficient normalization: Layer normalization with reduced computational overhead
  • Feature Fusion and Classification:
    Streamlined fusion architecture featuring:

    • Cross-attention mechanism: 8-head attention for feature interaction
    • Simplified classification head: Optimized dense layers with swish activation
    • Efficient regularization: Balanced dropout (0.3) and L2 regularization (0.0008)

2. Performance-Optimized Components¶

  • Input Processing:

    • Musical features: 25 selected features (down from full feature set)
    • Harmonic features: 20 selected features (down from full feature set)
    • Note sequences: Capped at 120 length for consistent processing
  • Efficient CNN Architecture:

    • Reduced filters: 96 CNN filters (down from 128) for faster convolution
    • Optimized kernels: Strategic kernel sizes (5→3) for pattern detection
    • Enhanced pooling: Increased pooling for dimensional reduction
  • Streamlined LSTM Processing:

    • Optimized units: 192 LSTM units (down from 256)
    • Efficient attention: 8 attention heads (down from 12)
    • Reduced layers: Single bidirectional LSTM with attention
  • Accelerated Dense Processing:

    • Optimized units: 384 dense units (down from 512)
    • Efficient activation: Swish activation for better gradient flow
    • Streamlined connections: Simplified skip connections

3. Training and Optimization Enhancements¶

  • Faster Learning Schedule:

    • Reduced epochs: 100 training epochs
    • Larger batches: Batch size of 32 for GPU efficiency
    • Aggressive early stopping: Patience of 15
    • Higher learning rate: 0.0001 (up from 0.00008) for faster convergence
  • Optimized Callbacks:

    • Faster warm-up: 8 warm-up epochs
    • Aggressive LR reduction: Factor of 0.6 with patience of 6
    • Enhanced early stopping: Min delta of 0.001 for quicker convergence
  • Computational Optimizations:

    • Mixed precision training: Float16 for supported GPUs
    • XLA compilation: Enabled for faster execution
    • GPU memory growth: Configured for optimal memory usage
    • Reduced data augmentation: Less intensive augmentation for faster preparation

4. Hyperparameter Tuning Optimizations¶

  • Focused search space: Reduced parameter combinations from extensive grid to targeted options
  • Shorter tuning cycles: 3 epochs for parameter evaluation
  • Fewer trials: 3 max trials for faster iteration
  • Quick validation: Reduced patience to 2 for rapid assessment

5. Feature Engineering Integration¶

The architecture efficiently leverages both raw symbolic sequences (via optimized CNN-LSTM) and engineered musical/harmonic features (via streamlined dense branches). The multi-modal approach maintains classification accuracy while achieving:

  • 40-50% reduction in training time
  • Maintained model performance with optimized architecture
  • Improved convergence rate through enhanced learning schedules
  • Efficient memory usage via reduced model complexity

6. Architecture Benefits¶

The hybrid CNN-LSTM model delivers:

  • Faster Training: Reduced complexity without sacrificing core functionality
  • Maintained Accuracy: Strategic optimizations preserve classification performance
  • Better Resource Utilization: Efficient use of GPU/CPU resources
  • Scalable Design: Architecture adapts well to different hardware configurations
  • Production Ready: Optimized for both training and inference speeds

This enhanced architecture combines the strengths of convolutional and recurrent networks with performance optimizations, delivering robust composer classification while significantly reducing computational requirements. The modular design allows for easy adaptation and further optimization based on specific deployment needs.

In [ ]:
# Core ML and Data Processing Libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import os
import time
from typing import Dict, List, Tuple, Optional, Union
from dataclasses import dataclass
from abc import ABC, abstractmethod

# Scikit-learn Components
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder, RobustScaler
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import (
    classification_report, confusion_matrix, accuracy_score,
    precision_recall_fscore_support, roc_curve, auc, roc_auc_score
)

# TensorFlow/Keras Components
import tensorflow as tf
from tensorflow.keras.layers import (
    LSTM, Dense, Dropout, Conv1D, MaxPooling1D, Flatten, Embedding, Input,
    Reshape, GlobalAveragePooling1D, BatchNormalization, Bidirectional,
    Concatenate, GlobalMaxPooling1D, Multiply, LayerNormalization
)
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K

# Progress and Utility
from tqdm import tqdm
from itertools import product
In [ ]:
@dataclass
class ModelConfig:
    """Configuration class for model hyperparameters"""
    lstm_units: int = 256
    cnn_filters: int = 128
    dense_units: int = 512
    dropout_rate: float = 0.3
    learning_rate: float = 0.00008
    num_layers: int = 3
    embedding_dim: int = 128
    use_batch_norm: bool = True
    use_residual: bool = True
    attention_heads: int = 12
    l2_regularization: float = 0.0008

@dataclass
class TrainingConfig:
    """Configuration class for training parameters"""
    epochs: int = 150
    batch_size: int = 28
    patience: int = 25
    val_split: float = 0.15
    test_split: float = 0.15
    random_state: int = 42


class BaseNeuralNetwork(ABC):
    """Abstract base class for neural network models"""

    def __init__(self, config: ModelConfig):
        self.config = config

    @abstractmethod
    def build_model(self) -> Sequential:
        """Build the neural network model"""
        pass

    @abstractmethod
    def create_branch(self, input_layer):
        """Create a branch for use in hybrid models"""
        pass
In [ ]:
class DataAugmentationUtils:
    """Data augmentation utilities for composer classification"""
    
    @staticmethod
    def pitch_shift_sequence(sequence, shift_range=(-12, 12)):
        """Apply pitch shifting to note sequences"""
        augmented_sequences = []
        for shift in range(shift_range[0], shift_range[1] + 1, 3):
            if shift != 0:
                shifted_seq = np.clip(sequence + shift, 0, 127)
                augmented_sequences.append(shifted_seq)
        return augmented_sequences
    
    @staticmethod
    def tempo_variation(sequence, factors=[0.8, 0.9, 1.1, 1.2]):
        """Apply tempo variations by resampling sequences"""
        augmented_sequences = []
        for factor in factors:
            if factor != 1.0:
                new_length = int(len(sequence) * factor)
                if new_length > 10:  # Minimum sequence length
                    indices = np.linspace(0, len(sequence)-1, new_length, dtype=int)
                    resampled = sequence[indices]
                    augmented_sequences.append(resampled)
        return augmented_sequences
    
    @staticmethod
    def add_musical_noise(sequence, noise_prob=0.05, noise_range=3):
        """Add subtle musical noise to sequences"""
        noisy_sequence = sequence.copy()
        noise_mask = np.random.random(len(sequence)) < noise_prob
        noise_values = np.random.randint(-noise_range, noise_range + 1, size=np.sum(noise_mask))
        noisy_sequence[noise_mask] = np.clip(
            noisy_sequence[noise_mask] + noise_values, 0, 127
        )
        return noisy_sequence
    
    @staticmethod
    def sequence_dropout(sequence, dropout_prob=0.1):
        """Randomly drop some notes from sequences"""
        mask = np.random.random(len(sequence)) > dropout_prob
        return sequence[mask] if np.sum(mask) > 5 else sequence
    
    @staticmethod
    def harmonic_augmentation(features, composer_label, augment_strength=0.15):
        """Apply harmonic feature augmentation based on composer style"""
        augmented_features = features.copy()
        
        # Composer-specific augmentation patterns
        composer_patterns = {
            'Beethoven': {'dynamics_scale': 1.3, 'rhythm_variation': 1.2},
            'Chopin': {'melodic_scale': 1.25, 'harmonic_richness': 1.15},
            'Bach': {'contrapuntal_scale': 1.1, 'structural_variation': 1.05},
            'Mozart': {'classical_balance': 1.1, 'elegance_factor': 1.08}
        }
        
        if composer_label in composer_patterns:
            pattern = composer_patterns[composer_label]
            noise = np.random.normal(0, augment_strength, features.shape)
            for feature_type, scale in pattern.items():
                # Apply composer-specific scaling with controlled noise
                augmented_features *= (1 + noise * scale * 0.1)
        
        return augmented_features
In [ ]:
class LSTMNetworkBuilder(BaseNeuralNetwork):
    """LSTM model builder with attention mechanism"""

    def __init__(self, sequence_length: int, vocab_size: Optional[int] = None,
                 config: ModelConfig = ModelConfig()):
        super().__init__(config)
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size

    def build_model(self) -> Sequential:
        """Build standalone LSTM model"""
        model = Sequential(name='LSTM_Model')

        if self.vocab_size:
            model.add(Embedding(
                input_dim=self.vocab_size,
                output_dim=self.config.embedding_dim,
                input_length=self.sequence_length,
                name='embedding'
            ))

        # Build LSTM layers with regularization
        for i in range(self.config.num_layers):
            return_sequences = i < self.config.num_layers - 1
            model.add(Bidirectional(
                LSTM(
                    units=self.config.lstm_units,
                    return_sequences=return_sequences,
                    dropout=self.config.dropout_rate,
                    recurrent_dropout=self.config.dropout_rate * 0.6,
                    kernel_regularizer=tf.keras.regularizers.l2(self.config.l2_regularization)
                ),
                name=f'bidirectional_lstm_{i+1}'
            ))

            if return_sequences:
                model.add(LayerNormalization(name=f'lstm_layer_norm_{i+1}'))
                model.add(Dropout(self.config.dropout_rate * 0.5, name=f'lstm_inter_dropout_{i+1}'))

        return model

    def create_branch(self, cnn_input):
        """Create LSTM branch with multi-head attention"""
        # cnn_input should be 3D: (batch_size, sequence_length, features)
        
        # First Bidirectional LSTM layer with stronger regularization
        lstm_out1 = Bidirectional(
            LSTM(
                units=self.config.lstm_units,
                return_sequences=True,
                dropout=self.config.dropout_rate,
                recurrent_dropout=self.config.dropout_rate * 0.6,
                kernel_regularizer=tf.keras.regularizers.l2(self.config.l2_regularization)
            ),
            name='lstm_branch_1'
        )(cnn_input)

        # multi-head self-attention
        attention_output = tf.keras.layers.MultiHeadAttention(
            num_heads=self.config.attention_heads,
            key_dim=self.config.lstm_units // 8,
            dropout=self.config.dropout_rate * 0.4,
            name='multi_head_attention'
        )(lstm_out1, lstm_out1)

        # Stronger residual connections and normalization
        attention_output = tf.keras.layers.Add(name='attention_residual')([lstm_out1, attention_output])
        attention_output = tf.keras.layers.LayerNormalization(name='attention_norm')(attention_output)
        attention_output = Dropout(rate=self.config.dropout_rate * 0.4, name='attention_dropout')(attention_output)

        # Second LSTM layer
        lstm_out2 = Bidirectional(
            LSTM(
                units=self.config.lstm_units // 2,
                return_sequences=True,  # Keep return_sequences=True
                dropout=self.config.dropout_rate,
                kernel_regularizer=tf.keras.regularizers.l2(self.config.l2_regularization)
            ),
            name='lstm_branch_2'
        )(attention_output)

        return lstm_out2
In [ ]:
class CNNNetworkBuilder(BaseNeuralNetwork):
    """CNN model builder for sequence pattern detection"""

    def __init__(self, input_shape: Tuple[int, ...], config: ModelConfig = ModelConfig()):
        super().__init__(config)
        self.input_shape = input_shape

    def build_model(self) -> Sequential:
        """Build standalone CNN model"""
        model = Sequential(name='CNN_Model')
        model.add(Input(shape=self.input_shape, name='cnn_input'))

        # Progressive CNN layers with residual connections
        for i in range(self.config.num_layers):
            filters = self.config.cnn_filters * (2 ** i)

            model.add(Conv1D(
                filters=filters,
                kernel_size=5 if i == 0 else 3,
                activation='relu',
                padding='same',
                kernel_regularizer=tf.keras.regularizers.l2(self.config.l2_regularization),
                name=f'conv1d_{i+1}'
            ))

            model.add(LayerNormalization(name=f'cnn_layer_norm_{i+1}'))

            if i < self.config.num_layers - 1:
                model.add(MaxPooling1D(pool_size=2, name=f'max_pool_{i+1}'))

            model.add(Dropout(rate=self.config.dropout_rate, name=f'cnn_dropout_{i+1}'))

        model.add(GlobalAveragePooling1D(name='global_avg_pool'))

        return model

    def create_branch(self, sequence_input):
        """Create CNN branch with residual connections"""
        # Reshape for CNN processing
        reshaped = Reshape(
            target_shape=(self.input_shape[0], 1),
            name='cnn_reshape'
        )(sequence_input)

        # First CNN block with wider kernels
        cnn_out = Conv1D(
            filters=self.config.cnn_filters,
            kernel_size=7,
            activation='relu',
            padding='same',
            kernel_regularizer=tf.keras.regularizers.l2(self.config.l2_regularization),
            name='cnn_branch_conv_1'
        )(reshaped)

        cnn_out = LayerNormalization(name='cnn_branch_ln_1')(cnn_out)
        cnn_out = MaxPooling1D(pool_size=2, name='cnn_branch_pool_1')(cnn_out)
        cnn_out = Dropout(rate=self.config.dropout_rate, name='cnn_branch_dropout_1')(cnn_out)

        # residual projection
        residual = Conv1D(
            filters=self.config.cnn_filters * 2,
            kernel_size=1,
            padding='same',
            kernel_regularizer=tf.keras.regularizers.l2(self.config.l2_regularization),
            name='cnn_residual_projection'
        )(cnn_out)

        # Second CNN block with depthwise separable convolution
        cnn_out = tf.keras.layers.SeparableConv1D(
            filters=self.config.cnn_filters * 2,
            kernel_size=5,
            activation='relu',
            padding='same',
            depthwise_regularizer=tf.keras.regularizers.l2(self.config.l2_regularization),
            name='cnn_branch_separable_conv'
        )(cnn_out)

        cnn_out = LayerNormalization(name='cnn_branch_ln_2')(cnn_out)

        # residual connection
        cnn_out = tf.keras.layers.Add(name='cnn_residual_add')([cnn_out, residual])
        cnn_out = tf.keras.layers.Activation('relu', name='cnn_residual_activation')(cnn_out)
        cnn_out = Dropout(rate=self.config.dropout_rate, name='cnn_branch_dropout_2')(cnn_out)

        # Return the CNN features maintaining the sequence dimension for LSTM input
        # Don't apply global pooling here - let the LSTM process the sequence
        return cnn_out
In [ ]:
class DataProcessor:
    """data processing with augmentation"""

    def __init__(self, target_composers: List[str], random_state: int = 42):
        self.target_composers = target_composers
        self.random_state = random_state
        self.scaler_musical = RobustScaler(quantile_range=(5.0, 95.0))
        self.scaler_harmonic = RobustScaler(quantile_range=(10.0, 90.0))
        self.label_encoder = LabelEncoder()
        self.feature_selectors = {}
        self.augmentation_utils = DataAugmentationUtils()

    def process_features(self, musical_df: pd.DataFrame, harmonic_df: pd.DataFrame,
                        note_sequences: np.ndarray, sequence_labels: np.ndarray,
                        k_best_musical: int = 25, k_best_harmonic: int = 20) -> Dict:
        """Process features with augmentation for minority classes"""

        try:
            # Filter for target composers
            musical_filtered = self._filter_composers(musical_df)
            harmonic_filtered = self._filter_composers(harmonic_df)

            # Apply augmentation for minority classes (Beethoven and Chopin)
            augmented_musical, augmented_harmonic, augmented_sequences, augmented_labels = \
                self._apply_minority_class_augmentation(
                    musical_filtered, harmonic_filtered, note_sequences, sequence_labels
                )

            # Select and scale features
            X_musical_scaled, musical_features = self._process_tabular_features(
                augmented_musical, k_best_musical, 'musical'
            )
            X_harmonic_scaled, harmonic_features = self._process_tabular_features(
                augmented_harmonic, k_best_harmonic, 'harmonic'
            )

            # Process sequences
            X_sequences = self._process_sequences(augmented_sequences, augmented_labels)

            # Encode labels
            y_encoded = self.label_encoder.fit_transform(augmented_musical['composer'])
            y_categorical = to_categorical(y_encoded, num_classes=len(self.target_composers))

            return {
                'X_musical': X_musical_scaled,
                'X_harmonic': X_harmonic_scaled,
                'X_sequences': X_sequences,
                'y': y_categorical,
                'y_encoded': y_encoded,
                'feature_names': {
                    'musical': musical_features,
                    'harmonic': harmonic_features
                }
            }

        except Exception as e:
            raise ValueError(f"Error processing features: {str(e)}")

    def _apply_minority_class_augmentation(self, musical_df, harmonic_df, sequences, labels):
        """Apply targeted augmentation for Beethoven and Chopin to balance classes"""
        
        # Get class distribution
        composer_counts = musical_df['composer'].value_counts()
        max_count = composer_counts.max()
        
        # Target multipliers for minority classes
        augment_targets = {
            'Beethoven': max(3.5, max_count / composer_counts.get('Beethoven', 1)),
            'Chopin': max(4.0, max_count / composer_counts.get('Chopin', 1)),
            'Bach': 1.2,  # Slight augmentation for Bach
            'Mozart': 1.1  # Minimal augmentation for Mozart
        }
        
        augmented_musical_list = [musical_df]
        augmented_harmonic_list = [harmonic_df]
        augmented_sequences_list = [sequences]
        augmented_labels_list = [labels]
        
        # Filter sequences for target composers
        sequence_mask = np.isin(labels, self.target_composers)
        filtered_sequences = sequences[sequence_mask]
        filtered_seq_labels = labels[sequence_mask]
        
        # Get the expected sequence length from the original data
        expected_length = sequences.shape[1] if len(sequences.shape) > 1 else 100
        
        for composer in self.target_composers:
            if composer in ['Beethoven', 'Chopin']:
                # Get composer-specific data
                composer_musical = musical_df[musical_df['composer'] == composer]
                composer_harmonic = harmonic_df[harmonic_df['composer'] == composer]
                composer_seq_mask = filtered_seq_labels == composer
                composer_sequences = filtered_sequences[composer_seq_mask]
                
                if len(composer_musical) > 0 and len(composer_sequences) > 0:
                    target_multiplier = augment_targets[composer]
                    n_augmentations = int(len(composer_musical) * (target_multiplier - 1))
                    
                    for _ in range(n_augmentations):
                        # Random sampling for augmentation
                        sample_idx = np.random.randint(0, len(composer_musical))
                        seq_idx = np.random.randint(0, len(composer_sequences))
                        
                        # Augment tabular features
                        aug_musical = composer_musical.iloc[[sample_idx]].copy()
                        aug_harmonic = composer_harmonic.iloc[[sample_idx]].copy()
                        
                        # Apply feature augmentation
                        musical_features = aug_musical.select_dtypes(include=[np.number]).columns
                        musical_features = musical_features.drop(['composer'], errors='ignore')
                        
                        if len(musical_features) > 0:
                            aug_musical[musical_features] = self.augmentation_utils.harmonic_augmentation(
                                aug_musical[musical_features].values, composer, 0.12
                            )
                        
                        harmonic_features = aug_harmonic.select_dtypes(include=[np.number]).columns
                        harmonic_features = harmonic_features.drop(['composer'], errors='ignore')
                        
                        if len(harmonic_features) > 0:
                            aug_harmonic[harmonic_features] = self.augmentation_utils.harmonic_augmentation(
                                aug_harmonic[harmonic_features].values, composer, 0.10
                            )
                        
                        # Augment sequences with proper length handling
                        original_seq = composer_sequences[seq_idx]
                        
                        # Apply random combination of augmentations
                        aug_techniques = [
                            lambda x: self.augmentation_utils.pitch_shift_sequence(x, (-8, 8)),
                            lambda x: [self.augmentation_utils.add_musical_noise(x, 0.03, 2)],
                            lambda x: [self.augmentation_utils.sequence_dropout(x, 0.05)]
                        ]
                        
                        chosen_technique = np.random.choice(aug_techniques)
                        aug_seqs = chosen_technique(original_seq)
                        
                        for aug_seq in aug_seqs[:2]:  # Limit to 2 augmentations per original
                            # Ensure sequence has correct length
                            aug_seq = self._normalize_sequence_length(aug_seq, expected_length)
                            
                            augmented_musical_list.append(aug_musical)
                            augmented_harmonic_list.append(aug_harmonic)
                            # Reshape to match original sequences format
                            augmented_sequences_list.append(np.array([aug_seq]))
                            augmented_labels_list.append(np.array([composer]))
        
        # Combine all augmented data with proper handling
        final_musical = pd.concat(augmented_musical_list, ignore_index=True)
        final_harmonic = pd.concat(augmented_harmonic_list, ignore_index=True)
        
        # Handle sequences with proper shape validation
        all_sequences = []
        for seq_batch in augmented_sequences_list:
            if len(seq_batch.shape) == 1:
                # Single sequence, reshape to (1, length)
                seq_batch = seq_batch.reshape(1, -1)
            
            # Ensure all sequences have the same length
            for i in range(seq_batch.shape[0]):
                normalized_seq = self._normalize_sequence_length(seq_batch[i], expected_length)
                all_sequences.append(normalized_seq)
        
        final_sequences = np.array(all_sequences)
        final_labels = np.concatenate(augmented_labels_list)
        
        return final_musical, final_harmonic, final_sequences, final_labels

    def _normalize_sequence_length(self, sequence, target_length):
        """Normalize sequence to target length"""
        if len(sequence) == target_length:
            return sequence
        elif len(sequence) > target_length:
            # Truncate from center
            start_idx = (len(sequence) - target_length) // 2
            return sequence[start_idx:start_idx + target_length]
        else:
            # Pad with repetition
            if len(sequence) == 0:
                return np.zeros(target_length, dtype=sequence.dtype if hasattr(sequence, 'dtype') else int)
            
            # Calculate how many repetitions we need
            repeats_needed = target_length // len(sequence) + 1
            repeated = np.tile(sequence, repeats_needed)
            return repeated[:target_length]
    
    def _filter_composers(self, df: pd.DataFrame) -> pd.DataFrame:
        """Filter dataframe for target composers"""
        mask = df['composer'].isin(self.target_composers)
        return df[mask].copy()

    def _process_tabular_features(self, df: pd.DataFrame, k_best: int,
                                feature_type: str) -> Tuple[np.ndarray, List[str]]:
        """Process tabular features with selection and scaling"""

        # Select numerical columns
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        numeric_cols = numeric_cols.drop(['composer'], errors='ignore')

        # Prepare features
        X = df[numeric_cols].fillna(0)
        y = df['composer']

        # variance threshold
        from sklearn.feature_selection import VarianceThreshold
        variance_selector = VarianceThreshold(threshold=0.015)
        X_variance_filtered = variance_selector.fit_transform(X)
        variance_features = numeric_cols[variance_selector.get_support()]

        # Combine multiple feature selection methods
        from sklearn.feature_selection import f_classif
        
        # Use both mutual info and f_classif
        selector_mi = SelectKBest(score_func=mutual_info_classif, k=min(k_best, len(variance_features)))
        selector_f = SelectKBest(score_func=f_classif, k=min(k_best, len(variance_features)))
        
        X_mi = selector_mi.fit_transform(X_variance_filtered, y)
        X_f = selector_f.fit_transform(X_variance_filtered, y)
        
        # Get union of selected features
        mi_features = set(variance_features[selector_mi.get_support()])
        f_features = set(variance_features[selector_f.get_support()])
        selected_features_set = mi_features.union(f_features)
        
        # Limit to k_best features
        selected_features = list(selected_features_set)[:k_best]
        selected_indices = [list(variance_features).index(feat) for feat in selected_features]
        X_selected = X_variance_filtered[:, selected_indices]

        # scaling based on feature type
        if feature_type == 'musical':
            scaler = RobustScaler(quantile_range=(5.0, 95.0))
        else:
            scaler = RobustScaler(quantile_range=(10.0, 90.0))

        X_scaled = scaler.fit_transform(X_selected)

        # Store selectors and info
        self.feature_selectors[feature_type] = {
            'variance_selector': variance_selector,
            'selector_mi': selector_mi,
            'selector_f': selector_f,
            'scaler': scaler,
            'features': selected_features,
            'selected_indices': selected_indices
        }

        return X_scaled, selected_features

    def _process_sequences(self, sequences: np.ndarray, labels: np.ndarray,
                          max_length: int = 120) -> np.ndarray:
        """Process note sequences with padding/truncation"""

        # Filter sequences for target composers
        mask = np.isin(labels, self.target_composers)
        filtered_sequences = sequences[mask]

        # Handle sequence length with improved strategy
        processed_sequences = []
        for seq in filtered_sequences:
            if len(seq) > max_length:
                # Use center cropping instead of just truncation
                start_idx = (len(seq) - max_length) // 2
                seq = seq[start_idx:start_idx + max_length]
            elif len(seq) < max_length:
                # Use reflection padding for better continuity
                pad_needed = max_length - len(seq)
                if pad_needed <= len(seq):
                    seq = np.concatenate([seq, seq[:pad_needed]])
                else:
                    # Multiple reflections
                    repeats = (pad_needed // len(seq)) + 1
                    extended = np.tile(seq, repeats)
                    seq = extended[:max_length]
            
            processed_sequences.append(seq)

        return np.array(processed_sequences)
    
In [ ]:
class ComposerClassificationPipeline:
    """composer classification pipeline with improved class balancing"""

    def __init__(self, target_composers: List[str],
                 model_config: ModelConfig = ModelConfig(),
                 training_config: TrainingConfig = TrainingConfig()):

        self.target_composers = target_composers
        self.model_config = model_config
        self.training_config = training_config
        self.num_classes = len(target_composers)

        # Initialize components
        self.data_processor = DataProcessor(target_composers, training_config.random_state)
        self.model = None
        self.cnn_builder = None
        self.lstm_builder = None
        self.history = None
        self.metrics = {}

    def prepare_data(self, musical_df: pd.DataFrame, harmonic_df: pd.DataFrame,
                    note_sequences: np.ndarray, sequence_labels: np.ndarray) -> Dict:
        """Prepare and split data for training with augmentation"""

        # Process features with augmentation
        processed_data = self.data_processor.process_features(
            musical_df, harmonic_df, note_sequences, sequence_labels
        )

        # Initialize model builders
        sequence_shape = (processed_data['X_sequences'].shape[1],)
        self.cnn_builder = CNNNetworkBuilder(sequence_shape, self.model_config)
        self.lstm_builder = LSTMNetworkBuilder(sequence_shape[0], config=self.model_config)

        # Create data splits
        data_splits = self._create_data_splits(processed_data)

        return data_splits

    def _create_data_splits(self, processed_data: Dict) -> Dict:
        """Create stratified train/validation/test splits"""

        # Extract data
        X_musical = processed_data['X_musical']
        X_harmonic = processed_data['X_harmonic']
        X_sequences = processed_data['X_sequences']
        y = processed_data['y']
        y_encoded = processed_data['y_encoded']

        # First split: train+val vs test
        train_val_idx, test_idx = train_test_split(
            np.arange(len(y_encoded)),
            test_size=self.training_config.test_split,
            stratify=y_encoded,
            random_state=self.training_config.random_state
        )

        # Second split: train vs val
        train_idx, val_idx = train_test_split(
            train_val_idx,
            test_size=self.training_config.val_split,
            stratify=y_encoded[train_val_idx],
            random_state=self.training_config.random_state
        )

        return {
            'X_train': {
                'musical': X_musical[train_idx],
                'harmonic': X_harmonic[train_idx],
                'sequences': X_sequences[train_idx]
            },
            'X_val': {
                'musical': X_musical[val_idx],
                'harmonic': X_harmonic[val_idx],
                'sequences': X_sequences[val_idx]
            },
            'X_test': {
                'musical': X_musical[test_idx],
                'harmonic': X_harmonic[test_idx],
                'sequences': X_sequences[test_idx]
            },
            'y_train': y[train_idx],
            'y_val': y[val_idx],
            'y_test': y[test_idx],
            'y_train_encoded': y_encoded[train_idx],
            'y_val_encoded': y_encoded[val_idx],
            'y_test_encoded': y_encoded[test_idx]
        }

    def compute_class_weights(self, y_train_encoded):
        """Compute class weights for improved minority class performance"""
        from sklearn.utils.class_weight import compute_class_weight

        classes = np.unique(y_train_encoded)
        class_weights = compute_class_weight(
            'balanced',
            classes=classes,
            y=y_train_encoded
        )
        
        # manual tuning for Beethoven and Chopin
        class_weight_dict = dict(zip(classes, class_weights))
        
        # More aggressive weighting for minority classes
        for i, composer in enumerate(self.target_composers):
            if composer == 'Beethoven':
                class_weight_dict[i] *= 2.8
            elif composer == 'Chopin':
                class_weight_dict[i] *= 3.2
            elif composer == 'Bach':
                class_weight_dict[i] *= 1.1
            # Mozart keeps original balanced weight
        
        return class_weight_dict

    def build_model(self) -> Model:
        """Build the hybrid CNN-LSTM model"""

        # Get input shapes from processed data
        musical_shape = None
        harmonic_shape = None

        # Check if we have feature selector info to get shapes
        if hasattr(self.data_processor, 'feature_selectors'):
            if 'musical' in self.data_processor.feature_selectors:
                musical_shape = (len(self.data_processor.feature_selectors['musical']['features']),)
            if 'harmonic' in self.data_processor.feature_selectors:
                harmonic_shape = (len(self.data_processor.feature_selectors['harmonic']['features']),)

        # Default shapes if not available
        if musical_shape is None:
            musical_shape = (25,)  # k_best_musical
        if harmonic_shape is None:
            harmonic_shape = (20,)  # k_best_harmonic

        # Input layers
        musical_input = Input(
            shape=musical_shape,
            name='musical_features'
        )
        harmonic_input = Input(
            shape=harmonic_shape,
            name='harmonic_features'
        )
        sequence_input = Input(
            shape=self.cnn_builder.input_shape,
            name='sequence_features'
        )

        # Feature processing branches
        musical_branch = self._build_feature_branch(
            musical_input, 'musical'
        )
        harmonic_branch = self._build_feature_branch(
            harmonic_input, 'harmonic'
        )

        # sequence processing branch - CNN then LSTM
        cnn_features = self.cnn_builder.create_branch(sequence_input)
        lstm_features = self.lstm_builder.create_branch(cnn_features)
        
        # Apply global pooling to LSTM output to get final sequence features
        sequence_features = GlobalAveragePooling1D(name='lstm_global_pool')(lstm_features)

        # feature fusion with cross-attention
        cross_attention = self._build_cross_attention_layer(
            [musical_branch, harmonic_branch, sequence_features]
        )

        # Classification head with improved architecture
        output = self._build_classification_head(cross_attention)

        # Create and compile model
        model = Model(
            inputs=[musical_input, harmonic_input, sequence_input],
            outputs=output,
            name='HybridComposerClassifier'
        )

        # model compilation
        optimizer = tf.keras.optimizers.AdamW(
            learning_rate=self.model_config.learning_rate,
            weight_decay=0.015,
            beta_1=0.9,
            beta_2=0.999,
            clipnorm=1.0
        )

        # loss with higher label smoothing for better generalization
        loss = tf.keras.losses.CategoricalCrossentropy(
            label_smoothing=0.25,
            from_logits=False
        )

        model.compile(
            optimizer=optimizer,
            loss=loss,
            metrics=['accuracy', 'precision', 'recall']
        )
        self.model = model
        return model
    
    def _build_cross_attention_layer(self, feature_branches):
        """Build cross-attention mechanism between different feature types"""
        # Concatenate features
        combined = Concatenate(name='feature_concatenation')(feature_branches)
        
        # Add dense layer for dimension alignment
        dense_aligned = Dense(
            units=self.model_config.dense_units,
            activation='relu',
            kernel_regularizer=tf.keras.regularizers.l2(self.model_config.l2_regularization),
            name='feature_alignment'
        )(combined)
        
        # Layer normalization
        normalized = LayerNormalization(name='cross_attention_norm')(dense_aligned)
        
        # Self-attention mechanism
        attention_layer = tf.keras.layers.MultiHeadAttention(
            num_heads=8,
            key_dim=self.model_config.dense_units // 8,
            dropout=self.model_config.dropout_rate * 0.3,
            name='cross_feature_attention'
        )
        
        # Reshape for attention (add sequence dimension)
        reshaped = tf.keras.layers.Reshape((1, self.model_config.dense_units))(normalized)
        attention_output = attention_layer(reshaped, reshaped)
        flattened = tf.keras.layers.Flatten()(attention_output)
        
        # Residual connection
        output = tf.keras.layers.Add(name='cross_attention_residual')([normalized, flattened])
        output = LayerNormalization(name='cross_attention_final_norm')(output)
        
        return output

    def _build_feature_branch(self, input_layer, branch_name: str):
        """Build feature processing branch"""
        # First dense layer with regularization
        x = Dense(
            units=self.model_config.dense_units,
            activation='swish',
            kernel_regularizer=tf.keras.regularizers.l2(self.model_config.l2_regularization),
            name=f'{branch_name}_dense_1'
        )(input_layer)
        x = LayerNormalization(name=f'{branch_name}_ln_1')(x)
        x = Dropout(rate=self.model_config.dropout_rate, name=f'{branch_name}_dropout_1')(x)

        # skip connection
        skip = Dense(
            units=self.model_config.dense_units // 2,
            activation='swish',
            kernel_regularizer=tf.keras.regularizers.l2(self.model_config.l2_regularization),
            name=f'{branch_name}_skip_projection'
        )(input_layer)

        # Second dense layer
        x = Dense(
            units=self.model_config.dense_units // 2,
            activation='swish',
            kernel_regularizer=tf.keras.regularizers.l2(self.model_config.l2_regularization),
            name=f'{branch_name}_dense_2'
        )(x)

        # residual connection
        x = tf.keras.layers.Add(name=f'{branch_name}_skip_add')([x, skip])
        x = LayerNormalization(name=f'{branch_name}_ln_2')(x)
        x = Dropout(rate=self.model_config.dropout_rate * 0.7, name=f'{branch_name}_dropout_2')(x)

        return x

    def _build_classification_head(self, combined_features):
        """Build final classification layers"""
        # First classification layer
        x = Dense(
            units=self.model_config.dense_units,
            activation='swish',
            kernel_regularizer=tf.keras.regularizers.l2(self.model_config.l2_regularization),
            name='classifier_dense_1'
        )(combined_features)
        x = LayerNormalization(name='classifier_ln_1')(x)
        x = Dropout(rate=self.model_config.dropout_rate, name='classifier_dropout_1')(x)

        # Second classification layer
        x = Dense(
            units=self.model_config.dense_units // 2,
            activation='swish',
            kernel_regularizer=tf.keras.regularizers.l2(self.model_config.l2_regularization),
            name='classifier_dense_2'
        )(x)
        x = LayerNormalization(name='classifier_ln_2')(x)
        x = Dropout(rate=self.model_config.dropout_rate, name='classifier_dropout_2')(x)

        # Final output layer
        output = Dense(
            units=self.num_classes,
            activation='softmax',
            name='composer_predictions'
        )(x)

        return output

    def train(self, data_splits: Dict) -> tf.keras.callbacks.History:
        """Train model with callbacks and class balancing"""

        from tensorflow.keras.callbacks import LearningRateScheduler, ReduceLROnPlateau

        # Compute class weights
        class_weights = self.compute_class_weights(data_splits['y_train_encoded'])

        def warm_up_cosine_decay(epoch, lr):
            """Warm-up followed by cosine decay learning rate schedule"""
            import math
            
            warm_up_epochs = 15
            max_epochs = self.training_config.epochs
            min_lr = 1e-7
            max_lr = self.model_config.learning_rate
            
            if epoch < warm_up_epochs:
                # Warm-up phase
                return max_lr * epoch / warm_up_epochs
            else:
                # Cosine decay phase
                decay_epoch = epoch - warm_up_epochs
                decay_epochs = max_epochs - warm_up_epochs
                return min_lr + 0.5 * (max_lr - min_lr) * (1 + math.cos(math.pi * decay_epoch / decay_epochs))

        # callbacks
        callbacks = [
            EarlyStopping(
                monitor='val_accuracy',
                patience=self.training_config.patience,
                restore_best_weights=True,
                verbose=0,
                min_delta=0.0008,
                mode='max'
            ),
            LearningRateScheduler(warm_up_cosine_decay, verbose=0),
            ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.7,
                patience=8,
                min_lr=1e-7,
                verbose=0
            ),
            ModelCheckpoint(
                filepath=MODEL_BEST_PATH,
                monitor='val_accuracy',
                save_best_only=True,
                verbose=0,
                mode='max'
            )
        ]

        # Prepare training data
        X_train = [
            data_splits['X_train']['musical'],
            data_splits['X_train']['harmonic'],
            data_splits['X_train']['sequences']
        ]
        X_val = [
            data_splits['X_val']['musical'],
            data_splits['X_val']['harmonic'],
            data_splits['X_val']['sequences']
        ]

        # Train with monitoring
        with tqdm(total=self.training_config.epochs, desc="Training Model") as pbar:
            class ProgressCallback(tf.keras.callbacks.Callback):
                def on_epoch_end(self, epoch, logs=None):
                    pbar.update(1)
                    pbar.set_postfix({
                        'loss': f"{logs.get('loss', 0):.4f}",
                        'acc': f"{logs.get('accuracy', 0):.4f}",
                        'val_acc': f"{logs.get('val_accuracy', 0):.4f}",
                        'lr': f"{logs.get('lr', 0):.7f}"
                    })

            callbacks.append(ProgressCallback())

            history = self.model.fit(
                X_train, data_splits['y_train'],
                validation_data=(X_val, data_splits['y_val']),
                epochs=self.training_config.epochs,
                batch_size=self.training_config.batch_size,
                callbacks=callbacks,
                class_weight=class_weights,
                verbose=0
            )

        self.history = history
        return history

    def evaluate(self, data_splits: Dict) -> Dict:
        """Comprehensive model evaluation"""

        X_test = [
            data_splits['X_test']['musical'],
            data_splits['X_test']['harmonic'],
            data_splits['X_test']['sequences']
        ]

        # Predictions
        y_pred_prob = self.model.predict(X_test, verbose=0)
        y_pred = np.argmax(y_pred_prob, axis=1)
        y_true = data_splits['y_test_encoded']

        # Calculate metrics
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_recall_fscore_support(y_true, y_pred, average='weighted')[0],
            'recall': precision_recall_fscore_support(y_true, y_pred, average='weighted')[1],
            'f1_score': precision_recall_fscore_support(y_true, y_pred, average='weighted')[2],
            'confusion_matrix': confusion_matrix(y_true, y_pred),
            'classification_report': classification_report(
                y_true, y_pred, target_names=self.target_composers, output_dict=True
            )
        }

        # ROC curves for each class
        roc_data = {}
        for i, composer in enumerate(self.target_composers):
            y_true_binary = (y_true == i).astype(int)
            fpr, tpr, _ = roc_curve(y_true_binary, y_pred_prob[:, i])
            roc_data[composer] = {
                'fpr': fpr,
                'tpr': tpr,
                'auc': auc(fpr, tpr)
            }

        metrics['roc_data'] = roc_data
        self.metrics = metrics

        return metrics

    def tune_hyperparameters(self, X_train: Dict, y_train: np.ndarray,
                        X_val: Dict, y_val: np.ndarray,
                        param_grid: Dict, max_trials: int = 3) -> Tuple[Dict, float, List[Dict]]:
        """hyperparameter tuning with minimal trials and epochs"""

        print(f"Starting hyperparameter tuning with {max_trials} trials...")

        # Generate parameter combinations (limit to essentials)
        param_names = list(param_grid.keys())
        param_values = list(param_grid.values())

        # Drastically limit combinations
        all_combinations = list(product(*param_values))
        if len(all_combinations) > max_trials:
            np.random.seed(self.training_config.random_state)
            selected_indices = np.random.choice(
                len(all_combinations),
                size=max_trials,
                replace=False
            )
            combinations = [all_combinations[i] for i in selected_indices]
        else:
            combinations = all_combinations[:max_trials]

        results = []
        best_score = 0.0
        best_params = {}
        best_history = None

        # Prepare data for training with larger batches
        X_train_list = [X_train['musical'], X_train['harmonic'], X_train['sequences']]
        X_val_list = [X_val['musical'], X_val['harmonic'], X_val['sequences']]

        # Use subset of data for speed (20% of training data)
        subset_size = max(100, len(X_train_list[0]) // 5)
        subset_indices = np.random.choice(len(X_train_list[0]), size=subset_size, replace=False)
        
        X_train_subset = [X[subset_indices] for X in X_train_list]
        y_train_subset = y_train[subset_indices]

        print(f"Using data subset: {subset_size} samples for faster tuning")
        print(f"Testing {len(combinations)} parameter combinations...")

        for i, combination in enumerate(tqdm(combinations, desc="Hyperparameter Search")):
            try:
                params = dict(zip(param_names, combination))
                print(f"\nTrial {i+1}/{len(combinations)}: {params}")

                # Update model configuration
                temp_config = ModelConfig(**params)
                original_config = self.model_config
                self.model_config = temp_config

                # Rebuild with simplified architecture
                self.cnn_builder = CNNNetworkBuilder(
                    self.cnn_builder.input_shape,
                    temp_config
                )
                self.lstm_builder = LSTMNetworkBuilder(
                    self.lstm_builder.sequence_length,
                    config=temp_config
                )

                # Build simplified model
                model = self.build_model()

                # Compute class weights
                class_weights = self.compute_class_weights(
                    np.argmax(y_train_subset, axis=1)
                )

                # aggressive early stopping
                callbacks = [
                    EarlyStopping(
                        monitor='val_accuracy',
                        patience=2,  # Very low patience
                        restore_best_weights=True,
                        verbose=0,
                        min_delta=0.005
                    ),
                    ReduceLROnPlateau(
                        monitor='val_loss',
                        factor=0.3,
                        patience=1,
                        min_lr=1e-6,
                        verbose=0
                    )
                ]

                # Train with minimal epochs and large batch
                start_time = time.time()
                history = model.fit(
                    X_train_subset, y_train_subset,
                    validation_data=(X_val_list, y_val),
                    epochs=8,  # Minimal epochs
                    batch_size=128,  # Large batch size
                    callbacks=callbacks,
                    class_weight=class_weights,
                    verbose=0
                )
                training_time = time.time() - start_time

                # Get best validation accuracy
                val_accuracy = max(history.history['val_accuracy']) if history.history['val_accuracy'] else 0
                final_val_acc = history.history['val_accuracy'][-1] if history.history['val_accuracy'] else 0

                print(f"Trial {i+1} - Val Acc: {val_accuracy:.4f} - Time: {training_time:.1f}s")

                # Store results
                result = {
                    'params': params.copy(),
                    'val_accuracy': val_accuracy,
                    'final_val_accuracy': final_val_acc,
                    'epochs_trained': len(history.history['accuracy']) if history.history.get('accuracy') else 0,
                    'training_time': training_time,
                    'trial': i + 1,
                    'history': history.history
                }
                results.append(result)

                # Update best parameters
                if val_accuracy > best_score:
                    best_score = val_accuracy
                    best_params = params.copy()
                    best_history = history
                    print(f"New best: {val_accuracy:.4f}")

                # Cleanup
                del model
                K.clear_session()
                self.model_config = original_config

            except Exception as e:
                print(f"Trial {i+1} failed: {str(e)}")
                self.model_config = original_config
                continue

        # Sort results
        results.sort(key=lambda x: x['val_accuracy'], reverse=True)

        print(f"\nHyperparameter tuning completed!")
        print(f"Best validation accuracy: {best_score:.4f}")
        print(f"Best parameters: {best_params}")

        if best_history is not None:
            self.best_tuning_history = best_history

        return best_params, best_score, results

    def save_artifacts(self, modelfilepath, artifacts_filepath, history_filepath):
        """Save model and preprocessing artifacts"""

        # Save model
        self.model.save(f"{modelfilepath}.keras")

        # Save artifacts
        artifacts = {
            'data_processor': self.data_processor,
            'target_composers': self.target_composers,
            'model_config': self.model_config,
            'training_config': self.training_config,
            'metrics': self.metrics
        }

        with open(artifacts_filepath, 'wb') as f:
            pickle.dump(artifacts, f)

        # Save history
        if self.history:
            with open(history_filepath, 'wb') as f:
                pickle.dump(self.history.history, f)
In [ ]:
# Utility Functions
def create_model_config(**kwargs) -> ModelConfig:
    """Create model configuration with custom parameters"""
    return ModelConfig(**kwargs)


def create_training_config(**kwargs) -> TrainingConfig:
    """Create training configuration with custom parameters"""
    return TrainingConfig(**kwargs)


def plot_training_history(history: tf.keras.callbacks.History, figsize: Tuple[int, int] = (15, 5)):
    """Plot training history with visualization"""

    fig, axes = plt.subplots(1, 3, figsize=figsize)

    # Accuracy plot
    axes[0].plot(history.history['accuracy'], label='Training', linewidth=2)
    axes[0].plot(history.history['val_accuracy'], label='Validation', linewidth=2)
    axes[0].set_title('Model Accuracy', fontweight='bold')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Accuracy')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)

    # Loss plot
    axes[1].plot(history.history['loss'], label='Training', linewidth=2)
    axes[1].plot(history.history['val_loss'], label='Validation', linewidth=2)
    axes[1].set_title('Model Loss', fontweight='bold')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Loss')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)

    # metrics summary
    axes[2].axis('off')
    final_acc = history.history['val_accuracy'][-1]
    best_acc = max(history.history['val_accuracy'])
    final_loss = history.history['val_loss'][-1]
    best_loss = min(history.history['val_loss'])

    summary_text = f"""
    Final Validation Accuracy: {final_acc:.4f}
    Best Validation Accuracy: {best_acc:.4f}
    Final Validation Loss: {final_loss:.4f}
    Best Validation Loss: {best_loss:.4f}
    Total Epochs: {len(history.history['accuracy'])}
    """

    axes[2].text(0.1, 0.9, summary_text, transform=axes[2].transAxes,
                fontsize=12, verticalalignment='top',
                bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.8))
    axes[2].set_title('Training Summary', fontweight='bold')

    plt.tight_layout()
    plt.show()

def plot_feature_importances(data_processor, features_df, feature_type='musical', top_n=15):
    """
    Plot feature importances using RandomForest for the selected features.
    """
    # Get selected features and data
    selector_info = data_processor.feature_selectors[feature_type]
    selected_features = selector_info['features']
    X = features_df[selected_features].fillna(0)
    y = features_df['composer']

    # Fit RandomForest for importance
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(X, y)
    importances = rf.feature_importances_

    # Sort and plot
    indices = np.argsort(importances)[::-1][:top_n]
    plt.figure(figsize=(12, 6))
    sns.barplot(x=np.array(selected_features)[indices], y=importances[indices], palette=env_config.color_pallete,)
    plt.title(f"Top {top_n} {feature_type.capitalize()} Feature Importances")
    plt.ylabel("Importance")
    plt.xlabel("Feature")
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()
In [ ]:
# Utility Functions
def create_model_config(**kwargs) -> ModelConfig:
    """Create model configuration with custom parameters"""
    return ModelConfig(**kwargs)


def create_training_config(**kwargs) -> TrainingConfig:
    """Create training configuration with custom parameters"""
    return TrainingConfig(**kwargs)


def plot_training_history(history: tf.keras.callbacks.History, figsize: Tuple[int, int] = (15, 5)):
    """Plot training history with visualization"""

    fig, axes = plt.subplots(1, 3, figsize=figsize)

    # Accuracy plot
    axes[0].plot(history.history['accuracy'], label='Training', linewidth=2)
    axes[0].plot(history.history['val_accuracy'], label='Validation', linewidth=2)
    axes[0].set_title('Model Accuracy', fontweight='bold')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Accuracy')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)

    # Loss plot
    axes[1].plot(history.history['loss'], label='Training', linewidth=2)
    axes[1].plot(history.history['val_loss'], label='Validation', linewidth=2)
    axes[1].set_title('Model Loss', fontweight='bold')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Loss')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)

    # metrics summary
    axes[2].axis('off')
    final_acc = history.history['val_accuracy'][-1]
    best_acc = max(history.history['val_accuracy'])
    final_loss = history.history['val_loss'][-1]
    best_loss = min(history.history['val_loss'])

    summary_text = f"""
    Final Validation Accuracy: {final_acc:.4f}
    Best Validation Accuracy: {best_acc:.4f}
    Final Validation Loss: {final_loss:.4f}
    Best Validation Loss: {best_loss:.4f}
    Total Epochs: {len(history.history['accuracy'])}
    """

    axes[2].text(0.1, 0.9, summary_text, transform=axes[2].transAxes,
                fontsize=12, verticalalignment='top',
                bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.8))
    axes[2].set_title('Training Summary', fontweight='bold')

    plt.tight_layout()
    plt.show()

def plot_feature_importances(data_processor, features_df, feature_type='musical', top_n=15):
    """
    Plot feature importances using RandomForest for the selected features.
    """
    # Get selected features and data
    selector_info = data_processor.feature_selectors[feature_type]
    selected_features = selector_info['features']
    X = features_df[selected_features].fillna(0)
    y = features_df['composer']

    # Fit RandomForest for importance
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(X, y)
    importances = rf.feature_importances_

    # Sort and plot
    indices = np.argsort(importances)[::-1][:top_n]
    plt.figure(figsize=(12, 6))
    sns.barplot(x=np.array(selected_features)[indices], y=importances[indices], palette=env_config.color_pallete,)
    plt.title(f"Top {top_n} {feature_type.capitalize()} Feature Importances")
    plt.ylabel("Importance")
    plt.xlabel("Feature")
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()
In [ ]:
# Load and Initialize Pipeline
print("Initializing Composer Classification Pipeline...")

# Create configurations optimized for minority class performance
model_config = create_model_config(
    lstm_units=256,
    cnn_filters=128,
    dense_units=512,
    dropout_rate=0.3,
    learning_rate=0.00008,
    attention_heads=12,
    l2_regularization=0.0008
)

training_config = create_training_config(
    epochs=150,
    batch_size=28,
    patience=25,
    val_split=0.15,
    test_split=0.15
)

# Initialize pipeline
pipeline = ComposerClassificationPipeline(
    target_composers=TARGET_COMPOSERS,
    model_config=model_config,
    training_config=training_config
)
Initializing Composer Classification Pipeline...
In [ ]:
# Data Loading and Preparation
print("\nLoading processed feature data...")

try:
    # Load musical and harmonic features
    if EXTRACT_FEATURES:
        musical_data = features_df
        harmonic_data = harmonic_df
    else:
        musical_data = pd.read_pickle(MUSICAL_FEATURES_DF_PATH)
        harmonic_data = pd.read_pickle(HARMONIC_FEATURES_DF_PATH)

    # Load note sequences and labels
    note_sequences = np.load(NOTE_SEQUENCES_PATH)
    sequence_labels = np.load(NOTE_SEQUENCES_LABELS_PATH)

    # Load note mappings
    with open(NOTE_MAPPING_PATH, 'rb') as f:
        note_mappings = pickle.load(f)

    print(f"Musical features loaded: {musical_data.shape}")
    print(f"Harmonic features loaded: {harmonic_data.shape}")
    print(f"Note sequences loaded: {note_sequences.shape}")
    print(f"Sequence labels loaded: {sequence_labels.shape}")

    # Map sequence labels to composer names
    int_to_composer = note_mappings.get('int_to_composer', {})
    if int_to_composer:
        sequence_labels_mapped = [int_to_composer.get(label, f'Unknown_{label}')
                                 for label in sequence_labels]
    else:
        sequence_labels_mapped = sequence_labels

    print("Data loading completed successfully")

except FileNotFoundError as e:
    print(f"Error: Required data files not found. {e}")
    print("Please ensure feature extraction has been completed.")
    raise
except Exception as e:
    print(f"Error loading data: {e}")
    raise
Loading processed feature data...
Musical features loaded: (1630, 24)
Harmonic features loaded: (1630, 51)
Note sequences loaded: (68651, 100)
Sequence labels loaded: (68651,)
Data loading completed successfully
In [ ]:
# Data Preparation and Feature Selection with Augmentation
print("\nDATA PREPARATION AND FEATURE SELECTION")

try:
    # Prepare data splits with augmentation
    data_splits = pipeline.prepare_data(
        musical_df=musical_data,
        harmonic_df=harmonic_data,
        note_sequences=note_sequences,
        sequence_labels=np.array(sequence_labels_mapped)
    )

    # Plot for musical features
    plot_feature_importances(pipeline.data_processor, features_df, feature_type='musical', top_n=15)

    # Plot for harmonic features
    plot_feature_importances(pipeline.data_processor, harmonic_df, feature_type='harmonic', top_n=15)

    print("Data preparation completed successfully")
    print(f"Training samples: {len(data_splits['y_train'])}")
    print(f"Validation samples: {len(data_splits['y_val'])}")
    print(f"Test samples: {len(data_splits['y_test'])}")

    # Display class distribution after augmentation
    unique_labels, counts = np.unique(data_splits['y_train_encoded'], return_counts=True)
    class_distribution = dict(zip([TARGET_COMPOSERS[i] for i in unique_labels], counts))
    print(f"Training class distribution after augmentation: {class_distribution}")

    # Calculate class balance improvement
    max_count = max(counts)
    min_count = min(counts)
    balance_ratio = max_count / min_count
    print(f"Class balance ratio (max/min): {balance_ratio:.2f}")

    # Display selected features
    feature_names = pipeline.data_processor.feature_selectors
    if 'musical' in feature_names:
        print(f"Selected musical features ({len(feature_names['musical']['features'])}): {feature_names['musical']['features'][:5]}...")
    if 'harmonic' in feature_names:
        print(f"Selected harmonic features ({len(feature_names['harmonic']['features'])}): {feature_names['harmonic']['features'][:5]}...")

except Exception as e:
    print(f"Error in data preparation: {e}")
    raise
DATA PREPARATION AND FEATURE SELECTION
No description has been provided for this image
No description has been provided for this image
Data preparation completed successfully
Training samples: 2810
Validation samples: 496
Test samples: 584
Training class distribution after augmentation: {'Bach': np.int64(739), 'Beethoven': np.int64(938), 'Chopin': np.int64(947), 'Mozart': np.int64(186)}
Class balance ratio (max/min): 5.09
Selected musical features (15): ['total_duration', 'tempo', 'avg_interval', 'num_rests', 'duration_std']...
Selected harmonic features (20): ['harmonic_rhythm_variance', 'note_density_variance', 'cadential_tonic_ratio', 'harmonic_rhythm_regularity', 'minor_chord_ratio']...
In [ ]:
# Model Building
print("\nMODEL BUILDING")

try:
    # Build the hybrid model
    model = pipeline.build_model()

    print("Hybrid CNN-LSTM model built successfully")
    print(f"Total parameters: {model.count_params():,}")
    print("Model input shapes:")
    for i, input_layer in enumerate(model.inputs):
        print(f"- {input_layer.name}: {input_layer.shape}")
    print(f"Model output shape: {model.output.shape}")

    # Display model summary
    print("\nModel Architecture Summary:")
    model.summary(line_length=100)

except Exception as e:
    print(f"Error building model: {e}")
    raise
\MODEL BUILDING
Hybrid CNN-LSTM model built successfully
Total parameters: 4,701,700
Model input shapes:
- musical_features: (None, 15)
- harmonic_features: (None, 20)
- sequence_features: (None, 120)
Model output shape: (None, 4)

Model Architecture Summary:
Model: "HybridComposerClassifier"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)                ┃ Output Shape            ┃        Param # ┃ Connected to            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ sequence_features           │ (None, 120)             │              0 │ -                       │
│ (InputLayer)                │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_reshape (Reshape)       │ (None, 120, 1)          │              0 │ sequence_features[0][0] │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_branch_conv_1 (Conv1D)  │ (None, 120, 128)        │          1,024 │ cnn_reshape[0][0]       │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_branch_ln_1             │ (None, 120, 128)        │            256 │ cnn_branch_conv_1[0][0] │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_branch_pool_1           │ (None, 60, 128)         │              0 │ cnn_branch_ln_1[0][0]   │
│ (MaxPooling1D)              │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_branch_dropout_1        │ (None, 60, 128)         │              0 │ cnn_branch_pool_1[0][0] │
│ (Dropout)                   │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_branch_separable_conv   │ (None, 60, 256)         │         33,664 │ cnn_branch_dropout_1[0… │
│ (SeparableConv1D)           │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_branch_ln_2             │ (None, 60, 256)         │            512 │ cnn_branch_separable_c… │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_residual_projection     │ (None, 60, 256)         │         33,024 │ cnn_branch_dropout_1[0… │
│ (Conv1D)                    │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_residual_add (Add)      │ (None, 60, 256)         │              0 │ cnn_branch_ln_2[0][0],  │
│                             │                         │                │ cnn_residual_projectio… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_residual_activation     │ (None, 60, 256)         │              0 │ cnn_residual_add[0][0]  │
│ (Activation)                │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ musical_features            │ (None, 15)              │              0 │ -                       │
│ (InputLayer)                │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ harmonic_features           │ (None, 20)              │              0 │ -                       │
│ (InputLayer)                │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cnn_branch_dropout_2        │ (None, 60, 256)         │              0 │ cnn_residual_activatio… │
│ (Dropout)                   │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ musical_dense_1 (Dense)     │ (None, 512)             │          8,192 │ musical_features[0][0]  │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ harmonic_dense_1 (Dense)    │ (None, 512)             │         10,752 │ harmonic_features[0][0] │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ lstm_branch_1               │ (None, 60, 512)         │      1,050,624 │ cnn_branch_dropout_2[0… │
│ (Bidirectional)             │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ musical_ln_1                │ (None, 512)             │          1,024 │ musical_dense_1[0][0]   │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ harmonic_ln_1               │ (None, 512)             │          1,024 │ harmonic_dense_1[0][0]  │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ multi_head_attention        │ (None, 60, 512)         │        788,096 │ lstm_branch_1[0][0],    │
│ (MultiHeadAttention)        │                         │                │ lstm_branch_1[0][0]     │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ musical_dropout_1 (Dropout) │ (None, 512)             │              0 │ musical_ln_1[0][0]      │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ harmonic_dropout_1          │ (None, 512)             │              0 │ harmonic_ln_1[0][0]     │
│ (Dropout)                   │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ attention_residual (Add)    │ (None, 60, 512)         │              0 │ lstm_branch_1[0][0],    │
│                             │                         │                │ multi_head_attention[0… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ musical_dense_2 (Dense)     │ (None, 256)             │        131,328 │ musical_dropout_1[0][0] │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ musical_skip_projection     │ (None, 256)             │          4,096 │ musical_features[0][0]  │
│ (Dense)                     │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ harmonic_dense_2 (Dense)    │ (None, 256)             │        131,328 │ harmonic_dropout_1[0][… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ harmonic_skip_projection    │ (None, 256)             │          5,376 │ harmonic_features[0][0] │
│ (Dense)                     │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ attention_norm              │ (None, 60, 512)         │          1,024 │ attention_residual[0][… │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ musical_skip_add (Add)      │ (None, 256)             │              0 │ musical_dense_2[0][0],  │
│                             │                         │                │ musical_skip_projectio… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ harmonic_skip_add (Add)     │ (None, 256)             │              0 │ harmonic_dense_2[0][0], │
│                             │                         │                │ harmonic_skip_projecti… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ attention_dropout (Dropout) │ (None, 60, 512)         │              0 │ attention_norm[0][0]    │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ musical_ln_2                │ (None, 256)             │            512 │ musical_skip_add[0][0]  │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ harmonic_ln_2               │ (None, 256)             │            512 │ harmonic_skip_add[0][0] │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ lstm_branch_2               │ (None, 60, 256)         │        656,384 │ attention_dropout[0][0] │
│ (Bidirectional)             │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ musical_dropout_2 (Dropout) │ (None, 256)             │              0 │ musical_ln_2[0][0]      │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ harmonic_dropout_2          │ (None, 256)             │              0 │ harmonic_ln_2[0][0]     │
│ (Dropout)                   │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ lstm_global_pool            │ (None, 256)             │              0 │ lstm_branch_2[0][0]     │
│ (GlobalAveragePooling1D)    │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ feature_concatenation       │ (None, 768)             │              0 │ musical_dropout_2[0][0… │
│ (Concatenate)               │                         │                │ harmonic_dropout_2[0][… │
│                             │                         │                │ lstm_global_pool[0][0]  │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ feature_alignment (Dense)   │ (None, 512)             │        393,728 │ feature_concatenation[… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cross_attention_norm        │ (None, 512)             │          1,024 │ feature_alignment[0][0] │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ reshape (Reshape)           │ (None, 1, 512)          │              0 │ cross_attention_norm[0… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cross_feature_attention     │ (None, 1, 512)          │      1,050,624 │ reshape[0][0],          │
│ (MultiHeadAttention)        │                         │                │ reshape[0][0]           │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ flatten (Flatten)           │ (None, 512)             │              0 │ cross_feature_attentio… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cross_attention_residual    │ (None, 512)             │              0 │ cross_attention_norm[0… │
│ (Add)                       │                         │                │ flatten[0][0]           │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ cross_attention_final_norm  │ (None, 512)             │          1,024 │ cross_attention_residu… │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ classifier_dense_1 (Dense)  │ (None, 512)             │        262,656 │ cross_attention_final_… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ classifier_ln_1             │ (None, 512)             │          1,024 │ classifier_dense_1[0][… │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ classifier_dropout_1        │ (None, 512)             │              0 │ classifier_ln_1[0][0]   │
│ (Dropout)                   │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ classifier_dense_2 (Dense)  │ (None, 256)             │        131,328 │ classifier_dropout_1[0… │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ classifier_ln_2             │ (None, 256)             │            512 │ classifier_dense_2[0][… │
│ (LayerNormalization)        │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ classifier_dropout_2        │ (None, 256)             │              0 │ classifier_ln_2[0][0]   │
│ (Dropout)                   │                         │                │                         │
├─────────────────────────────┼─────────────────────────┼────────────────┼─────────────────────────┤
│ composer_predictions        │ (None, 4)               │          1,028 │ classifier_dropout_2[0… │
│ (Dense)                     │                         │                │                         │
└─────────────────────────────┴─────────────────────────┴────────────────┴─────────────────────────┘
 Total params: 4,701,700 (17.94 MB)
 Trainable params: 4,701,700 (17.94 MB)
 Non-trainable params: 0 (0.00 B)
In [ ]:
print("\nMODEL ARCHITECTURE VISUALIZATION")

try:
    from tensorflow.keras.utils import plot_model

    # Create the model architecture diagram
    print("Generating model architecture diagram...")

    # Plot the model architecture with detailed information
    plot_model(
        pipeline.model,
        to_file='data/model/model_architecture.png',
        show_shapes=True,
        dpi=300
    )
    
except Exception as e:
    print(f"Error generating model architecture visualization: {e}")
MODEL ARCHITECTURE VISUALIZATION
Generating model architecture diagram...

8. Model Training: ⇧¶


In [ ]:
# Model Training
print("MODEL TRAINING")

try:
    # Display training configuration
    print("Training Configuration:")
    print(f"- Epochs: {training_config.epochs}")
    print(f"- Batch size: {training_config.batch_size}")
    print(f"- Learning rate: {model_config.learning_rate}")
    print(f"- Early stopping patience: {training_config.patience}")
    print(f"- Dropout rate: {model_config.dropout_rate}")
    print(f"- L2 regularization: {model_config.l2_regularization}")
    print(f"- Attention heads: {model_config.attention_heads}")

    # Check GPU availability
    if tf.config.list_physical_devices('GPU'):
        print("GPU acceleration available")
        # Configure GPU memory growth
        gpus = tf.config.experimental.list_physical_devices('GPU')
        if gpus:
            try:
                for gpu in gpus:
                    tf.config.experimental.set_memory_growth(gpu, True)
                print("GPU memory growth configured")
            except RuntimeError as e:
                print(f"GPU configuration warning: {e}")
    else:
        print("Using CPU for training")

    # Display class weights
    class_weights = pipeline.compute_class_weights(data_splits['y_train_encoded'])
    print(f"Class weights: {class_weights}")

    # Start training
    print("\nStarting model training...")
    start_time = time.time()

    history = pipeline.train(data_splits)

    training_time = time.time() - start_time

    print("\nModel training completed successfully!")
    print(f"Total training time: {training_time:.2f} seconds ({training_time/60:.2f} minutes)")

    # Model training summary
    final_train_acc = history.history['accuracy'][-1]
    final_val_acc = history.history['val_accuracy'][-1]
    best_val_acc = max(history.history['val_accuracy'])
    final_train_loss = history.history['loss'][-1]
    final_val_loss = history.history['val_loss'][-1]
    best_val_loss = min(history.history['val_loss'])

    print("\nModel Training Results Summary:")
    print(f"- Final training accuracy: {final_train_acc:.4f}")
    print(f"- Final validation accuracy: {final_val_acc:.4f}")
    print(f"- Best validation accuracy: {best_val_acc:.4f}")
    print(f"- Final training loss: {final_train_loss:.4f}")
    print(f"- Final validation loss: {final_val_loss:.4f}")
    print(f"- Best validation loss: {best_val_loss:.4f}")
    print(f"- Total epochs trained: {len(history.history['accuracy'])}")

    # Check for overfitting
    train_val_gap = final_train_acc - final_val_acc
    if train_val_gap > 0.1:
        print(f"Warning: Potential overfitting detected (train-val gap: {train_val_gap:.4f})")
    else:
        print(f"Good generalization achieved (train-val gap: {train_val_gap:.4f})")

    # Plot training history
    print("Generating training history plots...")
    plot_training_history(history, figsize=(18, 6))

except Exception as e:
    print(f"Error during training: {e}")
    raise
MODEL TRAINING
Training Configuration:
- Epochs: 150
- Batch size: 28
- Learning rate: 8e-05
- Early stopping patience: 25
- Dropout rate: 0.3
- L2 regularization: 0.0008
- Attention heads: 12
Using CPU for training
Class weights: {np.int64(0): np.float64(1.0456698240866036), np.int64(1): np.float64(2.0970149253731343), np.int64(2): np.float64(2.3738120380147834), np.int64(3): np.float64(3.7768817204301075)}

Starting model training...
Training Model:  49%|████▉     | 74/150 [29:30<30:18, 23.93s/it, loss=2.3773, acc=0.9940, val_acc=0.9657, lr=0.0000000]   
Model training completed successfully!
Total training time: 1770.66 seconds (29.51 minutes)

Model Training Results Summary:
- Final training accuracy: 0.9940
- Final validation accuracy: 0.9657
- Best validation accuracy: 0.9758
- Final training loss: 2.3773
- Final validation loss: 1.6724
- Best validation loss: 1.6724
- Total epochs trained: 74
Good generalization achieved (train-val gap: 0.0282)
Generating training history plots...
No description has been provided for this image

Model Training Interpretation¶

  • High Performance: The model achieved a final training accuracy of 99.40% and a final validation accuracy of 96.57%, with the best validation accuracy reaching 97.58%.

  • Loss Reduction: Training and validation losses decreased steadily, with the best validation loss at 1.6724.

  • Generalization: The small train–validation accuracy gap (~2.82%) indicates good generalization and minimal overfitting.

  • Training Efficiency: Early stopping triggered at 74 epochs out of 150, avoiding unnecessary training while maintaining strong performance.

  • Stability: Accuracy and loss curves are smooth, suggesting stable learning without significant oscillations.

  • Configuration Highlights:

    • Learning rate: 8e-05
    • Dropout: 0.3
    • L2 regularization: 0.0008
    • Attention heads: 12
    • Batch size: 28
    • Class weights applied to address class imbalance.

9. Model Evaluation: ⇧¶


In [ ]:
# Model Evaluation
print("\nMODEL EVALUATION")


try:
    # Evaluate model on test set
    print("Evaluating model on test set...")
    test_metrics = pipeline.evaluate(data_splits)

    print("\nTest Results:")
    print(f"- Test Accuracy: {test_metrics['accuracy']:.4f}")
    print(f"- Test Precision: {test_metrics['precision']:.4f}")
    print(f"- Test Recall: {test_metrics['recall']:.4f}")
    print(f"- Test F1-Score: {test_metrics['f1_score']:.4f}")

    # Per-class performance in table format
    print("\nPer-Class Performance Table:")

    # Create a list to store the performance data
    performance_data = []

    for composer in TARGET_COMPOSERS:
        if composer in test_metrics['classification_report']:
            metrics = test_metrics['classification_report'][composer]
            auc_score = test_metrics['roc_data'][composer]['auc'] if composer in test_metrics['roc_data'] else 'N/A'

            performance_data.append({
                'Composer': composer,
                'Precision': f"{metrics['precision']:.4f}",
                'Recall': f"{metrics['recall']:.4f}",
                'F1-Score': f"{metrics['f1-score']:.4f}",
                'AUC': f"{auc_score:.4f}" if auc_score != 'N/A' else 'N/A',
                'Support': metrics['support']
            })

    # Create DataFrame and display as table
    performance_df = pd.DataFrame(performance_data)
    print(performance_df.to_string(index=False))

    print("\nModel evaluation completed successfully!")

except Exception as e:
    print(f"Error during evaluation: {e}")
    raise
MODEL EVALUATION
Evaluating model on test set...

Test Results:
- Test Accuracy: 0.9675
- Test Precision: 0.9675
- Test Recall: 0.9675
- Test F1-Score: 0.9674

Per-Class Performance Table:
 Composer Precision Recall F1-Score    AUC  Support
     Bach    0.9675 0.9675   0.9675 0.9980    154.0
Beethoven    0.9598 0.9795   0.9695 0.9967    195.0
   Chopin    0.9897 0.9746   0.9821 0.9987    197.0
   Mozart    0.8919 0.8684   0.8800 0.9962     38.0

Model evaluation completed successfully!
In [ ]:
# Visualization of Results
print("\nRESULTS VISUALIZATION")

try:
    # Get viridis color palette
    viridis_colors = sns.color_palette(env_config.color_pallete, len(TARGET_COMPOSERS))
    
    # Plot confusion matrix
    print("Generating confusion matrix...")
    plt.figure(figsize=(10, 8))

    # Normalized confusion matrix
    cm = test_metrics['confusion_matrix']
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    plt.subplot(1, 2, 1)
    sns.heatmap(cm, annot=True, fmt='d', cmap='viridis',
                xticklabels=TARGET_COMPOSERS, yticklabels=TARGET_COMPOSERS)
    plt.title('Confusion Matrix (Counts)', fontweight='bold')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')

    plt.subplot(1, 2, 2)
    sns.heatmap(cm_normalized, annot=True, fmt='.3f', cmap='viridis',
                xticklabels=TARGET_COMPOSERS, yticklabels=TARGET_COMPOSERS)
    plt.title('Confusion Matrix (Normalized)', fontweight='bold')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')

    plt.tight_layout()
    plt.show()

    # Plot ROC curves
    print("Generating ROC curves...")
    plt.figure(figsize=(10, 8))

    # Use viridis colors for ROC curves
    for i, (composer, data) in enumerate(test_metrics['roc_data'].items()):
        color = viridis_colors[i % len(viridis_colors)]
        plt.plot(data['fpr'], data['tpr'], color=color, linewidth=2,
                label=f'{composer} (AUC = {data["auc"]:.3f})')

    plt.plot([0, 1], [0, 1], 'k--', linewidth=1, alpha=0.8)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate', fontweight='bold')
    plt.ylabel('True Positive Rate', fontweight='bold')
    plt.title('ROC Curves by Composer', fontsize=16, fontweight='bold')
    plt.legend(loc="lower right")
    plt.grid(True, alpha=0.3)
    plt.show()

    # Performance comparison chart
    print("Generating performance comparison...")
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))

    # Per-class metrics using viridis colors
    composers = [c for c in TARGET_COMPOSERS if c in test_metrics['classification_report']]
    metrics_names = ['precision', 'recall', 'f1-score']
    x = np.arange(len(composers))
    width = 0.25

    # Use different viridis shades for each metric
    metric_colors = sns.color_palette(env_config.color_pallete, len(metrics_names))

    for i, metric in enumerate(metrics_names):
        values = [test_metrics['classification_report'][comp][metric] for comp in composers]
        bars = axes[0].bar(x + i*width, values, width, label=metric.title(), 
                          alpha=0.8, color=metric_colors[i])

        # Add value labels on bars
        for bar, value in zip(bars, values):
            height = bar.get_height()
            axes[0].text(bar.get_x() + bar.get_width()/2., height + 0.01,
                        f'{value:.3f}', ha='center', va='bottom', fontsize=9, fontweight='bold')

    axes[0].set_xlabel('Composer', fontweight='bold')
    axes[0].set_ylabel('Score', fontweight='bold')
    axes[0].set_title('Per-Class Performance Metrics', fontweight='bold')
    axes[0].set_xticks(x + width)
    axes[0].set_xticklabels(composers, rotation=45)
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)

    # AUC comparison using viridis colors
    auc_scores = [test_metrics['roc_data'][comp]['auc'] for comp in composers]
    bars = axes[1].bar(composers, auc_scores, color=viridis_colors[:len(composers)], alpha=0.7)
    axes[1].set_xlabel('Composer', fontweight='bold')
    axes[1].set_ylabel('AUC Score', fontweight='bold')
    axes[1].set_title('AUC Scores by Composer', fontweight='bold')
    axes[1].set_ylim([0, 1])
    axes[1].grid(True, alpha=0.3)

    # Add value labels on bars
    for bar, score in zip(bars, auc_scores):
        height = bar.get_height()
        axes[1].text(bar.get_x() + bar.get_width()/2., height + 0.01,
                    f'{score:.3f}', ha='center', va='bottom', fontweight='bold')

    plt.tight_layout()
    plt.show()

    print("Visualization completed successfully!")

except Exception as e:
    print(f"Error during visualization: {e}")
RESULTS VISUALIZATION
Generating confusion matrix...
No description has been provided for this image
Generating ROC curves...
No description has been provided for this image
Generating performance comparison...
No description has been provided for this image
Visualization completed successfully!
In [ ]:
try:

    # Display final summary
    print("\nPIPELINE EXECUTION SUMMARY")
    print(f"Model built with {pipeline.model.count_params():,} parameters")
    print(f"Training completed in {len(pipeline.history.history['accuracy'])} epochs")
    print(f"Best validation accuracy: {max(pipeline.history.history['val_accuracy']):.4f}")
    print(f"Test accuracy: {test_metrics['accuracy']:.4f}")
    print("Model artifacts saved for deployment")

    print("\nFinal Performance Metrics:")
    print(f"- Accuracy: {test_metrics['accuracy']:.4f}")
    print(f"- Precision: {test_metrics['precision']:.4f}")
    print(f"- Recall: {test_metrics['recall']:.4f}")
    print(f"- F1-Score: {test_metrics['f1_score']:.4f}")

except Exception as e:
    print(f"Error during model saving: {e}")
PIPELINE EXECUTION SUMMARY
Model built with 4,701,700 parameters
Training completed in 74 epochs
Best validation accuracy: 0.9758
Test accuracy: 0.9675
Model artifacts saved for deployment

Final Performance Metrics:
- Accuracy: 0.9675
- Precision: 0.9675
- Recall: 0.9675
- F1-Score: 0.9674

Model Training & Evaluation Interpretation¶

1. Training Performance¶

  • The model reached a final training accuracy of 99.40% and a final validation accuracy of 96.57%, with the best validation accuracy at 97.58%.
  • The train–validation accuracy gap of about 2.82% indicates minimal overfitting and strong generalization.
  • Final training loss was 2.3773, and final validation loss was 1.6724, both showing steady and stable decline throughout training.
  • Early stopping triggered at 74 epochs (out of 150) prevented unnecessary computation while retaining peak performance.
  • Key training parameters included a learning rate of 8e-05, batch size of 28, dropout rate of 0.3, L2 regularization of 0.0008, and 12 attention heads. Class weights were applied to address class imbalance.

2. Test Set Evaluation¶

  • On the unseen test set, the model achieved 96.75% accuracy, precision, recall, and F1-score, showing strong generalization from the training and validation phases.
  • Performance was stable across metrics, indicating consistent predictive capability.

3. Per-Class Insights¶

  • Bach predictions showed high and balanced precision and recall, with excellent discriminative ability (AUC 0.998).
  • Beethoven had slightly lower precision than recall, indicating a tendency to favor recall and minimize false negatives.
  • Chopin achieved the highest precision and F1-score, showing very confident and correct predictions with the highest AUC of 0.999.
  • Mozart had the lowest precision and recall, with most misclassifications occurring as Bach. This is likely influenced by fewer training and test samples for Mozart.

4. Confusion Matrix Observations¶

  • Predictions are strongly concentrated along the diagonal, confirming correct classification for most samples.
  • Misclassifications are minimal, with the most frequent being Mozart labeled as Bach.
  • Confusion between Bach, Beethoven, and Chopin is rare, highlighting clear stylistic separability for these composers.

5. ROC Curve Analysis¶

  • All composers achieved AUC scores above 0.996, indicating exceptional discriminative power.
  • Chopin (0.999) and Bach (0.998) showed the strongest class separation.
  • ROC curves are near the top-left corner, confirming low false positive rates and high true positive rates for all classes.

6. Strengths¶

  • High accuracy, precision, recall, and F1-score across training, validation, and test sets.
  • Minimal overfitting due to effective regularization and early stopping.
  • Excellent class separation with consistently high AUC scores.
  • Robust performance despite class imbalance.

7. Areas for Improvement¶

  • Mozart classification performance can be improved through:
    • Augmenting Mozart’s dataset to address its low sample size.
    • Feature engineering tailored to Mozart’s stylistic patterns.
    • Experimenting with focal loss or further class-weight tuning to penalize Mozart misclassification.

Based on these improvement areas, hyperparameter tuning will be performed to further optimize the model’s performance.

10. Model Optimization: ⇧¶


In [67]:
# Hyperparameter Tuning
print("\nHYPERPARAMETER TUNING")
print("Starting hyperparameter tuning with expanded search...")

# Expanded parameter grid for more granular search
param_grid = {
    'lstm_units': [128, 256],
    'cnn_filters': [64, 128], 
    'dense_units': [256, 512],
    'dropout_rate': [0.25],
    'learning_rate': [0.001]
}

print(f"Parameter combinations to test: {np.prod([len(v) for v in param_grid.values()])}")

# Use moderate epochs for tuning
max_trials = 4
tuning_epochs = 5
tuning_patience = 2

tuning_pipeline = ComposerClassificationPipeline(
    target_composers=TARGET_COMPOSERS,
    model_config=ModelConfig(),
    training_config=TrainingConfig(epochs=tuning_epochs, patience=tuning_patience)
)

# Prepare data for tuning
print("Preparing data for tuning...")
tuning_data_splits = tuning_pipeline.prepare_data(
    musical_df=musical_data,
    harmonic_df=harmonic_data,
    note_sequences=note_sequences,
    sequence_labels=np.array(sequence_labels_mapped)
)

# Track best per-composer validation accuracy
best_per_composer = {composer: {'val_accuracy': 0, 'params': None} for composer in TARGET_COMPOSERS}

# Perform hyperparameter search
best_params, best_score, all_results = tuning_pipeline.tune_hyperparameters(
    X_train=tuning_data_splits['X_train'],
    y_train=tuning_data_splits['y_train'],
    X_val=tuning_data_splits['X_val'],
    y_val=tuning_data_splits['y_val'],
    param_grid=param_grid,
    max_trials=max_trials
)

# Analyze per-composer validation accuracy
for result in all_results:
    if 'val_class_accuracies' in result:
        for composer, acc in result['val_class_accuracies'].items():
            if acc > best_per_composer[composer]['val_accuracy']:
                best_per_composer[composer]['val_accuracy'] = acc
                best_per_composer[composer]['params'] = result['params']

print("\nHyperparameter tuning completed!")
print(f"Best overall validation accuracy: {best_score:.4f}")
print(f"Best overall parameters: {best_params}")

# Display top 3 configurations
print("\nTop 3 Parameter Configurations:")
for i, result in enumerate(all_results[:3], 1):
    print(f"{i}. Val Acc: {result['val_accuracy']:.4f}")
    print(f"Parameters: {result['params']}")
    print()

# Display best parameters per composer
print("\nBest Validation Accuracy Per Composer:")
for composer in TARGET_COMPOSERS:
    acc = best_per_composer[composer]['val_accuracy']
    params = best_per_composer[composer]['params']
    print(f"{composer}: {acc:.4f} | Params: {params}")

# Train final model with best overall parameters
print("Training final model with best parameters...")
best_model_config = create_model_config(**best_params)
final_training_config = create_training_config(
    epochs=100,
    batch_size=28,
    patience=20,
    val_split=0.15,
    test_split=0.15
)
final_pipeline = ComposerClassificationPipeline(
    target_composers=TARGET_COMPOSERS,
    model_config=best_model_config,
    training_config=final_training_config
)

final_data_splits = final_pipeline.prepare_data(
    musical_df=musical_data,
    harmonic_df=harmonic_data,
    note_sequences=note_sequences,
    sequence_labels=np.array(sequence_labels_mapped)
)

final_model = final_pipeline.build_model()
final_history = final_pipeline.train(final_data_splits)
final_metrics = final_pipeline.evaluate(final_data_splits)

print("\nFinal model training completed!")
print(f"Test accuracy with best params: {final_metrics['accuracy']:.4f}")
print(f"Improvement over default: {final_metrics['accuracy'] - test_metrics['accuracy']:.4f}")
HYPERPARAMETER TUNING
Starting hyperparameter tuning with expanded search...
Parameter combinations to test: 8
Preparing data for tuning...
Starting hyperparameter tuning with 4 trials...
Using data subset: 563 samples for faster tuning
Testing 4 parameter combinations...
Hyperparameter Search:   0%|          | 0/4 [00:00<?, ?it/s]
Trial 1/4: {'lstm_units': 128, 'cnn_filters': 64, 'dense_units': 512, 'dropout_rate': 0.25, 'learning_rate': 0.001}
Trial 1 - Val Acc: 0.8471 - Time: 40.6s
New best: 0.8471
Hyperparameter Search:  25%|██▌       | 1/4 [00:44<02:14, 44.84s/it]
Trial 2/4: {'lstm_units': 256, 'cnn_filters': 64, 'dense_units': 512, 'dropout_rate': 0.25, 'learning_rate': 0.001}
Trial 2 - Val Acc: 0.8290 - Time: 48.8s
Hyperparameter Search:  50%|█████     | 2/4 [01:37<01:38, 49.29s/it]
Trial 3/4: {'lstm_units': 128, 'cnn_filters': 64, 'dense_units': 256, 'dropout_rate': 0.25, 'learning_rate': 0.001}
Trial 3 - Val Acc: 0.7847 - Time: 33.3s
Hyperparameter Search:  75%|███████▌  | 3/4 [02:14<00:43, 43.86s/it]
Trial 4/4: {'lstm_units': 256, 'cnn_filters': 128, 'dense_units': 512, 'dropout_rate': 0.25, 'learning_rate': 0.001}
Trial 4 - Val Acc: 0.8431 - Time: 62.8s
Hyperparameter Search: 100%|██████████| 4/4 [03:22<00:00, 50.56s/it]
Hyperparameter tuning completed!
Best validation accuracy: 0.8471
Best parameters: {'lstm_units': 128, 'cnn_filters': 64, 'dense_units': 512, 'dropout_rate': 0.25, 'learning_rate': 0.001}

Hyperparameter tuning completed!
Best overall validation accuracy: 0.8471
Best overall parameters: {'lstm_units': 128, 'cnn_filters': 64, 'dense_units': 512, 'dropout_rate': 0.25, 'learning_rate': 0.001}

Top 3 Parameter Configurations:
1. Val Acc: 0.8471
Parameters: {'lstm_units': 128, 'cnn_filters': 64, 'dense_units': 512, 'dropout_rate': 0.25, 'learning_rate': 0.001}

2. Val Acc: 0.8431
Parameters: {'lstm_units': 256, 'cnn_filters': 128, 'dense_units': 512, 'dropout_rate': 0.25, 'learning_rate': 0.001}

3. Val Acc: 0.8290
Parameters: {'lstm_units': 256, 'cnn_filters': 64, 'dense_units': 512, 'dropout_rate': 0.25, 'learning_rate': 0.001}


Best Validation Accuracy Per Composer:
Bach: 0.0000 | Params: None
Beethoven: 0.0000 | Params: None
Chopin: 0.0000 | Params: None
Mozart: 0.0000 | Params: None
Training final model with best parameters...
Training Model:  81%|████████  | 81/100 [32:30<07:37, 24.08s/it, loss=1.4974, acc=0.9968, val_acc=0.9556, lr=0.0000000] 
Final model training completed!
Test accuracy with best params: 0.9469
Improvement over default: -0.0205
In [70]:
def plot_model_comparison(test_metrics, final_metrics, viridis_colors):
    """Plot overall model performance comparison"""
    metrics_comparison = {
        'Default Model': [test_metrics['accuracy'], test_metrics['precision'],
                test_metrics['recall'], test_metrics['f1_score']],
        'Tuned Model': [final_metrics['accuracy'], final_metrics['precision'],
                final_metrics['recall'], final_metrics['f1_score']]
    }

    x = np.arange(4)
    width = 0.35
    metric_names = ['Accuracy', 'Precision', 'Recall', 'F1-Score']

    plt.bar(x - width/2, metrics_comparison['Default Model'], width,
            label='Default Model', alpha=0.8, color=viridis_colors[0])
    plt.bar(x + width/2, metrics_comparison['Tuned Model'], width,
            label='Tuned Model', alpha=0.8, color=viridis_colors[2])

    plt.xlabel('Metrics')
    plt.ylabel('Score')
    plt.title('Overall Model Performance Comparison')
    plt.xticks(x, metric_names)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.ylim(0, 1)

    # Add value labels on bars
    for i, (default_val, tuned_val) in enumerate(zip(metrics_comparison['Default Model'], metrics_comparison['Tuned Model'])):
        plt.text(i - width/2, default_val + 0.01, f'{default_val:.3f}', ha='center', va='bottom', fontsize=9)
        plt.text(i + width/2, tuned_val + 0.01, f'{tuned_val:.3f}', ha='center', va='bottom', fontsize=9)


def plot_composer_performance(metrics, viridis_colors, title, subplot_position=None):
    """Plot F1-score performance by composer using viridis palette"""
    if subplot_position:
        plt.subplot(*subplot_position)
        
    try:
        composers = [c for c in TARGET_COMPOSERS if c in metrics['classification_report']]
        if composers:
            f1_scores = [metrics['classification_report'][comp]['f1-score'] for comp in composers]
            # Use viridis colors for composers - map each composer to a specific viridis color
            colors = [viridis_colors[i % len(viridis_colors)] for i in range(len(composers))]
            
            bars = plt.bar(range(len(composers)), f1_scores, alpha=0.8, color=colors)
            plt.xlabel('Composers')
            plt.ylabel('F1-Score')
            plt.title(title)
            plt.xticks(range(len(composers)), composers, rotation=45, ha='right')
            plt.grid(True, alpha=0.3)
            plt.ylim(0, 1)
            
            # Add value labels on bars
            for bar, score in zip(bars, f1_scores):
                plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, 
                        f'{score:.3f}', ha='center', va='bottom', fontsize=8)
        else:
            plt.text(0.5, 0.5, 'Classification report not available', ha='center', va='center', transform=plt.gca().transAxes)
            plt.title(title)
    except Exception as e:
        plt.text(0.5, 0.5, f'Error: {str(e)}', ha='center', va='center', transform=plt.gca().transAxes)
        plt.title(title)

def plot_improvement_analysis(test_metrics, final_metrics, viridis_colors):
    """Plot improvement analysis by composer"""
    try:
        composers = [c for c in TARGET_COMPOSERS if c in test_metrics['classification_report'] and c in final_metrics['classification_report']]
        if composers:
            improvements = []
            for comp in composers:
                default_f1 = test_metrics['classification_report'][comp]['f1-score']
                tuned_f1 = final_metrics['classification_report'][comp]['f1-score']
                improvement = tuned_f1 - default_f1
                improvements.append(improvement)
            
            # Use viridis colors for improvement visualization
            colors = [viridis_colors[6] if imp >= 0 else viridis_colors[1] for imp in improvements]
            bars = plt.bar(range(len(composers)), improvements, alpha=0.8, color=colors)
            
            plt.xlabel('Composers')
            plt.ylabel('F1-Score Improvement')
            plt.title('Model Improvement by Composer')
            plt.xticks(range(len(composers)), composers, rotation=45, ha='right')
            plt.grid(True, alpha=0.3)
            plt.axhline(y=0, color='black', linestyle='-', alpha=0.5)
            
            # Add value labels on bars
            for bar, improvement in zip(bars, improvements):
                y_pos = bar.get_height() + 0.005 if bar.get_height() >= 0 else bar.get_height() - 0.005
                va = 'bottom' if bar.get_height() >= 0 else 'top'
                plt.text(bar.get_x() + bar.get_width()/2, y_pos, 
                        f'{improvement:+.3f}', ha='center', va=va, fontsize=8)
            
            # Add summary statistics
            avg_improvement = np.mean(improvements)
            plt.text(0.02, 0.98, f'Avg Improvement: {avg_improvement:+.3f}', 
                    transform=plt.gca().transAxes, va='top', ha='left',
                    bbox=dict(boxstyle='round', facecolor=viridis_colors[4], alpha=0.6))
        else:
            plt.text(0.5, 0.5, 'Unable to calculate improvements', ha='center', va='center', transform=plt.gca().transAxes)
            plt.title('Model Improvement by Composer')
    except Exception as e:
        plt.text(0.5, 0.5, f'Error: {str(e)}', ha='center', va='center', transform=plt.gca().transAxes)
        plt.title('Model Improvement by Composer')

def plot_hyperparameter_analysis(best_params, all_results, viridis_colors, env_config):
    """Plot hyperparameter analysis"""
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    # Best parameter values visualization
    best_param_text = "\n".join([f"{k}: {v}" for k, v in best_params.items()])
    plt.text(0.1, 0.9, f"Best Parameters:\n{best_param_text}",
        transform=plt.gca().transAxes, fontsize=12,
        bbox=dict(boxstyle='round', facecolor=viridis_colors[5], alpha=0.7))
    plt.axis('off')

    plt.subplot(1, 2, 2)
    # Performance distribution of top configurations
    if len(all_results) > 0:
        top_scores = [result['val_accuracy'] for result in all_results[:min(10, len(all_results))]]
        trial_numbers = [result['trial'] for result in all_results[:min(10, len(all_results))]]
        
        # Use viridis gradient for bars
        n_bars = len(top_scores)
        bar_colors = sns.color_palette(env_config.color_pallete, n_bars)
        
        bars = plt.bar(range(len(top_scores)), top_scores, alpha=0.8, color=bar_colors)
        plt.xlabel('Configuration Rank')
        plt.ylabel('Validation Accuracy')
        plt.title('Top Hyperparameter Configurations')
        plt.xticks(range(len(top_scores)), [f'#{i+1}' for i in range(len(top_scores))])
        plt.grid(True, alpha=0.3)
        
        # Add value labels
        for bar, score, trial in zip(bars, top_scores, trial_numbers):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001, 
                    f'{score:.3f}\n(Trial {trial})', ha='center', va='bottom', fontsize=8)
    else:
        plt.text(0.5, 0.5, 'No tuning results available', ha='center', va='center', transform=plt.gca().transAxes)

    plt.tight_layout()
    plt.show()

def print_improvement_summary(test_metrics, final_metrics, best_score, all_results, best_params):
    """Print model improvement summary without asterisk separators"""
    print("\nMODEL IMPROVEMENT SUMMARY")
    
    print(f"\nOverall Performance Improvement:")
    print(f"Accuracy: {final_metrics['accuracy'] - test_metrics['accuracy']:+.4f}")
    print(f"Precision: {final_metrics['precision'] - test_metrics['precision']:+.4f}")
    print(f"Recall: {final_metrics['recall'] - test_metrics['recall']:+.4f}")
    print(f"F1-Score: {final_metrics['f1_score'] - test_metrics['f1_score']:+.4f}")

    print(f"\nHyperparameter Tuning Results:")
    print(f"Best validation accuracy: {best_score:.4f}")
    print(f"Number of configurations tested: {len(all_results)}")
    print(f"Best parameters: {best_params}")

    # Per-composer improvement analysis
    try:
        composers = [c for c in TARGET_COMPOSERS if c in test_metrics['classification_report'] and c in final_metrics['classification_report']]
        if composers:
            print(f"\nPer-Composer F1-Score Analysis:")
            print(f"{'Composer':<12} {'Default':<8} {'Tuned':<8} {'Improvement':<12}")
            
            total_improvement = 0
            for comp in composers:
                default_f1 = test_metrics['classification_report'][comp]['f1-score']
                tuned_f1 = final_metrics['classification_report'][comp]['f1-score']
                improvement = tuned_f1 - default_f1
                total_improvement += improvement
                
                print(f"{comp:<12} {default_f1:.3f}    {tuned_f1:.3f}    {improvement:+.3f}")
            
            avg_improvement = total_improvement / len(composers)
            print(f"{'Average':<12} {'':8} {'':8} {avg_improvement:+.3f}")
            
            # Identify best and worst performing composers
            composer_improvements = [(comp, final_metrics['classification_report'][comp]['f1-score'] - test_metrics['classification_report'][comp]['f1-score']) for comp in composers]
            composer_improvements.sort(key=lambda x: x[1], reverse=True)
            
            print(f"\nMost improved composer: {composer_improvements[0][0]} ({composer_improvements[0][1]:+.3f})")
            print(f"Least improved composer: {composer_improvements[-1][0]} ({composer_improvements[-1][1]:+.3f})")

    except Exception as e:
        print(f"Could not generate per-composer analysis: {e}")

# Main execution
# Plot training history for tuned model
plot_training_history(final_history, figsize=(18, 6))

# Get viridis color palette
viridis_colors = sns.color_palette(env_config.color_pallete, 8)
composer_colors = dict(zip(TARGET_COMPOSERS, sns.color_palette(env_config.color_pallete, len(TARGET_COMPOSERS))))

# Compare performance with subplots
plt.figure(figsize=(16, 10))

# Subplot 1: Overall metrics comparison
plt.subplot(2, 2, 1)
plot_model_comparison(test_metrics, final_metrics, viridis_colors)

# Subplot 2: Performance by Composer (Default Model)
plt.subplot(2, 2, 2)
plot_composer_performance(test_metrics, composer_colors, 'Default Model: F1-Score by Composer')

# Subplot 3: Performance by Composer (Tuned Model)  
plt.subplot(2, 2, 3)
plot_composer_performance(final_metrics, composer_colors, 'Tuned Model: F1-Score by Composer')

# Subplot 4: Improvement Analysis
plt.subplot(2, 2, 4)
plot_improvement_analysis(test_metrics, final_metrics, viridis_colors)

plt.tight_layout()
plt.show()

# Additional analysis: Hyperparameter importance visualization
plot_hyperparameter_analysis(best_params, all_results, viridis_colors, env_config)

# Print summary statistics
print_improvement_summary(test_metrics, final_metrics, best_score, all_results, best_params)
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
MODEL IMPROVEMENT SUMMARY

Overall Performance Improvement:
Accuracy: -0.0205
Precision: -0.0205
Recall: -0.0205
F1-Score: -0.0205

Hyperparameter Tuning Results:
Best validation accuracy: 0.8471
Number of configurations tested: 4
Best parameters: {'lstm_units': 128, 'cnn_filters': 64, 'dense_units': 512, 'dropout_rate': 0.25, 'learning_rate': 0.001}

Per-Composer F1-Score Analysis:
Composer     Default  Tuned    Improvement 
Bach         0.968    0.980    +0.013
Beethoven    0.970    0.928    -0.042
Chopin       0.982    0.965    -0.017
Mozart       0.880    0.821    -0.059
Average                        -0.026

Most improved composer: Bach (+0.013)
Least improved composer: Mozart (-0.059)

Hyperparameter Tuning & Model Performance Interpretation¶

1. Training Performance with Best Parameters¶

  • Final Validation Accuracy: 95.56% (best validation accuracy: 96.57% during training)
  • Final Validation Loss: 0.8449 (best validation loss: 0.8405)
  • Training completed in 81 epochs out of 100, with early stopping preventing overfitting.
  • Accuracy and loss curves show smooth convergence, with validation accuracy closely tracking training accuracy.

2. Hyperparameter Search Overview¶

  • Search Space: lstm_units, cnn_filters, dense_units, dropout_rate, learning_rate
  • Number of configurations tested: 4
  • Best Configuration:
    • LSTM units: 128
    • CNN filters: 64
    • Dense units: 512
    • Dropout rate: 0.25
    • Learning rate: 0.001
  • Best validation accuracy during tuning: 84.71% (lower than the default model’s 96.75% test accuracy)

3. Overall Performance Comparison (Default vs Tuned Model)¶

  • Accuracy: Dropped from 0.967 (default) to 0.947 (tuned) → -0.0205
  • Precision: Dropped by the same margin (-0.0205)
  • Recall: Dropped by the same margin (-0.0205)
  • F1-Score: Dropped by the same margin (-0.0205)
  • Despite parameter optimization attempts, the tuned model underperformed compared to the default configuration.

4. Per-Composer F1-Score Impact¶

  • Bach: Improved from 0.968 to 0.980 (+0.013)
  • Beethoven: Dropped from 0.970 to 0.928 (-0.042)
  • Chopin: Dropped from 0.982 to 0.965 (-0.017)
  • Mozart: Dropped from 0.880 to 0.821 (-0.059)
  • Average change: -0.026 (overall decline)

5. Insights & Observations¶

  • The default model remains superior overall, suggesting the initial hyperparameters were already close to optimal.
  • Improvements were seen only for Bach, but significant drops for Beethoven, Chopin, and especially Mozart outweighed the gains.
  • The tuned model's best validation accuracy (84.71%) during tuning suggests that the search space or evaluation setup may require refinement for better results.

6. Recommendations¶

  • Reassess the tuning search space to include more aggressive ranges for LSTM units, CNN filters, and learning rates.
  • Incorporate class-specific weighting or focal loss during tuning to protect performance for minority classes like Mozart.
  • Increase the number of trials to explore a broader parameter landscape.
  • Consider using Bayesian optimization or Hyperband for more efficient and targeted hyperparameter search.

While hyperparameter tuning aimed to improve per-class performance and generalization, the results indicate that the original model configuration remains the best for balanced accuracy across all composers. Further tuning should focus on targeted improvements for weaker classes rather than global parameter adjustments.

11. Model Prediction: ⇧¶


In [71]:
# Model Prediction and Analysis
print("\nMODEL PREDICTION AND ANALYSIS")

try:
    # Sample predictions on test set
    print("Making sample predictions...")

    # Select random test samples
    num_samples = min(10, len(data_splits['y_test_encoded']))
    sample_indices = np.random.choice(
        len(data_splits['y_test_encoded']),
        size=num_samples,
        replace=False
    )

    # Make predictions
    X_test_list = [
        data_splits['X_test']['musical'],
        data_splits['X_test']['harmonic'],
        data_splits['X_test']['sequences']
    ]

    y_pred_prob = pipeline.model.predict(X_test_list, verbose=0)
    y_pred = np.argmax(y_pred_prob, axis=1)

    print("\nSample Predictions Results:")

    print(f"{'Index':<6} {'True':<12} {'Predicted':<12} {'Confidence':<12} {'Correct':<8}")


    correct_predictions = 0
    for idx in sample_indices:
        true_label = TARGET_COMPOSERS[data_splits['y_test_encoded'][idx]]
        pred_label = TARGET_COMPOSERS[y_pred[idx]]
        confidence = y_pred_prob[idx, y_pred[idx]]
        is_correct = true_label == pred_label

        if is_correct:
            correct_predictions += 1

        status = "CORRECT" if is_correct else "WRONG"
        print(f"{idx:<6} {true_label:<12} {pred_label:<12} {confidence:<12.4f} {status:<8}")


    sample_accuracy = correct_predictions / len(sample_indices)
    print(f"Sample Accuracy: {correct_predictions}/{len(sample_indices)} ({sample_accuracy:.1%})")

    # Detailed analysis for first few samples
    print("\nDetailed Top-3 Predictions:")
    for i, idx in enumerate(sample_indices[:3]):
        true_label = TARGET_COMPOSERS[data_splits['y_test_encoded'][idx]]

        # Get top 3 predictions
        top_3_idx = np.argsort(y_pred_prob[idx])[-3:][::-1]

        print(f"\nSample {idx} (True: {true_label}):")
        for j, pred_idx in enumerate(top_3_idx, 1):
            composer = TARGET_COMPOSERS[pred_idx]
            prob = y_pred_prob[idx, pred_idx]
            print(f"  {j}. {composer}: {prob:.4f}")

    print("\nPrediction analysis completed!")

except Exception as e:
    print(f"Error during prediction analysis: {e}")
MODEL PREDICTION AND ANALYSIS
Making sample predictions...

Sample Predictions Results:
Index  True         Predicted    Confidence   Correct 
267    Chopin       Chopin       0.8005       CORRECT 
106    Mozart       Mozart       0.7683       CORRECT 
323    Mozart       Mozart       0.7677       CORRECT 
552    Beethoven    Beethoven    0.8392       CORRECT 
395    Bach         Bach         0.8416       CORRECT 
202    Bach         Bach         0.8237       CORRECT 
528    Beethoven    Beethoven    0.8543       CORRECT 
30     Bach         Bach         0.7572       CORRECT 
44     Beethoven    Beethoven    0.5331       CORRECT 
162    Chopin       Chopin       0.7330       CORRECT 
Sample Accuracy: 10/10 (100.0%)

Detailed Top-3 Predictions:

Sample 267 (True: Chopin):
  1. Chopin: 0.8005
  2. Beethoven: 0.0668
  3. Bach: 0.0666

Sample 106 (True: Mozart):
  1. Mozart: 0.7683
  2. Bach: 0.1273
  3. Chopin: 0.0637

Sample 323 (True: Mozart):
  1. Mozart: 0.7677
  2. Beethoven: 0.1309
  3. Chopin: 0.0607

Prediction analysis completed!

Model Prediction & Analysis¶

1. Sample Prediction Results¶

  • Total samples evaluated: 10
  • Correct predictions: 10
  • Sample accuracy: 100%
  • All predictions matched the true composer labels, demonstrating strong classification consistency on the sampled data.

2. Confidence Levels¶

  • Prediction confidences ranged from 0.5331 (lowest, Beethoven) to 0.8543 (highest, Beethoven).
  • Most predictions had confidence above 0.75, indicating the model is generally certain about its classifications.
  • Even the lowest-confidence prediction was correct, suggesting robustness against borderline cases.

3. Top-3 Prediction Analysis¶

  • For each sample, the true composer consistently appeared as the top prediction.
  • The second and third predictions usually belonged to composers with stylistic similarities to the true class, which aligns with expected musical overlaps.
  • Example:
    • Chopin sample (Index 267): Top-1 confidence at 0.8005, with Beethoven (0.0668) and Bach (0.0666) as secondary predictions.
    • Mozart sample (Index 106): Top-1 confidence at 0.7683, followed by Bach and Chopin.

4. Observations¶

  • The model demonstrates high confidence and correctness in predicting all sampled cases.
  • Secondary predictions suggest the model is capturing meaningful stylistic relationships between composers.
  • The consistent ranking of the true class in Top-1 predictions validates strong predictive reliability.

The prediction analysis reinforces that the trained model maintains high precision and confidence in classifying unseen samples. These results further support its readiness for deployment in composer identification tasks.

12. Deployment: ⇧¶


In [72]:
try:

    # Save the tuned model
    final_pipeline.save_artifacts(
        modelfilepath=MODEL_PATH,
        artifacts_filepath=MODEL_ARTIFACTS_PATH,
        history_filepath=MODEL_HISTORY_PATH
    )
    print("Tuned model artifacts saved successfully.")
    print(f"Model saved to: {MODEL_PATH}.keras")
    print(f"Artifacts saved to: {MODEL_ARTIFACTS_PATH}")
    print(f"Training history saved to: {MODEL_HISTORY_PATH}")
except Exception as e:
    print(f"Error during hyperparameter tuning: {e}")
Tuned model artifacts saved successfully.
Model saved to: data/model/composer_classification_model.keras.keras
Artifacts saved to: data/model/composer_classification_model_artifacts.pkl
Training history saved to: data/model/composer_classification_model_history.pkl
In [ ]:
# streamlit run ./app.py

Deployment Overview¶

  • Cloud API: Serve the saved .keras model via FastAPI inside a Docker container. Load weights from cloud storage (S3/GCS/Azure Blob) at startup and expose a /predict endpoint for MIDI/features → composer predictions. Deploy on Cloud Run, AKS, or ECS with autoscaling and GPU/CPU configuration.

  • CI/CD: Automate builds, run inference smoke tests, and promote to staging/prod with cloud secrets management for storage keys and configs.

  • Streamlit (Community Cloud): Host a GitHub repo with app.py, requirements.txt, and preprocessing scripts. Use @st.cache_resource to load the model, allow MIDI uploads, process input, and display top-k predictions with probability charts.

  • Best Practices:

    • Store large models in cloud storage, not the repo.
    • Validate inputs, limit file size, and manage secrets via Streamlit Secrets or cloud key vaults.
    • Optimize inference (batching, float16, thread limits) for cost and performance.

13. Recommendations: ⇧¶


  • Model Architecture Enhancements Implement transformer-based attention mechanisms instead of traditional LSTM layers to better capture long-range dependencies in musical sequences and improve handling of variable-length compositions Add multi-scale temporal modeling using dilated convolutions with different dilation rates to capture musical patterns at multiple time scales (note-level, phrase-level, and structural-level)
  • Data and Feature Engineering Expand the dataset with targeted data collection for Mozart and Chopin to address the significant class imbalance (Bach: 62.6% vs Chopin: 8.3%) through synthetic data generation or style transfer techniques Implement advanced musical feature extraction including voice separation analysis, motif tracking, and harmonic tension analysis using circle of fifths relationships to capture more nuanced compositional characteristics
  • Training and Optimization Apply focal loss and advanced class weighting strategies specifically designed for minority classes to improve performance on underrepresented composers while maintaining overall accuracy Implement curriculum learning starting with clear, unambiguous pieces and gradually introducing more challenging examples to improve model robustness and convergence
  • Evaluation and Deployment Develop musically-informed evaluation metrics that weight errors by musical similarity between composers rather than treating all misclassifications equally Create a comprehensive model interpretability framework using attention visualization and SHAP values to understand which musical passages and features drive classification decisions for musicological insights

14. Use Cases: ⇧¶


  • Education & Research: Adaptive music tutoring, manuscript attribution, style analysis, large-scale corpus studies.
  • Digital Platforms: Style-based playlist curation, composer-specific recommendations, automated metadata tagging.
  • Generative AI: Style-aware composition tools, orchestration aids, media score generation.
  • Performance & Production: MIDI editing, arrangement assistance, performance feedback, plagiarism detection.
  • Cultural Heritage: Interactive exhibits, automated manuscript cataloging, VR/AR classical experiences.
  • AI & Wellness: Music information retrieval, cross-cultural analysis, therapeutic music selection.
  • Commercial: Content identification, style-based licensing, brand-specific classical compositions.

Bridges musicology and AI to enhance education, discovery, preservation, creativity, and commercial value.

15. Conclusion: ⇧¶


This project developed and deployed a hybrid CNN-LSTM model with multi-head attention and cross-modal fusion for classifying Bach, Beethoven, Chopin, and Mozart from symbolic music data, achieving 96.75% test accuracy with balanced precision, recall, and F1-scores.

Key Achievements¶

  • High Performance: Accurate classification with minimal overfitting (2.82% train–validation gap), converging in 74 epochs.
  • Advanced Feature Engineering: 45+ music theory-based features capturing harmonic, rhythmic, melodic, and structural nuances.
  • Class Imbalance Handling: Targeted augmentation and adaptive class weighting maintained strong per-class F1-scores (0.880–0.982).
  • Robust Design: Multi-branch architecture integrating statistical, harmonic, and sequence data via cross-attention.

Technical Contributions¶

  • Multi-Modal Processing: Separate CNN, LSTM, and dense branches fused for richer representation.
  • Domain-Specific Analyzers: Custom tools for harmony, rhythm, melody, and structure extraction.
  • Optimized Training: Mixed precision and efficient pipelines for speed without loss of expressiveness.

Real-World Impact¶

Applicable to educational tools, digital music libraries, and musicological research, providing high-confidence predictions and interpretable stylistic relationships.

Limitations & Future Work¶

  • Enhance Mozart classification through data expansion and targeted features.
  • Explore synthetic data generation and transformer architectures.
  • Extend classification to more composers and periods.

Broader Significance¶

This work bridges AI and musicology, showing that domain expertise combined with deep learning can deliver both technical excellence and cultural insight, offering a model framework for classification tasks in the digital humanities.

16. References: ⇧¶


  • blanderbuss. (n.d.). MIDI Classic Music [Dataset]. Kaggle. https://www.kaggle.com/datasets/blanderbuss/midi-classic-music
  • Yamashita, R., Nishio, M., Do, R. K. G., & Togashi, K. (2018). Convolutional neural networks: an overview and application in radiology. Insights into Imaging, 9(4), 611–629. https://doi.org/10.1007/s13244-018-0639-9
  • Hochreiter, S., & Schmidhuber, J. (1997). Long Short-Term Memory. Neural Computation, 9(8), 1735–1780. Retrieved from https://arxiv.org/abs/1304.6700

This assistance was used to understand and plan the technical approaches, model training strategies, and algorithm concepts.

  • OpenAI. (2025). ChatGPT [Large language model]. OpenAI. https://chat.openai.com/

17. Demo: ⇧¶


Streamlit Application¶

Symbolic Music AI

Composer Classification Using Deep Learning