Skip to content

Latest commit

 

History

History
521 lines (405 loc) · 19.3 KB

DRA.0.4.md

File metadata and controls

521 lines (405 loc) · 19.3 KB

Deeplearning Algorithms tutorial

谷歌的人工智能位于全球前列,在图像识别、语音识别、无人驾驶等技术上都已经落地。而百度实质意义上扛起了国内的人工智能的大旗,覆盖无人驾驶、智能助手、图像识别等许多层面。苹果业已开始全面拥抱机器学习,新产品进军家庭智能音箱并打造工作站级别Mac。另外,腾讯的深度学习平台Mariana已支持了微信语音识别的语音输入法、语音开放平台、长按语音消息转文本等产品,在微信图像识别中开始应用。全球前十大科技公司全部发力人工智能理论研究和应用的实现,虽然入门艰难,但是一旦入门,高手也就在你的不远处! AI的开发离不开算法那我们就接下来开始学习算法吧!

线性判别分析(Linear Discriminant Analysis)

线性判别分析(Linear Discriminant Analysis),是一种监督学习(supervised learning)算法。线性判别式分析也叫做Fisher线性判别(Fisher Linear Discriminant ,FLD),是模式识别的经典算法,它是在1996年由Belhumeur引入模式识别和人工智能领域的。其鉴别分析的基本思想是将高维的模式样本投影到最佳鉴别矢量空间,以达到抽取分类信息和压缩特征空间维数的效果,投影后保证模式样本在新的子空间有最大的类间距离和最小的类内距离,即模式在该空间中有最佳的可分离性。因此,它是一种有效的特征抽取方法。使用这种方法能够使投影后模式样本的类间散布矩阵最大,并且同时类内散布矩阵最小。

线性判别分析(Linear Discriminant Analysis)是对费舍尔的线性鉴别方法的归纳,这种方法使用统计学,模式识别和机器学习方法,试图找到两类物体或事件的特征的一个线性组合,以能够特征化或区分它们。所得的组合可用来作为一个线性分类器,或者,更常见的是,为后续的分类做降维处理。

线性判别分析(Linear Discriminant Analysis)与方差分析(ANOVA)和回归分析紧密相关,这两种分析方法也试图通过一些特征或测量值的线性组合来表示一个因变量。然而,方差分析使用类别自变量和连续数因变量,而判别分析连续自变量和类别因变量(即类标签)。[3] 逻辑回归和概率回归比方差分析更类似于LDA,因为他们也是用连续自变量来解释类别因变量的。LDA的基本假设是自变量是正态分布的,当这一假设无法满足时,在实际应用中更倾向于用上述的其他方法。

线性判别分析(Linear Discriminant Analysis)也与主成分分析(PCA)和因子分析紧密相关,它们都在寻找最佳解释数据的变量线性组合。[4] LDA明确的尝试为数据类之间不同建立模型。 另一方面,PCA不考虑类的任何不同,因子分析是根据不同点而不是相同点来建立特征组合。判别的分析不同因子分析还在于,它不是一个相互依存技术:即必须区分出自变量和因变量(也称为准则变量)的不同。

在对自变量每一次观察测量值都是连续量的时候,LDA能有效的起作用。当处理类别自变量时,与LDA相对应的技术称为判别反应分析。

通常它能够保证投影后模式样本在新的空间中有最小的类内距离和最大的类间距离,即模式在该空间中有最佳的可分离性。

应用示例:

from __future__ import print_function

import os
import time
import pickle
import itertools
import numpy as np
import matplotlib.pyplot as plt

import theano
import lasagne

# init color printer
class BColors:
    """
    Colored command line output formatting
    """
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

    def __init__(self):
        """ Constructor """
        pass

    def print_colored(self, string, color):
        """ Change color of string """
        return color + string + BColors.ENDC

col = BColors()


def threaded_generator(generator, num_cached=10):
    """
    Threaded generator
    """
    import Queue
    queue = Queue.Queue(maxsize=num_cached)
    queue = Queue.Queue(maxsize=num_cached)
    end_marker = object()

    # define producer
    def producer():
        for item in generator:
            #item = np.array(item)  # if needed, create a copy here
            queue.put(item)
        queue.put(end_marker)

    # start producer
    import threading
    thread = threading.Thread(target=producer)
    thread.daemon = True
    thread.start()

    # run as consumer
    item = queue.get()
    while item is not end_marker:
        yield item
        queue.task_done()
        item = queue.get()


def generator_from_iterator(iterator):
    """
    Compile generator from iterator
    """
    for x in iterator:
        yield x


def threaded_generator_from_iterator(iterator, num_cached=10):
    """
    Compile threaded generator from iterator
    """
    generator = generator_from_iterator(iterator)
    return threaded_generator(generator, num_cached)


