×

🏢 ERP:企业资源规划的“数字大脑”(附 Python 模拟实现)

万邦科技Lex 万邦科技Lex 发表于2026-05-21 15:14:05 浏览18 评论0

抢沙发发表评论

ERP 是企业资源规划系统的简称,但它的本质远不止一个软件——它是企业的“数字中枢神经系统”。让我用一个简单的比喻帮你理解:如果把企业比作人体,ERP 就是大脑+神经系统,负责协调所有器官(部门)的工作。

一、 ERP 的核心本质:从“信息孤岛”到“数据统一”

ERP 解决的三大核心痛点:

痛点
无 ERP 时
有 ERP 后
数据孤岛
销售不知道库存,财务不知道采购
所有数据实时同步
流程割裂
订单→生产→发货要跑5个部门
全流程自动化流转
决策盲目
靠 Excel 和“感觉”做决策
数据驱动的智能决策

ERP 的“三层蛋糕”结构:

应用层(用户看到)    销售、采购、生产、财务、HR
    ↓
业务层(逻辑核心)    订单处理、库存管理、生产排程
    ↓
数据层(基础底座)    客户、产品、供应商、BOM

二、 ERP 六大核心模块详解

ERP 通常包含几十个模块,但最核心的是这六个:

1. 销售与分销(SD)

作用:从报价到回款的完整销售流程管理
# sales_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class OrderStatus(Enum):
    QUOTATION = "quotation"      # 报价
    CONFIRMED = "confirmed"      # 已确认
    IN_PRODUCTION = "in_production"  # 生产中
    SHIPPED = "shipped"          # 已发货
    DELIVERED = "delivered"      # 已交付
    INVOICED = "invoiced"        # 已开票
    PAID = "paid"                # 已付款

@dataclass
class Customer:
    """客户主数据"""
    code: str
    name: str
    credit_limit: float  # 信用额度
    payment_terms: int   # 付款条件(天数)
    current_balance: float = 0.0

@dataclass
class SalesOrderItem:
    """销售订单行项目"""
    product_code: str
    quantity: int
    unit_price: float
    delivery_date: datetime
    
    @property
    def line_total(self):
        return self.quantity * self.unit_price

class SalesOrder:
    """销售订单(ERP核心对象)"""
    
    def __init__(self, order_number: str, customer: Customer):
        self.order_number = order_number
        self.customer = customer
        self.order_date = datetime.now()
        self.status = OrderStatus.QUOTATION
        self.items: List[SalesOrderItem] = []
        self.delivery_address: str = ""
        self.created_at = datetime.now()
        self.updated_at = datetime.now()
        
    def add_item(self, product_code: str, quantity: int, 
                 unit_price: float, delivery_date: datetime):
        """添加订单行"""
        # 检查库存可用性(调用库存模块)
        if not self.check_inventory(product_code, quantity):
            raise ValueError(f"产品 {product_code} 库存不足")
        
        item = SalesOrderItem(product_code, quantity, unit_price, delivery_date)
        self.items.append(item)
        self.updated_at = datetime.now()
        
        print(f"✅ 添加订单行: {product_code} x{quantity}")
        
    def confirm_order(self):
        """确认订单(触发后续流程)"""
        if self.status != OrderStatus.QUOTATION:
            raise ValueError("只有报价单才能确认")
        
        # 检查信用额度
        order_total = self.get_total_amount()
        if order_total + self.customer.current_balance > self.customer.credit_limit:
            raise ValueError(f"超出信用额度 {self.customer.credit_limit}")
        
        self.status = OrderStatus.CONFIRMED
        self.updated_at = datetime.now()
        
        # 触发后续流程
        self.trigger_downstream_processes()
        
        print(f"📋 订单 {self.order_number} 已确认,总额: ¥{order_total:,.2f}")
        
    def get_total_amount(self) -> float:
        """计算订单总额"""
        return sum(item.line_total for item in self.items)
    
    def check_inventory(self, product_code: str, quantity: int) -> bool:
        """检查库存(模拟调用库存模块)"""
        # 实际中这里会调用库存管理模块
        inventory = {
            "P001": 100,  # 产品编码 -> 可用库存
            "P002": 50,
            "P003": 200
        }
        return inventory.get(product_code, 0) >= quantity
    
    def trigger_downstream_processes(self):
        """触发下游流程"""
        # 1. 创建生产计划
        self.create_production_order()
        
        # 2. 预留库存
        self.reserve_inventory()
        
        # 3. 安排发货
        self.plan_delivery()
        
        print("🚀 已触发生产、库存、发货流程")
    
    def create_production_order(self):
        """创建生产订单(调用生产模块)"""
        for item in self.items:
            # 这里会调用生产模块的API
            print(f"🏭 为 {item.product_code} 创建生产计划")
    
    def reserve_inventory(self):
        """预留库存(调用库存模块)"""
        for item in self.items:
            # 这里会调用库存模块的API
            print(f"📦 预留库存: {item.product_code} x{item.quantity}")
    
    def plan_delivery(self):
        """安排发货(调用物流模块)"""
        earliest_date = min(item.delivery_date for item in self.items)
        print(f"🚚 安排发货,最早交付: {earliest_date.strftime('%Y-%m-%d')}")
    
    def to_dict(self):
        """转换为字典(用于API或存储)"""
        return {
            "order_number": self.order_number,
            "customer": self.customer.name,
            "status": self.status.value,
            "order_date": self.order_date.strftime("%Y-%m-%d"),
            "total_amount": self.get_total_amount(),
            "items": [
                {
                    "product": item.product_code,
                    "quantity": item.quantity,
                    "price": item.unit_price,
                    "line_total": item.line_total
                }
                for item in self.items
            ],
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat()
        }

