Skip to content

Commit

Permalink
matplotlib subplots side by side and fixed tests + readme
Browse files Browse the repository at this point in the history
  • Loading branch information
SunehB committed Nov 29, 2024
1 parent d6744b6 commit 83dcc71
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 181 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,24 @@ You can visualize the learned decision boundary of your model as such:

Which produces the following GIF:


<p align="center">
<img src="https://github.com/gallettilance/kviz/blob/master/examples/circle_relu_model.gif?raw=true"/>
</p>

To view the learned decision boundary of your model in the feature space as well, set the view_feature_space flag to True as such:
**(please note this can only be done for neural networks with one hidden layer)**
```python
viz = Visualizer(model)
viz.fit(X, Y, snap_freq=20, duration=300, view_feature_space=True, batch_size=4, epochs=1000, verbose=0)
```

Which produces the two GIFs side by side:
<p align="center">
<img src="https://github.com/gallettilance/kviz/blob/master/examples/feature_space.gif?raw=true"/>
</p>


We can try different activation functions, network architectures, etc. to see what works
best. For example, from looking at the GIF we can see that the neural net is trying to
learn a decision boundary that is a combination of two straight lines. Clearly this is
Expand Down
Binary file added examples/feature_space.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
222 changes: 77 additions & 145 deletions kviz/visualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import numpy as np
from PIL import Image as im
import os


import matplotlib.pyplot as plt
from matplotlib.colors import rgb2hex, LinearSegmentedColormap
Expand Down Expand Up @@ -238,6 +240,49 @@ def _snap_decision_boundary(self, X, Y, filename):
return np.asarray(im.open(filename + '.png'))


def _snap_feature_space(self, X, Y, filename):
"""
Generate a snapshot of the feature space after transformation by the first hidden layer
Parameters:
X : ndarray
input data to be transformed by the model's hidden layer
Y : ndarray
target classes corresponding to input data X, used for coloring the scatter plot
filename : str
name of file to save the snapshot as a PNG image
Returns:
np.ndarray
Image array of the saved feature space snapshot
"""
hidden_features = self._int_models[0].predict(X)

h = .02
x_min, x_max = hidden_features[:, 0].min() - .1, hidden_features[:, 0].max() + .1
y_min, y_max = hidden_features[:, 1].min() - .1, hidden_features[:, 1].max() + .1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
meshData = np.c_[xx.ravel(), yy.ravel()]

fig, ax = plt.subplots(frameon=False)
ax.scatter(hidden_features[:, 0], hidden_features[:, 1],
color=COLORS[Y].tolist(), s=100, alpha=.9)

hidden_layer_model = keras.Sequential([self.model.layers[1]])

Z = hidden_layer_model.predict(meshData)
Z = np.array([z[0] for z in Z]).reshape(xx.shape)
ax.contourf(xx, yy, Z, alpha=.4, cmap=CMAP)

ax.set_xlim(x_min, x_max)
ax.set_ylim(y_min, y_max)

fig.savefig(filename + '.png')
plt.close()
return np.asarray(im.open(filename + '.png'))