def accuracy_score(t, p):
    """
    Compute accuracy
    """
    return float(np.sum(p == t)) / len(p)


class LDA(object):
    """ LDA Class """

    def __init__(self, r=1e-3, n_components=None, verbose=False, show=False):
        """ Constructor """
        self.r = r
        self.n_components = n_components

        self.scalings_ = None
        self.coef_ = None
        self.intercept_ = None
        self.means = None

        self.verbose = verbose
        self.show = show

    def fit(self, X, y, X_te=None):
        """ Compute lda on hidden layer """

        # split into semi- and supervised- data
        X_all = X.copy()
        X = X[y >= 0]
        y = y[y >= 0]

        # get class labels
        classes = np.unique(y)

        # set number of components
        if self.n_components is None:
            self.n_components = len(classes) - 1

        # compute means
        means = []
        for group in classes:
            Xg = X[y == group, :]
            means.append(Xg.mean(0))
        self.means = np.asarray(means)

        # compute covs
        covs = []
        for group in classes:
            Xg = X[y == group, :]
            Xg = Xg - np.mean(Xg, axis=0)
            covs.append(np.cov(Xg.T))

        # within scatter
        Sw = np.average(covs, axis=0)

        # total scatter
        X_all = X_all - np.mean(X_all, axis=0)
        if X_te is not None:
            St = np.cov(np.concatenate((X_all, X_te)).T)
        else:
            St = np.cov(X_all.T)

        # between scatter
        Sb = St - Sw

        # cope for numerical instability
        Sw += np.identity(Sw.shape[0]) * self.r

        # compute eigen decomposition
        from scipy.linalg.decomp import eigh
        evals, evecs = eigh(Sb, Sw)

        # sort eigen vectors according to eigen values
        evecs = evecs[:, np.argsort(evals)[::-1]]

        # normalize eigen vectors
        evecs /= np.apply_along_axis(np.linalg.norm, 0, evecs)

        # compute lda data
        self.scalings_ = evecs
        self.coef_ = np.dot(self.means, evecs).dot(evecs.T)
        self.intercept_ = (-0.5 * np.diag(np.dot(self.means, self.coef_.T)))

        if self.verbose:
            top_k_evals = evals[-self.n_components:]
            print("LDA-Eigenvalues (Train):", np.array_str(top_k_evals, precision=2, suppress_small=True))
            print("Ratio min(eigval)/max(eigval): %.3f, Mean(eigvals): %.3f" % (top_k_evals.min() / top_k_evals.max(), top_k_evals.mean()))

        if self.show:
            plt.figure("Eigenvalues")
            ax = plt.subplot(111)
            top_k_evals /= np.sum(top_k_evals)
            plt.plot(range(self.n_components), top_k_evals, 'bo-')
            plt.grid('on')
            plt.xlabel('Eigenvalue', fontsize=20)
            plt.ylabel('Explained Discriminative Variance', fontsize=20)
            plt.ylim([0.0, 1.05 * np.max(top_k_evals)])

            ax.tick_params(axis='x', labelsize=18)
            ax.tick_params(axis='y', labelsize=18)

        return evals

    def transform(self, X):
        """ transform data """
        X_new = np.dot(X, self.scalings_)
        return X_new[:, :self.n_components]

    def predict_proba(self, X):
        """ estimate probability """
        prob = -(np.dot(X, self.coef_.T) + self.intercept_)
        np.exp(prob, prob)
        prob += 1
        np.reciprocal(prob, prob)
        prob /= prob.sum(axis=1).reshape((prob.shape[0], -1))
        return prob

    def predict_log_proba(self, X):
        """ estimate log probability """
        return np.log(self.predict_proba(X))


def create_iter_functions(l_out, l_in, y_tensor_type, objective, learning_rate, l_2, compute_updates):
    """ Create functions for training, validation and testing to iterate one epoch. """

    # init target tensor
    targets = y_tensor_type('y')

    # compute train costs
    tr_output = lasagne.layers.get_output(l_out, deterministic=False)
    tr_cost = objective(tr_output, targets)

    # compute validation costs
    va_output = lasagne.layers.get_output(l_out, deterministic=True)
    va_cost = objective(va_output, targets)

    # collect all parameters of net and compute updates
    all_params = lasagne.layers.get_all_params(l_out, trainable=True)

    # add weight decay
    if l_2 is not None:
        tr_cost += l_2 * lasagne.regularization.apply_penalty(all_params, lasagne.regularization.l2)

    # compute updates from gradients
    all_grads = lasagne.updates.get_or_compute_grads(tr_cost, all_params)
    updates = compute_updates(all_grads, all_params, learning_rate)

    # compile iter functions
    tr_outputs = [tr_cost]
    iter_train = theano.function([l_in.input_var, targets], tr_outputs, updates=updates)

    va_outputs = [va_cost, va_output]
    iter_valid = theano.function([l_in.input_var, targets], va_outputs)

    # compile output function
    compute_output = theano.function([l_in.input_var], va_output)

    return dict(train=iter_train, valid=iter_valid, test=iter_valid, compute_output=compute_output)


