MEUL with a non-parametric homeostasis
In this notebook, we will study how homeostasis (cooperation) may be an essential ingredient to this algorithm working on a winner-take-all basis (competition). This extension has been published as Perrinet, Neural Computation (2010) (see https://laurentperrinet.github.io/publication/perrinet-10-shl ). Compared to other posts, such as this previous post, we improve the code to not depend on any parameter (namely the C
parameter of the rescaling function). For that, we will use a non-parametric approach based on the use of cumulative histograms.
This is joint work with Victor Boutin and Angelo Francisioni. See also the other posts on unsupervised learning.
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline
import numpy as np
np.set_printoptions(formatter = dict( float = lambda x: "%.3g" % x ), precision=3, suppress=True, threshold=np.inf)
from shl_scripts.shl_experiments import SHL
import time
%load_ext autoreload
%autoreload 2
DEBUG = True
DEBUG = False
if not DEBUG:
tag = '2017-12-01_Testing_COMPs'
DEBUG_DOWNSCALE = 1
else:
tag = '2017-12-01_Testing_COMPs-DEBUG'
DEBUG_DOWNSCALE = 10
seed = 42
nb_quant = 512
C = 0.
do_sym = False
verbose = False
i_sample = 42
from shl_scripts.shl_experiments import SHL
matname = tag + '_nohomeo'
shl = SHL(DEBUG_DOWNSCALE=DEBUG_DOWNSCALE,
datapath='/tmp/database', seed=seed,
eta=0.05, verbose=2, record_each=50, n_iter=1000,
eta_homeo=0., alpha_homeo=1.,
do_sym=do_sym, nb_quant=nb_quant)
data = shl.get_data(matname=matname)
test_size = shl.batch_size # data.shape[0]//2
data_training = data[:test_size, :]
data_test = data[test_size:,:]
if DEBUG:
test_size = data.shape[0]//20
data_training = data[:(data.shape[0]-test_size),:].copy()
data_test = data[:test_size, :].copy()
dico_partial_learning = shl.learn_dico(data=data_training, matname=matname)
We start off by using a short learning with no homeostasis such that we end up with a unbalanced dictionary:
fig, ax = shl.show_dico(dico_partial_learning, data=data, title=matname)
fig.show()
fig, ax = shl.time_plot(dico_partial_learning, variable='prob_active');
fig.show()
MP classique¶
n_samples, n_pixels = data_test.shape
n_dictionary, n_pixels = dico_partial_learning.dictionary.shape
norm_each_filter = np.sqrt(np.sum(dico_partial_learning.dictionary**2, axis=1))
dico_partial_learning.dictionary /= norm_each_filter[:,np.newaxis]
sparse_code_mp = shl.code(data_test, dico_partial_learning, matname=matname)
from shl_scripts.shl_tools import plot_proba_histogram
fig, ax = plot_proba_histogram(sparse_code_mp)
COMP : learning rescaling on sparse coefficients - basic principle¶
n_samples, n_dictionary = sparse_code_mp.shape
N = n_samples * n_dictionary
q = np.zeros_like(sparse_code_mp)
for i in range(i_sample, i_sample+1): #range(n_samples):
for k in range(n_dictionary):
if sparse_code_mp[i, k] > 0:
q[i, k] = np.sum(sparse_code_mp[i, k]>sparse_code_mp.ravel())/N
if q[i, k]==0:
print(k, i, sparse_code_mp[i, k])
if i < 22 and k < 20:
print(i, k, 'Raw value=', sparse_code_mp[i, k],
' is transformed into=', q[i, k])
def rescaling(code, do_sym=do_sym):
if do_sym:
code = np.abs(code)
else:
code *= code>0
n_samples, n_dictionary = code.shape
N = n_samples * n_dictionary
q = np.zeros_like(code)
for i in range(n_samples):
for k in range(n_dictionary):
if code[i, k] > 0:
q[i, k] = np.sum(code[i, k]>code.ravel())/N
return q
Testing on one sample:
%%time
sparse_code_mp_ = rescaling(sparse_code_mp[i_sample:(i_sample+1), :], do_sym=do_sym)
def rescaling(code, do_sym=do_sym):
if do_sym: code = np.abs(code)
n_samples, n_dictionary = code.shape
N = n_samples * n_dictionary
q = np.sum(code[:, :, np.newaxis] > code.ravel()[np.newaxis, np.newaxis, :], axis=-1)
return q
%%time
sparse_code_mp_ = rescaling(sparse_code_mp[i_sample:(i_sample+1), :], do_sym=do_sym)
def get_P_cum(code, nb_quant=512, do_rescale=True, do_sym=do_sym):
if do_rescale:
p_c = rescaling(code, do_sym=do_sym)
n_samples, nb_filter = code.shape
code_bins = np.linspace(0., 1, nb_quant, endpoint=True)
P_cum = np.zeros((nb_filter, nb_quant))
for i in range(nb_filter):
p, bins = np.histogram(p_c[:, i], bins=code_bins, density=True)
p /= p.sum()
P_cum[i, :] = np.hstack((0, np.cumsum(p)))
return P_cum
%%time
P_cum = get_P_cum(sparse_code_mp[i_sample:(i_sample+1), :], nb_quant=nb_quant, do_sym=do_sym)
COMP : learning modulations on linear coefficients using ranks¶
Sparse coefficients are distributed according to:
print('Min=', np.min(sparse_code_mp), 'Max=', np.max(sparse_code_mp), 'Shape=', sparse_code_mp.shape)
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
ax.plot(np.sort(sparse_code_mp.ravel()))
ax.set_xlabel('rank'); ax.set_ylabel('coefficient');
and linear (rectified) coefficients to:
corr = (data_test @ dico_partial_learning.dictionary.T)
corr *= corr>0
print('Min=', np.min(corr), 'Max=', np.max(corr), 'Shape=', corr.shape)
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
ax.plot(np.sort(corr.ravel()))
ax.set_xlabel('rank'); ax.set_ylabel('coefficient');
The rescaling functions aims at achieving a better numerical stability, such that the quantification won't affect the learning. But it fundamentally does not change the learning algorithm. The later choice (linear rectified coefficients) seems to be a more robust choice and is similar to the prior choice of an exponential function in a past implementation.
def get_rescaling(code, nb_quant=nb_quant, do_sym=do_sym, verbose=False):
# if do_sym:
# code = np.abs(code)
# else:
# code *= code>0
sorted_coeffs = np.sort(code.ravel())
indices = [int(q*(sorted_coeffs.size-1) ) for q in np.linspace(0, 1, nb_quant, endpoint=True)]
C = sorted_coeffs[indices]
if verbose:
print ('At indices (ranks) ', indices)
print ('the coefficients are ', C )
return C
C = get_rescaling(corr, nb_quant=nb_quant, do_sym=do_sym, verbose=True)
def rescaling(code, C, do_sym=do_sym):
# if do_sym:
# code = np.abs(code)
# else:
# code *= code>0
n_samples, n_dictionary = code.shape
N = n_samples * n_dictionary
q = np.zeros_like(code)
code_bins = np.linspace(0., 1, C.size, endpoint=True)
for i in range(n_samples):
for k in range(n_dictionary):
if code[i, k] > 0:
q[i, k] = np.interp(code[i, k], C, code_bins)
return q
rescaled_code_ = rescaling(corr[i_sample:(i_sample+1), :], C=C, do_sym=do_sym)
%%time
rescaled_code_ = rescaling(corr[i_sample:(i_sample+1), :], C=C, do_sym=do_sym)
This can be vectorized:
def rescaling(code, C, do_sym=do_sym):
# if do_sym:
# code = np.abs(code)
# else:
# code *= code>0
code_bins = np.linspace(0., 1, C.size, endpoint=True)
return np.interp(code, C, code_bins) * (code > 0.)
print('on ', corr.shape[0], ' samples')
rescaled_code_ = rescaling(corr, C=C, do_sym=do_sym)
%%time
sparse_code_mp_ = rescaling(sparse_code_mp, C=C, do_sym=do_sym)
We notice also that we use half of the vector for negative coefficients. This could be improved as:
def rescaling(code, C, do_sym=do_sym):
# if do_sym:
# code = np.abs(code)
# else:
# code *= code>0
p_c = np.zeros_like(code)
ind_nz = code>0.
code_bins = np.linspace(0., 1, C.size, endpoint=True)
p_c[ind_nz] = np.interp(code[ind_nz], C, code_bins)
return p_c # * (code > 0.)
rescaled_code__ = rescaling(corr, C=C, do_sym=do_sym)
assert(np.array_equal(rescaled_code_, rescaled_code__))
%%time
sparse_code_mp_ = rescaling(sparse_code_mp, C=C, do_sym=do_sym)
Finally, one can get the P_cum
look-up-tables:
def get_P_cum(code, nb_quant, C=C, do_sym=do_sym):
p_c = rescaling(code, C=C, do_sym=do_sym)
n_samples, nb_filter = code.shape
code_bins = np.linspace(0., 1, nb_quant, endpoint=True)
P_cum = np.zeros((nb_filter, nb_quant))
for i in range(nb_filter):
p, bins = np.histogram(p_c[:, i], bins=code_bins, density=True)
p /= p.sum()
P_cum[i, :] = np.hstack((0., np.cumsum(p)))
return P_cum
computing histograms is rather fast:
%%time
P_cum = get_P_cum(sparse_code_mp, nb_quant=nb_quant, C=C)
C = get_rescaling(corr, nb_quant=nb_quant, do_sym=do_sym, verbose=True)
C_vec_init = C.copy()
fig=plt.figure(figsize=(13, 8))
ax = plt.subplot(111)
x = np.linspace(0, 30, 100)
ax.plot(x, np.ones_like(x), '--')
ax.plot(x, rescaling(x, C=C, do_sym=do_sym))
ax.set_xlabel('raw coefficient')
ax.set_ylabel('rescaled coefficient')
ax.set_xscale('log');
P_cum = get_P_cum(rescaling(sparse_code_mp, C=C, do_sym=do_sym), nb_quant=nb_quant, C=C, do_sym=do_sym)
from shl_scripts.shl_tools import plot_P_cum
fig, ax = plot_P_cum(P_cum, verbose=False, alpha=.05)
ax.plot([0, 1], [0, 1], 'r--');
ax.set_ylim(0.85, 1.01);ax.set_xlim(0., 1.);
Finally, we notice that the C
vector is of the same length as each P_cum
vector and we will integrate it to this array to ease message passing.
COMP : optimizing the quantile function¶
These quantile
and rescaling
functions are implanted in the shl_scripts
:
from shl_scripts.shl_encode import quantile, rescaling
corr = (data_test @ dico_partial_learning.dictionary.T)
#corr *= corr>0
print('correlation=', corr[i_sample, :])
print('transformed correlation=', rescaling(corr, C=C, do_sym=do_sym)[i_sample, :])
C = get_rescaling(corr, nb_quant=nb_quant, do_sym=do_sym, verbose=True)
fig=plt.figure(figsize=(13, 8))
ax = plt.subplot(111)
x = np.linspace(0, 30, 100)
ax.plot(x, np.ones_like(x), '--')
ax.plot(x, rescaling(x, C=C_vec_init, do_sym=do_sym))
ax.plot(x, rescaling(x, C=.5, do_sym=do_sym))
ax.set_xlabel('raw coefficient')
ax.set_ylabel('rescaled coefficient')
ax.set_xscale('log')
ax.axis('tight');
A sanity check with extremal values:
stick = np.arange(shl.n_dictionary)*nb_quant
print('Value for ones = ', rescaling(np.inf*np.ones(shl.n_dictionary), C=C))
print('Value for zeros = ', rescaling(np.zeros(shl.n_dictionary), C=C))
print('Value for ones = ', quantile(P_cum, rescaling(np.inf*np.ones(shl.n_dictionary), C=C), stick))
print('Value for zeros = ', quantile(P_cum, rescaling(np.zeros(shl.n_dictionary), C=C), stick))
a P_cum
array that should not change anything to the way we chose filters:
P_cum_zeroeffect = np.linspace(0, 1, nb_quant, endpoint=True)[np.newaxis, :] * np.ones((n_dictionary, 1))
def plot_scatter_MpVsComp(sparse_vector, my_sparse_code, title='MP', xlabel='MP', ylabel='COMP'):
fig = plt.figure(figsize=(16, 16))
ax = fig.add_subplot(111)
a_min = np.min((sparse_vector.min(), my_sparse_code.min()))
a_max = np.max((sparse_vector.max(), my_sparse_code.max()))
ax.plot(np.array([a_min, a_max]), np.array([a_min, a_max]), 'k--', lw=2)
print(sparse_vector.shape, my_sparse_code.shape)
ax.scatter(sparse_vector.ravel(), my_sparse_code.ravel(), alpha=0.01)
ax.set_title(title)
ax.set_ylabel(ylabel)
ax.set_xlabel(xlabel)
#ax.set_xlim(0)
#ax.set_ylim(0)
ax.axis('equal')
return fig, ax
p_c = rescaling(corr, C=C, do_sym=do_sym)
fig, ax = plot_scatter_MpVsComp(p_c, quantile(P_cum_zeroeffect, p_c, stick),
title='transformation with flat pcum', xlabel='input', ylabel='quantile')
The relative mean squared error is low:
print('Relative difference = ', np.sum( (p_c - quantile(P_cum_zeroeffect, p_c, stick))**2) / np.sum(p_c**2))
COMP : using modulations¶
let's use this new rescaling
function
l0_sparseness = shl.l0_sparseness
def comp(data, dico, P_cum, C=C, do_sym=do_sym, verbose=0):
if verbose!=0: t0 = time.time()
n_samples, n_dictionary = data.shape[0], dico.shape[0]
sparse_code = np.zeros((n_samples, n_dictionary))
corr = (data @ dico.T)
Xcorr = (dico @ dico.T)
nb_quant = P_cum.shape[1]
stick = np.arange(n_dictionary)*nb_quant
for i_sample in range(n_samples):
c = corr[i_sample, :].copy()
if verbose!=0: ind_list=list()
for i_l0 in range(int(l0_sparseness)):
if P_cum is None:
q_i = rescaling(c, C=C, do_sym=do_sym)
else:
q_i = quantile(P_cum, rescaling(c, C=C, do_sym=do_sym), stick, do_fast=True)
ind = np.argmax(q_i)
if verbose!=0: ind_list.append(ind)
c_ind = c[ind] / Xcorr[ind, ind]
sparse_code[i_sample, ind] += c_ind
c -= c_ind * Xcorr[ind, :]
if verbose!=0 and i_sample in range(2):
q_i = quantile(P_cum, rescaling(c, C=C, do_sym=do_sym), stick)
print(ind_list, [q_i[i] for i in ind_list], np.median(q_i), q_i.max(), [c[i] for i in ind_list], c.min(), c.max())
if verbose!=0:
duration = time.time()-t0
print('coding duration : {0}s'.format(duration))
return sparse_code
corr = (data_test @ dico_partial_learning.dictionary.T)
C = get_rescaling(corr, nb_quant=nb_quant, verbose=True)
sparse_code = comp(data_test, dico_partial_learning.dictionary, P_cum_zeroeffect, C=C, do_sym=do_sym, verbose=1)
%%timeit
corr = (data_test @ dico_partial_learning.dictionary.T)
C = get_rescaling(corr, nb_quant=nb_quant, verbose=False)
sparse_code = comp(data_test, dico_partial_learning.dictionary, P_cum_zeroeffect, C=C, do_sym=do_sym, verbose=0)
%%timeit
corr = (data_test @ dico_partial_learning.dictionary.T)
C = get_rescaling(corr, nb_quant=nb_quant, verbose=False)
sparse_code = comp(data_test, dico_partial_learning.dictionary, P_cum_zeroeffect, C=C, do_sym=do_sym, verbose=0)
testing that COMP with fixed Pcum is equivalent to MP¶
print(dico_partial_learning.P_cum)
from shl_scripts.shl_learn import get_P_cum
from shl_scripts.shl_encode import get_rescaling, rescaling
corr = (data_test @ dico_partial_learning.dictionary.T)
C = get_rescaling(corr, nb_quant=nb_quant, verbose=True)
p_c = rescaling(corr, C=C, do_sym=do_sym)
from shl_scripts.shl_encode import get_rescaling, mp
l0_sparseness = 50
sparse_code_mp = mp(data_test, dico_partial_learning.dictionary, P_cum=None, C=5., do_sym=do_sym, l0_sparseness=l0_sparseness, verbose=1)
sparse_code_comp = mp(data_test, dico_partial_learning.dictionary, P_cum=P_cum_zeroeffect, C=5., do_sym=do_sym, l0_sparseness=l0_sparseness, verbose=1)
#print('Difference = ', np.sum((sparse_code_mp - sparse_code_comp)**2))
#print('Relative difference = ', np.mean((sparse_code_mp - sparse_code_comp)**2))
#print('Variance = ', np.mean((sparse_code_mp)**2))
print('Relative difference = ', np.sum((sparse_code_mp - sparse_code_comp)**2)/np.sum((sparse_code_mp)**2))
fig, ax = plot_proba_histogram(sparse_code_mp)
fig.show()
fig, ax = plot_proba_histogram(sparse_code_comp)
n_samples, nb_filter = sparse_code_mp.shape
from shl_scripts.shl_encode import get_rescaling, mp
corr = (data_test @ dico_partial_learning.dictionary.T)
C_vec = get_rescaling(corr, nb_quant=nb_quant, verbose=True)
P_cum = np.vstack((P_cum_zeroeffect, C_vec))
print('Classical MP')
sparse_code_mp = mp(data_test, dico_partial_learning.dictionary, P_cum=None, C=np.inf, do_sym=do_sym, l0_sparseness=l0_sparseness, verbose=1)
print('Homeostatic MP')
sparse_code_comp = mp(data_test, dico_partial_learning.dictionary, P_cum=P_cum, C=0., do_sym=do_sym, l0_sparseness=l0_sparseness, verbose=1)
print('Relative difference = ', np.sum((sparse_code_mp - sparse_code_comp)**2)/np.sum((sparse_code_mp)**2))
fig, ax = plot_proba_histogram(sparse_code_mp)
fig.show()
fig, ax = plot_proba_histogram(sparse_code_comp)
fig, ax = plot_scatter_MpVsComp(sparse_code_mp, sparse_code_comp)
gradient descent¶
First with one rescaling function, and varying P_cum
functions.
from shl_scripts.shl_learn import get_P_cum
from shl_scripts.shl_encode import get_rescaling, mp
corr = (data_test @ dico_partial_learning.dictionary.T)
C_vec = get_rescaling(corr, nb_quant=nb_quant, verbose=True)
P_cum = np.linspace(0, 1, nb_quant, endpoint=True)[np.newaxis, :] * np.ones((nb_filter, 1))
print('Shape of modulation function', P_cum.shape)
P_cum = np.vstack((P_cum, C_vec))
print('Shape of modulation function', P_cum.shape)
eta_homeo = .01
for i in range(400//DEBUG_DOWNSCALE):
sparse_code = mp(data_test, dico_partial_learning.dictionary, P_cum=P_cum, C=0., do_sym=do_sym, verbose=0)
P_cum_ = get_P_cum(sparse_code, nb_quant=nb_quant, C=C_vec)
P_cum[:-1, :] = (1-eta_homeo) * P_cum[:-1, :] + eta_homeo * P_cum_
if i % (100//DEBUG_DOWNSCALE) == 0:
print('Learning step', i)
fig, ax = plot_proba_histogram(sparse_code)
plt.show()
now by learning the rescaling function along with the P_cum
functions.
from shl_scripts.shl_learn import update_P_cum
from shl_scripts.shl_encode import get_rescaling, mp
corr = (data_test @ dico_partial_learning.dictionary.T)
C_vec= C_vec_init = get_rescaling(corr, nb_quant=nb_quant, verbose=True)
P_cum = np.linspace(0, 1, nb_quant, endpoint=True)[np.newaxis, :] * np.ones((nb_filter, 1))
print('Shape of modulation function', P_cum.shape)
P_cum = np.vstack((P_cum, C_vec))
print('Shape of modulation function', P_cum.shape)
eta_homeo = .01
for i in range(400//DEBUG_DOWNSCALE):
sparse_code = mp(data_test, dico_partial_learning.dictionary, P_cum=P_cum, C=0., do_sym=do_sym, verbose=0)
corr = (data_test @ dico_partial_learning.dictionary.T)
C_vec = get_rescaling(corr, nb_quant=nb_quant, do_sym=do_sym, verbose=verbose)
P_cum[-1, :]= (1 - eta_homeo) * P_cum[-1, :] + eta_homeo * C_vec
P_cum[:-1, :] = update_P_cum(P_cum=P_cum[:-1, :],
code=sparse_code, eta_homeo=eta_homeo,
C=P_cum[-1, :], nb_quant=nb_quant, do_sym=do_sym, verbose=verbose)
if i % (100//DEBUG_DOWNSCALE) == 0:
print('Learning step', i)
fig, ax = plot_proba_histogram(sparse_code)
plt.show()
Note how the rescaling function evolved during learning
fig=plt.figure(figsize=(13, 8))
ax = plt.subplot(111)
x = np.linspace(0, 30, 100)
ax.plot(x, np.ones_like(x), '--')
ax.plot(x, rescaling(x, C=C_vec_init, do_sym=do_sym), label='init')
ax.plot(x, rescaling(x, C=.5, do_sym=do_sym), label='C=5')
ax.plot(x, rescaling(x, C=C_vec, do_sym=do_sym), label='auto')
ax.set_xlabel('raw coefficient')
ax.set_ylabel('rescaled coefficient')
ax.set_xscale('log')
ax.axis('tight')
ax.legend(loc='best');
from shl_scripts.shl_tools import plot_P_cum
fig, ax = plot_P_cum(P_cum[:-1, :], verbose=False);
fig, ax = plot_P_cum(P_cum[:-1, :], verbose=False)
ax.set_ylim(0.8, 1.01);ax.set_xlim(0., 1.);
Similar to the previous post, one observes that the distribution gets progressively more uniform which was our goal. This implementation makes the Matching Pursuit algorithm running faster such that we can integrate it to the SHL package. This was done in this commit.
finally...¶
Let's try the whole learning process with the adaptive rescaling function:
from shl_scripts.shl_experiments import SHL
matname = tag + '_autohomeo'
shl = SHL(DEBUG_DOWNSCALE=DEBUG_DOWNSCALE,
datapath='/tmp/database',
data_cache='/tmp/data_cache',
verbose=2,
homeo_method='HAP', homeo_params=dict(alpha_homeo=0., C=0.),
do_sym=do_sym, nb_quant=nb_quant)
data = shl.get_data(matname=matname)
test_size = data.shape[0]//2
data_training = data[:test_size, :]
data_test = data[test_size:,:]
if DEBUG:
test_size = data.shape[0]//20
data_training = data[:(data.shape[0]-test_size),:].copy()
data_test = data[:test_size, :].copy()
dico_partial_learning = shl.learn_dico(data=data_training, matname=matname)
We start off by using a short learning with no homeostasis such that we end up with a unbalanced dictionary:
fig, ax = shl.show_dico(dico_partial_learning, data=data, title=matname)
fig.show()
fig, ax = shl.time_plot(dico_partial_learning, variable='prob_active');
fig.show()