Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
ShenJinglong committed May 10, 2021
2 parents 7ea7c56 + 7856a81 commit 582b92d
Show file tree
Hide file tree
Showing 7 changed files with 522 additions and 33 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
__pycache__/
.vscode/
.vscode/
output/
50 changes: 47 additions & 3 deletions Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,36 @@
def compute_norm(data):
return sum([np.sum(item**2) for item in data])

"""
@brief: 极坐标转欧氏坐标
@param [polar_coordinate]: 要转换的极坐标 | 都是用普通列表表示的坐标
@return: 转换结果(欧氏坐标)
"""
def polar2euclid(polar_coordinate):
return [polar_coordinate[0] * np.math.cos(polar_coordinate[1]), polar_coordinate[0] * np.math.sin(polar_coordinate[1])]

"""
@brief: 欧氏坐标转极坐标
@param [polar_coordinate]: 要转换的欧氏坐标 | 都是用普通列表表示的坐标
@return: 转换结果(极坐标)
"""
def euclid2polar(euclid_coordinate):
return [np.math.sqrt(euclid_coordinate[0]**2 + euclid_coordinate[1]**2), np.math.atan2(euclid_coordinate[1], euclid_coordinate[0])]

class Client():
def __init__(self) -> None:
pass

def run(self, data, label):
def run(self, data, label, p_d):
self.__comm = CommClient('127.0.0.1', 12345)
self.__trainer = TR()
self.__hi = np.random.uniform()

self.__polar_position = p_d[0]
self.__polar_direction = p_d[1]
self.__euclid_position = polar2euclid(self.__polar_position)
self.__euclid_direction = polar2euclid(self.__polar_direction)

self.__hi = self.__polar_position[0]**(-params.PATHLOSS_FACTOR)
self.__transmit_power = params.CLIENT_TRANSMIT_POWER

for _ in range(params.ITERATION_NUM):
Expand All @@ -26,14 +48,36 @@ def run(self, data, label):
# 计算梯度的二范数
grad_norm = compute_norm(grad)
# 向服务器发送结果
self.__comm.send({'grad_norm': grad_norm, 'received_power': self.__hi * self.__transmit_power})
self.__comm.send({'grad_norm': grad_norm, 'received_power': self.__hi * self.__transmit_power, 'position': self.__euclid_position})
# 接收服务器的调度结果:1为调度,0为未调度
sche_sig = self.__comm.recv()
if sche_sig == 1:
# 被调度后更新模型,得到 local model
self.__trainer.train_with_grad(grad)
# 向服务器发送 local model
self.__comm.send(self.__trainer.get_weights())
self.__update_user()

def __update_user(self):
self.__move(1)
self.__hi = self.__polar_position[0]**(-params.PATHLOSS_FACTOR)

def __move(self, time_elapsed):
distance = self.__polar_direction[0] * time_elapsed
pose_d = polar2euclid([distance, self.__polar_direction[1]])
self.__euclid_position[0] += pose_d[0]
self.__euclid_position[1] += pose_d[1]

self.__polar_position = euclid2polar(self.__euclid_position)

if self.__polar_position[0] > 100:
normal_dir = polar2euclid([1, self.__polar_position[1]])
dot_product = self.__euclid_direction[0] * normal_dir[0] + self.__euclid_direction[1] * normal_dir[1]
polar_rho_vec = [dot_product, self.__polar_position[1]]
euclid_rho_vec = polar2euclid(polar_rho_vec)
euclid_side_vec = [self.__euclid_direction[0] - euclid_rho_vec[0], self.__euclid_direction[1] - euclid_rho_vec[1]]
self.__euclid_direction[0], self.__euclid_direction[1] = euclid_side_vec[0] - euclid_rho_vec[0], euclid_side_vec[1] - euclid_rho_vec[1]
self.__polar_direction = euclid2polar(self.__euclid_direction)

if __name__ == '__main__':
client = Client()
Expand Down
169 changes: 148 additions & 21 deletions Server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

from Client import Client
import numpy as np
import matplotlib.pyplot as plt

Expand Down Expand Up @@ -29,7 +30,8 @@ def random_select():
return sche_sig, a