# 使用示例
if __name__ == "__main__":
    print("🎯 ERP 销售模块演示")
    print("-" * 40)
    
    # 创建客户
    customer = Customer("C001", "阿里巴巴集团", credit_limit=1000000, payment_terms=30)
    
    # 创建销售订单
    order = SalesOrder("SO20240520001", customer)
    
    # 添加产品
    order.add_item("P001", 10, 2999.00, datetime.now() + timedelta(days=7))
    order.add_item("P002", 5, 599.00, datetime.now() + timedelta(days=10))
    
    # 确认订单
    try:
        order.confirm_order()
    except ValueError as e:
        print(f"❌ 确认失败: {e}")
    
    # 查看订单详情
    print("\n📄 订单详情:")
    import json
    print(json.dumps(order.to_dict(), indent=2, ensure_ascii=False))

2. 物料管理(MM)

作用:采购、库存、供应商管理
# material_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Dict, List
import heapq
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class InventoryItem:
    """库存物料"""
    
    def __init__(self, material_code: str, description: str, 
                 unit: str, unit_cost: float):
        self.material_code = material_code
        self.description = description
        self.unit = unit
        self.unit_cost = unit_cost
        self.batches = []  # 批次库存
        
    def add_batch(self, quantity: int, purchase_date: datetime, 
                  expiry_date: datetime = None):
        """添加库存批次(先进先出)"""
        batch = {
            'quantity': quantity,
            'purchase_date': purchase_date,
            'expiry_date': expiry_date,
            'remaining': quantity
        }
        self.batches.append(batch)
        # 按采购日期排序(FIFO)
        self.batches.sort(key=lambda x: x['purchase_date'])
        
    def consume(self, quantity: int) -> float:
        """消耗库存(先进先出)返回成本"""
        if self.get_available_qty() < quantity:
            raise ValueError(f"库存不足: 需要 {quantity}, 可用 {self.get_available_qty()}")
        
        total_cost = 0.0
        remaining = quantity
        
        for batch in self.batches:
            if remaining <= 0:
                break
                
            if batch['remaining'] > 0:
                consume_qty = min(batch['remaining'], remaining)
                batch['remaining'] -= consume_qty
                remaining -= consume_qty
                total_cost += consume_qty * self.unit_cost
                
        return total_cost
    
    def get_available_qty(self) -> int:
        """获取可用数量"""
        return sum(batch['remaining'] for batch in self.batches)
    
    def get_value(self) -> float:
        """获取库存价值"""
        return self.get_available_qty() * self.unit_cost

class MRPPlanner:
    """物料需求计划(MRP)计算器"""
    
    def __init__(self):
        self.bom = {}  # 物料清单 {成品: [(物料, 数量)]}
        
    def add_bom(self, finished_product: str, components: List[tuple]):
        """添加物料清单"""
        self.bom[finished_product] = components
    
    def calculate_requirements(self, product: str, quantity: int, 
                              lead_time: int = 0) -> Dict:
        """
        计算物料需求
        
        Args:
            product: 要生产的产品
            quantity: 需求数量
            lead_time: 提前期(天)
            
        Returns:
            各物料的需求量
        """
        requirements = {}
        self._calculate_recursive(product, quantity, requirements)
        
        # 计算需求日期
        requirement_date = datetime.now() + timedelta(days=lead_time)
        
        return {
            'product': product,
            'quantity': quantity,
            'requirement_date': requirement_date.strftime('%Y-%m-%d'),
            'material_requirements': requirements
        }
    
    def _calculate_recursive(self, product: str, quantity: int, 
                           requirements: Dict, level: int = 0):
        """递归计算物料需求"""
        indent = "  " * level
        
        if product not in self.bom:
            # 原材料
            requirements[product] = requirements.get(product, 0) + quantity
            print(f"{indent}📦 {product}: {quantity}")
            return
        
        # 半成品,需要展开
        print(f"{indent}🔧 {product} (需要 {quantity}):")
        for component, comp_qty in self.bom[product]:
            total_needed = comp_qty * quantity
            self._calculate_recursive(component, total_needed, requirements, level + 1)

class PurchaseRequisition:
    """采购申请"""
    
    def __init__(self, pr_number: str):
        self.pr_number = pr_number
        self.items = []  # [(物料, 数量, 需求日期)]
        self.status = "draft"
        self.created_at = datetime.now()
        
    def add_item(self, material: str, quantity: int, required_date: datetime):
        self.items.append((material, quantity, required_date))
        
    def approve(self):
        self.status = "approved"
        print(f"✅ 采购申请 {self.pr_number} 已批准")
        return self.generate_po()
    
    def generate_po(self) -> str:
        """生成采购订单"""
        po_number = f"PO{datetime.now().strftime('%Y%m%d%H%M%S')}"
        print(f"📄 生成采购订单 {po_number}")
        return po_number

# 使用示例
if __name__ == "__main__":
    print("\n📦 ERP 物料管理模块演示")
    print("=" * 40)
    
    # 1. 库存管理
    print("\n1. 库存管理(FIFO):")
    item = InventoryItem("RAW001", "铝合金板", "KG", 50.0)
    
    # 添加不同批次
    item.add_batch(100, datetime(2024, 5, 1))
    item.add_batch(200, datetime(2024, 5, 10))
    item.add_batch(150, datetime(2024, 5, 20))
    
    print(f"初始库存: {item.get_available_qty()} KG")
    
    # 消耗 120 KG
    cost = item.consume(120)
    print(f"消耗 120KG 后:")
    print(f"  可用库存: {item.get_available_qty()} KG")
    print(f"  消耗成本: ¥{cost:,.2f}")
    print(f"  库存价值: ¥{item.get_value():,.2f}")
    
    # 2. MRP 计算
    print("\n2. MRP 物料需求计划:")
    planner = MRPPlanner()
    
    # 定义BOM(物料清单)
    planner.add_bom("成品A", [("半成品B", 2), ("包装材料", 1)])
    planner.add_bom("半成品B", [("原材料X", 3), ("原材料Y", 2)])
    
    # 计算生产100个成品A的需求
    requirements = planner.calculate_requirements("成品A", 100, lead_time=7)
    print(f"\n需求汇总:")
    for material, qty in requirements['material_requirements'].items():
        print(f"  {material}: {qty}")
    
    # 3. 采购申请
    print("\n3. 采购流程:")
    pr = PurchaseRequisition("PR20240520001")
    pr.add_item("原材料X", 600, datetime.now() + timedelta(days=5))
    pr.add_item("原材料Y", 400, datetime.now() + timedelta(days=5))
    
    po_number = pr.approve()
    print(f"生成的采购订单: {po_number}")