def _stack_gifs(self, imgs1, imgs2, filename, duration):
"""
Takes two lists of images and stacks each image in one list on top
Expand Down Expand Up @@ -307,7 +352,7 @@ def _reset(self):
self._graph = self._graph_original_copy.copy()


def fit(self, X, Y, snap_freq=10, filename='decision_boundary', duration=1000, **kwargs):
def fit(self, X, Y, snap_freq=10, filename='decision_boundary', duration=1000, view_feature_space=False, **kwargs):
"""
Make GIF from snapshots of decision boundary at given snap_freq
Expand All @@ -317,11 +362,13 @@ def fit(self, X, Y, snap_freq=10, filename='decision_boundary', duration=1000, *
Y : ndarray
classes to be learned
snap_freq : int
number of epochs after which to take a snapshot
number of epochs after which to take a snapshot
filename : str
name of file to save as GIF
duration : int
duration in ms between images in GIF
view_feature_space : bool
flag to display the decision boundary in hidden feature space
**kwargs : other params
paramter inputs to model.fit
Expand All @@ -336,10 +383,36 @@ def fit(self, X, Y, snap_freq=10, filename='decision_boundary', duration=1000, *
else:
epochs = snap_freq

for _ in range(int(epochs / snap_freq)):
temp_dir = "snapshots"
os.makedirs(temp_dir, exist_ok=True)

for epoch in range(int(epochs / snap_freq)):
self.model.fit(X, Y, epochs=snap_freq, **kwargs)
self._int_models = self._get_int_models() # TODO: make this function more efficient
images.append(im.fromarray(self._snap_decision_boundary(X, Y, filename)))
if (view_feature_space):


if (len(self.model.layers) > 3):
raise ValueError("The model must have only one hidden layer for this visualization")

fig, axes = plt.subplots(1, 2, figsize=(12, 6))

axes[0].imshow(self._snap_decision_boundary(X, Y, filename))
axes[0].axis('off')
axes[0].set_title("Input Space")

axes[1].imshow(self._snap_feature_space(X, Y, filename))
axes[1].axis('off')
axes[1].set_title("Feature Space")

temp_filename = os.path.join(temp_dir, f"epoch_{epoch}.png")
fig.savefig(temp_filename, format='png')
plt.close(fig)

images.append(im.open(temp_filename))

else:
images.append(im.fromarray(self._snap_decision_boundary(X, Y, filename)))

self._convert_gif(images, filename, duration)
return self.model
Expand Down Expand Up @@ -417,147 +490,6 @@ def view_activations_for(self, X, filename='activations', duration=1000, x_color

self._stack_gifs(network_images, input_images, filename, duration)
return

def _snap_feature_space(self, X, Y, filename):
"""
Generate a snapshot of the feature space after transformation by the first hidden layer
Parameters:
X : ndarray
input data to be transformed by the model's hidden layer
Y : ndarray
target classes corresponding to input data X, used for coloring the scatter plot
filename : str
name of file to save the snapshot as a PNG image
Returns:
np.ndarray
Image array of the saved feature space snapshot
"""
hidden_features = self._int_models[0].predict(X)

h = .02
x_min, x_max = hidden_features[:, 0].min() - .1, hidden_features[:, 0].max() + .1
y_min, y_max = hidden_features[:, 1].min() - .1, hidden_features[:, 1].max() + .1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
meshData = np.c_[xx.ravel(), yy.ravel()]

fig, ax = plt.subplots(frameon=False)
ax.scatter(hidden_features[:, 0], hidden_features[:, 1],
color=COLORS[Y].tolist(), s=100, alpha=.9)

hidden_layer_model = keras.Sequential([self.model.layers[1]])

Z = hidden_layer_model.predict(meshData)
Z = np.array([z[0] for z in Z]).reshape(xx.shape)
ax.contourf(xx, yy, Z, alpha=.4, cmap=CMAP)

ax.set_xlim(x_min, x_max)
ax.set_ylim(y_min, y_max)

fig.savefig(filename + '.png')
plt.close()
return np.asarray(im.open(filename + '.png'))

def fit_in_feature_space(self, X, Y, snap_freq = 10, filename='feature_space', duration=1000, **kwargs):
"""
Make GIF from snapshots of decision boundary in the feature space at given snap_freq
Parameters:
X : ndarray
input data for training the Keras model
Y : ndarray
target labels corresponding to input data X
snap_freq : int
number of epochs after which to take a snapshot of the feature space
filename : str
name of file to save as GIF
duration : int
duration in ms between images in GIF
**kwargs : other params
paramter inputs to model.fit
Returns:
The model after learning
"""
images = []
if 'epochs' in kwargs:
epochs = kwargs['epochs']
kwargs.pop('epochs', None)
else:
epochs = snap_freq

for _ in range(int(epochs / snap_freq)):
self.model.fit(X, Y, epochs=snap_freq, **kwargs)
self._int_models = self._get_int_models()
images.append(im.fromarray(self._snap_feature_space(X, Y, filename)).convert('RGB').convert('P'))

images[0].save(
filename + '.gif',
optimize=False,
save_all=True,
append_images=images[1:],
loop=0,
duration=duration,
disposal=2
)
return self.model


def combine_gifs(self,gif1_path, gif2_path, output_path, duration=200):
"""
Display two GIFs side by side in a single GIF
Parameters:
gif1_path : str
File path of the first GIF
gif2_path : str
File path of the second GIF
output_path : str
File path to save the combined GIF
duration : int
Duration in ms between frames in the combined GIF
Returns:
str
File path of the saved and combined GIF
"""
gif1 = im.open(gif1_path)
gif2 = im.open(gif2_path)

n_frames = 0
combined_frames = []

try:
while True:
frame1 = gif1.copy()
frame2 = gif2.copy()

combined = im.new('RGB', (frame1.width + frame2.width, frame1.height))

combined.paste(frame1, (0, 0))
combined.paste(frame2, (frame1.width, 0))

combined_frames.append(combined)
n_frames += 1

gif1.seek(n_frames)
gif2.seek(n_frames)

except EOFError:
pass # End of gif

combined_frames[0].save(
output_path,
save_all=True,
append_images=combined_frames[1:],
duration=duration,
loop=0
)

return output_path


def render(self, filename='graph'):
"""
Expand Down
46 changes: 10 additions & 36 deletions tests/test_visualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from kviz.visualizer import Visualizer

import os

def test_render():
ACTIVATION = "sigmoid"
Expand Down Expand Up @@ -61,41 +60,16 @@ def custom_activation(x):
dg.view_activations_for(X, 'test_fit_activations', 100)

def test_feature_space():
t, _ = datasets.make_blobs(n_samples=200, centers=[[0, 0]], cluster_std=1, random_state=42)
X = np.array(list(filter(lambda x: x[0]**2 + x[1]**2 < 1.5 or x[0]**2 + x[1]**2 > 2.5, t)))
Y = np.array([1 if x[0]**2 + x[1]**2 >= 2 else 0 for x in X])



model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_dim=2, activation="tanh"))
model.add(keras.layers.Dense(1, activation="sigmoid"))
model.compile(loss="binary_crossentropy", optimizer=keras.optimizers.Adam(learning_rate=1e-1))


obj = Visualizer(model)
obj.fit(X,Y,10, 'decision_boundary',100,epochs =500,verbose =0, batch_size=50)
obj.fit_in_feature_space(X, Y, 1, 'feature_space', 30, epochs=100)

decision_boundary = './decision_boundary.gif'
if os.path.exists('decision_boundary.gif') and os.path.exists('feature_space.gif'):
obj.combine_gifs('decision_boundary.gif', 'feature_space.gif', 'side_by_side.gif')
else:
print("One or both GIFs are missing. Make sure they are created and in the correct directory.")


def main():
print("Running test_render...")
test_render()

print("Running test_view_activations_for...")
test_view_activations_for()

print("Running test_fit...")
test_fit()
model.add(layers.Dense(2, input_dim=2, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))
model.compile(loss="binary_crossentropy")
print("no. of layers", len(model.layers))

print("Running test_feature_space...")
test_feature_space()
# Generate data that looks like 2 concentric circles
t, _ = datasets.make_blobs(n_samples=200, centers=[[0, 0]], cluster_std=1, random_state=1)
X = np.array(list(filter(lambda x: x[0]**2 + x[1]**2 < 1 or x[0]**2 + x[1]**2 > 1.5, t)))
Y = np.array([1 if x[0]**2 + x[1]**2 >= 1 else 0 for x in X])

if __name__ == "__main__":
main()
viz = Visualizer(model)
viz.fit(X, Y, snap_freq=20, filename='feature space', duration=300, view_feature_space=True, batch_size=4, epochs=1000, verbose=0)

0 comments on commit 83dcc71

Please sign in to comment.