def get_uplink_delay(received_power):
I = np.random.random((params.RB_NUM,1))
# I = np.zeros((params.RB_NUM,1))
I = np.random.uniform(1e-4, 0.01, size=(params.RB_NUM,1))
SINR = received_power / (I + params.UPLINK_BANDWIDTH * params.NOISE_POWER_SPECTRAL_DENSITY)
rate = params.UPLINK_BANDWIDTH * np.log2(1 + SINR)
delay = params.MODEL_SIZE / rate
Expand Down Expand Up @@ -90,7 +92,7 @@ def get_RB_schedual(eng, msg, user_sche):
# print(np.max(r_delay_ma, axis=0))
# print(result[-1])
# print(np.argmax(r_delay_ma, axis=0))
return np.argmax(r_delay_ma, axis=0), np.max(r_delay_ma, axis=0), result[-1]
return np.argmax(r_delay_ma, axis=0), np.max(r_delay_ma, axis=0), result[-1][0]

def random_RB_schedual(msg, user_sche):
received_power = []
Expand All @@ -116,6 +118,91 @@ def random_RB_schedual(msg, user_sche):
user_delay.append(0)
return RB_sche, user_delay, max(user_delay)

def joint_user_RB_sche(eng, msg):
import matlab

kappa = 10 # 梯度在优化问题里所占的权重

received_power = []
for id in range(params.CLIENT_NUM):
received_power.append(msg[id]['received_power'])
uplink_delay = get_uplink_delay(np.array([received_power]))

# 构造不等式约束
part_1 = []
for i in range(params.CLIENT_NUM):
part_1.append(np.identity(params.RB_NUM))
part_1.append(np.zeros((params.RB_NUM, 1 + params.CLIENT_NUM)))
part_1 = np.concatenate(part_1, axis=1)
part_2 = []
for i in range(params.CLIENT_NUM):
ma = np.zeros((params.CLIENT_NUM, params.RB_NUM))
ma[i, :] = uplink_delay[:, i].transpose()
part_2.append(ma)
part_2.append(np.zeros((params.CLIENT_NUM, params.CLIENT_NUM)))
part_2.append(-np.ones((params.CLIENT_NUM, 1)))
part_2 = np.concatenate(part_2, axis=1)
A = matlab.double(np.concatenate([part_1, part_2], axis=0).tolist())
b = matlab.double(np.concatenate([np.ones((params.RB_NUM, 1)), np.zeros((params.CLIENT_NUM, 1))], axis=0).tolist())

# 构造等式约束
part_1 = []
for i in range(params.CLIENT_NUM):
ma = np.zeros((params.CLIENT_NUM, params.RB_NUM))
ma[i, :] = np.ones((1, params.RB_NUM))
part_1.append(ma)
part_1.append(-np.identity(params.CLIENT_NUM))
part_1.append(np.zeros((params.CLIENT_NUM, 1)))
part_1 = np.concatenate(part_1, axis=1)
part_2 = np.zeros((1, params.RB_NUM * params.CLIENT_NUM + params.CLIENT_NUM + 1))
part_2[0, params.RB_NUM * params.CLIENT_NUM:params.RB_NUM * params.CLIENT_NUM + params.CLIENT_NUM] = np.ones((1, params.CLIENT_NUM))
Aeq = matlab.double(np.concatenate([part_1, part_2], axis=0).tolist())
a = np.zeros((params.CLIENT_NUM + 1, 1))
a[-1,0] = params.SCHEDUAL_SIZE
beq = matlab.double(a.tolist())

# 限定上下界
lb = matlab.double(np.zeros((params.CLIENT_NUM * params.RB_NUM + params.CLIENT_NUM + 1,)).tolist())
ub = np.ones((params.CLIENT_NUM * params.RB_NUM + params.CLIENT_NUM + 1,))
ub[-1] = np.Inf
ub = matlab.double(ub.tolist())

# 目标函数
grad_norms = []
for id in range(params.CLIENT_NUM):
grad_norms.append(msg[id]['grad_norm'])
f = np.zeros((params.RB_NUM * params.CLIENT_NUM + params.CLIENT_NUM + 1, 1))
f[params.RB_NUM * params.CLIENT_NUM:params.RB_NUM * params.CLIENT_NUM + params.CLIENT_NUM] = -kappa * np.array([grad_norms]).transpose()
f[-1, 0] = 1
f = matlab.double(f.tolist())
intcon = matlab.double(list(range(1, params.CLIENT_NUM * params.RB_NUM + params.CLIENT_NUM + 1)))