3. 生产计划(PP)

作用:生产订单、排程、报工
# production_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict
import json
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
@dataclass
class WorkCenter:
    """工作中心(生产线)"""
    code: str
    name: str
    capacity_hours: float  # 日产能(小时)
    setup_time: float      # 换线时间(小时)
    hourly_cost: float     # 小时成本
    
    def __post_init__(self):
        self.schedule = []  # 排程计划
        self.available_hours = self.capacity_hours  # 剩余可用时间

class ProductionOrder:
    """生产订单"""
    
    def __init__(self, order_number: str, material: str, quantity: int):
        self.order_number = order_number
        self.material = material
        self.quantity = quantity
        self.status = "created"
        self.created_date = datetime.now()
        self.start_date = None
        self.end_date = None
        self.routing = []  # 工艺路线
        self.actual_output = 0
        self.scrap_qty = 0
        
    def add_routing_step(self, work_center: WorkCenter, 
                        operation_time: float, sequence: int):
        """添加工序"""
        self.routing.append({
            'sequence': sequence,
            'work_center': work_center.code,
            'operation': f"OP{sequence:02d}",
            'std_time': operation_time,  # 标准工时(小时/件)
            'total_time': operation_time * self.quantity,
            'status': 'pending'
        })
        # 按工序排序
        self.routing.sort(key=lambda x: x['sequence'])
    
    def calculate_lead_time(self) -> float:
        """计算总生产时间"""
        return sum(step['total_time'] for step in self.routing)
    
    def schedule_production(self, start_date: datetime):
        """安排生产计划"""
        self.start_date = start_date
        current_date = start_date
        
        for step in self.routing:
            # 计算工序所需天数
            days_needed = step['total_time'] / 8  # 按8小时工作制
            step['planned_start'] = current_date
            step['planned_end'] = current_date + timedelta(days=days_needed)
            
            # 更新当前日期(考虑工序间等待时间)
            current_date = step['planned_end'] + timedelta(days=0.5)
        
        self.end_date = self.routing[-1]['planned_end']
        self.status = "scheduled"
        
    def report_progress(self, operation_seq: int, 
                       good_qty: int, scrap_qty: int = 0):
        """报工"""
        for step in self.routing:
            if step['sequence'] == operation_seq:
                step['status'] = 'completed'
                step['actual_good'] = good_qty
                step['actual_scrap'] = scrap_qty
                break
        
        self.actual_output += good_qty
        self.scrap_qty += scrap_qty
        
        # 检查是否全部完成
        if all(step['status'] == 'completed' for step in self.routing):
            self.status = "completed"
            self.end_date = datetime.now()
            
    def get_efficiency(self) -> float:
        """计算生产效率"""
        if self.actual_output == 0:
            return 0.0
        planned_output = self.quantity - self.scrap_qty
        return self.actual_output / planned_output
    
    def to_gantt_data(self) -> List[Dict]:
        """生成甘特图数据"""
        gantt_data = []
        for step in self.routing:
            gantt_data.append({
                'Task': f"{self.order_number}-{step['operation']}",
                'Start': step['planned_start'].strftime('%Y-%m-%d'),
                'End': step['planned_end'].strftime('%Y-%m-%d'),
                'Resource': step['work_center']
            })
        return gantt_data

class ProductionScheduler:
    """生产排程器"""
    
    def __init__(self):
        self.work_centers: Dict[str, WorkCenter] = {}
        self.orders: List[ProductionOrder] = []
        
    def add_work_center(self, wc: WorkCenter):
        self.work_centers[wc.code] = wc
    
    def add_order(self, order: ProductionOrder):
        self.orders.append(order)
        
    def schedule_all(self):
        """安排所有订单的生产计划"""
        # 按优先级排序(这里简化按创建时间)
        self.orders.sort(key=lambda x: x.created_date)
        
        current_date = datetime.now()
        
        for order in self.orders:
            if order.status == "created":
                # 检查物料可用性
                if self.check_material_availability(order):
                    order.schedule_production(current_date)
                    
                    # 更新下一个订单的开始时间
                    if order.end_date:
                        current_date = order.end_date + timedelta(days=1)
                    
                    print(f"📅 安排订单 {order.order_number}: "
                          f"{order.start_date.strftime('%m-%d')} 到 "
                          f"{order.end_date.strftime('%m-%d')}")
                else:
                    print(f"⚠️ 订单 {order.order_number} 物料不足,无法安排")
    
    def check_material_availability(self, order: ProductionOrder) -> bool:
        """检查物料可用性(简化)"""
        # 实际中会调用库存模块
        return True

# 使用示例
if __name__ == "__main__":
    print("\n🏭 ERP 生产计划模块演示")
    print("=" * 40)
    
    # 创建工作中心
    wc1 = WorkCenter("WC001", "冲压车间", 16.0, 1.0, 200.0)
    wc2 = WorkCenter("WC002", "组装线", 20.0, 0.5, 150.0)
    wc3 = WorkCenter("WC003", "喷漆车间", 12.0, 2.0, 180.0)
    
    # 创建生产订单
    order1 = ProductionOrder("PO001", "成品A", 100)
    order1.add_routing_step(wc1, 0.5, 1)  # 冲压 0.5小时/件
    order1.add_routing_step(wc2, 0.8, 2)  # 组装 0.8小时/件
    order1.add_routing_step(wc3, 0.3, 3)  # 喷漆 0.3小时/件
    
    # 计算生产时间
    total_time = order1.calculate_lead_time()
    print(f"订单 {order1.order_number} 总工时: {total_time:.1f} 小时")
    print(f"标准生产周期: {total_time/8:.1f} 天 (按8小时/天)")
    
    # 排程
    scheduler = ProductionScheduler()
    scheduler.add_work_center(wc1)
    scheduler.add_work_center(wc2)
    scheduler.add_work_center(wc3)
    scheduler.add_order(order1)
    scheduler.schedule_all()
    
    # 模拟报工
    print("\n📊 生产进度跟踪:")
    order1.report_progress(1, 100, 5)  # 工序1完成,100良品,5废品
    order1.report_progress(2, 95, 3)   # 工序2完成
    order1.report_progress(3, 92, 1)   # 工序3完成
    
    print(f"生产完成! 状态: {order1.status}")
    print(f"良品数量: {order1.actual_output}")
    print(f"废品数量: {order1.scrap_qty}")
    print(f"生产效率: {order1.get_efficiency():.1%}")
    
    # 生成甘特图数据
    gantt_data = order1.to_gantt_data()
    print(f"\n📅 甘特图数据:")
    print(json.dumps(gantt_data, indent=2, ensure_ascii=False))