def train(iter_funcs, dataset, train_batch_iter, valid_batch_iter, r):
    """
    Train the model with `dataset` with mini-batch training.
    Each mini-batch has `batch_size` recordings.
    """
    import sys
    import time

    for epoch in itertools.count(1):

        # iterate train batches
        batch_train_losses = []
        iterator = train_batch_iter(dataset['X_train'], dataset['y_train'])
        generator = threaded_generator_from_iterator(iterator)

        start, after = time.time(), time.time()
        for i_batch, (X_b, y_b) in enumerate(generator):
            batch_res = iter_funcs['train'](X_b, y_b)
            batch_train_losses.append(batch_res[0])
            after = time.time()
            train_time = (after-start)

            # report loss during training
            perc = 100 * (float(i_batch) / train_batch_iter.n_batches)
            dec = int(perc // 4)
            progbar = "|" + dec * "#" + (25-dec) * "-" + "|"
            vals = (perc, progbar, train_time, np.mean(batch_train_losses))
            loss_str = " (%d%%) %s time: %.2fs, loss: %.5f" % vals
            print(col.print_colored(loss_str, col.WARNING), end="\r")
            sys.stdout.flush()

        print("\x1b[K", end="\r")
        avg_train_loss = np.mean(batch_train_losses)

        # lda evaluation (accuracy based)

        # iterate validation batches
        batch_valid_losses = []
        iterator = valid_batch_iter(dataset['X_valid'], dataset['y_valid'])
        generator = threaded_generator_from_iterator(iterator)
        net_output_va, y_va = None, np.zeros(0, dtype=np.int32)
        for X_b, y_b in generator:
            batch_res = iter_funcs['valid'](X_b, y_b)
            batch_valid_losses.append(batch_res[0])

            y_va = np.concatenate((y_va, y_b))
            net_output = iter_funcs['compute_output'](X_b)
            if net_output_va is None:
                net_output_va = net_output
            else:
                net_output_va = np.vstack((net_output_va, net_output))

        avg_valid_loss = np.mean(batch_valid_losses)

        # compute train set net output
        iterator = train_batch_iter(dataset['X_train'], dataset['y_train'])
        generator = threaded_generator_from_iterator(iterator)
        net_output_tr, y_tr = None, np.zeros(0, dtype=np.int32)
        for i_batch, (X_b, y_b) in enumerate(generator):
            y_tr = np.concatenate((y_tr, y_b))
            net_output = iter_funcs['compute_output'](X_b)
            if net_output_tr is None:
                net_output_tr = net_output
            else:
                net_output_tr = np.vstack((net_output_tr, net_output))

        # fit lda on net output
        print("")
        dlda = LDA(r=r, n_components=None, verbose=True)
        evals = dlda.fit(net_output_tr, y_tr)

        # predict on train set
        proba = dlda.predict_proba(net_output_tr[y_tr >= 0])
        y_tr_pr = np.argmax(proba, axis=1)
        tr_acc = 100 * accuracy_score(y_tr[y_tr >= 0], y_tr_pr)

        # predict on validation set
        proba = dlda.predict_proba(net_output_va)
        y_va_pr = np.argmax(proba, axis=1)
        va_acc = 100 * accuracy_score(y_va, y_va_pr)

        # estimate overfitting
        overfit = va_acc / tr_acc

        # collect results
        yield {
            'number': epoch,
            'train_loss': avg_train_loss,
            'train_acc': tr_acc,
            'valid_loss': avg_valid_loss,
            'valid_acc': va_acc,
            'overfitting': overfit,
            'eigenvalues': evals
        }


def fit(l_out, l_in, data, objective, y_tensor_type,
        train_batch_iter, valid_batch_iter,
        r=1e-3, num_epochs=100, patience=20,
        learn_rate=0.01, update_learning_rate=None,
        l_2=None, compute_updates=None,
        exp_name='ff', out_path=None, dump_file=None):
    """ Train model """

    # log model evolution
    log_file = os.path.join(out_path, 'results.pkl')

    print("\n")
    print(col.print_colored("Running Test Case: " + exp_name, BColors.UNDERLINE))

    # adaptive learning rate
    learning_rate = theano.shared(np.float32(learn_rate))
    if update_learning_rate is None:
        def update_learning_rate(lr, e):
            return lr
    learning_rate.set_value(update_learning_rate(learn_rate, 0))

    # initialize evaluation output
    pred_tr_err, pred_val_err, overfitting = [], [], []
    tr_accs, va_accs = [], []
    eigenvalues = []

    print("Building model and compiling functions...")
    iter_funcs = create_iter_functions(l_out, l_in, y_tensor_type, objective, learning_rate=learning_rate,
                                       l_2=l_2, compute_updates=compute_updates)

    print("Starting training...")
    now = time.time()
    try:

        # initialize early stopping
        last_improvement = 0
        best_model = lasagne.layers.get_all_param_values(l_out)

        # iterate training epochs
        prev_acc_tr, prev_acc_va = 0.0, 0.0
        for epoch in train(iter_funcs, data, train_batch_iter, valid_batch_iter, r):

            print("Epoch {} of {} took {:.3f}s".format(
                epoch['number'], num_epochs, time.time() - now))
            now = time.time()

            # update learning rate
            learn_rate = update_learning_rate(learn_rate, epoch['number'])
            learning_rate.set_value(learn_rate)

            # --- collect train output ---

            tr_loss, va_loss = epoch['train_loss'], epoch['valid_loss']
            train_acc, valid_acc = epoch['train_acc'], epoch['valid_acc']
            overfit = epoch['overfitting']

            # prepare early stopping
            if valid_acc >= prev_acc_va:
                last_improvement = 0
                best_model = lasagne.layers.get_all_param_values(l_out)

                # dump net parameters during training
                if dump_file is not None:
                    with open(dump_file, 'w') as fp:
                        params = lasagne.layers.get_all_param_values(l_out)
                        pickle.dump(params, fp)

            # increase improvement counter
            last_improvement += 1

            # plot train output
            if train_acc is None:
                txt_tr = 'costs_tr %.5f' % tr_loss
            else:
                txt_tr = 'costs_tr %.5f (%.3f), ' % (tr_loss, train_acc)
            if train_acc >= prev_acc_tr:
                txt_tr = col.print_colored(txt_tr, BColors.OKGREEN)
                prev_acc_tr = train_acc

            if valid_acc is None:
                txt_val = ''
            else:
                txt_val = 'costs_val %.5f (%.3f), tr/val %.3f' % (va_loss, valid_acc, overfit)
            if valid_acc >= prev_acc_va:
                txt_val = col.print_colored(txt_val, BColors.OKGREEN)
                prev_acc_va = valid_acc

            print('  lr: %.5f' % learn_rate)
            print('  ' + txt_tr + txt_val)

            # collect model evolution data
            tr_accs.append(train_acc)
            va_accs.append(valid_acc)
            pred_tr_err.append(tr_loss)
            pred_val_err.append(va_loss)
            overfitting.append(overfit)
            eigenvalues.append(epoch['eigenvalues'])

            # --- early stopping: preserve best model ---
            if last_improvement > patience:
                print(col.print_colored("Early Stopping!", BColors.WARNING))
                status = "Epoch: %d, Best Validation Accuracy: %.3f" % (epoch['number'], prev_acc_va)
                print(col.print_colored(status, BColors.WARNING))
                break

            # maximum number of epochs reached
            if epoch['number'] >= num_epochs:
                break

            # shuffle train data
            if not hasattr(data['X_train'], 'reset_batch_generator'):
                rand_idx = np.random.permutation(data['X_train'].shape[0])
                data['X_train'] = data['X_train'][rand_idx]
                data['y_train'] = data['y_train'][rand_idx]

            # save results
            exp_res = dict()
            exp_res['pred_tr_err'] = pred_tr_err
            exp_res['tr_accs'] = tr_accs
            exp_res['pred_val_err'] = pred_val_err
            exp_res['va_accs'] = va_accs
            exp_res['overfitting'] = overfitting
            exp_res['eigenvalues'] = eigenvalues

            with open(log_file, 'w') as fp:
                pickle.dump(exp_res, fp)

    except KeyboardInterrupt:
        pass

    # set net to best weights
    lasagne.layers.set_all_param_values(l_out, best_model)

    # evaluate on test set
    test_losses, test_acc = [], []
    iterator = valid_batch_iter(data['X_test'], data['y_test'])
    for X_b, y_b in iterator:
        loss_te = iter_funcs['test'](X_b, y_b)
        test_losses.append(loss_te[0])
        if len(loss_te) > 1:
            test_acc.append(loss_te[1])

    # compute evaluation measures
    avg_loss_te = np.mean(test_losses)
    avg_acc_te = np.mean(test_acc)

    print("--------------------------------------------")
    print('Loss on Test-Set: %.5f' % avg_loss_te)
    print("--------------------------------------------\n")

    if out_path is not None:

        # add test results and save results
        exp_res['avg_loss_te'] = avg_loss_te
        exp_res['avg_acc_te'] = avg_acc_te

        with open(log_file, 'w') as fp:
            pickle.dump(exp_res, fp)

    return l_out, prev_acc_va