# 求解
result = eng.intlinprog(f, intcon, A, b, Aeq, beq, lb, ub)
result = np.array(result)
allo_ma = result[:params.CLIENT_NUM * params.RB_NUM].reshape((params.CLIENT_NUM, params.RB_NUM)).transpose()
r_delay_ma = allo_ma * uplink_delay
RB_sche = np.argmax(r_delay_ma, axis=0),
user_delay = np.max(r_delay_ma, axis=0),
iter_delay = result[-1][0]

sche_ma = result[params.CLIENT_NUM * params.RB_NUM:params.CLIENT_NUM * params.RB_NUM + params.CLIENT_NUM]
sche_sig = {}
a = []
for id in range(params.CLIENT_NUM):
if sche_ma[id][0] > 0.5:
sche_sig[id] = 1
a.append(id)
else:
sche_sig[id] = 0
return sche_sig, a, RB_sche, user_delay, iter_delay

def draw_circle(ax):
thetas = np.linspace(0, np.math.pi*2, 200)
x = 100 * np.cos(thetas)
y = 100 * np.sin(thetas)
ax.plot(x, y)

class Server():
def __init__(self) -> None:
pass
Expand All @@ -131,9 +218,16 @@ def run(self, data, label):
acc_recorder = []
iter_recorder = []
uplink_delay_recorder = []
schedualed_user_info = {}
uplink_delay_cost = []

# plt.figure(figsize=(8, 6), dpi=80)
f, (ax1, ax2, ax3) = plt.subplots(3, 1)
f, axes = plt.subplots(3, 2)
for ax in axes[:, 0]:
ax.remove()
axes = axes[:, 1]
gs = axes[0].get_gridspec()
axbig = f.add_subplot(gs[:, 0])
plt.ion()

for iter in range(params.ITERATION_NUM):
Expand All @@ -147,11 +241,13 @@ def run(self, data, label):
msg = self.__comm.recv_all()
print('gradient received ...')
# 基于梯度范数给出调度结果
sche_sig, a = select_based_on_weight(msg)
# sche_sig, a = select_based_on_weight(msg)
# sche_sig, a = random_select() # 随机调度
# 给出资源分配结果
RB_sche, user_delay, iter_delay = get_RB_schedual(eng, msg, sche_sig)
# RB_sche, user_delay, iter_delay = get_RB_schedual(eng, msg, sche_sig)
# RB_sche, user_delay, iter_delay = random_RB_schedual(msg, sche_sig) # 随机分配
sche_sig, a, RB_sche, user_delay, iter_delay = joint_user_RB_sche(eng, msg)
# print("schedualed user num : ", len(a))
# 下发调度结果
print('sending schedual command ...')
self.__comm.send(sche_sig)
Expand All @@ -167,28 +263,59 @@ def run(self, data, label):
acc_recorder.append(eval_result[1])
iter_recorder.append(iter)
uplink_delay_recorder.append(iter_delay)
user_position_recorder = []
for i in range(params.CLIENT_NUM):
user_position_recorder.append(msg[i]['position'])
if len(uplink_delay_cost) == 0:
uplink_delay_cost.append(iter_delay)
else:
uplink_delay_cost.append(iter_delay + uplink_delay_cost[-1])
schedualed_user_info[iter] = {sche_id: {'position': msg[sche_id]['position'], 'grad_norm': msg[sche_id]['grad_norm']} for sche_id in a}