4. 财务会计(FI)

作用:总账、应收应付、固定资产
# finance_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
import json

class AccountType(Enum):
    ASSET = "asset"           # 资产
    LIABILITY = "liability"   # 负债
    EQUITY = "equity"         # 所有者权益
    REVENUE = "revenue"       # 收入
    EXPENSE = "expense"       # 费用

class TransactionType(Enum):
    DEBIT = "debit"    # 借方
    CREDIT = "credit"  # 贷方

@dataclass
class Account:
    """会计科目"""
    code: str
    name: str
    type: AccountType
    parent_code: str = None
    
    def __post_init__(self):
        self.balance = 0.0
        self.transactions: List[Transaction] = []

@dataclass
class Transaction:
    """会计凭证分录"""
    account_code: str
    amount: float
    type: TransactionType
    date: datetime
    description: str
    reference: str  # 参考凭证号

class GeneralLedger:
    """总账系统"""
    
    def __init__(self):
        self.accounts: Dict[str, Account] = {}
        self.transactions: List[Transaction] = []
        self.period_start: datetime = None
        self.period_end: datetime = None
        
        # 初始化常用科目
        self.init_chart_of_accounts()
    
    def init_chart_of_accounts(self):
        """初始化会计科目表"""
        # 资产类
        self.add_account(Account("1001", "现金", AccountType.ASSET))
        self.add_account(Account("1122", "应收账款", AccountType.ASSET))
        self.add_account(Account("1405", "库存商品", AccountType.ASSET))
        
        # 负债类
        self.add_account(Account("2202", "应付账款", AccountType.LIABILITY))
        
        # 权益类
        self.add_account(Account("3001", "实收资本", AccountType.EQUITY))
        
        # 损益类
        self.add_account(Account("6001", "主营业务收入", AccountType.REVENUE))
        self.add_account(Account("6401", "主营业务成本", AccountType.EXPENSE))
        self.add_account(Account("6602", "管理费用", AccountType.EXPENSE))
    
    def add_account(self, account: Account):
        self.accounts[account.code] = account
    
    def make_journal_entry(self, entries: List[Transaction], 
                          description: str = ""):
        """制作日记账凭证(借贷必相等)"""
        # 验证借贷平衡
        total_debit = sum(e.amount for e in entries if e.type == TransactionType.DEBIT)
        total_credit = sum(e.amount for e in entries if e.type == TransactionType.CREDIT)
        
        if abs(total_debit - total_credit) > 0.01:  # 允许微小误差
            raise ValueError(f"借贷不平衡: 借{total_debit:.2f} ≠ 贷{total_credit:.2f}")
        
        # 生成凭证号
        voucher_no = f"V{datetime.now().strftime('%Y%m%d%H%M%S')}"
        
        # 记录交易
        for entry in entries:
            entry.reference = voucher_no
            self.transactions.append(entry)
            
            # 更新账户余额
            account = self.accounts[entry.account_code]
            if entry.type == TransactionType.DEBIT:
                if account.type in [AccountType.ASSET, AccountType.EXPENSE]:
                    account.balance += entry.amount
                else:
                    account.balance -= entry.amount
            else:  # CREDIT
                if account.type in [AccountType.ASSET, AccountType.EXPENSE]:
                    account.balance -= entry.amount
                else:
                    account.balance += entry.amount
            
            account.transactions.append(entry)
        
        print(f"📒 凭证 {voucher_no}: {description}")
        return voucher_no
    
    def post_sales_revenue(self, order_no: str, amount: float, 
                          customer: str, cost: float = 0):
        """过账销售收入"""
        # 1. 确认收入
        entries = [
            Transaction("1122", amount, TransactionType.DEBIT, 
                       datetime.now(), f"销售给{customer}", order_no),
            Transaction("6001", amount, TransactionType.CREDIT,
                       datetime.now(), f"销售收入", order_no)
        ]
        self.make_journal_entry(entries, f"确认销售收入 {order_no}")
        
        # 2. 结转成本(如果有)
        if cost > 0:
            entries = [
                Transaction("6401", cost, TransactionType.DEBIT,
                          datetime.now(), f"销售成本", order_no),
                Transaction("1405", cost, TransactionType.CREDIT,
                          datetime.now(), f"减少库存", order_no)
            ]
            self.make_journal_entry(entries, f"结转销售成本 {order_no}")
    
    def post_purchase(self, po_no: str, amount: float, 
                     supplier: str, is_inventory: bool = True):
        """过账采购"""
        debit_account = "1405" if is_inventory else "6602"  # 库存或费用
        
        entries = [
            Transaction(debit_account, amount, TransactionType.DEBIT,
                       datetime.now(), f"采购自{supplier}", po_no),
            Transaction("2202", amount, TransactionType.CREDIT,
                       datetime.now(), f"应付账款", po_no)
        ]
        self.make_journal_entry(entries, f"采购入库 {po_no}")
    
    def get_trial_balance(self) -> Dict:
        """生成试算平衡表"""
        trial_balance = []
        
        for account in self.accounts.values():
            trial_balance.append({
                'account_code': account.code,
                'account_name': account.name,
                'account_type': account.type.value,
                'debit': account.balance if account.balance > 0 else 0,
                'credit': -account.balance if account.balance < 0 else 0
            })
        
        total_debit = sum(item['debit'] for item in trial_balance)
        total_credit = sum(item['credit'] for item in trial_balance)
        
        return {
            'report_date': datetime.now().strftime('%Y-%m-%d'),
            'trial_balance': trial_balance,
            'total_debit': total_debit,
            'total_credit': total_credit,
            'is_balanced': abs(total_debit - total_credit) < 0.01
        }
    
    def generate_income_statement(self, start_date: datetime, 
                                end_date: datetime) -> Dict:
        """生成利润表"""
        revenue = 0.0
        expense = 0.0
        
        for account in self.accounts.values():
            if account.type == AccountType.REVENUE:
                revenue += account.balance
            elif account.type == AccountType.EXPENSE:
                expense += account.balance
        
        net_income = revenue - expense
        
        return {
            'period': f"{start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}",
            'revenue': revenue,
            'expense': expense,
            'net_income': net_income,
            'gross_margin': (revenue - expense) / revenue if revenue > 0 else 0
        }
    
    def generate_balance_sheet(self) -> Dict:
        """生成资产负债表"""
        assets = 0.0
        liabilities = 0.0
        equity = 0.0
        
        for account in self.accounts.values():
            if account.type == AccountType.ASSET:
                assets += account.balance
            elif account.type == AccountType.LIABILITY:
                liabilities += account.balance
            elif account.type == AccountType.EQUITY:
                equity += account.balance
        
        # 检查平衡: 资产 = 负债 + 所有者权益
        is_balanced = abs(assets - (liabilities + equity)) < 0.01
        
        return {
            'report_date': datetime.now().strftime('%Y-%m-%d'),
            'assets': assets,
            'liabilities': liabilities,
            'equity': equity,
            'total_liabilities_equity': liabilities + equity,
            'is_balanced': is_balanced
        }

