电商运营部门进行经营情况分析,一般会包括哪些数据?
电商运营部门进行经营情况分析通常会关注以下几类关键数据指标:
- 总销售额(GMV) - 平台总交易额
- 订单量 - 日/周/月订单总数
- 平均客单价 - 客户平均订单金额
- 销售转化率 - 访客到下单的转化比例
- 爆款与滞销商品分析 - 销售排名、库存周转率
- 退货/退款率 - 按品类、商品、供应商等维度分析
- 用户增长数据 - 新客户获取量、获客成本(CAC)
- 用户留存分析 - 7/30/90天留存率
- 复购率 - 重复购买行为分析
- 用户生命周期价值(LTV) - 客户长期贡献价值
- 用户分层 - RFM模型(近度、频度、金额)分析
- 用户画像数据 - 年龄、性别、地域分布等
- 流量指标 - PV(页面浏览量)、UV(独立访客数)
- 渠道分析 - 各来源渠道流量与转化率
- 搜索关键词表现 - 热搜词、转化率
- 广告投放效果 - 点击率(CTR)、转化成本(CPC)、投资回报率(ROI)
- 自然流量vs付费流量 - 占比与效果对比
- 类目销售结构 - 各品类销售占比、增长率
- 产品表现 - 浏览量、加购率、转化率
- 产品评价 - 评分、评论数量、好评率
- 价格敏感度 - 价格变动对销量影响
- 促销活动效果 - 活动期间销售提升、ROI
- 优惠券使用情况 - 领取率、使用率、转化贡献
- 会员营销数据 - 会员复购率、会员贡献占比
- 内容营销效果 - 内容曝光、互动、转化数据
- 毛利率 - 整体及分品类毛利水平
- 营销费用率 - 营销支出占销售比例
- 履约成本 - 物流、客服、包装等成本
- 退货成本 - 退货处理、物流损耗
- 库存周转率 - 库存效率指标
- 断货率 - 缺货情况分析
- 库存结构 - 库龄分析、滞销品监控
- 供应商表现 - 供应商交付及质量评价
- 客服工单量 - 问题分类统计
- 客户满意度 - NPS评分、服务评价
- 问题解决时效 - 平均响应和解决时间
- 常见问题分析 - 热点问题识别与改进
通过对以上数据的综合分析,电商运营部门可以全面把握经营情况,及时调整策略,优化用户体验,提升经营效率和盈利能力。在大型电商平台,这些数据通常通过BI系统进行多维度交叉分析,形成可视化仪表盘,支持运营决策。
我想mock一个数据库,基于这个数据库做一个Demo,Demo内容包括分析销售情况,销售的增长或者减少的原因分析。请你帮忙设计数据库Schema
为支持电商销售情况及原因分析的Demo,我设计了以下数据库Schema,包含核心业务实体和关联关系,以便进行多维度分析。
CREATE TABLE users (
user_id INT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
phone VARCHAR(20),
gender VARCHAR(10),
age INT,
city VARCHAR(50),
province VARCHAR(50),
registration_date DATE,
user_level VARCHAR(20),
is_active BOOLEAN,
user_source VARCHAR(50)
);
CREATE TABLE product_categories (
category_id INT PRIMARY KEY,
category_name VARCHAR(100),
parent_category_id INT NULL,
category_level INT,
is_active BOOLEAN,
FOREIGN KEY (parent_category_id) REFERENCES product_categories(category_id)
);
CREATE TABLE products (
product_id INT PRIMARY KEY,
product_name VARCHAR(200),
category_id INT,
brand VARCHAR(100),
supplier VARCHAR(100),
original_price DECIMAL(10, 2),
current_price DECIMAL(10, 2),
cost DECIMAL(10, 2),
stock_quantity INT,
create_time DATE,
is_active BOOLEAN,
FOREIGN KEY (category_id) REFERENCES product_categories(category_id)
);
CREATE TABLE orders (
order_id INT PRIMARY KEY,
user_id INT,
order_date TIMESTAMP,
total_amount DECIMAL(10, 2),
discount_amount DECIMAL(10, 2),
payment_method VARCHAR(50),
payment_status VARCHAR(20),
shipping_address VARCHAR(200),
order_status VARCHAR(20),
order_source VARCHAR(50),
device_type VARCHAR(20),
FOREIGN KEY (user_id) REFERENCES users(user_id)
);
CREATE TABLE order_items (
order_item_id INT PRIMARY KEY,
order_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(10, 2),
discount DECIMAL(10, 2),
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
);
CREATE TABLE marketing_campaigns (
campaign_id INT PRIMARY KEY,
campaign_name VARCHAR(100),
campaign_type VARCHAR(50),
start_date DATE,
end_date DATE,
budget DECIMAL(10, 2),
target_audience VARCHAR(100),
discount_type VARCHAR(50),
discount_value DECIMAL(10, 2),
is_active BOOLEAN
);
CREATE TABLE order_campaign_map (
id INT PRIMARY KEY,
order_id INT,
campaign_id INT,
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (campaign_id) REFERENCES marketing_campaigns(campaign_id)
);
CREATE TABLE visit_logs (
log_id INT PRIMARY KEY,
user_id INT NULL,
session_id VARCHAR(100),
page_url VARCHAR(200),
referrer_url VARCHAR(200),
visit_time TIMESTAMP,
device_type VARCHAR(50),
ip_address VARCHAR(50),
stay_duration INT,
FOREIGN KEY (user_id) REFERENCES users(user_id)
);
CREATE TABLE inventory_records (
record_id INT PRIMARY KEY,
product_id INT,
change_date TIMESTAMP,
quantity_change INT,
reason VARCHAR(100),
FOREIGN KEY (product_id) REFERENCES products(product_id)
);
CREATE TABLE price_changes (
change_id INT PRIMARY KEY,
product_id INT,
change_date TIMESTAMP,
old_price DECIMAL(10, 2),
new_price DECIMAL(10, 2),
reason VARCHAR(100),
FOREIGN KEY (product_id) REFERENCES products(product_id)
);
CREATE TABLE user_behaviors (
behavior_id INT PRIMARY KEY,
user_id INT,
product_id INT,
behavior_type VARCHAR(50), -- 点击、加购、收藏等
behavior_time TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(user_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
);
CREATE TABLE traffic_sources (
source_id INT PRIMARY KEY,
source_name VARCHAR(100),
source_type VARCHAR(50), -- 搜索引擎、社交媒体、直接访问等
campaign_id INT NULL,
FOREIGN KEY (campaign_id) REFERENCES marketing_campaigns(campaign_id)
);
CREATE TABLE returns (
return_id INT PRIMARY KEY,
order_id INT,
order_item_id INT NULL,
return_date TIMESTAMP,
return_reason VARCHAR(200),
return_status VARCHAR(50),
refund_amount DECIMAL(10, 2),
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (order_item_id) REFERENCES order_items(order_item_id)
);
CREATE TABLE reviews (
review_id INT PRIMARY KEY,
order_id INT,
product_id INT,
user_id INT,
rating INT,
comment TEXT,
review_date TIMESTAMP,
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id),
FOREIGN KEY (user_id) REFERENCES users(user_id)
);
基于此数据库Schema,你可以展示以下销售分析场景:
- 按日/周/月/季度/年的销售额及订单量趋势
- 不同地区销售分布及增长情况
- 各品类销售占比变化
- 营销活动影响:关联营销活动表分析不同活动对销售的拉动效果
- 价格敏感度:通过价格变动表分析价格调整与销量的关系
- 库存影响:分析库存波动对销售的影响
- 季节性因素:发现销售的周期性规律
- 用户行为转化:分析用户浏览-加购-下单的转化漏斗
- 渠道归因:各流量渠道贡献的销售额及转化率
- 设备归因:不同终端设备的销售贡献及转化效率
- 用户分层:新老客户结构及贡献变化
销售增长原因分析查询示例:
-- 促销活动对销售的影响分析
SELECT
c.campaign_name,
COUNT(DISTINCT o.order_id) AS order_count,
SUM(o.total_amount) AS total_sales,
AVG(o.total_amount) AS avg_order_value
FROM orders o
JOIN order_campaign_map ocm ON o.order_id = ocm.order_id
JOIN marketing_campaigns c ON ocm.campaign_id = c.campaign_id
WHERE o.order_date BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY c.campaign_name
ORDER BY total_sales DESC;
-- 价格变动对销售的影响分析
WITH price_change_periods AS (
SELECT
p.product_id,
p.product_name,
pc.change_date,
LEAD(pc.change_date) OVER (PARTITION BY p.product_id ORDER BY pc.change_date) AS next_change_date,
pc.old_price,
pc.new_price,
(pc.new_price - pc.old_price) / pc.old_price * 100 AS price_change_pct
FROM price_changes pc
JOIN products p ON pc.product_id = p.product_id
)
SELECT
pcp.product_id,
pcp.product_name,
pcp.change_date,
pcp.price_change_pct,
-- 价格变动前15天的销量
(SELECT COALESCE(SUM(oi.quantity), 0)
FROM order_items oi
JOIN orders o ON oi.order_id = o.order_id
WHERE oi.product_id = pcp.product_id
AND o.order_date BETWEEN pcp.change_date - INTERVAL '15 day' AND pcp.change_date) AS sales_before,
-- 价格变动后15天的销量
(SELECT COALESCE(SUM(oi.quantity), 0)
FROM order_items oi
JOIN orders o ON oi.order_id = o.order_id
WHERE oi.product_id = pcp.product_id
AND o.order_date BETWEEN pcp.change_date AND pcp.change_date + INTERVAL '15 day') AS sales_after
FROM price_change_periods pcp
ORDER BY pcp.price_change_pct DESC;
这个数据库Schema设计提供了全面的电商数据分析基础,可以支持多维度的销售情况分析和原因探究,适合用于电商运营分析Demo展示。
请你生成一个Python脚本,向AWS MySQL写入生成这些表,并mock合理的数据,我后面基于mock的数据做分析demo。设计几个变量,RDS endpoint, user, password,我后面会手动添加
- 假设电商主要销售泳衣,同时也在向成衣,饰品,帽子和包等品类拓展
- 销售价格在5-50美元之间
- 主要销售国家是美国(约70%),其次是加拿大、英国、法国、澳大利亚
- 50%收入来自独立站,40%来自Amazon商城,其它商城约10%
- 流量来自直接访问、搜索、和社交媒体(Facebook等)
- 流量趋势:1月份开始,销售数据较12月明显增长。一直持续增长到7月,从8月开始下降
import pymysql
import random
from faker import Faker
from datetime import datetime, timedelta
import time
# 数据库连接配置 - 需要手动添加
RDS_ENDPOINT = "your-rds-endpoint.amazonaws.com"
RDS_USER = "your_username"
RDS_PASSWORD = "your_password"
RDS_DB_NAME = "swimwear_ecommerce"
# 初始化Faker生成器
fake = Faker()
def create_database():
# 连接到MySQL服务器(不指定数据库)
conn = pymysql.connect(
host=RDS_ENDPOINT,
user=RDS_USER,
password=RDS_PASSWORD
)
cursor = conn.cursor()
try:
# 创建数据库
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {RDS_DB_NAME}")
print(f"数据库 {RDS_DB_NAME} 创建成功或已存在")
except Exception as e:
print(f"创建数据库时出错: {e}")
finally:
cursor.close()
conn.close()
def connect_to_db():
# 连接到指定数据库
return pymysql.connect(
host=RDS_ENDPOINT,
user=RDS_USER,
password=RDS_PASSWORD,
db=RDS_DB_NAME
)
def create_tables(conn):
cursor = conn.cursor()
# 创建用户表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
phone VARCHAR(20),
gender VARCHAR(10),
age INT,
city VARCHAR(50),
country VARCHAR(50),
registration_date DATE,
user_level VARCHAR(20),
is_active BOOLEAN,
user_source VARCHAR(50)
)
""")
# 创建商品分类表
cursor.execute("""
CREATE TABLE IF NOT EXISTS product_categories (
category_id INT PRIMARY KEY,
category_name VARCHAR(100),
parent_category_id INT NULL,
category_level INT,
is_active BOOLEAN
)
""")
# 创建商品表
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
product_id INT PRIMARY KEY,
product_name VARCHAR(200),
category_id INT,
brand VARCHAR(100),
supplier VARCHAR(100),
original_price DECIMAL(10, 2),
current_price DECIMAL(10, 2),
cost DECIMAL(10, 2),
stock_quantity INT,
create_time DATE,
is_active BOOLEAN,
FOREIGN KEY (category_id) REFERENCES product_categories(category_id)
)
""")
# 创建订单表
cursor.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id INT PRIMARY KEY,
user_id INT,
order_date TIMESTAMP,
total_amount DECIMAL(10, 2),
discount_amount DECIMAL(10, 2),
payment_method VARCHAR(50),
payment_status VARCHAR(20),
shipping_address VARCHAR(200),
order_status VARCHAR(20),
order_source VARCHAR(50),
device_type VARCHAR(20),
FOREIGN KEY (user_id) REFERENCES users(user_id)
)
""")
# 创建订单详情表
cursor.execute("""
CREATE TABLE IF NOT EXISTS order_items (
order_item_id INT PRIMARY KEY,
order_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(10, 2),
discount DECIMAL(10, 2),
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
)
""")
# 创建营销活动表
cursor.execute("""
CREATE TABLE IF NOT EXISTS marketing_campaigns (
campaign_id INT PRIMARY KEY,
campaign_name VARCHAR(100),
campaign_type VARCHAR(50),
start_date DATE,
end_date DATE,
budget DECIMAL(10, 2),
target_audience VARCHAR(100),
discount_type VARCHAR(50),
discount_value DECIMAL(10, 2),
is_active BOOLEAN
)
""")
# 创建订单营销关联表
cursor.execute("""
CREATE TABLE IF NOT EXISTS order_campaign_map (
id INT PRIMARY KEY,
order_id INT,
campaign_id INT,
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (campaign_id) REFERENCES marketing_campaigns(campaign_id)
)
""")
# 创建访问日志表
cursor.execute("""
CREATE TABLE IF NOT EXISTS visit_logs (
log_id INT PRIMARY KEY,
user_id INT NULL,
session_id VARCHAR(100),
page_url VARCHAR(200),
referrer_url VARCHAR(200),
visit_time TIMESTAMP,
device_type VARCHAR(50),
ip_address VARCHAR(50),
stay_duration INT,
FOREIGN KEY (user_id) REFERENCES users(user_id)
)
""")
# 创建库存记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS inventory_records (
record_id INT PRIMARY KEY,
product_id INT,
change_date TIMESTAMP,
quantity_change INT,
reason VARCHAR(100),
FOREIGN KEY (product_id) REFERENCES products(product_id)
)
""")
# 创建价格变动表
cursor.execute("""
CREATE TABLE IF NOT EXISTS price_changes (
change_id INT PRIMARY KEY,
product_id INT,
change_date TIMESTAMP,
old_price DECIMAL(10, 2),
new_price DECIMAL(10, 2),
reason VARCHAR(100),
FOREIGN KEY (product_id) REFERENCES products(product_id)
)
""")
# 创建用户行为表
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_behaviors (
behavior_id INT PRIMARY KEY,
user_id INT,
product_id INT,
behavior_type VARCHAR(50),
behavior_time TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(user_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
)
""")
# 创建渠道来源表
cursor.execute("""
CREATE TABLE IF NOT EXISTS traffic_sources (
source_id INT PRIMARY KEY,
source_name VARCHAR(100),
source_type VARCHAR(50),
campaign_id INT NULL,
FOREIGN KEY (campaign_id) REFERENCES marketing_campaigns(campaign_id)
)
""")
# 创建退款/退货表
cursor.execute("""
CREATE TABLE IF NOT EXISTS returns (
return_id INT PRIMARY KEY,
order_id INT,
order_item_id INT NULL,
return_date TIMESTAMP,
return_reason VARCHAR(200),
return_status VARCHAR(50),
refund_amount DECIMAL(10, 2),
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (order_item_id) REFERENCES order_items(order_item_id)
)
""")
# 创建评价表
cursor.execute("""
CREATE TABLE IF NOT EXISTS reviews (
review_id INT PRIMARY KEY,
order_id INT,
product_id INT,
user_id INT,
rating INT,
comment TEXT,
review_date TIMESTAMP,
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id),
FOREIGN KEY (user_id) REFERENCES users(user_id)
)
""")
conn.commit()
print("所有表格创建成功")
def generate_product_categories(conn):
cursor = conn.cursor()
# 清除已有数据
cursor.execute("DELETE FROM product_categories")
categories = [
(1, "泳装", None, 1, True),
(11, "女士泳衣", 1, 2, True),
(12, "男士泳裤", 1, 2, True),
(13, "儿童泳衣", 1, 2, True),
(14, "比基尼", 1, 2, True),
(15, "沙滩配件", 1, 2, True),
(2, "成衣", None, 1, True),
(21, "女士上装", 2, 2, True),
(22, "女士下装", 2, 2, True),
(23, "男士上装", 2, 2, True),
(24, "男士下装", 2, 2, True),
(3, "配饰", None, 1, True),
(31, "项链", 3, 2, True),
(32, "手链", 3, 2, True),
(33, "耳环", 3, 2, True),
(4, "帽子", None, 1, True),
(41, "棒球帽", 4, 2, True),
(42, "太阳帽", 4, 2, True),
(43, "沙滩帽", 4, 2, True),
(5, "包袋", None, 1, True),
(51, "手提包", 5, 2, True),
(52, "沙滩包", 5, 2, True),
(53, "背包", 5, 2, True)
]
for category in categories:
cursor.execute("""
INSERT INTO product_categories (category_id, category_name, parent_category_id, category_level, is_active)
VALUES (%s, %s, %s, %s, %s)
""", category)
conn.commit()
print(f"已插入 {len(categories)} 个产品类别")
def generate_products(conn):
cursor = conn.cursor()
# 清除已有数据
cursor.execute("DELETE FROM products")
brands = ["OceanBreeze", "SunnyDays", "WaveRider", "BeachLife", "TropicalVibes", "CoastalChic"]
suppliers = ["SeaFashion Inc.", "BeachGear Supply", "Aquatic Styles", "Summer Collections Ltd", "Coastal Manufacturers"]
# 产品名称模板
product_templates = {
11: ["女式连体泳衣", "流行女士泳衣", "修身女士泳衣", "性感露背泳衣"],
12: ["男士沙滩短裤", "男士游泳短裤", "快干泳裤", "时尚男士泳裤"],
13: ["儿童可爱泳衣", "防晒儿童泳装", "卡通儿童泳衣"],
14: ["时尚比基尼套装", "性感分体泳衣", "热带风情比基尼"],
15: ["防水沙滩包", "防晒沙滩巾", "沙滩拖鞋"],
21: ["女士T恤", "女士衬衫", "女士背心"],
22: ["女士短裤", "女士牛仔裤", "女士裙装"],
23: ["男士T恤", "男士衬衫", "男士背心"],
24: ["男士短裤", "男士牛仔裤", "男士休闲裤"],
31: ["波西米亚项链", "简约风项链", "贝壳项链"],
32: ["手工串珠手链", "银质手链", "编织手链"],
33: ["贝壳耳环", "波西米亚耳环", "简约耳钉"],
41: ["标志棒球帽", "复古棒球帽"],
42: ["宽边太阳帽", "折叠太阳帽"],
43: ["草编沙滩帽", "防晒沙滩帽"],
51: ["手提购物包", "编织手提包"],
52: ["防水沙滩包", "网眼沙滩包"],
53: ["轻便背包", "防水背包"]
}
products = []
product_id = 1
# 生成产品数据,泳装类别占70%的产品
for category_id in product_templates:
# 确定每个类别产品数量
if category_id in [11, 12, 13, 14, 15]: # 泳装类别
count = random.randint(20, 30)
elif category_id in [21, 22, 23, 24]: # 成衣类别
count = random.randint(8, 15)
else: # 其他类别
count = random.randint(5, 10)
for _ in range(count):
template = random.choice(product_templates[category_id])
color = random.choice(["红色", "蓝色", "黑色", "白色", "粉色", "绿色", "黄色", "紫色"])
size = random.choice(["S", "M", "L", "XL"]) if category_id not in [31, 32, 33] else ""
product_name = f"{template} {color}" + (f" {size}" if size else "")
brand = random.choice(brands)
supplier = random.choice(suppliers)
# 价格设置在5-50美元之间,成本为售价的60%左右
if category_id in [11, 12, 13, 14]: # 泳装主要产品
current_price = round(random.uniform(20, 50), 2)
elif category_id in [31, 32, 33]: # 配饰
current_price = round(random.uniform(5, 15), 2)
else: # 其他产品
current_price = round(random.uniform(10, 40), 2)
original_price = round(current_price * random.uniform(1, 1.2), 2)
cost = round(current_price * random.uniform(0.55, 0.65), 2)
stock = random.randint(50, 500)
create_date = (datetime.now() - timedelta(days=random.randint(180, 720))).strftime('%Y-%m-%d')
products.append((
product_id,
product_name,
category_id,
brand,
supplier,
original_price,
current_price,
cost,
stock,
create_date,
True
))
product_id += 1
for product in products:
cursor.execute("""
INSERT INTO products (product_id, product_name, category_id, brand, supplier, original_price,
current_price, cost, stock_quantity, create_time, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", product)
conn.commit()
print(f"已插入 {len(products)} 个产品")
return product_id - 1 # 返回最大产品ID用于后续引用
def generate_users(conn, count=3000):
cursor = conn.cursor()
# 清除已有数据
cursor.execute("DELETE FROM users")
users = []
# 国家分布:美国70%,其他国家共30%
countries = {
"United States": 0.7,
"Canada": 0.1,
"United Kingdom": 0.1,
"France": 0.05,
"Australia": 0.05
}
# 用户来源
sources = ["Organic Search", "Direct", "Social Media", "Referral", "Email Campaign"]
for user_id in range(1, count+1):
# 选择国家,根据权重
country = random.choices(list(countries.keys()), weights=list(countries.values()))[0]
# 创建注册日期 - 分布在过去两年时间
reg_days_ago = random.randint(1, 730)
reg_date = (datetime.now() - timedelta(days=reg_days_ago)).strftime('%Y-%m-%d')
# 用户等级
days_since_reg = (datetime.now() - datetime.strptime(reg_date, '%Y-%m-%d')).days
purchases = max(0, int(days_since_reg / 60) + random.randint(-2, 2)) # 大致每60天购买一次
if purchases >= 10:
user_level = "VIP"
elif purchases >= 5:
user_level = "Gold"
elif purchases >= 2:
user_level = "Silver"
else:
user_level = "Bronze"
user = (
user_id,
fake.user_name(),
fake.email(),
fake.phone_number(),
random.choice(["Male", "Female"]),
random.randint(18, 65),
fake.city(),
country,
reg_date,
user_level,
random.random() > 0.1, # 90%是活跃用户
random.choice(sources)
)
users.append(user)
for user in users:
cursor.execute("""
INSERT INTO users (user_id, username, email, phone, gender, age, city, country, registration_date,
user_level, is_active, user_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", user)
conn.commit()
print(f"已插入 {count} 个用户")
def generate_marketing_campaigns(conn):
cursor = conn.cursor()
# 清除已有数据
cursor.execute("DELETE FROM marketing_campaigns")
campaigns = [
(1, "新年特惠", "Discount", "2023-01-01", "2023-01-15", 5000.00, "All Users", "Percentage", 20.00, False),
(2, "春季泳装上新", "New Arrival", "2023-03-01", "2023-03-31", 8000.00, "Female Users", "Percentage", 10.00, False),
(3, "夏日狂欢", "Holiday", "2023-06-01", "2023-06-30", 10000.00, "All Users", "Percentage", 15.00, False),
(4, "返校季促销", "Back to School", "2023-08-15", "2023-09-15", 6000.00, "Students", "Fixed", 5.00, False),
(5, "黑色星期五", "Holiday", "2023-11-24", "2023-11-28", 15000.00, "All Users", "Percentage", 30.00, False),
(6, "圣诞特惠", "Holiday", "2023-12-15", "2023-12-25", 12000.00, "All Users", "Percentage", 25.00, True),
(7, "会员专享", "Membership", "2023-01-01", "2023-12-31", 5000.00, "VIP & Gold Members", "Percentage", 10.00, True),
(8, "首单立减", "New Customer", "2023-01-01", "2023-12-31", 3000.00, "New Users", "Fixed", 5.00, True)
]
for campaign in campaigns:
cursor.execute("""
INSERT INTO marketing_campaigns (campaign_id, campaign_name, campaign_type, start_date, end_date,
budget, target_audience, discount_type, discount_value, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", campaign)
conn.commit()
print(f"已插入 {len(campaigns)} 个营销活动")
def generate_traffic_sources(conn):
cursor = conn.cursor()
# 清除已有数据
cursor.execute("DELETE FROM traffic_sources")
sources = [
(1, "Direct", "Direct", None),
(2, "Google Organic", "Search", None),
(3, "Bing Organic", "Search", None),
(4, "Facebook", "Social", None),
(5, "Instagram", "Social", None),
(6, "Twitter", "Social", None),
(7, "Pinterest", "Social", None),
(8, "Email Newsletter", "Email", None),
(9, "Affiliate", "Referral", None),
(10, "New Year Campaign", "Paid", 1),
(11, "Spring Collection", "Paid", 2),
(12, "Summer Sale", "Paid", 3),
(13, "Back to School", "Paid", 4),
(14, "Black Friday", "Paid", 5),
(15, "Christmas Sale", "Paid", 6)
]
for source in sources:
cursor.execute("""
INSERT INTO traffic_sources (source_id, source_name, source_type, campaign_id)
VALUES (%s, %s, %s, %s)
""", source)
conn.commit()
print(f"已插入 {len(sources)} 个流量来源")
def get_month_multiplier(month):
"""
根据月份返回销售量倍数,实现季节性趋势:
1月开始逐步增长,7月达到高峰,之后开始下降
"""
# 季节性趋势系数
seasonal_trend = {
1: 0.8, # 1月,开始上升
2: 0.9,
3: 1.0,
4: 1.2,
5: 1.4,
6: 1.6,
7: 1.8, # 7月高峰
8: 1.6,
9: 1.4,
10: 1.2,
11: 1.0,
12: 1.1 # 假日季节略有提升
}
return seasonal_trend.get(month, 1.0)
def generate_orders_and_items(conn, max_product_id):
cursor = conn.cursor()
# 清除已有数据
cursor.execute("DELETE FROM order_items")
cursor.execute("DELETE FROM orders")
cursor.execute("DELETE FROM order_campaign_map")
# 订单来源
order_sources = {
"Own Website": 0.50,
"Amazon": 0.40,
"Other Marketplaces": 0.10
}
# 支付方式
payment_methods = ["Credit Card", "PayPal", "Apple Pay", "Google Pay", "Amazon Pay"]
# 设备类型
device_types = ["Desktop", "Mobile", "Tablet"]
# 订单状态
order_statuses = ["Completed", "Shipped", "Processing", "Cancelled"]
status_weights = [0.85, 0.08, 0.05, 0.02]
# 获取所有有效活动
cursor.execute("SELECT campaign_id, start_date, end_date, discount_type, discount_value FROM marketing_campaigns")
campaigns = {row[0]: {"start": row[1], "end": row[2], "type": row[3], "value": row[4]} for row in cursor.fetchall()}
# 生成2023年的订单
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 12, 31)
order_id = 1
order_item_id = 1
map_id = 1
# 订单数量将根据月份有所不同
base_daily_orders = 50 # 基础日均订单量
current_date = start_date
while current_date <= end_date:
# 根据月份调整订单数量
month_multiplier = get_month_multiplier(current_date.month)
daily_orders = int(base_daily_orders * month_multiplier)
# 添加一些随机波动
daily_orders = int(daily_orders * random.uniform(0.8, 1.2))
# 周末订单更多
if current_date.weekday() >= 5: # 5,6是周六日
daily_orders = int(daily_orders * 1.3)
# 特殊节日
if (current_date.month == 11 and current_date.day >= 24 and current_date.day <= 28) or \
(current_date.month == 12 and current_date.day >= 15 and current_date.day <= 25):
daily_orders = int(daily_orders * 1.5) # 黑五和圣诞季节
# 生成当天订单
for _ in range(daily_orders):
user_id = random.randint(1, 3000)
order_hour = random.randint(0, 23)
order_minute = random.randint(0, 59)
order_second = random.randint(0, 59)
order_datetime = datetime(current_date.year, current_date.month, current_date.day,
order_hour, order_minute, order_second)
# 选择订单来源
order_source = random.choices(list(order_sources.keys()), weights=list(order_sources.values()))[0]
# 随机选择支付和设备
payment_method = random.choice(payment_methods)
device_type = random.choice(device_types)
# 随机选择订单状态
order_status = random.choices(order_statuses, weights=status_weights)[0]
# 订单金额初始化
total_amount = 0.0
discount_amount = 0.0
# 查看是否有活动
applicable_campaigns = []
for camp_id, camp_data in campaigns.items():
if camp_data["start"] <= order_datetime.date() <= camp_data["end"]:
applicable_campaigns.append(camp_id)
# 创建订单
cursor.execute("""
INSERT INTO orders (order_id, user_id, order_date, total_amount, discount_amount,
payment_method, payment_status, shipping_address, order_status, order_source, device_type)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
order_id,
user_id,
order_datetime,
0, # 临时设为0,后面更新
0, # 临时设为0,后面更新
payment_method,
"Paid" if order_status != "Cancelled" else "Refunded",
fake.address().replace('\n', ', '),
order_status,
order_source,
device_type
))
# 决定订单项数量
items_count = random.choices([1, 2, 3, 4], weights=[0.6, 0.25, 0.1, 0.05])[0]
# 创建订单项
selected_products = random.sample(range(1, max_product_id+1), items_count)
for product_id in selected_products:
# 获取产品信息
cursor.execute("SELECT current_price FROM products WHERE product_id = %s", (product_id,))
product = cursor.fetchone()
if product:
unit_price = product[0]
quantity = random.randint(1, 3)
item_discount = 0
# 计算折扣
if applicable_campaigns and random.random() < 0.7: # 70%的订单应用活动
campaign_id = random.choice(applicable_campaigns)
camp_data = campaigns[campaign_id]
# 记录订单与活动的关联
cursor.execute("""
INSERT INTO order_campaign_map (id, order_id, campaign_id)
VALUES (%s, %s, %s)
""", (map_id, order_id, campaign_id))
map_id += 1
# 计算折扣金额
if camp_data["type"] == "Percentage":
item_discount = round((unit_price * quantity) * (camp_data["value"] / 100), 2)
else: # Fixed
item_discount = min(camp_data["value"], unit_price * quantity)
discount_amount += item_discount
# 添加订单项
cursor.execute("""
INSERT INTO order_items (order_item_id, order_id, product_id, quantity, unit_price, discount)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
order_item_id,
order_id,
product_id,
quantity,
unit_price,
item_discount
))
total_amount += unit_price * quantity
order_item_id += 1
# 更新订单总金额
cursor.execute("""
UPDATE orders SET total_amount = %s, discount_amount = %s WHERE order_id = %s
""", (total_amount, discount_amount, order_id))
order_id += 1
# 进入下一天
current_date += timedelta(days=1)
# 每处理50天提交一次,减少内存压力
if current_date.day % 50 == 0:
conn.commit()
conn.commit()
print(f"已插入 {order_id-1} 个订单和 {order_item_id-1} 个订单项")
def generate_sample_visits_behaviors(conn, max_product_id):
cursor = conn.cursor()
# 清除已有数据
cursor.execute("DELETE FROM visit_logs")
cursor.execute("DELETE FROM user_behaviors")
# 生成用户访问记录和行为 (为简化示例,只生成2023年每月的部分样本数据)
log_id = 1
behavior_id = 1
# 页面列表
pages = [
"/",
"/products",
"/category/swimwear",
"/category/clothing",
"/category/accessories",
"/category/hats",
"/category/bags",
"/product-details/",
"/cart",
"/checkout",
"/account"
]
# 来源URL列表
referrers = [
"https://www.google.com/",
"https://www.facebook.com/",
"https://www.instagram.com/",
"https://www.pinterest.com/",
"https://www.amazon.com/",
"", # 直接访问
]
# 设备类型
devices = ["Desktop", "Mobile", "Tablet"]
device_weights = [0.35, 0.55, 0.1]
# 行为类型
behaviors = ["view", "add_to_cart", "add_to_wishlist", "purchase"]
behavior_weights = [0.7, 0.15, 0.1, 0.05]
for month in range(1, 13):
# 根据月份的季节性调整访问量
month_multiplier = get_month_multiplier(month)
visit_count = int(5000 * month_multiplier) # 每月样本访问量
for _ in range(visit_count):
# 随机日期时间
day = random.randint(1, 28) # 避免月末日期问题
hour = random.randint(0, 23)
minute = random.randint(0, 59)
second = random.randint(0, 59)
visit_time = datetime(2023, month, day, hour, minute, second)
# 是否登录用户
is_logged_in = random.random() < 0.4 # 40%是已登录用户
user_id = random.randint(1, 3000) if is_logged_in else None
# 生成会话ID
session_id = fake.uuid4()
# 随机选择页面、来源和设备
base_page = random.choice(pages)
if "product-details" in base_page:
page = base_page + str(random.randint(1, max_product_id))
else:
page = base_page
referrer = random.choice(referrers)
device = random.choices(devices, weights=device_weights)[0]
stay_duration = random.randint(10, 300) # 停留10-300秒
# 记录访问
cursor.execute("""
INSERT INTO visit_logs (log_id, user_id, session_id, page_url, referrer_url,
visit_time, device_type, ip_address, stay_duration)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
log_id,
user_id,
session_id,
page,
referrer,
visit_time,
device,
fake.ipv4(),
stay_duration
))
log_id += 1
# 记录用户行为 (仅对产品详情页)
if "product-details" in page and random.random() < 0.8: # 80%的产品页面会有行为记录
product_id = int(page.split('/')[-1])
behavior_type = random.choices(behaviors, weights=behavior_weights)[0]
if user_id: # 记录已登录用户行为
cursor.execute("""
INSERT INTO user_behaviors (behavior_id, user_id, product_id, behavior_type, behavior_time)
VALUES (%s, %s, %s, %s, %s)
""", (
behavior_id,
user_id,
product_id,
behavior_type,
visit_time
))
behavior_id += 1
conn.commit()
print(f"已插入 {month}月 的访问和行为样本数据")
print(f"总计插入了 {log_id-1} 条访问记录和 {behavior_id-1} 条用户行为")
def generate_sample_reviews_returns(conn):
cursor = conn.cursor()
# 清除已有数据
cursor.execute("DELETE FROM reviews")
cursor.execute("DELETE FROM returns")
# 获取完成的订单ID
cursor.execute("SELECT order_id, user_id, order_date FROM orders WHERE order_status = 'Completed' LIMIT 2000")
completed_orders = cursor.fetchall()
review_id = 1
return_id = 1
# 生成评论数据 - 60%的完成订单有评价
for order in completed_orders:
if random.random() < 0.6:
order_id = order[0]
user_id = order[1]
order_date = order[2]
# 评价通常在收到商品后1-10天发表
days_after = random.randint(1, 10)
review_date = order_date + timedelta(days=days_after)
# 获取订单中的商品
cursor.execute("SELECT product_id FROM order_items WHERE order_id = %s", (order_id,))
products = cursor.fetchall()
for product in products:
product_id = product[0]
# 90%的产品获得4-5星,5%获得3星,5%获得1-2星
rating_rand = random.random()
if rating_rand < 0.9:
rating = random.randint(4, 5)
elif rating_rand < 0.95:
rating = 3
else:
rating = random.randint(1, 2)
# 根据星级生成评论
if rating >= 4:
comments = [
"非常满意的购物体验,质量很好。",
"商品和描述一致,非常喜欢。",
"面料舒适,款式时尚,很满意。",
"物超所值,会再次购买。",
"物流很快,包装完好,产品质量也不错。"
]
elif rating == 3:
comments = [
"商品质量一般,但价格合理。",
"基本符合预期,但有小瑕疵。",
"款式不错,但尺寸稍有偏差。",
"总体来说还可以,但物流有点慢。"
]
else:
comments = [
"商品与描述不符,很失望。",
"质量太差,不值这个价格。",
"尺寸完全不对,需要退货。",
"收到时已经损坏,客服处理很慢。"
]
comment = random.choice(comments)
cursor.execute("""
INSERT INTO reviews (review_id, order_id, product_id, user_id, rating, comment, review_date)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
review_id,
order_id,
product_id,
user_id,
rating,
comment,
review_date
))
review_id += 1
# 生成退货数据 - 约3%的订单有退货
for order in completed_orders:
if random.random() < 0.03:
order_id = order[0]
order_date = order[2]
# 退货通常在收到商品后1-7天发起
days_after = random.randint(1, 7)
return_date = order_date + timedelta(days=days_after)
# 获取订单信息
cursor.execute("SELECT total_amount FROM orders WHERE order_id = %s", (order_id,))
total_amount = cursor.fetchone()[0]
# 获取订单中的商品
cursor.execute("SELECT order_item_id, product_id FROM order_items WHERE order_id = %s", (order_id,))
items = cursor.fetchall()
# 全部退货或部分退货
is_full_return = random.random() < 0.7 # 70%是全部退货
if is_full_return:
# 全部退货
return_reason = random.choice([
"尺寸不合适",
"样式不喜欢",
"质量问题",
"收到错误商品",
"商品损坏"
])
cursor.execute("""
INSERT INTO returns (return_id, order_id, order_item_id, return_date,
return_reason, return_status, refund_amount)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
return_id,
order_id,
None, # 全部退货时不指定具体item
return_date,
return_reason,
"Completed",
total_amount
))
return_id += 1
else:
# 部分退货,随机选择一个商品
item = random.choice(items)
order_item_id = item[0]
product_id = item[1]
# 获取该商品价格
cursor.execute("SELECT unit_price, quantity FROM order_items WHERE order_item_id = %s", (order_item_id,))
item_data = cursor.fetchone()
unit_price = item_data[0]
quantity = item_data[1]
item_total = unit_price * quantity
return_reason = random.choice([
"尺寸不合适",
"样式不喜欢",
"质量问题",
"收到错误商品",
"商品损坏"
])
cursor.execute("""
INSERT INTO returns (return_id, order_id, order_item_id, return_date,
return_reason, return_status, refund_amount)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
return_id,
order_id,
order_item_id,
return_date,
return_reason,
"Completed",
item_total
))
return_id += 1
conn.commit()
print(f"已插入 {review_id-1} 条评论和 {return_id-1} 条退货记录")
def generate_price_inventory_changes(conn, max_product_id):
cursor = conn.cursor()
# 清除已有数据
cursor.execute("DELETE FROM price_changes")
cursor.execute("DELETE FROM inventory_records")
change_id = 1
record_id = 1
# 为热门产品生成价格变动历史
popular_products = random.sample(range(1, max_product_id+1), int(max_product_id * 0.3)) # 30%的产品有价格变动
for product_id in popular_products:
# 获取当前价格
cursor.execute("SELECT current_price FROM products WHERE product_id = %s", (product_id,))
current_price = cursor.fetchone()[0]
# 决定价格变动次数 (1-4次)
change_count = random.randint(1, 4)
# 生成价格变动历史
for i in range(change_count):
# 变动发生的月份,偏向于促销季
month = random.choices(range(1, 13), weights=[1, 1, 1.2, 1, 1.2, 1.5, 1.5, 1.2, 1, 1, 1.5, 2])[0]
day = random.randint(1, 28)
change_date = datetime(2023, month, day, random.randint(0, 23), random.randint(0, 59), random.randint(0, 59))
# 先涨后降或先降后涨的模式
if i % 2 == 0: # 涨价
old_price = current_price
new_price = round(current_price * random.uniform(1.05, 1.2), 2) # 涨5%-20%
reason = random.choice(["季节性调整", "成本上涨", "新版本上市"])
else: # 降价
old_price = current_price
new_price = round(current_price * random.uniform(0.75, 0.95), 2) # 降5%-25%
reason = random.choice(["促销活动", "清理库存", "应对竞争"])
cursor.execute("""
INSERT INTO price_changes (change_id, product_id, change_date, old_price, new_price, reason)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
change_id,
product_id,
change_date,
old_price,
new_price,
reason
))
current_price = new_price
change_id += 1
# 生成库存记录
for product_id in range(1, max_product_id+1):
# 库存调整次数
changes = random.randint(2, 6)
for _ in range(changes):
# 随机日期
month = random.randint(1, 12)
day = random.randint(1, 28)
change_date = datetime(2023, month, day, random.randint(9, 17), random.randint(0, 59), random.randint(0, 59))
# 入库或出库
is_inbound = random.random() < 0.7 # 70%是入库
if is_inbound:
quantity_change = random.randint(50, 200) # 入库数量50-200
reason = random.choice(["常规补货", "季节性备货", "新品入库"])
else:
quantity_change = -random.randint(10, 50) # 出库数量10-50
reason = random.choice(["库存调整", "质量问题退货", "仓库转移"])
cursor.execute("""
INSERT INTO inventory_records (record_id, product_id, change_date, quantity_change, reason)
VALUES (%s, %s, %s, %s, %s)
""", (
record_id,
product_id,
change_date,
quantity_change,
reason
))
record_id += 1
conn.commit()
print(f"已插入 {change_id-1} 条价格变动记录和 {record_id-1} 条库存记录")
def main():
print("开始执行电商数据模拟...")
try:
# 创建数据库
create_database()
# 连接到数据库
conn = connect_to_db()
print("成功连接到数据库")
# 创建表结构
create_tables(conn)
# 生成模拟数据
generate_product_categories(conn)
max_product_id = generate_products(conn)
generate_users(conn, 3000)
generate_marketing_campaigns(conn)
generate_traffic_sources(conn)
generate_orders_and_items(conn, max_product_id)
generate_sample_visits_behaviors(conn, max_product_id)
generate_sample_reviews_returns(conn)
generate_price_inventory_changes(conn, max_product_id)
print("数据模拟完成!")
except Exception as e:
print(f"发生错误: {e}")
finally:
if 'conn' in locals():
conn.close()
print("数据库连接已关闭")
if __name__ == "__main__":
main()
-
请先确保您的AWS RDS MySQL实例已创建并可访问
-
安装所需Python依赖:
pip install pymysql faker
-
修改脚本中的数据库连接信息:
RDS_ENDPOINT = "your-rds-endpoint.amazonaws.com" RDS_USER = "your_username" RDS_PASSWORD = "your_password"
-
运行脚本生成模拟数据:
python generate_ecommerce_data.py
这个脚本生成的数据遵循您指定的业务特点:
-
产品类别分布:
- 主要销售泳衣产品(约70%的产品和销售量)
- 扩展品类包括成衣、饰品、帽子和包
-
价格范围:
- 所有商品价格在5-50美元之间
- 主要泳装产品价格在20-50美元范围
- 配饰价格较低,在5-15美元
-
国家分布:
- 美国客户占比约70%
- 其他市场包括加拿大、英国、法国和澳大利亚
-
销售渠道:
- 独立站贡献50%收入
- Amazon平台贡献40%收入
- 其他商城贡献10%收入
-
流量来源:
- 包括直接访问、搜索引擎和社交媒体(如Facebook)
- 添加了各种营销活动相关流量
-
季节性趋势:
- 1月销售开始增长
- 7月达到全年峰值
- 8月开始下降
- 特殊销售期(如黑色星期五和圣诞节)有额外波动
该数据可用于进行全面的销售趋势分析、营销效果评估、客户行为分析等,为您提供强大的演示基础。
这个demo有几个细节需要确认:
- 如果以Chrome插件的形式来触发数据查询和报告生成,通过什么方式能够安全的查询MySQL的数据?通过什么方式来调用LLM?是否都通过API Gateway + Lambda来实现?
- 这些查询出来的数据建议暂时存放在哪里,确保安全性?
通过Chrome插件触发数据查询和报告生成时,确保安全性的最佳架构如下:
Chrome插件 → API Gateway → Lambda → RDS MySQL
↓
Lambda → LLM API → Lambda → API Gateway → Chrome插件
安全考虑:
- 不直接连接数据库:Chrome插件绝不应直接连接RDS,这会暴露数据库凭证
- API密钥认证:API Gateway应实现API密钥或更强的认证机制(如Cognito)
- 最小权限原则:Lambda函数使用的IAM角色应只具备所需的最小数据库访问权限
- 参数验证:所有API请求参数需在Lambda中严格验证,防止SQL注入
-
Chrome插件发起请求:
// Chrome插件代码示例 async function fetchSalesReport(parameters) { const response = await fetch('https://your-api-gateway.amazonaws.com/prod/sales-report', { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-api-key': 'YOUR_API_KEY' // API Gateway密钥 }, body: JSON.stringify(parameters) }); return await response.json(); }
-
Lambda函数处理查询:
import pymysql import os import json def lambda_handler(event, context): # 参数验证 validate_parameters(event) # 连接数据库 - 使用环境变量存储凭证 conn = pymysql.connect( host=os.environ['RDS_ENDPOINT'], user=os.environ['RDS_USER'], password=os.environ['RDS_PASSWORD'], db=os.environ['RDS_DB_NAME'] ) try: # 执行查询 cursor = conn.cursor() # 使用参数化查询防止SQL注入 cursor.execute("SELECT * FROM sales WHERE date BETWEEN %s AND %s", (event['start_date'], event['end_date'])) results = cursor.fetchall() # 处理结果 processed_data = process_results(results) return { 'statusCode': 200, 'body': json.dumps(processed_data) } finally: conn.close()
-
通过单独的Lambda函数:
- 接收查询结果数据
- 调用LLM API (如OpenAI API或Amazon Bedrock)
- 返回生成的分析报告
-
代码示例:
import json import boto3 import os import requests def lambda_handler(event, context): # 获取查询结果 sales_data = json.loads(event['body']) # 准备LLM提示 prompt = f""" 分析以下电商销售数据并生成见解报告: {json.dumps(sales_data, indent=2)} 请提供: 1. 销售趋势分析 2. 增长或下降的可能原因 3. 关键产品类别表现 4. 建议的行动方案 """ # 选择1: 调用OpenAI API if os.environ.get('LLM_PROVIDER') == 'OPENAI': response = requests.post( 'https://api.openai.com/v1/chat/completions', headers={ 'Authorization': f"Bearer {os.environ['OPENAI_API_KEY']}", 'Content-Type': 'application/json' }, json={ 'model': 'gpt-4', 'messages': [{'role': 'user', 'content': prompt}], 'temperature': 0.7 } ) analysis = response.json()['choices'][0]['message']['content'] # 选择2: 调用Amazon Bedrock elif os.environ.get('LLM_PROVIDER') == 'BEDROCK': bedrock = boto3.client('bedrock-runtime') response = bedrock.invoke_model( modelId='anthropic.claude-v2', body=json.dumps({ 'prompt': f"\n\nHuman: {prompt}\n\nAssistant:", 'max_tokens_to_sample': 4000 }) ) analysis = json.loads(response['body'].read())['completion'] return { 'statusCode': 200, 'body': json.dumps({'analysis': analysis}) }
根据数据敏感性和使用场景,有以下几个选项:
Lambda → S3(加密存储) → 预签名URL → Chrome插件
实现方式:
def lambda_handler(event, context):
# 查询数据库获取结果
query_results = query_database()
# 生成唯一文件名
file_name = f"report-{uuid.uuid4()}.json"
# 将结果存储到S3
s3 = boto3.client('s3')
s3.put_object(
Bucket='your-secure-bucket',
Key=file_name,
Body=json.dumps(query_results),
ContentType='application/json',
ServerSideEncryption='AES256' # 服务端加密
)
# 生成预签名URL (短期有效,如15分钟)
presigned_url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': 'your-secure-bucket', 'Key': file_name},
ExpiresIn=900 # 15分钟
)
return {
'statusCode': 200,
'body': json.dumps({'url': presigned_url})
}
安全特点:
- 数据使用AES-256加密存储
- 预签名URL短期有效
- 可设置生命周期策略自动删除数据
- 可启用S3访问日志审计
适用于结构化数据且需要快速访问的场景:
def lambda_handler(event, context):
# 查询数据库获取结果
query_results = query_database()
# 生成唯一ID
report_id = str(uuid.uuid4())
# 存储到DynamoDB
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('TemporaryReports')
# 添加TTL字段,24小时后自动删除
expiration_time = int(time.time()) + 86400
table.put_item(
Item={
'reportId': report_id,
'data': query_results,
'createdAt': datetime.now().isoformat(),
'ttl': expiration_time
}
)
return {
'statusCode': 200,
'body': json.dumps({'reportId': report_id})
}
如果数据量小且不需要持久化,可以直接通过API响应返回:
def lambda_handler(event, context):
# 查询数据库获取结果
query_results = query_database()
# 直接返回结果
return {
'statusCode': 200,
'body': json.dumps(query_results),
'headers': {
'Content-Type': 'application/json',
'Cache-Control': 'no-store, max-age=0'
}
}
-
数据库访问:
- 使用Lambda函数访问RDS,而非直接从客户端
- 使用参数化查询防止SQL注入
- 数据库凭证存储在AWS Secrets Manager或环境变量中
-
API安全:
- 使用API Gateway的API密钥或AWS Cognito进行认证
- 启用API Gateway请求验证
- 实施请求限流防止滥用
-
数据传输:
- 使用HTTPS加密传输
- 敏感数据使用AWS KMS加密
- 使用预签名URL限制访问时间
-
LLM调用:
- API密钥存储在AWS Secrets Manager
- 实施输入验证和输出过滤
- 考虑使用AWS Bedrock以获得更好的AWS集成和安全性
这种架构确保了数据查询和LLM调用的安全性,同时为Chrome插件提供了简单的接口。临时数据存储方案根据您的具体需求和数据敏感性可以灵活选择。
方案:Chrome插件采用API Gateway + Lambda + RDS,使用Lambda函数调用LLM API (Boto3),查询的数据临时存放在DynamoDB临时表
请你输出一份设计方案,包括需求描述,实现方案(含RDS MySQL表结构),业务流程。以markdown格式输出
开发一款Chrome插件形式的"电商销售分析助手",帮助电商运营人员快速分析销售数据,识别增长或下降趋势并提供原因分析。该工具将通过AI赋能,提供专业的销售洞察和建议。
- 销售数据查询:支持按时间范围、产品类别、销售渠道等维度查询销售数据
- 趋势分析:展示销售额、订单量、客单价等关键指标的趋势图表
- 原因分析:利用LLM分析销售增长或下降的可能原因
- 多维度比较:支持同比/环比分析,不同产品类别、渠道的对比
- 数据可视化:直观展示分析结果和关键指标
- 安全性:确保数据传输和存储的安全性
- 电商运营经理:需要全面了解销售状况,做出战略决策
- 产品经理:关注具体产品类别表现,优化产品组合
- 营销人员:评估营销活动效果,优化营销策略
Chrome插件 → API Gateway → Lambda函数 → RDS MySQL
↓ ↓
返回结果 ← DynamoDB临时表 ← 分析结果
↑ ↑
API Gateway ← Lambda函数 ← LLM API (Amazon Bedrock)
- 前端:Chrome Extension (HTML, CSS, JavaScript, Chart.js)
- 后端:
- AWS API Gateway:API管理和安全
- AWS Lambda:无服务器计算
- Amazon RDS MySQL:数据存储
- Amazon DynamoDB:临时数据存储
- Amazon Bedrock:LLM服务调用
- AWS KMS:加密管理
已在前面设计的电商数据库Schema,包含以下核心表:
-- 用户表
CREATE TABLE users (
user_id INT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
phone VARCHAR(20),
gender VARCHAR(10),
age INT,
city VARCHAR(50),
country VARCHAR(50),
registration_date DATE,
user_level VARCHAR(20),
is_active BOOLEAN,
user_source VARCHAR(50)
);
-- 商品分类表
CREATE TABLE product_categories (
category_id INT PRIMARY KEY,
category_name VARCHAR(100),
parent_category_id INT NULL,
category_level INT,
is_active BOOLEAN
);
-- 商品表
CREATE TABLE products (
product_id INT PRIMARY KEY,
product_name VARCHAR(200),
category_id INT,
brand VARCHAR(100),
supplier VARCHAR(100),
original_price DECIMAL(10, 2),
current_price DECIMAL(10, 2),
cost DECIMAL(10, 2),
stock_quantity INT,
create_time DATE,
is_active BOOLEAN,
FOREIGN KEY (category_id) REFERENCES product_categories(category_id)
);
-- 订单表
CREATE TABLE orders (
order_id INT PRIMARY KEY,
user_id INT,
order_date TIMESTAMP,
total_amount DECIMAL(10, 2),
discount_amount DECIMAL(10, 2),
payment_method VARCHAR(50),
payment_status VARCHAR(20),
shipping_address VARCHAR(200),
order_status VARCHAR(20),
order_source VARCHAR(50),
device_type VARCHAR(20),
FOREIGN KEY (user_id) REFERENCES users(user_id)
);
-- 订单详情表
CREATE TABLE order_items (
order_item_id INT PRIMARY KEY,
order_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(10, 2),
discount DECIMAL(10, 2),
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
);
-- 营销活动表
CREATE TABLE marketing_campaigns (
campaign_id INT PRIMARY KEY,
campaign_name VARCHAR(100),
campaign_type VARCHAR(50),
start_date DATE,
end_date DATE,
budget DECIMAL(10, 2),
target_audience VARCHAR(100),
discount_type VARCHAR(50),
discount_value DECIMAL(10, 2),
is_active BOOLEAN
);
-- 订单营销关联表
CREATE TABLE order_campaign_map (
id INT PRIMARY KEY,
order_id INT,
campaign_id INT,
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (campaign_id) REFERENCES marketing_campaigns(campaign_id)
);
表名: SalesAnalysisTemp
主键:
- reportId (分区键): 唯一报告ID
属性:
- queryParams: 查询参数JSON
- rawData: 查询结果数据JSON
- analysisResults: LLM分析结果JSON
- createdAt: 创建时间戳
- ttl: 过期时间戳(24小时后)
- status: 处理状态(PROCESSING, COMPLETED, ERROR)
POST /api/sales/query
请求体:
{
"timeRange": {
"start": "2023-01-01",
"end": "2023-07-31"
},
"dimensions": ["category", "channel"],
"metrics": ["sales", "orders", "aov"],
"filters": {
"categories": ["泳装", "配饰"],
"channels": ["Own Website", "Amazon"]
},
"compareWith": "previous_period" // 可选: previous_period, previous_year
}
响应:
{
"reportId": "550e8400-e29b-41d4-a716-446655440000",
"status": "PROCESSING"
}
GET /api/sales/analysis/{reportId}
响应:
{
"status": "COMPLETED",
"queryParams": {...},
"summary": {
"totalSales": 124500.50,
"totalOrders": 2450,
"averageOrderValue": 50.82,
"growthRate": 15.4
},
"trends": [...],
"analysis": {
"insights": "销售额较上期增长15.4%,主要原因有三点...",
"factors": [
{
"factor": "季节性需求增长",
"impact": "高",
"description": "..."
},
...
],
"recommendations": [...]
},
"expiresAt": "2023-08-16T14:30:00Z"
}
-
用户发起查询
- 用户在Chrome插件界面设置查询参数(时间范围、产品类别等)
- 点击"分析"按钮发起请求
-
数据查询处理
- API Gateway接收请求并转发至Lambda函数
- Lambda函数验证参数并构建SQL查询
- 执行RDS MySQL查询获取原始数据
- 在DynamoDB创建临时记录,状态为"PROCESSING"
- 返回reportId给前端
-
数据分析处理
- 第一个Lambda触发第二个Lambda函数
- 第二个Lambda函数准备数据并调用Amazon Bedrock API
- LLM生成分析结果
- 分析结果存入DynamoDB,更新状态为"COMPLETED"
-
结果展示
- 前端轮询获取分析状态
- 当状态为"COMPLETED"时,获取完整分析结果
- 使用Chart.js渲染可视化图表
- 展示LLM生成的分析洞察和建议
-
趋势识别
- 计算关键指标的时间序列变化
- 识别显著的增长或下降点
- 计算同比/环比变化率
-
多维度分析
- 按产品类别分析销售表现
- 按销售渠道分析转化率
- 按用户群体分析购买行为
-
原因归因分析
- 分析营销活动与销售增长的关联
- 评估价格变动对销售量的影响
- 考虑季节性因素和市场趋势
- 分析竞品活动和外部事件影响
-
生成洞察和建议
- 提炼关键发现和洞察
- 提供具体的行动建议
- 预测未来趋势
import json
import boto3
import pymysql
import os
import uuid
import time
from datetime import datetime, timedelta
# 数据库连接配置
RDS_ENDPOINT = os.environ['RDS_ENDPOINT']
RDS_USER = os.environ['RDS_USER']
RDS_PASSWORD = os.environ['RDS_PASSWORD']
RDS_DB_NAME = os.environ['RDS_DB_NAME']
# DynamoDB配置
DYNAMODB_TABLE = 'SalesAnalysisTemp'
def lambda_handler(event, context):
# 解析请求参数
body = json.loads(event['body'])
time_range = body.get('timeRange', {})
dimensions = body.get('dimensions', [])
metrics = body.get('metrics', [])
filters = body.get('filters', {})
compare_with = body.get('compareWith', None)
# 参数验证
if not time_range or 'start' not in time_range or 'end' not in time_range:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid time range parameters'})
}
# 生成唯一报告ID
report_id = str(uuid.uuid4())
try:
# 连接到RDS MySQL
conn = pymysql.connect(
host=RDS_ENDPOINT,
user=RDS_USER,
password=RDS_PASSWORD,
db=RDS_DB_NAME
)
# 构建并执行查询
query_results = execute_sales_query(conn, time_range, dimensions, metrics, filters)
# 如果需要比较数据,获取比较期间的数据
comparison_results = None
if compare_with:
comparison_time_range = calculate_comparison_period(time_range, compare_with)
comparison_results = execute_sales_query(conn, comparison_time_range, dimensions, metrics, filters)
conn.close()
# 准备存储数据
query_data = {
'primary': query_results,
'comparison': comparison_results
}
# 存储到DynamoDB
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
# 设置24小时后过期
ttl = int(time.time()) + 86400
# 保存初始记录
table.put_item(
Item={
'reportId': report_id,
'queryParams': body,
'rawData': query_data,
'createdAt': datetime.now().isoformat(),
'ttl': ttl,
'status': 'PROCESSING'
}
)
# 异步触发分析Lambda
lambda_client = boto3.client('lambda')
lambda_client.invoke(
FunctionName=os.environ['ANALYSIS_LAMBDA_ARN'],
InvocationType='Event',
Payload=json.dumps({'reportId': report_id})
)
return {
'statusCode': 200,
'body': json.dumps({
'reportId': report_id,
'status': 'PROCESSING'
})
}
except Exception as e:
print(f"Error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def execute_sales_query(conn, time_range, dimensions, metrics, filters):
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 构建SQL查询
select_clauses = []
group_by_clauses = []
where_clauses = [f"o.order_date BETWEEN '{time_range['start']}' AND '{time_range['end']}'"]
# 添加维度
for dim in dimensions:
if dim == 'category':
select_clauses.append("pc.category_name")
group_by_clauses.append("pc.category_name")
elif dim == 'channel':
select_clauses.append("o.order_source")
group_by_clauses.append("o.order_source")
elif dim == 'date':
select_clauses.append("DATE(o.order_date) as order_day")
group_by_clauses.append("order_day")
# 添加指标
if 'sales' in metrics or len(metrics) == 0:
select_clauses.append("SUM(o.total_amount) as total_sales")
if 'orders' in metrics or len(metrics) == 0:
select_clauses.append("COUNT(DISTINCT o.order_id) as order_count")
if 'aov' in metrics or len(metrics) == 0:
select_clauses.append("SUM(o.total_amount) / COUNT(DISTINCT o.order_id) as average_order_value")
# 添加过滤条件
if 'categories' in filters and filters['categories']:
categories = "', '".join(filters['categories'])
where_clauses.append(f"pc.category_name IN ('{categories}')")
if 'channels' in filters and filters['channels']:
channels = "', '".join(filters['channels'])
where_clauses.append(f"o.order_source IN ('{channels}')")
# 构建完整SQL
sql = f"""
SELECT {', '.join(select_clauses)}
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
JOIN product_categories pc ON p.category_id = pc.category_id
WHERE {' AND '.join(where_clauses)}
"""
if group_by_clauses:
sql += f" GROUP BY {', '.join(group_by_clauses)}"
# 执行查询
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
return results
def calculate_comparison_period(time_range, compare_with):
start_date = datetime.strptime(time_range['start'], '%Y-%m-%d')
end_date = datetime.strptime(time_range['end'], '%Y-%m-%d')
period_days = (end_date - start_date).days
if compare_with == 'previous_period':
new_end = start_date - timedelta(days=1)
new_start = new_end - timedelta(days=period_days)
elif compare_with == 'previous_year':
new_start = start_date.replace(year=start_date.year-1)
new_end = end_date.replace(year=end_date.year-1)
else:
return time_range
return {
'start': new_start.strftime('%Y-%m-%d'),
'end': new_end.strftime('%Y-%m-%d')
}
import json
import boto3
import os
from datetime import datetime
# DynamoDB配置
DYNAMODB_TABLE = 'SalesAnalysisTemp'
def lambda_handler(event, context):
report_id = event['reportId']
try:
# 从DynamoDB获取数据
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
response = table.get_item(
Key={'reportId': report_id}
)
if 'Item' not in response:
raise Exception(f"Report {report_id} not found")
report_data = response['Item']
raw_data = report_data['rawData']
query_params = report_data['queryParams']
# 准备数据摘要
summary = prepare_data_summary(raw_data)
# 准备LLM提示
prompt = generate_analysis_prompt(raw_data, query_params, summary)
# 调用Amazon Bedrock
bedrock = boto3.client('bedrock-runtime')
llm_response = bedrock.invoke_model(
modelId='anthropic.claude-v2',
body=json.dumps({
'prompt': f"\n\nHuman: {prompt}\n\nAssistant:",
'max_tokens_to_sample': 4000,
'temperature': 0.7
})
)
analysis_text = json.loads(llm_response['body'].read())['completion']
# 解析LLM响应
analysis_results = parse_llm_response(analysis_text)
# 更新DynamoDB
table.update_item(
Key={'reportId': report_id},
UpdateExpression="set analysisResults = :a, summary = :s, status = :st, updatedAt = :u",
ExpressionAttributeValues={
':a': analysis_results,
':s': summary,
':st': 'COMPLETED',
':u': datetime.now().isoformat()
}
)
return {
'statusCode': 200,
'body': json.dumps({'status': 'Analysis completed'})
}
except Exception as e:
print(f"Error: {str(e)}")
# 更新错误状态
if 'report_id' in locals():
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
table.update_item(
Key={'reportId': report_id},
UpdateExpression="set status = :st, errorMessage = :err, updatedAt = :u",
ExpressionAttributeValues={
':st': 'ERROR',
':err': str(e),
':u': datetime.now().isoformat()
}
)
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def prepare_data_summary(raw_data):
primary_data = raw_data['primary']
comparison_data = raw_data.get('comparison')
# 计算总销售额
total_sales = sum(item.get('total_sales', 0) for item in primary_data)
# 计算总订单数
total_orders = sum(item.get('order_count', 0) for item in primary_data)
# 计算平均订单价值
aov = total_sales / total_orders if total_orders > 0 else 0
# 计算增长率
growth_rate = None
if comparison_data:
comparison_sales = sum(item.get('total_sales', 0) for item in comparison_data)
if comparison_sales > 0:
growth_rate = ((total_sales - comparison_sales) / comparison_sales) * 100
return {
'totalSales': total_sales,
'totalOrders': total_orders,
'averageOrderValue': aov,
'growthRate': growth_rate
}
def generate_analysis_prompt(raw_data, query_params, summary):
# 格式化日期范围
start_date = query_params['timeRange']['start']
end_date = query_params['timeRange']['end']
# 构建提示
prompt = f"""
作为电商数据分析专家,请分析以下销售数据并提供专业见解。
## 分析时间范围
{start_date} 至 {end_date}
## 销售数据摘要
- 总销售额: ${summary['totalSales']:.2f}
- 总订单数: {summary['totalOrders']}
- 平均订单价值: ${summary['averageOrderValue']:.2f}
"""
if summary['growthRate'] is not None:
prompt += f"- 销售增长率: {summary['growthRate']:.2f}%\n"
# 添加详细数据
prompt += "\n## 详细销售数据\n"
prompt += json.dumps(raw_data['primary'], indent=2)
if raw_data.get('comparison'):
prompt += "\n\n## 对比期间数据\n"
prompt += json.dumps(raw_data['comparison'], indent=2)
# 分析要求
prompt += """
## 分析要求
请提供以下分析:
1. 销售趋势分析:描述主要销售趋势,包括增长或下降的时间点和幅度。
2. 原因分析:分析销售增长或下降的可能原因,考虑以下因素:
- 季节性因素
- 产品类别表现
- 销售渠道效果
- 可能的市场变化
- 价格策略影响
3. 关键发现:提炼3-5个最重要的数据发现。
4. 行动建议:基于分析提供3-5条具体可行的建议。
请以JSON格式返回分析结果,包含以下字段:
{
"trendAnalysis": "销售趋势的详细分析...",
"causalFactors": [
{"factor": "因素1", "impact": "高/中/低", "description": "详细描述..."},
...
],
"keyInsights": ["洞察1", "洞察2", ...],
"recommendations": ["建议1", "建议2", ...]
}
"""
return prompt
def parse_llm_response(response_text):
# 尝试从响应中提取JSON
try:
# 查找JSON开始和结束位置
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx >= 0 and end_idx > start_idx:
json_str = response_text[start_idx:end_idx]
return json.loads(json_str)
else:
# 如果没有找到JSON格式,进行简单结构化
return {
"trendAnalysis": response_text,
"causalFactors": [],
"keyInsights": [],
"recommendations": []
}
except json.JSONDecodeError:
# JSON解析失败,返回原始文本
return {
"rawAnalysis": response_text
}
import json
import boto3
import os
# DynamoDB配置
DYNAMODB_TABLE = 'SalesAnalysisTemp'
def lambda_handler(event, context):
# 从路径参数获取报告ID
report_id = event['pathParameters']['reportId']
try:
# 从DynamoDB获取数据
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
response = table.get_item(
Key={'reportId': report_id}
)
if 'Item' not in response:
return {
'statusCode': 404,
'body': json.dumps({'error': 'Report not found'})
}
report_data = response['Item']
# 根据状态返回不同内容
status = report_data.get('status', 'UNKNOWN')
if status == 'PROCESSING':
return {
'statusCode': 200,
'body': json.dumps({
'reportId': report_id,
'status': status,
'message': 'Analysis in progress'
})
}
elif status == 'ERROR':
return {
'statusCode': 200,
'body': json.dumps({
'reportId': report_id,
'status': status,
'error': report_data.get('errorMessage', 'Unknown error')
})
}
elif status == 'COMPLETED':
# 返回完整结果
return {
'statusCode': 200,
'body': json.dumps({
'reportId': report_id,
'status': status,
'queryParams': report_data.get('queryParams', {}),
'summary': report_data.get('summary', {}),
'analysis': report_data.get('analysisResults', {}),
'createdAt': report_data.get('createdAt'),
'expiresAt': report_data.get('ttl')
})
}
else:
return {
'statusCode': 500,
'body': json.dumps({'error': 'Invalid report status'})
}
except Exception as e:
print(f"Error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
- 传输加密:所有API通信使用HTTPS
- 存储加密:RDS和DynamoDB数据加密存储
- 临时存储:分析结果在DynamoDB中设置24小时TTL自动删除
- 最小权限:Lambda函数使用IAM角色,仅授予必要权限
- API密钥认证:Chrome插件请求需提供API密钥
- 请求限流:API Gateway配置请求限流,防止滥用
- 输入验证:所有API输入参数严格验证,防止注入攻击
- 敏感信息处理:数据库凭证存储在AWS Secrets Manager
- 错误处理:统一错误处理,不暴露敏感信息
- 日志审计:启用CloudWatch日志,记录所有操作
- 创建RDS MySQL实例并导入初始数据
- 创建DynamoDB表
- 部署Lambda函数
- 配置API Gateway
- 设置IAM角色和权限
- 部署Chrome插件
- Lambda监控:CloudWatch监控函数执行情况
- API监控:API Gateway监控请求量和错误率
- 数据库监控:RDS性能指标监控
- 错误告警:设置CloudWatch告警,异常时通知
- 预测分析:添加销售预测功能
- 异常检测:自动识别销售异常并告警
- 竞品分析:集成市场数据,提供竞争情报
- 个性化报告:支持自定义报告模板
- 实时分析:集成Kinesis实现实时数据分析
- 多语言支持:支持多语言分析报告
- 高级可视化:集成更丰富的可视化组件
- 移动端支持:开发配套移动应用
此设计方案提供了电商销售分析助手的完整实现框架,从数据库设计到API接口,从Lambda函数实现到安全考虑,全面覆盖了系统各个方面。该方案采用无服务器架构,具有高可扩展性、低维护成本的特点,同时通过LLM技术提供智能化的销售分析,帮助电商运营人员快速获取销售洞察。
请完整梳理整个工作流程,参考如下格式(注意下面的内容与当前架构无关,仅仅展示格式):
- 用户通过API Gateway发送代码审查请求。
- 代码审查提交/code review(POST)接口,客户提交pull request ID、repo_url、access_token、project、scan_scope、branch等参数。
- 遍历GitHub(本原型以GitHub为例)检查该pull request ID是否包含白名单文件(需处理的文件后缀:.go/.py/.java等)。如果没有,直接返回"No file need review"。如果有,继续执行(4)和(5)
- 在DDB中存储代码审查任务状态
- 触发Lambda进行异步处理,根据文件将任务拆分到SQS中,设置白名单(需处理的文件后缀:.go/.py/.java等,在Lambda环境变量中设置)
- 获取pull request ID diff文件等信息
- 将代码审查消息放入SQS
- 更新DDB中待处理文件数量
-
用户在Chrome插件界面设置查询条件(时间范围、产品类别、销售渠道等)并点击"分析"按钮。
-
Chrome插件通过API Gateway向
/api/sales/query
端点发送POST请求,包含以下参数:- timeRange(时间范围)
- dimensions(分析维度,如category、channel)
- metrics(分析指标,如sales、orders、aov)
- filters(过滤条件)
- compareWith(比较周期,如previous_period)
-
API Gateway接收请求并触发数据查询Lambda函数。
-
数据查询Lambda函数验证请求参数的有效性,生成唯一reportId。
-
Lambda函数连接到RDS MySQL数据库,根据查询参数构建SQL查询语句。
-
Lambda执行SQL查询,获取销售数据。如果设置了比较周期,还会执行额外的对比查询。
-
Lambda在DynamoDB的SalesAnalysisTemp表中创建记录:
- reportId作为主键
- 设置status为"PROCESSING"
- 存储查询参数和原始数据
- 设置24小时后的TTL过期时间
-
查询Lambda异步触发分析Lambda,传递reportId。
-
查询Lambda向客户端返回reportId和处理状态(PROCESSING)。
-
分析Lambda函数被触发,接收reportId参数。
-
分析Lambda从DynamoDB获取与reportId关联的查询结果数据。
-
Lambda准备数据摘要(总销售额、订单数、平均订单价值、增长率等)。
-
Lambda构建分析提示(prompt),包含:
- 时间范围信息
- 销售数据摘要
- 详细销售数据
- 对比期间数据(如有)
- 分析要求(趋势、原因、洞察、建议)
-
Lambda调用Amazon Bedrock API(使用Claude模型)进行数据分析,传递构建的prompt。
-
Lambda接收并解析LLM返回的分析结果,提取结构化内容:
- 趋势分析
- 因果因素
- 关键洞察
- 行动建议
-
Lambda更新DynamoDB中的记录:
- 设置status为"COMPLETED"
- 存储分析结果和数据摘要
- 添加更新时间戳
-
Chrome插件通过API Gateway定期轮询
/api/sales/analysis/{reportId}
端点,检查分析状态。 -
结果查询Lambda接收请求,从DynamoDB获取对应reportId的记录。
-
根据记录状态返回不同内容:
- 如果状态为"PROCESSING",返回进行中状态
- 如果状态为"ERROR",返回错误信息
- 如果状态为"COMPLETED",返回完整的分析结果和摘要
-
当Chrome插件收到"COMPLETED"状态和完整结果后,停止轮询。
-
Chrome插件处理返回的数据,使用Chart.js渲染销售趋势图表和关键指标。
-
Chrome插件展示LLM生成的分析洞察和建议,包括:
- 销售趋势分析
- 增长或下降的原因分析
- 关键产品类别表现
- 具体的行动建议
-
用户与分析结果交互,可进一步调整查询参数重新分析。
-
存储在DynamoDB中的临时数据在创建后24小时自动过期并删除(通过TTL机制)。
-
如果用户执行新的查询,将创建新的reportId和关联数据。
这个工作流程确保了从用户请求到结果展示的完整处理链,同时通过异步处理和轮询机制,提供了良好的用户体验和系统性能。
根据你的设计,一共有几个Lambda?
根据设计方案,系统共包含3个Lambda函数,每个函数负责不同的处理阶段:
触发方式:通过API Gateway的/api/sales/query
端点触发
主要职责:
- 接收并验证查询参数
- 连接RDS MySQL执行销售数据查询
- 将查询结果存储到DynamoDB临时表
- 异步触发分析Lambda
- 返回reportId给客户端
触发方式:由数据查询Lambda异步触发
主要职责:
- 从DynamoDB获取原始查询数据
- 准备数据摘要和分析提示(prompt)
- 调用Amazon Bedrock执行LLM分析
- 解析和结构化LLM返回的分析结果
- 更新DynamoDB中的记录状态和分析结果
触发方式:通过API Gateway的/api/sales/analysis/{reportId}
端点触发
主要职责:
- 接收客户端轮询请求
- 从DynamoDB获取指定reportId的记录
- 根据处理状态返回相应结果
- 当状态为"COMPLETED"时返回完整的分析结果
- 当状态为"PROCESSING"或"ERROR"时返回相应状态信息
这三个Lambda函数通过事件触发和DynamoDB共享状态的方式协同工作,形成了完整的异步处理链,实现了高效的数据查询、分析和结果展示流程。
我能用EC2代替API Gateway吗?API Gateway可能会有连接超时的问题
是的,您完全可以使用EC2代替API Gateway来处理Chrome插件的请求。这种替换能解决API Gateway的连接超时限制问题(默认为29秒),同时提供更多的灵活性。
Chrome插件 → EC2 Web服务器 → Lambda函数 → RDS MySQL
↓ ↓
返回结果 ← DynamoDB临时表 ← 分析结果
- 无超时限制 - 解决API Gateway 29秒超时限制
- 完全控制 - 自定义请求处理、中间件、错误处理
- 长连接支持 - 可使用WebSocket等保持连接,减少轮询需求
- 定制化处理 - 可在请求处理层添加缓存、合并请求等优化
- 无服务器架构 - 无需管理服务器
- 自动扩展 - 无需配置扩展规则
- 内置安全功能 - 如限流、WAF集成
- 简化部署 - 更少的配置项
如果决定使用EC2,推荐采用以下配置:
-
Web服务器选择:
- Node.js + Express:轻量级、异步处理能力强
- Python + Flask/FastAPI:如果团队Python经验更丰富
-
EC2配置:
实例类型: t3.small (开始阶段足够) AMI: Amazon Linux 2023 自动扩展组: 最小1台,最大3台 负载均衡器: Application Load Balancer
-
API路由设计:
// Node.js Express示例 const express = require('express'); const { Lambda } = require('@aws-sdk/client-lambda'); const app = express(); app.use(express.json()); // 启动查询分析 app.post('/api/sales/query', async (req, res) => { try { const lambda = new Lambda(); const response = await lambda.invoke({ FunctionName: 'sales-query-lambda', Payload: JSON.stringify({body: JSON.stringify(req.body)}), InvocationType: 'RequestResponse' }); const result = JSON.parse(Buffer.from(response.Payload).toString()); res.json(JSON.parse(result.body)); } catch (error) { console.error('Error invoking Lambda:', error); res.status(500).json({error: 'Failed to process request'}); } }); // 获取分析结果 app.get('/api/sales/analysis/:reportId', async (req, res) => { // 类似实现,调用结果获取Lambda }); app.listen(3000, () => console.log('Server running on port 3000'));
-
安全配置:
- 配置HTTPS(使用ACM证书)
- 实现API密钥认证
- 设置安全组只允许HTTP/HTTPS入站流量
- 定期更新安全补丁
-
高可用性:
- 在多个可用区部署EC2实例
- 使用自动恢复设置
-
安全性:
- 使用IAM角色而非访问密钥
- 实现请求验证和消毒
- 考虑使用AWS WAF保护EC2
-
监控:
- 配置CloudWatch警报监控实例健康
- 设置请求日志记录
- 监控错误率和响应时间
-
成本优化:
- 使用竞价实例降低成本
- 设置自动扩展以适应负载波动
考虑以下因素做出最终决策:
- 团队经验 - 是否有管理EC2的经验?
- 需求稳定性 - API需求是否会频繁变化?
- 预期负载 - 预期的请求量和处理复杂度?
- 长期维护 - 谁将负责服务器维护?
- 预算约束 - EC2+ALB成本vs API Gateway成本分析
对于您的电商分析Demo,如果主要关注点是避免超时问题,且团队能够管理EC2实例,使用EC2是一个合理的选择,尤其是在开发和测试阶段更能提供灵活性。
采用EC2后,还需要使用Lambda来调用Bedrock模型吗,是否直接通过EC2即可?
是的,采用EC2后完全可以直接调用Bedrock模型,无需通过Lambda中间层。这将进一步简化架构,降低延迟和复杂性。
Chrome插件 → EC2 Web服务器 → Amazon Bedrock
↓ ↓
RDS MySQL DynamoDB
- 架构简化 - 减少服务组件,降低复杂度
- 降低延迟 - 移除Lambda中间层,减少请求往返
- 统一处理 - 在同一服务内处理数据查询和AI分析
- 无Lambda限制 - 避开Lambda的15分钟执行限制
- 简化错误处理 - 集中在EC2处理错误和重试
- 简化监控 - 统一日志和监控体系
- 资源管理 - 长时间运行的AI请求可能占用EC2资源
- 扩展性考虑 - 需要适当设计EC2自动扩展策略
- 故障隔离 - 需要额外设计故障隔离机制
使用Node.js + Express实现直接调用Bedrock的示例:
// 安装依赖: npm install express aws-sdk mysql2
const express = require('express');
const { BedrockRuntimeClient, InvokeModelCommand } = require('@aws-sdk/client-bedrock-runtime');
const mysql = require('mysql2/promise');
const { DynamoDBClient, PutItemCommand, GetItemCommand, UpdateItemCommand } = require('@aws-sdk/client-dynamodb');
const { v4: uuidv4 } = require('uuid');
const app = express();
app.use(express.json());
// 数据库配置
const dbConfig = {
host: process.env.RDS_ENDPOINT,
user: process.env.RDS_USER,
password: process.env.RDS_PASSWORD,
database: process.env.RDS_DB_NAME
};
const dynamoDB = new DynamoDBClient();
const bedrock = new BedrockRuntimeClient();
const DYNAMODB_TABLE = 'SalesAnalysisTemp';
// 启动销售分析
app.post('/api/sales/query', async (req, res) => {
try {
const { timeRange, dimensions, metrics, filters, compareWith } = req.body;
// 参数验证
if (!timeRange || !timeRange.start || !timeRange.end) {
return res.status(400).json({ error: 'Invalid time range parameters' });
}
// 生成报告ID
const reportId = uuidv4();
// 异步处理,立即返回reportId
res.status(202).json({
reportId,
status: 'PROCESSING',
message: 'Analysis started'
});
// 异步处理分析任务
processAnalysisTask(reportId, req.body).catch(error => {
console.error(`Error processing analysis for ${reportId}:`, error);
updateAnalysisStatus(reportId, 'ERROR', { errorMessage: error.message });
});
} catch (error) {
console.error('Error handling request:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
// 获取分析结果
app.get('/api/sales/analysis/:reportId', async (req, res) => {
try {
const { reportId } = req.params;
// 从DynamoDB获取分析状态
const result = await dynamoDB.send(new GetItemCommand({
TableName: DYNAMODB_TABLE,
Key: { reportId: { S: reportId } }
}));
if (!result.Item) {
return res.status(404).json({ error: 'Report not found' });
}
// 解析DynamoDB响应
const item = unmarshallDbItem(result.Item);
// 根据状态返回不同响应
res.json({
reportId,
status: item.status,
...(item.status === 'COMPLETED' && {
summary: item.summary,
analysis: item.analysisResults,
queryParams: item.queryParams,
createdAt: item.createdAt,
expiresAt: item.ttl
}),
...(item.status === 'ERROR' && { error: item.errorMessage })
});
} catch (error) {
console.error('Error retrieving analysis:', error);
res.status(500).json({ error: 'Failed to retrieve analysis results' });
}
});
// 异步处理分析任务
async function processAnalysisTask(reportId, queryParams) {
try {
// 初始化分析状态
await initializeAnalysisStatus(reportId, queryParams);
// 连接数据库并执行查询
const connection = await mysql.createConnection(dbConfig);
const queryResults = await executeSalesQuery(connection, queryParams);
// 如需比较数据,获取比较期间数据
let comparisonResults = null;
if (queryParams.compareWith) {
const comparisonTimeRange = calculateComparisonPeriod(queryParams.timeRange, queryParams.compareWith);
const modifiedParams = { ...queryParams, timeRange: comparisonTimeRange };
comparisonResults = await executeSalesQuery(connection, modifiedParams);
}
connection.end();
// 准备分析数据
const rawData = {
primary: queryResults,
comparison: comparisonResults
};
// 计算数据摘要
const summary = prepareDataSummary(rawData);
// 更新DynamoDB中的原始数据和摘要
await updateRawDataAndSummary(reportId, rawData, summary);
// 生成分析提示并调用Bedrock
const prompt = generateAnalysisPrompt(rawData, queryParams, summary);
const analysisText = await callBedrock(prompt);
// 解析分析结果
const analysisResults = parseAnalysisResponse(analysisText);
// 更新分析结果到DynamoDB
await updateAnalysisResults(reportId, analysisResults);
} catch (error) {
console.error(`Analysis process failed for ${reportId}:`, error);
// 更新错误状态
await updateAnalysisStatus(reportId, 'ERROR', { errorMessage: error.message });
throw error;
}
}
// 调用Bedrock模型
async function callBedrock(prompt) {
const payload = JSON.stringify({
prompt: `\n\nHuman: ${prompt}\n\nAssistant:`,
max_tokens_to_sample: 4000,
temperature: 0.7
});
const command = new InvokeModelCommand({
modelId: 'anthropic.claude-v2',
body: payload
});
try {
const response = await bedrock.send(command);
const responseBody = JSON.parse(Buffer.from(response.body).toString());
return responseBody.completion;
} catch (error) {
console.error('Bedrock API call failed:', error);
throw new Error(`Failed to invoke Bedrock: ${error.message}`);
}
}
// 初始化分析状态
async function initializeAnalysisStatus(reportId, queryParams) {
const ttl = Math.floor(Date.now() / 1000) + 86400; // 24小时后过期
await dynamoDB.send(new PutItemCommand({
TableName: DYNAMODB_TABLE,
Item: {
reportId: { S: reportId },
queryParams: { S: JSON.stringify(queryParams) },
status: { S: 'PROCESSING' },
createdAt: { S: new Date().toISOString() },
ttl: { N: ttl.toString() }
}
}));
}
// 其他必要辅助函数...
// executeSalesQuery, prepareDataSummary, updateRawDataAndSummary,
// updateAnalysisResults, calculateComparisonPeriod, parseAnalysisResponse 等
// 实现逻辑与之前设计方案的Lambda函数类似
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
-
异步处理设计
- 使用异步处理避免长时间运行的请求阻塞服务器
- 状态更新和任务处理解耦
-
资源管理
- 设置适当的并发连接数限制
- 考虑使用工作队列(如Bull)管理分析任务
- 实现请求超时和取消机制
-
错误处理
- 实现全面的错误捕获和恢复机制
- 适当重试Bedrock API调用
- 记录详细错误日志便于调试
-
扩展设计
- 将分析处理逻辑与主HTTP服务器分离
- 考虑使用Node.js的cluster模块提高并发处理能力
-
IAM配置
- EC2实例需要配置IAM角色,授予以下权限:
- Bedrock模型调用权限
- DynamoDB表读写权限
- RDS MySQL连接权限
- EC2实例需要配置IAM角色,授予以下权限:
-
监控与日志
- 配置CloudWatch详细日志
- 设置关键指标监控(API响应时间、错误率等)
- 监控Bedrock API调用限额和使用情况
直接从EC2调用Bedrock是完全可行的方案,而且相比通过Lambda的方案,它具有架构更简单、延迟更低的优势。对于电商分析demo这种应用场景,EC2直接调用Bedrock足够满足需求,同时提供更好的灵活性和控制力。
为了处理长时间运行的分析任务,关键是实现良好的异步处理模式,避免阻塞Web服务器的主线程,同时确保分析任务的状态可追踪和错误可恢复。
修改Chrome插件的设计,改成一个Web应用。使用EC2直接调用 Amazon Bedrock。请你更新完整的设计文件
开发一款基于Web的"电商销售分析平台",帮助电商运营人员快速分析销售数据,识别增长或下降趋势并提供原因分析。该平台通过AI赋能,提供专业的销售洞察和建议,直观展现销售状况和发展方向。
- 销售数据查询:支持按时间范围、产品类别、销售渠道等维度查询销售数据
- 趋势分析:展示销售额、订单量、客单价等关键指标的趋势图表
- 原因分析:利用LLM分析销售增长或下降的可能原因
- 多维度比较:支持同比/环比分析,不同产品类别、渠道的对比
- 数据可视化:直观展示分析结果和关键指标
- 分析结果导出:支持将分析结果导出为PDF或CSV格式
- 安全访问控制:基于角色的访问控制,保护敏感数据
- 电商运营经理:需要全面了解销售状况,做出战略决策
- 产品经理:关注具体产品类别表现,优化产品组合
- 营销人员:评估营销活动效果,优化营销策略
- 系统管理员:管理用户权限,监控系统运行
用户浏览器 → EC2 Web应用 → RDS MySQL (销售数据)
↑ ↓
└───────── DynamoDB (临时分析结果) ← Amazon Bedrock LLM模型
- 前端:React.js + Redux + Chart.js + Material UI
- 后端:
- Web服务器:Node.js + Express.js
- 数据库:Amazon RDS MySQL (核心数据), Amazon DynamoDB (临时存储)
- AI服务:Amazon Bedrock (Claude模型)
- 服务器:Amazon EC2 + Auto Scaling + Load Balancer
- 缓存:Redis (可选,用于高频查询缓存)
- DevOps:
- 容器化:Docker
- CI/CD:AWS CodePipeline + CodeBuild
- 监控:CloudWatch + Prometheus
- 日志:CloudWatch Logs
核心电商数据表结构保持不变,包括:
-- 用户表
CREATE TABLE users (
user_id INT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
phone VARCHAR(20),
gender VARCHAR(10),
age INT,
city VARCHAR(50),
country VARCHAR(50),
registration_date DATE,
user_level VARCHAR(20),
is_active BOOLEAN,
user_source VARCHAR(50)
);
-- 商品分类表
CREATE TABLE product_categories (
category_id INT PRIMARY KEY,
category_name VARCHAR(100),
parent_category_id INT NULL,
category_level INT,
is_active BOOLEAN
);
-- 商品表
CREATE TABLE products (
product_id INT PRIMARY KEY,
product_name VARCHAR(200),
category_id INT,
brand VARCHAR(100),
supplier VARCHAR(100),
original_price DECIMAL(10, 2),
current_price DECIMAL(10, 2),
cost DECIMAL(10, 2),
stock_quantity INT,
create_time DATE,
is_active BOOLEAN,
FOREIGN KEY (category_id) REFERENCES product_categories(category_id)
);
-- 订单表
CREATE TABLE orders (
order_id INT PRIMARY KEY,
user_id INT,
order_date TIMESTAMP,
total_amount DECIMAL(10, 2),
discount_amount DECIMAL(10, 2),
payment_method VARCHAR(50),
payment_status VARCHAR(20),
shipping_address VARCHAR(200),
order_status VARCHAR(20),
order_source VARCHAR(50),
device_type VARCHAR(20),
FOREIGN KEY (user_id) REFERENCES users(user_id)
);
-- 订单详情表
CREATE TABLE order_items (
order_item_id INT PRIMARY KEY,
order_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL(10, 2),
discount DECIMAL(10, 2),
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
);
-- 营销活动表
CREATE TABLE marketing_campaigns (
campaign_id INT PRIMARY KEY,
campaign_name VARCHAR(100),
campaign_type VARCHAR(50),
start_date DATE,
end_date DATE,
budget DECIMAL(10, 2),
target_audience VARCHAR(100),
discount_type VARCHAR(50),
discount_value DECIMAL(10, 2),
is_active BOOLEAN
);
-- 订单营销关联表
CREATE TABLE order_campaign_map (
id INT PRIMARY KEY,
order_id INT,
campaign_id INT,
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (campaign_id) REFERENCES marketing_campaigns(campaign_id)
);
-- 平台用户表
CREATE TABLE platform_users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
full_name VARCHAR(100),
role ENUM('admin', 'analyst', 'viewer') NOT NULL DEFAULT 'viewer',
last_login TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
);
-- 用户访问令牌表
CREATE TABLE user_sessions (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
token VARCHAR(255) NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES platform_users(id)
);
表名: SalesAnalysisResults
主键:
- reportId (分区键): 唯一报告ID
属性:
- userId: 用户ID
- queryParams: 查询参数JSON
- rawData: 查询结果数据JSON
- analyzedData: 处理后的分析数据JSON
- analysisResults: LLM分析结果JSON
- createdAt: 创建时间戳
- ttl: 过期时间戳(3天后)
- status: 处理状态(PROCESSING, COMPLETED, ERROR)
-
登录/注册页面
- 用户认证
- 忘记密码功能
-
仪表板页面
- 关键业绩指标(KPI)概览
- 最近分析结果摘要
- 销售趋势简报
-
分析配置页面
- 查询参数设置(时间范围、产品类别等)
- 分析维度选择
- 比较期间设置
-
分析结果页面
- 交互式图表展示
- AI生成的分析洞察
- 详细数据表格
- 导出选项
-
历史报告页面
- 过去生成的分析报告列表
- 报告筛选和搜索功能
-
用户管理页面 (管理员专用)
- 用户账户管理
- 权限设置
- 状态管理:Redux用于全局状态管理
- 路由:React Router用于页面导航
- 图表:Chart.js和D3.js用于可视化
- UI框架:Material UI提供一致的用户界面体验
- 表格:React Table处理大数据集展示
- 表单:Formik + Yup处理表单验证
- HTTP客户端:Axios处理API请求
-
Express.js应用结构
- 路由层:处理API路由和请求验证
- 服务层:实现业务逻辑
- 数据层:处理数据库访问
- 中间件:认证、日志、错误处理
-
主要服务组件
- AuthService:用户认证和授权
- QueryService:销售数据查询和处理
- AnalysisService:数据分析和AI集成
- ReportService:报告生成和管理
-
并发处理
- 使用Worker Threads处理CPU密集型任务
- 任务队列系统处理长时间运行的分析任务
- PM2进程管理确保高可用性
- 认证API
POST /api/auth/login
POST /api/auth/register
POST /api/auth/logout
GET /api/auth/profile
- 销售分析API
POST /api/analysis/create
请求体:
{
"timeRange": {"start": "2023-01-01", "end": "2023-07-31"},
"dimensions": ["category", "channel"],
"metrics": ["sales", "orders", "aov"],
"filters": {
"categories": ["泳装", "配饰"],
"channels": ["Own Website", "Amazon"]
},
"compareWith": "previous_period"
}
GET /api/analysis/:reportId
响应:
{
"reportId": "550e8400-e29b-41d4-a716-446655440000",
"status": "COMPLETED",
"summary": {...},
"trends": [...],
"analysis": {...}
}
GET /api/analysis/history
DELETE /api/analysis/:reportId
- 数据访问API
GET /api/data/categories
GET /api/data/products
GET /api/data/channels
GET /api/data/metrics
- 提供配置分析所需的元数据
- 管理API
GET /api/admin/users
POST /api/admin/users
PUT /api/admin/users/:userId
DELETE /api/admin/users/:userId
- 数据准备:预处理查询结果,计算关键指标
- 提示工程:构建结构化提示,包含上下文和分析需求
- 模型调用:通过AWS SDK调用Bedrock API
- 结果解析:解析和结构化AI生成的内容
- 结果存储:将分析结果保存到DynamoDB
function generatePrompt(data, params) {
return `
作为电商数据分析专家,请分析以下销售数据并提供专业见解。
## 分析时间范围
${params.timeRange.start} 至 ${params.timeRange.end}
## 销售数据摘要
- 总销售额: $${data.summary.totalSales.toFixed(2)}
- 总订单数: ${data.summary.totalOrders}
- 平均订单价值: $${data.summary.averageOrderValue.toFixed(2)}
${data.summary.growthRate !== null ? `- 销售增长率: ${data.summary.growthRate.toFixed(2)}%` : ''}
## 详细销售数据
${JSON.stringify(data.primaryData, null, 2)}
${data.comparisonData ? `## 对比期间数据\n${JSON.stringify(data.comparisonData, null, 2)}` : ''}
## 分析要求
请提供以下分析:
1. 销售趋势分析:描述主要销售趋势,包括增长或下降的时间点和幅度。
2. 原因分析:分析销售增长或下降的可能原因。
3. 关键发现:提炼3-5个最重要的数据发现。
4. 行动建议:基于分析提供3-5条具体可行的建议。
请以JSON格式返回分析结果,包含以下字段:
{
"trendAnalysis": "销售趋势的详细分析...",
"causalFactors": [
{"factor": "因素1", "impact": "高/中/低", "description": "详细描述..."},
...
],
"keyInsights": ["洞察1", "洞察2", ...],
"recommendations": ["建议1", "建议2", ...]
}
`;
}
- 用户访问Web应用登录页面
- 输入用户名和密码提交登录
- 服务器验证凭据,生成JWT令牌
- 返回令牌给前端,前端存储在localStorage
- 后续请求在Authorization头部携带令牌
-
分析配置阶段
- 用户在分析配置页面设置查询条件
- 选择分析维度、指标和筛选条件
- 选择可选的比较期间
- 提交分析请求
-
数据查询处理阶段
- EC2服务器接收请求并验证
- 生成唯一的reportId
- 异步启动分析任务
- 返回reportId和状态给前端
- 前端开始轮询分析状态
-
数据分析阶段
- EC2服务器连接MySQL查询相关销售数据
- 处理和聚合查询结果
- 计算关键指标和趋势
- 准备AI分析所需的结构化数据
- 调用Amazon Bedrock API进行深度分析
- 接收和解析AI生成的分析结果
- 将结果存储到DynamoDB
-
结果展示阶段
- 前端轮询或WebSocket接收分析完成通知
- 获取完整分析结果
- 使用Chart.js渲染交互式图表
- 展示AI生成的洞察和建议
- 提供数据下钻和交互功能
-
后续操作
- 用户可保存或导出报告
- 可调整参数重新分析
- 可与团队分享分析结果
-
临时数据
- 分析结果在DynamoDB中保存3天
- 通过TTL属性自动过期并删除
- 用户可手动删除不需要的分析结果
-
历史报告
- 用户可将重要分析保存为永久报告
- 永久报告存储在RDS中的reports表
- 可随时通过历史报告页面访问
// app.js - 主应用文件
const express = require('express');
const cors = require('cors');
const path = require('path');
const morgan = require('morgan');
const helmet = require('helmet');
const authRoutes = require('./routes/auth');
const analysisRoutes = require('./routes/analysis');
const dataRoutes = require('./routes/data');
const adminRoutes = require('./routes/admin');
const { authenticateJWT } = require('./middleware/auth');
const errorHandler = require('./middleware/errorHandler');
// 初始化Express应用
const app = express();
// 中间件
app.use(helmet()); // 安全头部
app.use(cors()); // 跨域支持
app.use(morgan('combined')); // 请求日志
app.use(express.json()); // JSON请求体解析
// 静态文件服务 - React应用
app.use(express.static(path.join(__dirname, 'client/build')));
// API路由
app.use('/api/auth', authRoutes);
app.use('/api/analysis', authenticateJWT, analysisRoutes);
app.use('/api/data', authenticateJWT, dataRoutes);
app.use('/api/admin', authenticateJWT, adminRoutes);
// 所有其他请求返回React应用
app.get('*', (req, res) => {
res.sendFile(path.join(__dirname, 'client/build/index.html'));
});
// 错误处理中间件
app.use(errorHandler);
// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
module.exports = app;
// services/analysisService.js
const mysql = require('mysql2/promise');
const { DynamoDBClient, PutItemCommand, GetItemCommand, UpdateItemCommand } = require('@aws-sdk/client-dynamodb');
const { BedrockRuntimeClient, InvokeModelCommand } = require('@aws-sdk/client-bedrock-runtime');
const { v4: uuidv4 } = require('uuid');
const config = require('../config');
// 初始化客户端
const dynamoDB = new DynamoDBClient({ region: config.AWS_REGION });
const bedrock = new BedrockRuntimeClient({ region: config.AWS_REGION });
// 数据库连接池
const pool = mysql.createPool({
host: config.DB_HOST,
user: config.DB_USER,
password: config.DB_PASSWORD,
database: config.DB_NAME,
waitForConnections: true,
connectionLimit: 10
});
class AnalysisService {
// 创建新的分析任务
async createAnalysis(userId, queryParams) {
const reportId = uuidv4();
const now = new Date();
const ttl = Math.floor(now.getTime() / 1000) + (3 * 24 * 60 * 60); // 3天后过期
try {
// 初始化分析记录
await dynamoDB.send(new PutItemCommand({
TableName: 'SalesAnalysisResults',
Item: {
reportId: { S: reportId },
userId: { S: userId.toString() },
queryParams: { S: JSON.stringify(queryParams) },
status: { S: 'PROCESSING' },
createdAt: { S: now.toISOString() },
ttl: { N: ttl.toString() }
}
}));
// 异步执行分析任务
this.processAnalysis(reportId, queryParams, userId).catch(error => {
console.error(`Analysis error for ${reportId}:`, error);
this.updateStatus(reportId, 'ERROR', { errorMessage: error.message });
});
return { reportId, status: 'PROCESSING' };
} catch (error) {
console.error('Failed to create analysis task:', error);
throw new Error('Failed to create analysis task');
}
}
// 获取分析结果
async getAnalysis(reportId) {
try {
const result = await dynamoDB.send(new GetItemCommand({
TableName: 'SalesAnalysisResults',
Key: { reportId: { S: reportId } }
}));
if (!result.Item) {
throw new Error('Analysis report not found');
}
// 将DynamoDB格式转换为JSON对象
const item = this.unmarshallItem(result.Item);
return item;
} catch (error) {
console.error(`Failed to retrieve analysis ${reportId}:`, error);
throw error;
}
}
// 处理分析任务
async processAnalysis(reportId, queryParams, userId) {
try {
// 从MySQL查询销售数据
const queryResults = await this.executeSalesQuery(queryParams);
// 如果需要比较数据,获取比较期间的数据
let comparisonResults = null;
if (queryParams.compareWith) {
const comparisonTimeRange = this.calculateComparisonPeriod(
queryParams.timeRange,
queryParams.compareWith
);
const comparisonParams = { ...queryParams, timeRange: comparisonTimeRange };
comparisonResults = await this.executeSalesQuery(comparisonParams);
}
// 准备数据
const rawData = {
primaryData: queryResults,
comparisonData: comparisonResults
};
// 计算数据摘要
const summary = this.prepareDataSummary(rawData);
// 更新原始数据和摘要
await this.updateRawDataAndSummary(reportId, rawData, summary);
// 生成分析提示
const prompt = this.generateAnalysisPrompt(rawData, queryParams, summary);
// 调用Bedrock获取分析
const analysisText = await this.callBedrockModel(prompt);
// 解析分析结果
const analysisResults = this.parseAnalysisResponse(analysisText);
// 更新分析结果
await this.updateAnalysisResults(reportId, analysisResults);
return { reportId, status: 'COMPLETED' };
} catch (error) {
console.error(`Analysis processing failed for ${reportId}:`, error);
await this.updateStatus(reportId, 'ERROR', { errorMessage: error.message });
throw error;
}
}
// 调用Bedrock模型
async callBedrockModel(prompt) {
try {
const payload = JSON.stringify({
prompt: `\n\nHuman: ${prompt}\n\nAssistant:`,
max_tokens_to_sample: 4000,
temperature: 0.7
});
const command = new InvokeModelCommand({
modelId: 'anthropic.claude-v2',
body: payload
});
const response = await bedrock.send(command);
const responseBody = JSON.parse(Buffer.from(response.body).toString());
return responseBody.completion;
} catch (error) {
console.error('Bedrock API call failed:', error);
throw new Error(`Failed to invoke Bedrock: ${error.message}`);
}
}
// 执行销售查询
async executeSalesQuery(queryParams) {
// 实现SQL查询逻辑,类似之前设计的Lambda函数中的查询功能
// 查询实现略...
}
// 更新分析状态
async updateStatus(reportId, status, additionalData = {}) {
// 实现DynamoDB更新逻辑
// 实现略...
}
// 其他辅助方法: prepareDataSummary, generateAnalysisPrompt,
// parseAnalysisResponse, calculateComparisonPeriod, updateRawDataAndSummary, etc.
// 方法实现略...
}
module.exports = new AnalysisService();
// services/authService.js
const bcrypt = require('bcryptjs');
const jwt = require('jsonwebtoken');
const mysql = require('mysql2/promise');
const config = require('../config');
// 数据库连接池
const pool = mysql.createPool({
host: config.DB_HOST,
user: config.DB_USER,
password: config.DB_PASSWORD,
database: config.DB_NAME,
waitForConnections: true,
connectionLimit: 10
});
class AuthService {
async login(username, password) {
try {
const connection = await pool.getConnection();
try {
// 查找用户
const [users] = await connection.query(
'SELECT id, username, password_hash, role FROM platform_users WHERE username = ? AND is_active = true',
[username]
);
if (users.length === 0) {
throw new Error('Invalid username or password');
}
const user = users[0];
// 验证密码
const isPasswordValid = await bcrypt.compare(password, user.password_hash);
if (!isPasswordValid) {
throw new Error('Invalid username or password');
}
// 生成JWT令牌
const token = jwt.sign(
{
id: user.id,
username: user.username,
role: user.role
},
config.JWT_SECRET,
{ expiresIn: '24h' }
);
// 更新最后登录时间
await connection.query(
'UPDATE platform_users SET last_login = NOW() WHERE id = ?',
[user.id]
);
return {
token,
user: {
id: user.id,
username: user.username,
role: user.role
}
};
} finally {
connection.release();
}
} catch (error) {
console.error('Login error:', error);
throw error;
}
}
async register(userData) {
// 实现用户注册逻辑
// 实现略...
}
async getUserProfile(userId) {
// 获取用户资料
// 实现略...
}
}
module.exports = new AuthService();
src/
├── components/
│ ├── common/
│ │ ├── Header.js
│ │ ├── Sidebar.js
│ │ ├── Footer.js
│ │ ├── Loading.js
│ │ └── ErrorBoundary.js
│ ├── auth/
│ │ ├── LoginForm.js
│ │ └── RegisterForm.js
│ ├── dashboard/
│ │ ├── Dashboard.js
│ │ ├── KpiCard.js
│ │ └── RecentAnalyses.js
│ ├── analysis/
│ │ ├── AnalysisForm.js
│ │ ├── AnalysisResult.js
│ │ ├── charts/
│ │ │ ├── SalesTrendChart.js
│ │ │ ├── CategoryComparisonChart.js
│ │ │ └── ChannelDistributionChart.js
│ │ └── insights/
│ │ ├── TrendAnalysis.js
│ │ ├── CausalFactors.js
│ │ └── Recommendations.js
│ └── admin/
│ └── UserManagement.js
├── redux/
│ ├── store.js
│ ├── slices/
│ │ ├── authSlice.js
│ │ ├── analysisSlice.js
│ │ └── uiSlice.js
│ └── thunks/
│ ├── authThunks.js
│ └── analysisThunks.js
├── services/
│ ├── api.js
│ ├── authService.js
│ └── analysisService.js
├── utils/
│ ├── dateUtils.js
│ ├── formatters.js
│ └── validators.js
├── hooks/
│ ├── useAuth.js
│ └── useAnalysis.js
├── pages/
│ ├── LoginPage.js
│ ├── DashboardPage.js
│ ├── AnalysisPage.js
│ ├── ResultsPage.js
│ ├── HistoryPage.js
│ └── AdminPage.js
├── App.js
└── index.js
// redux/slices/analysisSlice.js
import { createSlice } from '@reduxjs/toolkit';
const initialState = {
currentAnalysis: null,
analysisHistory: [],
isLoading: false,
error: null,
status: 'idle', // 'idle' | 'loading' | 'succeeded' | 'failed'
};
const analysisSlice = createSlice({
name: 'analysis',
initialState,
reducers: {
startAnalysis(state) {
state.isLoading = true;
state.status = 'loading';
state.error = null;
},
analysisSuccess(state, action) {
state.isLoading = false;
state.status = 'succeeded';
state.currentAnalysis = action.payload;
},
analysisFailure(state, action) {
state.isLoading = false;
state.status = 'failed';
state.error = action.payload;
},
setAnalysisHistory(state, action) {
state.analysisHistory = action.payload;
},
resetAnalysis(state) {
state.currentAnalysis = null;
state.status = 'idle';
state.error = null;
}
},
});
export const {
startAnalysis,
analysisSuccess,
analysisFailure,
setAnalysisHistory,
resetAnalysis,
} = analysisSlice.actions;
export default analysisSlice.reducer;
// services/analysisService.js
import api from './api';
export const analysisService = {
createAnalysis: async (queryParams) => {
const response = await api.post('/api/analysis/create', queryParams);
return response.data;
},
getAnalysis: async (reportId) => {
const response = await api.get(`/api/analysis/${reportId}`);
return response.data;
},
getHistory: async () => {
const response = await api.get('/api/analysis/history');
return response.data;
},
deleteAnalysis: async (reportId) => {
const response = await api.delete(`/api/analysis/${reportId}`);
return response.data;
}
};
推荐配置:
- 实例类型: t3.medium (初始阶段)
- AMI: Amazon Linux 2023
- 自动扩展组: 最小1台,最大3台
- 负载均衡器: Application Load Balancer
-
准备阶段
- 创建VPC, 子网, 安全组
- 设置RDS MySQL实例
- 创建DynamoDB表
-
应用部署
- 创建EC2启动模板
- 设置EC2用户数据脚本安装依赖
- 配置自动扩展组和负载均衡器
- 部署应用代码到EC2实例
-
持续集成/持续部署
- 配置AWS CodePipeline
- 设置源代码仓库连接
- 配置自动化构建和部署
#!/bin/bash
# EC2用户数据脚本
# 更新系统
yum update -y
# 安装Node.js
curl -sL https://rpm.nodesource.com/setup_18.x | bash -
yum install -y nodejs
# 安装Git
yum install -y git
# 安装PM2进程管理器
npm install pm2 -g
# 创建应用目录
mkdir -p /var/www/sales-analysis
# 克隆应用代码
git clone https://github.com/your-repo/sales-analysis-app.git /var/www/sales-analysis
# 切换到应用目录
cd /var/www/sales-analysis
# 安装依赖
npm install --production
# 设置环境变量
cat > .env << EOL
PORT=3000
DB_HOST=${DB_HOST}
DB_USER=${DB_USER}
DB_PASSWORD=${DB_PASSWORD}
DB_NAME=${DB_NAME}
JWT_SECRET=${JWT_SECRET}
AWS_REGION=${AWS_REGION}
EOL
# 启动应用
pm2 start app.js --name "sales-analysis" --watch
pm2 startup
pm2 save
# 设置Nginx反向代理
yum install -y nginx
cat > /etc/nginx/conf.d/sales-analysis.conf << EOL
server {
listen 80;
server_name _;
location / {
proxy_pass http://127.0.0.1:3000;
proxy_http_version 1.1;
proxy_set_header Upgrade \$http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host \$host;
proxy_cache_bypass \$http_upgrade;
}
}
EOL
# 启动Nginx
systemctl enable nginx
systemctl start nginx
- 传输加密:使用HTTPS加密所有通信
- 存储加密:RDS和DynamoDB数据加密存储
- 临时数据:分析结果3天后自动删除
- 密码安全:使用bcrypt哈希存储密码
- JWT认证:所有API请求需要有效的JWT令牌
- 基于角色的访问控制:admin、analyst和viewer角色
- API权限管理:根据用户角色限制API访问
- 输入验证:验证所有输入参数
- 防XSS:使用helmet中间件设置安全头部
- 防CSRF:实现防跨站请求伪造保护
- 安全依赖:定期更新依赖解决安全漏洞
- EC2安全组:限制入站流量
- IAM角色:使用最小权限原则
- VPC设计:数据库放置在私有子网
- AWS WAF:防止常见Web攻击
- 应用监控:PM2和CloudWatch监控应用性能
- 基础设施监控:EC2、RDS和DynamoDB指标监控
- 日志管理:集中日志存储和分析
- 告警设置:关键指标异常告警
- 定期备份:RDS数据库定期备份
- 更新计划:定期更新依赖和操作系统
- 扩容计划:根据使用情况调整资源配置
- 性能优化:定期审查性能瓶颈
本设计提供了一个完整的电商销售分析平台,通过Web应用形式呈现,使用EC2直接处理数据查询和调用Amazon Bedrock进行AI分析。系统架构简洁高效,确保了数据安全和用户体验。
- 多语言支持:添加多语言界面和分析报告
- 高级分析:集成预测分析和异常检测功能
- 数据集成:增加与Google Analytics、Facebook等平台的数据集成
- 自定义仪表板:允许用户自定义分析仪表板
- 移动应用:开发配套移动应用提高访问便利性
此设计方案提供了电商销售分析平台的完整实现框架,从前端页面到后端服务,从数据库设计到Bedrock集成,全面覆盖了系统各个方面。方案基于EC2直接处理和调用AI服务的架构,简化了系统复杂度,提高了性能和可控性,同时确保了良好的扩展性和安全性。