ax1.clear()
ax1.set_xlim(0, params.ITERATION_NUM)
ax1.set_ylim(0, 1)
ax1.plot(iter_recorder, acc_recorder, label='acc')
ax1.legend()
ax2.clear()
ax2.set_xlim(0, params.ITERATION_NUM)
ax2.set_ylim(0, 3)
ax2.plot(iter_recorder, loss_recorder, label='loss')
ax2.legend()
ax3.clear()
ax3.set_xlim(0, params.ITERATION_NUM)
ax3.plot(iter_recorder, uplink_delay_recorder, label='uplink delay')
ax3.plot([0, params.ITERATION_NUM], [np.mean(uplink_delay_recorder)]*2, label='mean value')
ax3.text(int(params.ITERATION_NUM / 2), np.mean(uplink_delay_recorder) + 2, f'{np.mean(uplink_delay_recorder):.2f}')
ax3.legend()
axes[0].clear()
axes[0].set_xlim(0, params.ITERATION_NUM)
axes[0].set_ylim(0, 1)
axes[0].plot(iter_recorder, acc_recorder, label='acc')
axes[0].legend()
# axes[1].clear()
# axes[1].set_xlim(0, params.ITERATION_NUM)
# axes[1].set_ylim(0, 3)
# axes[1].plot(iter_recorder, loss_recorder, label='loss')
# axes[1].legend()
axes[1].clear()
axes[1].set_xlim(0, params.ITERATION_NUM)
axes[1].plot(iter_recorder, uplink_delay_cost, label='uplink delay cost')
axes[1].legend()
axes[2].clear()
axes[2].set_xlim(0, params.ITERATION_NUM)
axes[2].plot(iter_recorder, uplink_delay_recorder, label='uplink delay')
axes[2].plot([0, params.ITERATION_NUM], [np.mean(uplink_delay_recorder)]*2, label='mean value')
axes[2].text(int(params.ITERATION_NUM / 2), np.mean(uplink_delay_recorder) * 0.8, f'{np.mean(uplink_delay_recorder):.2f}')
axes[2].legend()
axbig.clear()
axbig.axis('equal')
draw_circle(axbig)
for i, p in enumerate(user_position_recorder):
axbig.scatter(p[0], p[1])
if i in a:
axbig.plot([0, p[0]], [0, p[1]])

plt.pause(0.0001)

plt.ioff()
plt.show()

# 保存数据
# np.savez(
# f'./output/random_with_sche_info_2.npz',
# iter_recorder = iter_recorder,
# acc_recorder = acc_recorder,
# loss_recorder = loss_recorder,
# uplink_delay_recorder = uplink_delay_recorder,
# uplink_delay_cost = uplink_delay_cost,
# schedualed_user_info = schedualed_user_info
# )

eng.exit()

if __name__ == '__main__':
Expand Down
5 changes: 4 additions & 1 deletion TFTrainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ def evaluate(self, data, label):
return self.__model.evaluate(data, label)

def get_weights(self):
return self.__model.get_weights()
return self.__model.get_weights()

if __name__ == '__main__':
r = TFTrainer()
33 changes: 31 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@
from MNIST.MNISTReader import MNISTImageReader as ds_r
from MNIST.MNISTReader import MNISTLabelReader as lb_r

def classify_dataset(data, label):
d = [[], [], [], [], [], [], [], [], [], []]
print(d)
print(data.shape)
print(label.shape)

for i in range(60000):
d[label[i]].append(data[i, :, :, :])

for i in range(10):
d[i] = np.array(d[i])

return d

if __name__ == '__main__':
server = Server()
clients = []
Expand All @@ -34,15 +48,30 @@
edr.close()
elr.close()

np.random.seed(2)
server_p = mp.Process(target=server.run, args=(edata, elabel))
clients_p = []
local_data_size = int(np.floor(params.DATASET_SIZE_USED_TO_TRAIN / params.CLIENT_NUM))
for i, client in enumerate(clients):
clients_p.append(mp.Process(target=client.run, args=(
data[i*local_data_size:(i+1)*local_data_size, :, :, :],
label[i*local_data_size:(i+1)*local_data_size]
label[i*local_data_size:(i+1)*local_data_size],
[[np.random.uniform(0, 100), np.random.uniform(0, 2*np.math.pi)],
[np.random.uniform(3, 5), np.random.uniform(0, 2*np.math.pi)]]
)))


# result = classify_dataset(data, label)
# np.random.seed(4)
# server_p = mp.Process(target=server.run, args=(edata, elabel))
# clients_p = []
# for i, client in enumerate(clients):
# clients_p.append(mp.Process(target=client.run, args=(
# result[i],
# i * np.ones((len(result[i]),), dtype=np.uint8),
# [[np.random.uniform(0, 100), np.random.uniform(0, 2*np.math.pi)],
# [np.random.uniform(3, 5), np.random.uniform(0, 2*np.math.pi)]]
# )))

server_p.start()
for client_p in clients_p:
client_p.start()
Expand Down
Loading

0 comments on commit 582b92d

Please sign in to comment.