# 使用示例
if __name__ == "__main__":
    print("\n💰 ERP 财务会计模块演示")
    print("=" * 40)
    
    ledger = GeneralLedger()
    
    # 模拟一个月业务
    print("1. 初始投资:")
    entries = [
        Transaction("1001", 1000000, TransactionType.DEBIT, 
                   datetime.now(), "收到投资款", "INV001"),
        Transaction("3001", 1000000, TransactionType.CREDIT,
                   datetime.now(), "实收资本", "INV001")
    ]
    ledger.make_journal_entry(entries, "公司成立收到投资")
    
    print("\n2. 采购原材料:")
    ledger.post_purchase("PO001", 200000, "供应商A", is_inventory=True)
    
    print("\n3. 支付管理费用:")
    entries = [
        Transaction("6602", 50000, TransactionType.DEBIT,
                   datetime.now(), "支付工资", "PAY001"),
        Transaction("1001", 50000, TransactionType.CREDIT,
                   datetime.now(), "银行支付", "PAY001")
    ]
    ledger.make_journal_entry(entries, "支付员工工资")
    
    print("\n4. 销售产品:")
    ledger.post_sales_revenue("SO001", 350000, "客户A", cost=120000)
    
    # 生成财务报表
    print("\n" + "=" * 40)
    print("📈 财务报表")
    print("=" * 40)
    
    # 试算平衡表
    trial_balance = ledger.get_trial_balance()
    print("\n试算平衡表:")
    print(f"  总借方: ¥{trial_balance['total_debit']:,.2f}")
    print(f"  总贷方: ¥{trial_balance['total_credit']:,.2f}")
    print(f"  是否平衡: {'✅' if trial_balance['is_balanced'] else '❌'}")
    
    # 利润表
    income_stmt = ledger.generate_income_statement(
        datetime.now() - timedelta(days=30),
        datetime.now()
    )
    print("\n利润表:")
    print(f"  期间: {income_stmt['period']}")
    print(f"  收入: ¥{income_stmt['revenue']:,.2f}")
    print(f"  费用: ¥{income_stmt['expense']:,.2f}")
    print(f"  净利润: ¥{income_stmt['net_income']:,.2f}")
    print(f"  毛利率: {income_stmt['gross_margin']:.1%}")
    
    # 资产负债表
    balance_sheet = ledger.generate_balance_sheet()
    print("\n资产负债表:")
    print(f"  资产: ¥{balance_sheet['assets']:,.2f}")
    print(f"  负债: ¥{balance_sheet['liabilities']:,.2f}")
    print(f"  所有者权益: ¥{balance_sheet['equity']:,.2f}")
    print(f"  负债+权益: ¥{balance_sheet['total_liabilities_equity']:,.2f}")
    print(f"  会计恒等式: {'✅' if balance_sheet['is_balanced'] else '❌'}")

5. 控制(CO)

作用:成本中心、利润中心、内部订单
# controlling_module.py
from datetime import datetime
from dataclasses import dataclass
from typing import Dict, List
import json

@dataclass
class CostCenter:
    """成本中心"""
    code: str
    name: str
    manager: str
    department: str
    cost_element: str  # 成本要素
    
    def __post_init__(self):
        self.planned_cost = 0.0
        self.actual_cost = 0.0
        self.variance = 0.0

@dataclass
class ProfitCenter:
    """利润中心"""
    code: str
    name: str
    responsible: str
    
    def __post_init__(self):
        self.revenue = 0.0
        self.cost = 0.0
        self.profit = 0.0

class InternalOrder:
    """内部订单(用于项目成本归集)"""
    
    def __init__(self, order_no: str, description: str, 
                 cost_center: str, budget: float):
        self.order_no = order_no
        self.description = description
        self.cost_center = cost_center
        self.budget = budget
        self.actual_cost = 0.0
        self.status = "open"
        self.start_date = datetime.now()
        self.end_date = None
        
    def post_cost(self, amount: float, description: str):
        """过账成本"""
        if self.actual_cost + amount > self.budget:
            print(f"⚠️ 警告: 内部订单 {self.order_no} 将超出预算")
        
        self.actual_cost += amount
        print(f"📊 内部订单 {self.order_no}: 增加成本 ¥{amount:,.2f} ({description})")
    
    def calculate_variance(self) -> float:
        """计算预算差异"""
        return self.actual_cost - self.budget
    
    def close_order(self):
        """关闭内部订单"""
        self.status = "closed"
        self.end_date = datetime.now()
        variance = self.calculate_variance()
        
        print(f"🔒 关闭内部订单 {self.order_no}")
        print(f"  预算: ¥{self.budget:,.2f}")
        print(f"  实际: ¥{self.actual_cost:,.2f}")
        print(f"  差异: ¥{variance:,.2f} ({'超支' if variance > 0 else '节约'})")
        
        return variance

class CostController:
    """成本控制器"""
    
    def __init__(self):
        self.cost_centers: Dict[str, CostCenter] = {}
        self.profit_centers: Dict[str, ProfitCenter] = {}
        self.internal_orders: Dict[str, InternalOrder] = {}
        
    def add_cost_center(self, cc: CostCenter):
        self.cost_centers[cc.code] = cc
    
    def add_profit_center(self, pc: ProfitCenter):
        self.profit_centers[pc.code] = pc
    
    def create_internal_order(self, order: InternalOrder):
        self.internal_orders[order.order_no] = order
    
    def allocate_cost(self, from_cc: str, to_cc: str, amount: float, 
                     description: str):
        """成本分配"""
        if from_cc in self.cost_centers and to_cc in self.cost_centers:
            self.cost_centers[from_cc].actual_cost -= amount
            self.cost_centers[to_cc].actual_cost += amount
            print(f"🔄 成本分配: {from_cc} → {to_cc} ¥{amount:,.2f}")
    
    def calculate_center_performance(self):
        """计算中心绩效"""
        print("\n" + "=" * 40)
        print("📊 成本中心绩效报告")
        print("=" * 40)
        
        for cc in self.cost_centers.values():
            cc.variance = cc.actual_cost - cc.planned_cost
            variance_pct = (cc.variance / cc.planned_cost * 100) if cc.planned_cost > 0 else 0
            
            status = "✅" if cc.variance <= 0 else "⚠️"
            print(f"{cc.code} {cc.name}:")
            print(f"  预算: ¥{cc.planned_cost:,.2f}")
            print(f"  实际: ¥{cc.actual_cost:,.2f}")
            print(f"  差异: {status} ¥{cc.variance:,.2f} ({variance_pct:+.1f}%)")
        
        print("\n" + "=" * 40)
        print("📈 利润中心绩效报告")
        print("=" * 40)
        
        for pc in self.profit_centers.values():
            pc.profit = pc.revenue - pc.cost
            margin = (pc.profit / pc.revenue * 100) if pc.revenue > 0 else 0
            
            print(f"{pc.code} {pc.name}:")
            print(f"  收入: ¥{pc.revenue:,.2f}")
            print(f"  成本: ¥{pc.cost:,.2f}")
            print(f"  利润: ¥{pc.profit:,.2f} (利润率: {margin:.1f}%)")
    
    def generate_cost_report(self, report_type: str = "monthly"):
        """生成成本报告"""
        report = {
            "report_type": report_type,
            "generated_at": datetime.now().isoformat(),
            "cost_centers": [],
            "profit_centers": [],
            "internal_orders": []
        }
        
        for cc in self.cost_centers.values():
            report["cost_centers"].append({
                "code": cc.code,
                "name": cc.name,
                "planned": cc.planned_cost,
                "actual": cc.actual_cost,
                "variance": cc.variance
            })
        
        for pc in self.profit_centers.values():
            report["profit_centers"].append({
                "code": pc.code,
                "name": pc.name,
                "revenue": pc.revenue,
                "cost": pc.cost,
                "profit": pc.profit
            })
        
        for io in self.internal_orders.values():
            report["internal_orders"].append({
                "order_no": io.order_no,
                "description": io.description,
                "budget": io.budget,
                "actual": io.actual_cost,
                "variance": io.calculate_variance(),
                "status": io.status
            })
        
        return report

# 使用示例
if __name__ == "__main__":
    print("\n📈 ERP 控制模块演示")
    print("=" * 40)
    
    controller = CostController()
    
    # 创建成本中心
    cc1 = CostCenter("CC001", "生产一部", "张三", "生产部", "500101")
    cc2 = CostCenter("CC002", "研发中心", "李四", "研发部", "500102")
    cc3 = CostCenter("CC003", "销售部", "王五", "销售部", "500103")
    
    cc1.planned_cost = 500000
    cc2.planned_cost = 300000
    cc3.planned_cost = 200000
    
    controller.add_cost_center(cc1)
    controller.add_cost_center(cc2)
    controller.add_cost_center(cc3)
    
    # 创建利润中心
    pc1 = ProfitCenter("PC001", "华东大区", "赵六")
    pc2 = ProfitCenter("PC002", "华南大区", "钱七")
    
    pc1.revenue = 1500000
    pc1.cost = 900000
    pc2.revenue = 1200000
    pc2.cost = 750000
    
    controller.add_profit_center(pc1)
    controller.add_profit_center(pc2)
    
    # 创建内部订单(项目)
    project1 = InternalOrder("IO202405001", "新产品研发项目", "CC002", 200000)
    project2 = InternalOrder("IO202405002", "营销活动项目", "CC003", 80000)
    
    controller.create_internal_order(project1)
    controller.create_internal_order(project2)
    
    # 模拟成本发生
    print("\n💰 模拟业务发生:")
    cc1.actual_cost = 520000  # 生产一部实际成本
    cc2.actual_cost = 280000  # 研发中心实际成本
    cc3.actual_cost = 210000  # 销售部实际成本
    
    project1.post_cost(150000, "研发人员工资")
    project1.post_cost(40000, "研发材料")
    project1.post_cost(30000, "测试费用")
    
    project2.post_cost(50000, "广告费")
    project2.post_cost(25000, "活动费用")
    
    # 成本分配
    controller.allocate_cost("CC002", "CC001", 50000, "技术支持费用")
    
    # 计算绩效
    controller.calculate_center_performance()
    
    # 关闭内部订单
    print("\n🔒 关闭内部订单:")
    project1.close_order()
    project2.close_order()
    
    # 生成报告
    report = controller.generate_cost_report()
    print(f"\n📄 成本控制报告已生成,包含:")
    print(f"  - 成本中心: {len(report['cost_centers'])} 个")
    print(f"  - 利润中心: {len(report['profit_centers'])} 个")
    print(f"  - 内部订单: {len(report['internal_orders'])} 个")

6. 人力资源(HR)

作用:组织、人事、薪酬、考勤
# hr_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
import json
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class EmployeeStatus(Enum):
    ACTIVE = "active"
    PROBATION = "probation"
    LEAVE = "leave"
    RESIGNED = "resigned"
    RETIRED = "retired"

class AttendanceStatus(Enum):
    PRESENT = "present"
    ABSENT = "absent"
    LATE = "late"
    LEAVE = "leave"
    OVERTIME = "overtime"

@dataclass
class Employee:
    """员工主数据"""
    employee_id: str
    name: str
    department: str
    position: str
    hire_date: datetime
    base_salary: float
    
    def __post_init__(self):
        self.status = EmployeeStatus.ACTIVE
        self.email = f"{self.name.lower().replace(' ', '.')}@company.com"
        self.manager_id: str = None
        self.attendance_records: List[Dict] = []
        self.leave_balance = 0  # 剩余年假
        self.salary_history: List[Dict] = []
        
    def get_service_years(self) -> float:
        """计算司龄"""
        days = (datetime.now() - self.hire_date).days
        return days / 365.25
    
    def apply_leave(self, start_date: datetime, end_date: datetime, 
                   leave_type: str, reason: str) -> bool:
        """申请休假"""
        if self.leave_balance <= 0 and leave_type == "annual":
            print(f"❌ {self.name} 年假余额不足")
            return False
        
        leave_days = (end_date - start_date).days + 1
        
        if leave_type == "annual":
            if leave_days > self.leave_balance:
                print(f"❌ 申请 {leave_days} 天,但只有 {self.leave_balance} 天年假")
                return False
            self.leave_balance -= leave_days
        
        print(f"✅ {self.name} 申请 {leave_type} 假 {leave_days} 天")
        return True
    
    def record_attendance(self, date: datetime, status: AttendanceStatus, 
                         hours: float = 8.0, notes: str = ""):
        """记录考勤"""
        record = {
            "date": date.strftime("%Y-%m-%d"),
            "status": status.value,
            "check_in": "09:00",
            "check_out": "18:00",
            "hours": hours,
            "notes": notes
        }
        self.attendance_records.append(record)
        
        status_icon = {
            AttendanceStatus.PRESENT: "✅",
            AttendanceStatus.ABSENT: "❌",
            AttendanceStatus.LATE: "⏰",
            AttendanceStatus.OVERTIME: "💪"
        }.get(status, "📝")
        
        print(f"{status_icon} {self.name} {date.strftime('%m-%d')}: {status.value}")

class PayrollCalculator:
    """薪资计算器"""
    
    def __init__(self):
        self.tax_rates = [
            (36000, 0.03, 0),
            (144000, 0.10, 2520),
            (300000, 0.20, 16920),
            (420000, 0.25, 31920),
            (660000, 0.30, 52920),
            (960000, 0.35, 85920),
            (float('inf'), 0.45, 181920)
        ]
        
        self.social_insurance_rate = 0.105  # 社保比例
        self.housing_fund_rate = 0.12      # 公积金比例
    
    def calculate_tax(self, annual_income: float) -> float:
        """计算个人所得税"""
        for threshold, rate, deduction in self.tax_rates:
            if annual_income <= threshold:
                return annual_income * rate - deduction
        return 0
    
    def calculate_salary(self, employee: Employee, 
                        attendance_days: int = 22,
                        overtime_hours: float = 0,
                        bonuses: float = 0) -> Dict:
        """计算薪资"""
        # 基本工资
        daily_rate = employee.base_salary / 22
        basic_salary = daily_rate * attendance_days
        
        # 加班费
        overtime_rate = daily_rate / 8 * 1.5
        overtime_pay = overtime_hours * overtime_rate
        
        # 应发工资
        gross_salary = basic_salary + overtime_pay + bonuses
        
        # 扣除项
        social_insurance = gross_salary * self.social_insurance_rate
        housing_fund = gross_salary * self.housing_fund_rate
        
        # 个人所得税(简化计算)
        annual_income = gross_salary * 12
        income_tax = self.calculate_tax(annual_income) / 12
        
        # 实发工资
        net_salary = gross_salary - social_insurance - housing_fund - income_tax
        
        payslip = {
            "employee_id": employee.employee_id,
            "employee_name": employee.name,
            "pay_month": datetime.now().strftime("%Y-%m"),
            "basic_salary": round(basic_salary, 2),
            "overtime_pay": round(overtime_pay, 2),
            "bonuses": round(bonuses, 2),
            "gross_salary": round(gross_salary, 2),
            "deductions": {
                "social_insurance": round(social_insurance, 2),
                "housing_fund": round(housing_fund, 2),
                "income_tax": round(income_tax, 2)
            },
            "total_deductions": round(social_insurance + housing_fund + income_tax, 2),
            "net_salary": round(net_salary, 2)
        }
        
        # 记录薪资历史
        employee.salary_history.append(payslip)
        
        return payslip

class OrganizationChart:
    """组织架构图"""
    
    def __init__(self):
        self.employees: Dict[str, Employee] = {}
        self.departments: Dict[str, List[str]] = {}  # 部门 -> [员工ID]
        
    def add_employee(self, employee: Employee, department: str):
        self.employees[employee.employee_id] = employee
        
        if department not in self.departments:
            self.departments[department] = []
        self.departments[department].append(employee.employee_id)
        
    def set_manager(self, employee_id: str, manager_id: str):
        """设置汇报关系"""
        if employee_id in self.employees and manager_id in self.employees:
            self.employees[employee_id].manager_id = manager_id
    
    def get_subordinates(self, manager_id: str) -> List[Employee]:
        """获取下属"""
        return [
            emp for emp in self.employees.values() 
            if emp.manager_id == manager_id
        ]
    
    def get_department_staff(self, department: str) -> List[Employee]:
        """获取部门员工"""
        staff_ids = self.departments.get(department, [])
        return [self.employees[eid] for eid in staff_ids]
    
    def generate_org_chart(self) -> Dict:
        """生成组织架构"""
        org_chart = {}
        
        for dept, emp_ids in self.departments.items():
            managers = []
            for emp_id in emp_ids:
                emp = self.employees[emp_id]
                if emp.manager_id is None:  # 部门负责人
                    subordinates = self.get_subordinates(emp_id)
                    manager_node = {
                        "name": emp.name,
                        "position": emp.position,
                        "subordinates": [
                            {"name": sub.name, "position": sub.position}
                            for sub in subordinates
                        ]
                    }
                    managers.append(manager_node)
            
            org_chart[dept] = managers
        
        return org_chart

# 使用示例
if __name__ == "__main__":
    print("\n👥 ERP 人力资源模块演示")
    print("=" * 40)
    
    org = OrganizationChart()
    
    # 创建员工
    ceo = Employee("E001", "张三", "总裁办", "CEO", datetime(2020, 1, 1), 100000)
    cto = Employee("E002", "李四", "技术部", "CTO", datetime(2021, 3, 1), 80000)
    dev1 = Employee("E003", "王五", "技术部", "高级工程师", datetime(2022, 6, 1), 40000)
    dev2 = Employee("E004", "赵六", "技术部", "工程师", datetime(2023, 8, 1), 30000)
    hr_manager = Employee("E005", "钱七", "人力资源部", "HR经理", datetime(2021, 5, 1), 50000)
    
    # 添加到组织
    org.add_employee(ceo, "总裁办")
    org.add_employee(cto, "技术部")
    org.add_employee(dev1, "技术部")
    org.add_employee(dev2, "技术部")
    org.add_employee(hr_manager, "人力资源部")
    
    # 设置汇报关系
    org.set_manager(cto.employee_id, ceo.employee_id)
    org.set_manager(dev1.employee_id, cto.employee_id)
    org.set_manager(dev2.employee_id, cto.employee_id)
    
    # 考勤管理
    print("📅 考勤记录:")
    today = datetime.now()
    dev1.record_attendance(today, AttendanceStatus.PRESENT)
    dev2.record_attendance(today, AttendanceStatus.LATE, notes="堵车")
    
    # 休假管理
    print("\n🏖️ 休假管理:")
    dev1.leave_balance = 10
    dev1.apply_leave(
        datetime.now() + timedelta(days=7),
        datetime.now() + timedelta(days=9),
        "annual",
        "年度休假"
    )
    
    # 薪资计算
    print("\n💰 薪资计算:")
    calculator = PayrollCalculator()
    payslip = calculator.calculate_salary(
        dev1, 
        attendance_days=22,
        overtime_hours=20,
        bonuses=5000
    )
    
    print(f"员工: {payslip['employee_name']}")
    print(f"应发工资: ¥{payslip['gross_salary']:,.2f}")
    print(f"扣除合计: ¥{payslip['total_deductions']:,.2f}")
    print(f"实发工资: ¥{payslip['net_salary']:,.2f}")
    
    # 组织架构
    print("\n🏢 组织架构:")
    org_chart = org.generate_org_chart()
    for dept, managers in org_chart.items():
        print(f"\n{dept}:")
        for manager in managers:
            print(f"  ├─ {manager['name']} ({manager['position']})")
            for sub in manager['subordinates']:
                print(f"  │  ├─ {sub['name']} ({sub['position']})")
    
    # 司龄统计
    print("\n📊 员工司龄:")
    for emp in org.employees.values():
        years = emp.get_service_years()
        print(f"  {emp.name}: {years:.1f} 年")

三、 ERP 的现代演进:从本地部署到云原生

时代
技术架构
特点
代表产品
ERP 1.0 (1990s)
单机版
部门级,无集成
用友U8、金蝶K3
ERP 2.0 (2000s)
C/S 架构
企业级,模块化
SAP R/3、Oracle EBS
ERP 3.0 (2010s)
B/S 架构
集团化,国际化
SAP S/4HANA
ERP 4.0 (2020s+)
云原生
AI驱动,微服务
金蝶云星空、用友YonSuite
现代 ERP 关键技术栈
# 现代云ERP技术架构示意
modern_erp_stack = {
    "前端": ["React/Vue", "移动优先", "PWA"],
    "网关": ["Kong/Apigee", "GraphQL", "gRPC"],
    "微服务": ["Spring Cloud", "Kubernetes", "Docker"],
    "数据层": ["分布式数据库", "数据湖", "实时数仓"],
    "AI能力": ["预测分析", "RPA", "智能推荐"]
}

四、 ERP 实施成功率的关键因素

根据 Gartner 研究,ERP 项目失败率高达 70%。成功的关键在于:
  1. 业务流程重组(BPR):先优化流程,再上系统


  2. 数据治理:主数据(客户、产品、供应商)质量是生命线


  3. 分阶段实施:先财务、进销存,再生产、HR


  4. 用户培训:系统最终是给人用的


  5. 持续优化:上线只是开始,不是结束



💡 总结

ERP 不是银弹,而是企业管理的放大镜
  • 管理好的企业,ERP 让其效率提升 30-50%


  • 管理差的企业,ERP 会让问题暴露无遗


记住:选择 ERP 不是选择软件,而是选择管理理念。最好的 ERP 是那个最匹配你业务模式和管理水平的系统,而不是最贵或功能最多的。
互动话题:你的公司正在使用或考虑哪个 ERP 系统?实施过程中遇到了哪些挑战?评论区聊聊你的经验!


群贤毕至

访客