×

锦程物流item_search - 根据关键词获取列表接口对接全攻略:从入门到精通

万邦科技Lex 万邦科技Lex 发表于2026-02-01 11:31:36 浏览33 评论0

抢沙发发表评论

一、接口概览

1.1 接口简介

item_search接口是锦程物流开放平台提供的物流订单搜索接口,支持根据多种条件查询物流订单列表,实现订单的模糊搜索、状态筛选、时间范围查询等复杂检索功能。

1.2 核心功能

  • 关键词搜索:订单号、运单号、收发货人姓名/电话模糊匹配

  • 多条件筛选:按状态、时间、物流产品等维度筛选

  • 分页查询:支持大数据量的分页加载

  • 排序功能:支持按创建时间、更新时间等字段排序

  • 字段选择:可指定返回字段,优化网络传输

二、准备工作

2.1 环境准备

# Python环境pip install requests python-dotenv# Java环境(Maven)<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>4.10.0</version>
</dependency># PHP环境composer require guzzlehttp/guzzle

2.2 配置管理

# config.pyimport osfrom dotenv import load_dotenv

load_dotenv()class Config:    # API配置
    JC_APP_KEY = os.getenv('JC_APP_KEY', '')
    JC_APP_SECRET = os.getenv('JC_APP_SECRET', '')
    JC_API_ENDPOINT = os.getenv('JC_API_ENDPOINT', 
        'https://sandbox-api.jc56.com'  # 默认沙箱环境
    )    
    # 业务配置
    DEFAULT_PAGE_SIZE = 20
    MAX_PAGE_SIZE = 100
    REQUEST_TIMEOUT = 30

三、接口详解

3.1 接口地址

POST /v1/item/search

3.2 请求参数详解

公共参数

参数名
类型
必填
说明
示例
app_key
string
应用标识
jc_app_2024
timestamp
string
请求时间戳
2026-02-01 10:00:00
sign
string
请求签名
详见签名算法
sign_method
string
签名方法
md5
format
string
返回格式
json(默认)
version
string
API版本
1.0

业务参数

参数名
类型
必填
说明
示例
keyword
string
搜索关键词
JC20260201
order_no
string
精确订单号
JC202602010001
waybill_no
string
运单号
SF123456789
shipper_name
string
发货人姓名
张三
shipper_phone
string
发货人电话
13800138000
consignee_name
string
收货人姓名
李四
consignee_phone
string
收货人电话
13900139000
status
string
订单状态
CREATED, PICKED_UP, IN_TRANSIT, DELIVERED
start_time
string
开始时间
2026-01-01 00:00:00
end_time
string
结束时间
2026-02-01 23:59:59
product_code
string
产品编码
JC_EXPRESS
page_no
int
页码
1(默认)
page_size
int
每页条数
20(默认)
sort_field
string
排序字段
create_time, update_time
sort_order
string
排序方向
desc(默认), asc
fields
string
返回字段
order_no,status,create_time

3.3 签名算法

import hashlibfrom typing import Dict, Anydef generate_sign(params: Dict[str, Any], app_secret: str) -> str:    """
    生成API请求签名
    
    Args:
        params: 请求参数字典
        app_secret: 应用密钥
    
    Returns:
        32位大写MD5签名
    """
    # 1. 过滤空值和sign参数
    filtered_params = {
        k: v for k, v in params.items() 
        if v is not None and k != 'sign'
    }    
    # 2. 按键名ASCII升序排序
    sorted_keys = sorted(filtered_params.keys())    
    # 3. 拼接键值对
    sign_str = ''
    for key in sorted_keys:        # 处理数组和对象类型参数
        if isinstance(filtered_params[key], (list, dict)):
            value = str(filtered_params[key])        else:
            value = str(filtered_params[key])
        sign_str += f"{key}{value}"
    
    # 4. 拼接app_secret
    sign_str += app_secret    
    # 5. 计算MD5并转为大写
    md5_hash = hashlib.md5()
    md5_hash.update(sign_str.encode('utf-8'))    return md5_hash.hexdigest().upper()# 测试用例test_params = {    "app_key": "test_app",    "timestamp": "2026-02-01 10:00:00",    "keyword": "测试",    "page_no": 1,    "page_size": 20}
app_secret = "test_secret"signature = generate_sign(test_params, app_secret)print(f"生成的签名: {signature}")

四、完整代码实现

4.1 Python完整实现

import requestsimport jsonimport hashlibimport timefrom typing import Dict, Any, List, Optionalfrom datetime import datetimefrom urllib.parse import urljoinclass JCLogisticsSearchAPI:    """锦程物流搜索API客户端"""
    
    def __init__(self, app_key: str, app_secret: str, sandbox: bool = True):        """
        初始化API客户端
        # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
        Args:
            app_key: 应用密钥
            app_secret: 应用密钥
            sandbox: 是否使用沙箱环境
        """
        self.app_key = app_key        self.app_secret = app_secret        self.base_url = "https://sandbox-api.jc56.com" if sandbox else "https://api.jc56.com"
        self.session = requests.Session()        self.session.headers.update({            "User-Agent": "JCLogistics-Search-Client/1.0",            "Accept": "application/json",            "Content-Type": "application/json; charset=utf-8"
        })    
    def _generate_timestamp(self) -> str:        """生成标准格式时间戳"""
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")    
    def _generate_sign(self, params: Dict[str, Any]) -> str:        """生成请求签名"""
        # 过滤空值和sign参数
        filtered_params = {
            k: v for k, v in params.items() 
            if v is not None and k != 'sign'
        }        
        # 排序并拼接
        sorted_keys = sorted(filtered_params.keys())
        sign_str = ''
        for key in sorted_keys:            if isinstance(filtered_params[key], (list, dict)):
                value = json.dumps(filtered_params[key], separators=(',', ':'))            else:
                value = str(filtered_params[key])
            sign_str += f"{key}{value}"
        
        # 添加密钥并计算MD5
        sign_str += self.app_secret        return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()    
    def search_orders(
        self,
        keyword: Optional[str] = None,
        order_no: Optional[str] = None,
        status_list: Optional[List[str]] = None,
        start_time: Optional[str] = None,
        end_time: Optional[str] = None,
        page_no: int = 1,
        page_size: int = 20,
        sort_field: str = "create_time",
        sort_order: str = "desc",
        fields: Optional[List[str]] = None,
        **extra_params    ) -> Dict[str, Any]:        """
        搜索物流订单
        
        Args:
            keyword: 搜索关键词(支持模糊匹配)
            order_no: 精确订单号
            status_list: 状态筛选列表
            start_time: 开始时间(格式:YYYY-MM-DD HH:MM:SS)
            end_time: 结束时间(格式:YYYY-MM-DD HH:MM:SS)
            page_no: 页码(从1开始)
            page_size: 每页数量(1-100)
            sort_field: 排序字段
            sort_order: 排序方向(desc降序/asc升序)
            fields: 指定返回字段列表
            **extra_params: 其他查询参数
        
        Returns:
            API响应数据
        """
        # 构建请求参数
        params = {            "app_key": self.app_key,            "timestamp": self._generate_timestamp(),            "sign_method": "md5",            "format": "json",            "version": "1.0",            "page_no": max(1, page_no),            "page_size": min(max(1, page_size), 100),            "sort_field": sort_field,            "sort_order": sort_order,
        }        
        # 添加可选参数
        if keyword:
            params["keyword"] = keyword        if order_no:
            params["order_no"] = order_no        if status_list:
            params["status"] = ",".join(status_list)        if start_time:
            params["start_time"] = start_time        if end_time:
            params["end_time"] = end_time        if fields:
            params["fields"] = ",".join(fields)        
        # 添加额外参数
        params.update(extra_params)        
        # 生成签名
        params["sign"] = self._generate_sign(params)        
        # 发送请求
        url = urljoin(self.base_url, "/v1/item/search")        
        try:
            response = self.session.post(
                url,
                json=params,
                timeout=30
            )
            response.raise_for_status()
            
            result = response.json()            
            # 验证返回签名(如果返回中有sign字段)
            if "sign" in result:                # 注意:这里需要根据实际API返回的签名验证规则实现
                pass
                
            return result            
        except requests.exceptions.Timeout:            return {                "success": False,                "code": "TIMEOUT",                "message": "请求超时",                "data": None
            }        except requests.exceptions.RequestException as e:            return {                "success": False,                "code": "REQUEST_ERROR",                "message": f"请求失败: {str(e)}",                "data": None
            }        except json.JSONDecodeError:            return {                "success": False,                "code": "INVALID_RESPONSE",                "message": "响应解析失败",                "data": None
            }    
    def search_all_orders(
        self,
        **search_params    ) -> List[Dict[str, Any]]:        """
        获取所有符合条件的订单(自动处理分页)
        
        Args:
            **search_params: 搜索参数
        
        Returns:
            订单列表
        """
        all_orders = []
        page_no = 1
        
        while True:            # 获取当前页数据
            result = self.search_orders(
                page_no=page_no,
                page_size=100,  # 使用最大页数减少请求次数
                **search_params
            )            
            if not result.get("success"):                print(f"第{page_no}页查询失败: {result.get('message')}")                break
            
            data = result.get("data", {})
            orders = data.get("list", [])
            pagination = data.get("pagination", {})            
            # 添加当前页数据
            all_orders.extend(orders)            
            # 检查是否还有下一页
            current_page = pagination.get("page_no", page_no)
            total_pages = pagination.get("total_pages", 0)
            has_next = pagination.get("has_next", False)            
            print(f"已获取第{current_page}页,共{len(orders)}条,总计{len(all_orders)}条")            
            if not has_next or current_page >= total_pages:                break
            
            page_no += 1
            
            # 避免请求过于频繁
            time.sleep(0.5)        
        return all_orders# 使用示例def demo_search():    """搜索接口使用演示"""
    
    # 初始化客户端
    client = JCLogisticsSearchAPI(
        app_key="your_app_key",
        app_secret="your_app_secret",
        sandbox=True
    )    
    print("=== 示例1:关键词搜索 ===")
    result1 = client.search_orders(
        keyword="北京",
        page_no=1,
        page_size=10
    )    print(json.dumps(result1, ensure_ascii=False, indent=2))    
    print("\n=== 示例2:状态筛选 ===")
    result2 = client.search_orders(
        status_list=["DELIVERED", "IN_TRANSIT"],
        start_time="2026-01-01 00:00:00",
        end_time="2026-02-01 23:59:59",
        sort_field="create_time",
        sort_order="desc"
    )    
    print("\n=== 示例3:获取所有待发货订单 ===")
    all_pending_orders = client.search_all_orders(
        status_list=["CREATED"],
        start_time="2026-01-01 00:00:00"
    )    print(f"共找到 {len(all_pending_orders)} 个待发货订单")if __name__ == "__main__":
    demo_search()

4.2 Java实现

import com.fasterxml.jackson.annotation.JsonInclude;import com.fasterxml.jackson.databind.ObjectMapper;import okhttp3.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.crypto.Mac;import javax.crypto.spec.SecretKeySpec;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.security.MessageDigest;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.*;import java.util.concurrent.TimeUnit;public class JCLogisticsSearchClient {    private static final Logger logger = LoggerFactory.getLogger(JCLogisticsSearchClient.class);    
    private final String appKey;    private final String appSecret;    private final String baseUrl;    private final OkHttpClient httpClient;    private final ObjectMapper objectMapper;    
    public JCLogisticsSearchClient(String appKey, String appSecret, boolean sandbox) {        this.appKey = appKey;        this.appSecret = appSecret;        this.baseUrl = sandbox ? "https://sandbox-api.jc56.com" : "https://api.jc56.com";        
        this.httpClient = new OkHttpClient.Builder()
                .connectTimeout(30, TimeUnit.SECONDS)
                .readTimeout(30, TimeUnit.SECONDS)
                .writeTimeout(30, TimeUnit.SECONDS)
                .addInterceptor(new LoggingInterceptor())
                .build();        
        this.objectMapper = new ObjectMapper();        this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
    }    
    // 搜索参数构建器
    public static class SearchParams {        private String keyword;        private String orderNo;        private List<String> statusList;        private String startTime;        private String endTime;        private Integer pageNo = 1;        private Integer pageSize = 20;        private String sortField = "create_time";        private String sortOrder = "desc";        private List<String> fields;        
        // 省略getter/setter和builder方法
    }    
    public SearchResult searchOrders(SearchParams params) throws IOException {        // 构建请求参数
        Map<String, Object> requestParams = new HashMap<>();
        requestParams.put("app_key", appKey);
        requestParams.put("timestamp", LocalDateTime.now()
                .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        requestParams.put("sign_method", "md5");
        requestParams.put("format", "json");
        requestParams.put("version", "1.0");        
        // 添加业务参数
        if (params.getKeyword() != null) {
            requestParams.put("keyword", params.getKeyword());
        }        if (params.getStatusList() != null && !params.getStatusList().isEmpty()) {
            requestParams.put("status", String.join(",", params.getStatusList()));
        }        // ... 其他参数
        
        // 生成签名
        String sign = generateSign(requestParams);
        requestParams.put("sign", sign);        
        // 构建请求
        String jsonBody = objectMapper.writeValueAsString(requestParams);        Request request = new Request.Builder()
                .url(baseUrl + "/v1/item/search")
                .post(RequestBody.create(jsonBody, MediaType.parse("application/json")))
                .addHeader("User-Agent", "JCLogistics-Java-Client/1.0")
                .build(); 
         # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex       
        // 发送请求
        try (Response response = httpClient.newCall(request).execute()) {            if (!response.isSuccessful()) {                throw new IOException("Unexpected code " + response);
            }            
            String responseBody = response.body().string();            return objectMapper.readValue(responseBody, SearchResult.class);
        }
    }    
    // 省略其他辅助方法...
    
    static class SearchResult {        private Boolean success;        private String code;        private String message;        private SearchData data;        
        // 省略getter/setter
    }    
    static class SearchData {        private List<OrderInfo> list;        private Pagination pagination;        
        // 省略getter/setter
    }    
    static class Pagination {        private Integer pageNo;        private Integer pageSize;        private Integer totalCount;        private Integer totalPages;        private Boolean hasNext;        
        // 省略getter/setter
    }
}

4.3 PHP实现(Laravel示例)

<?phpnamespace App\Services\Logistics;use GuzzleHttp\Client;use GuzzleHttp\Exception\RequestException;use Illuminate\Support\Facades\Cache;use Illuminate\Support\Facades\Log;class JCLogisticsSearchService{    private $appKey;    private $appSecret;    private $baseUrl;    private $client;    private $timeout = 30;    
    public function __construct($sandbox = true)    {        $this->appKey = config('services.jc_logistics.app_key');        $this->appSecret = config('services.jc_logistics.app_secret');        $this->baseUrl = $sandbox 
            ? 'https://sandbox-api.jc56.com'
            : 'https://api.jc56.com';            
        $this->client = new Client([            'base_uri' => $this->baseUrl,            'timeout' => $this->timeout,            'headers' => [                'User-Agent' => 'JCLogistics-Laravel-Client/1.0',                'Accept' => 'application/json',
            ]
        ]);
    }    
    /**
     * 搜索订单
     */
    public function search(array $params = []): array
    {        try {            // 基础参数
            $requestParams = [                'app_key' => $this->appKey,                'timestamp' => date('Y-m-d H:i:s'),                'sign_method' => 'md5',                'format' => 'json',                'version' => '1.0',
            ];            
            // 合并业务参数
            $requestParams = array_merge($requestParams, $params);            
            // 生成签名
            $requestParams['sign'] = $this->generateSign($requestParams);            
            // 发送请求
            $response = $this->client->post('/v1/item/search', [                'json' => $requestParams,                'http_errors' => false
            ]);            
            $statusCode = $response->getStatusCode();            $body = $response->getBody()->getContents();            $result = json_decode($body, true);            
            if ($statusCode !== 200) {                Log::error('锦程物流搜索接口请求失败', [                    'status' => $statusCode,                    'response' => $result
                ]);                
                return [                    'success' => false,                    'code' => 'HTTP_ERROR',                    'message' => '接口请求失败',                    'data' => null
                ];
            }            
            return $result;
            
        } catch (RequestException $e) {            Log::error('锦程物流搜索接口异常', [                'error' => $e->getMessage()
            ]);            
            return [                'success' => false,                'code' => 'REQUEST_EXCEPTION',                'message' => $e->getMessage(),                'data' => null
            ];
        }
    }    
    /**
     * 带缓存搜索
     */
    public function searchWithCache(array $params, int $ttl = 300): array
    {        $cacheKey = 'jc_search:' . md5(serialize($params));        
        return Cache::remember($cacheKey, $ttl, function () use ($params) {            return $this->search($params);
        });
    }    
    /**
     * 生成签名
     */
    private function generateSign(array $params): string
    {        // 移除sign参数
        unset($params['sign']);        
        // 过滤空值并按键名排序
        $params = array_filter($params, function ($value) {            return $value !== null && $value !== '';
        });        ksort($params);        
        // 拼接字符串
        $signStr = '';        foreach ($params as $key => $value) {            if (is_array($value) || is_object($value)) {                $value = json_encode($value, JSON_UNESCAPED_UNICODE);
            }            $signStr .= $key . $value;
        }        
        // 添加密钥并计算MD5
        $signStr .= $this->appSecret;        return strtoupper(md5($signStr));
    }    
    /**
     * 批量搜索所有订单
     */
    public function searchAll(array $params = []): array
    {        $allOrders = [];        $pageNo = 1;        
        do {            $params['page_no'] = $pageNo;            $params['page_size'] = 100;            
            $result = $this->search($params);            
            if (!$result['success']) {                break;
            }            
            $orders = $result['data']['list'] ?? [];            $pagination = $result['data']['pagination'] ?? [];            
            $allOrders = array_merge($allOrders, $orders);            
            $hasNext = $pagination['has_next'] ?? false;            $totalPages = $pagination['total_pages'] ?? 0;            
            if (!$hasNext || $pageNo >= $totalPages) {                break;
            }            
            $pageNo++;            usleep(500000); // 延迟0.5秒
            
        } while (true);        
        return $allOrders;
    }
}

五、返回结果处理

5.1 成功响应格式

{
  "success": true,
  "code": "10000",
  "message": "成功",
  "data": {
    "list": [
      {
        "order_no": "JC202602010001",
        "waybill_no": "SF123456789",
        "status": "DELIVERED",
        "status_desc": "已签收",
        "shipper_name": "张三",
        "shipper_phone": "13800138000",
        "consignee_name": "李四",
        "consignee_phone": "13900139000",
        "create_time": "2026-02-01 09:00:00",
        "update_time": "2026-02-01 16:30:00",
        "product_name": "锦程快递",
        "weight": 2.5,
        "volume": 0.02,
        "amount": 25.50
      },
      {
        "order_no": "JC202602010002",
        "waybill_no": "SF123456790",
        "status": "IN_TRANSIT",
        "status_desc": "运输中",
        "shipper_name": "王五",
        "shipper_phone": "13800138001",
        "consignee_name": "赵六",
        "consignee_phone": "13900139001",
        "create_time": "2026-02-01 10:00:00",
        "update_time": "2026-02-01 14:20:00",
        "product_name": "锦程快运",
        "weight": 15.8,
        "volume": 0.15,
        "amount": 120.00
      }
    ],
    "pagination": {
      "page_no": 1,
      "page_size": 20,
      "total_count": 125,
      "total_pages": 7,
      "has_next": true,
      "has_previous": false
    },
    "summary": {
      "total_amount": 2845.50,
      "total_weight": 158.3,
      "order_count": 125,
      "status_distribution": {
        "CREATED": 12,
        "PICKED_UP": 28,
        "IN_TRANSIT": 45,
        "DELIVERED": 40
      }
    }
  }}

5.2 响应字段说明

订单信息字段

字段名
类型
说明
order_no
string
订单号(唯一标识)
waybill_no
string
运单号
status
string
状态编码
status_desc
string
状态描述
shipper_*
string
发货人信息
consignee_*
string
收货人信息
create_time
string
创建时间
update_time
string
更新时间
product_name
string
产品名称
weight
float
重量(kg)
volume
float
体积(m³)
amount
float
金额(元)

分页信息字段

字段名
类型
说明
page_no
int
当前页码
page_size
int
每页数量
total_count
int
总记录数
total_pages
int
总页数
has_next
bool
是否有下一页
has_previous
bool
是否有上一页

5.3 错误响应处理

class JCLogisticsError(Exception):    """锦程物流API异常基类"""
    passclass AuthenticationError(JCLogisticsError):    """认证错误"""
    passclass ValidationError(JCLogisticsError):    """参数验证错误"""
    passclass RateLimitError(JCLogisticsError):    """频率限制错误"""
    passclass APIError(JCLogisticsError):    """API错误"""
    def __init__(self, code, message, response=None):        self.code = code        self.message = message        self.response = response        super().__init__(f"[{code}] {message}")def handle_api_response(response: Dict[str, Any]) -> Dict[str, Any]:    """
    统一处理API响应
    
    Args:
        response: API响应数据
    
    Returns:
        处理后的数据
    
    Raises:
        各种业务异常
    """
    if not response.get("success"):
        code = response.get("code")
        message = response.get("message", "未知错误")        
        # 根据错误码抛出特定异常
        if code in ["20001", "20002", "20003"]:            raise AuthenticationError(message)        elif code in ["20004", "20005"]:            raise ValidationError(message)        elif code == "20006":            raise RateLimitError(message)        else:            raise APIError(code, message, response)    
    return response.get("data", {})# 使用示例try:
    result = client.search_orders(keyword="测试")
    data = handle_api_response(result)
    orders = data.get("list", [])    
except AuthenticationError as e:    print(f"认证失败: {e}")    # 重新获取token或提醒用户except RateLimitError as e:    print(f"请求频率超限: {e}")    # 等待后重试
    time.sleep(60)except APIError as e:    print(f"API错误: {e}")    # 记录日志并通知管理员
    logger.error(f"锦程物流API错误: {e.code} - {e.message}")except Exception as e:    print(f"未知错误: {e}")

六、高级功能实现

6.1 智能搜索建议

class IntelligentSearchService:    """智能搜索服务"""
    
    def __init__(self, api_client):        self.client = api_client        self.search_history = []  # 搜索历史缓存
    
    def smart_search(self, query: str) -> Dict[str, Any]:        """
        智能搜索:自动识别搜索类型
        
        Args:
            query: 搜索查询字符串
        
        Returns:
            搜索结果
        """
        # 识别查询类型
        search_type = self._detect_search_type(query)        
        # 构建搜索参数
        params = self._build_search_params(query, search_type)        
        # 执行搜索
        result = self.client.search_orders(**params)        
        # 记录搜索历史
        self._record_search_history(query, search_type, result)        
        return result    
    def _detect_search_type(self, query: str) -> str:        """
        自动识别搜索类型
        """
        # 检查是否为订单号(特定格式)
        if re.match(r'^JC\d{12}$', query):            return 'order_no'
        
        # 检查是否为运单号
        if re.match(r'^[A-Z]{2}\d{9}$', query):            return 'waybill_no'
        
        # 检查是否为手机号
        if re.match(r'^1[3-9]\d{9}$', query):            return 'phone'
        
        # 检查是否为日期
        if re.match(r'^\d{4}-\d{2}-\d{2}$', query):            return 'date'
        
        # 默认为关键词搜索
        return 'keyword'
    
    def _build_search_params(self, query: str, search_type: str) -> Dict[str, Any]:        """
        根据搜索类型构建参数
        """
        params = {}        
        if search_type == 'order_no':
            params['order_no'] = query        elif search_type == 'waybill_no':
            params['waybill_no'] = query        elif search_type == 'phone':            # 尝试同时搜索发货人和收货人电话
            params['shipper_phone'] = query
            params['consignee_phone'] = query        elif search_type == 'date':
            params['start_time'] = f"{query} 00:00:00"
            params['end_time'] = f"{query} 23:59:59"
        else:
            params['keyword'] = query        
        return params

6.2 实时搜索提示

// 前端实现示例(Vue.js)<template>  <div class="logistics-search">
    <div class="search-box">
      <input
        v-model="searchQuery"
        @input="onSearchInput"
        @keyup.enter="doSearch"
        placeholder="搜索订单号、运单号、手机号..."
      />
      <button @click="doSearch">搜索</button>
    </div>
    
    <!-- 搜索建议 -->
    <div v-if="showSuggestions" class="suggestions">
      <div
        v-for="suggestion in suggestions"
        :key="suggestion.value"
        @click="selectSuggestion(suggestion)"
        class="suggestion-item"
      >
        <div class="suggestion-type">{{ suggestion.type }}</div>
        <div class="suggestion-value">{{ suggestion.value }}</div>
      </div>
    </div>
    
    <!-- 搜索结果 -->
    <div v-if="searchResults" class="results">
      <div class="result-summary">
        找到 {{ searchResults.pagination.total_count }} 条记录      </div>
      <table class="result-table">
        <!-- 表格内容 -->
      </table>
      <div class="pagination">
        <!-- 分页组件 -->
      </div>
    </div>
  </div></template><script>export default {  data() {    return {      searchQuery: '',      suggestions: [],      showSuggestions: false,      searchResults: null,      searchHistory: JSON.parse(localStorage.getItem('jc_search_history')) || []
    }
  },  methods: {    async onSearchInput() {      if (this.searchQuery.length < 2) {        this.suggestions = [];        this.showSuggestions = false;        return;
      }      
      // 防抖处理
      clearTimeout(this.suggestTimer);      this.suggestTimer = setTimeout(async () => {        try {          const response = await this.$http.post('/api/jc-logistics/suggest', {            query: this.searchQuery
          });          this.suggestions = response.data;          this.showSuggestions = this.suggestions.length > 0;
        } catch (error) {          console.error('获取搜索建议失败:', error);
        }
      }, 300);
    },    
    async doSearch() {      try {        const response = await this.$http.post('/api/jc-logistics/search', {          query: this.searchQuery,          page: 1,          size: 20
        });        
        this.searchResults = response.data;        this.showSuggestions = false;        
        // 保存搜索历史
        this.saveSearchHistory(this.searchQuery);
        
      } catch (error) {        this.$message.error('搜索失败: ' + error.message);
      }
    },    
    saveSearchHistory(query) {      // 去重并限制历史记录数量
      const index = this.searchHistory.findIndex(item => item === query);      if (index !== -1) {        this.searchHistory.splice(index, 1);
      }      
      this.searchHistory.unshift(query);      this.searchHistory = this.searchHistory.slice(0, 10);      
      localStorage.setItem('jc_search_history', 
        JSON.stringify(this.searchHistory));
    }
  }
}</script>

6.3 搜索性能优化

import redisfrom functools import lru_cachefrom datetime import datetime, timedeltaclass OptimizedSearchService:    """优化搜索服务"""
    
    def __init__(self, api_client, redis_client):        self.api_client = api_client        self.redis = redis_client        self.cache_prefix = "jc_search:"
        @lru_cache(maxsize=1000)
    def search_with_cache(self, cache_key: str, **params):        """
        使用内存缓存和Redis缓存的搜索
        """
        # 先尝试从Redis获取
        cached = self.redis.get(cache_key)        if cached:            return json.loads(cached)        
        # 调用API
        result = self.api_client.search_orders(**params)        
        # 缓存到Redis(根据数据量设置不同过期时间)
        if result.get("success"):
            data = result.get("data", {})
            total_count = data.get("pagination", {}).get("total_count", 0)            
            # 根据数据量设置缓存时间
            if total_count == 0:
                ttl = 300  # 5分钟
            elif total_count <= 100:
                ttl = 1800  # 30分钟
            else:
                ttl = 3600  # 1小时
            
            self.redis.setex(
                cache_key,
                ttl,
                json.dumps(result)
            )        
        return result    
    def smart_search_optimized(self, **params) -> Dict[str, Any]:        """
        智能优化搜索
        """
        # 生成缓存键
        cache_key = self._generate_cache_key(params)        
        # 检查是否可以使用缓存
        if self._can_use_cache(params):            return self.search_with_cache(cache_key, **params)        
        # 实时搜索
        return self.api_client.search_orders(**params)    
    def _generate_cache_key(self, params: Dict[str, Any]) -> str:        """生成缓存键"""
        # 移除分页参数,因为它们不影响数据本身
        cache_params = params.copy()
        cache_params.pop('page_no', None)
        cache_params.pop('page_size', None)        
        # 生成唯一键
        param_str = json.dumps(cache_params, sort_keys=True)        return f"{self.cache_prefix}{hashlib.md5(param_str.encode()).hexdigest()}"
    
    def _can_use_cache(self, params: Dict[str, Any]) -> bool:        """判断是否可以使用缓存"""
        # 实时性要求高的查询不使用缓存
        if params.get('real_time', False):            return False
        
        # 特定状态不使用缓存
        real_time_statuses = ['PICKED_UP', 'IN_TRANSIT', 'DELIVERING']        if params.get('status') in real_time_statuses:            return False
        
        return True
    
    def batch_search(self, search_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:        """
        批量搜索优化
        """
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        results = []        
        with ThreadPoolExecutor(max_workers=5) as executor:            # 提交所有搜索任务
            future_to_query = {
                executor.submit(self.smart_search_optimized, **query): query 
                for query in search_queries
            }            
            # 收集结果
            for future in as_completed(future_to_query):                try:
                    result = future.result(timeout=10)
                    results.append(result)                except Exception as e:
                    query = future_to_query[future]                    print(f"查询失败 {query}: {e}")
                    results.append({                        "success": False,                        "error": str(e)
                    })        
        return results

七、实战应用场景

7.1 物流管理系统集成

class LogisticsManagementSystem:    """物流管理系统集成示例"""
    
    def __init__(self, jc_client):        self.jc_client = jc_client        self.order_cache = {}    
    def search_orders_by_criteria(self, criteria: Dict[str, Any]) -> List[Dict[str, Any]]:        """
        根据综合条件搜索订单
        
        Args:
            criteria: 搜索条件字典,包含:
                - date_range: 日期范围
                - status: 状态
                - customer_info: 客户信息
                - product_type: 产品类型
                - amount_range: 金额范围
        
        Returns:
            订单列表
        """
        # 构建API搜索参数
        search_params = {}        
        # 处理日期范围
        if 'date_range' in criteria:
            start_date, end_date = criteria['date_range']
            search_params['start_time'] = f"{start_date} 00:00:00"
            search_params['end_time'] = f"{end_date} 23:59:59"
        
        # 处理状态
        if 'status' in criteria:
            search_params['status'] = criteria['status']        
        # 处理客户信息
        if 'customer_info' in criteria:
            customer = criteria['customer_info']            # 尝试多种搜索方式
            if 'phone' in customer:
                search_params['shipper_phone'] = customer['phone']
                search_params['consignee_phone'] = customer['phone']            elif 'name' in customer:
                search_params['keyword'] = customer['name']        
        # 执行搜索
        result = self.jc_client.search_orders(**search_params)        
        if not result.get('success'):            raise Exception(f"搜索失败: {result.get('message')}")
        
        orders = result.get('data', {}).get('list', [])        
        # 应用额外的过滤条件
        filtered_orders = self._apply_filters(orders, criteria)        
        return filtered_orders    
    def get_dashboard_stats(self, start_date: str, end_date: str) -> Dict[str, Any]:        """
        获取仪表板统计信息
        """
        # 搜索所有订单
        all_orders = self.jc_client.search_all_orders(
            start_time=f"{start_date} 00:00:00",
            end_time=f"{end_date} 23:59:59"
        )        
        # 计算统计信息
        stats = {            'total_orders': len(all_orders),            'total_amount': sum(order.get('amount', 0) for order in all_orders),            'total_weight': sum(order.get('weight', 0) for order in all_orders),            'status_distribution': {},            'daily_trend': self._calculate_daily_trend(all_orders)
        }        
        # 统计状态分布
        for order in all_orders:
            status = order.get('status', 'UNKNOWN')
            stats['status_distribution'][status] = \
                stats['status_distribution'].get(status, 0) + 1
        
        return stats    
    def export_orders_to_excel(self, search_params: Dict[str, Any], 
                              filename: str) -> str:        """
        导出订单到Excel
        """
        import pandas as pd        
        # 获取所有订单
        all_orders = self.jc_client.search_all_orders(**search_params)        
        # 转换为DataFrame
        df = pd.DataFrame(all_orders)        
        # 数据清洗和转换
        if not df.empty:            # 选择需要的列
            columns = ['order_no', 'waybill_no', 'status_desc', 
                      'shipper_name', 'shipper_phone',                      'consignee_name', 'consignee_phone',                      'create_time', 'amount', 'weight']
            
            df = df[columns]            
            # 重命名列
            column_mapping = {                'order_no': '订单号',                'waybill_no': '运单号',                'status_desc': '状态',                'shipper_name': '发货人',                'shipper_phone': '发货人电话',                'consignee_name': '收货人',                'consignee_phone': '收货人电话',                'create_time': '创建时间',                'amount': '金额',                'weight': '重量'
            }
            df = df.rename(columns=column_mapping)        
        # 导出到Excel
        df.to_excel(filename, index=False)        
        return filename

7.2 订单状态监控系统

class OrderMonitor:    """订单状态监控系统"""
    
    def __init__(self, jc_client, alert_service):        self.jc_client = jc_client        self.alert_service = alert_service        self.monitored_statuses = {            'DELAYED': 24 * 3600,  # 24小时未更新
            'EXCEPTION': 0,        # 异常状态立即通知
        }    
    def monitor_abnormal_orders(self):        """
        监控异常订单
        """
        # 搜索特定状态的订单
        abnormal_orders = []        
        for status in ['EXCEPTION', 'PROBLEM']:
            result = self.jc_client.search_orders(
                status=status,
                page_size=100
            )            
            if result.get('success'):
                orders = result.get('data', {}).get('list', [])
                abnormal_orders.extend(orders)        
        # 处理异常订单
        for order in abnormal_orders:            self._handle_abnormal_order(order)    
    def monitor_delayed_orders(self):        """
        监控延迟订单
        """
        # 搜索24小时未更新的运输中订单
        import datetime
        
        cutoff_time = (datetime.datetime.now() - 
                      datetime.timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
        
        result = self.jc_client.search_orders(
            status='IN_TRANSIT',
            end_time=cutoff_time,
            sort_field='update_time',
            sort_order='asc'
        )        
        if result.get('success'):
            delayed_orders = result.get('data', {}).get('list', [])            
            for order in delayed_orders:                self._handle_delayed_order(order)    
    def _handle_abnormal_order(self, order: Dict[str, Any]):        """处理异常订单"""
        order_no = order.get('order_no')
        status_desc = order.get('status_desc', '未知状态')        
        # 发送告警
        alert_message = f"""
        订单异常告警!
        订单号:{order_no}
        状态:{status_desc}
        发货人:{order.get('shipper_name')}
        收货人:{order.get('consignee_name')}
        更新时间:{order.get('update_time')}
        """
        
        self.alert_service.send_alert(
            title="物流订单异常",
            message=alert_message,
            level="HIGH",
            recipients=["logistics_manager@company.com"]
        )        
        # 记录到数据库
        self._log_abnormal_order(order)    
    def _handle_delayed_order(self, order: Dict[str, Any]):        """处理延迟订单"""
        order_no = order.get('order_no')
        last_update = order.get('update_time')        
        # 发送通知
        notification = f"""
        订单运输延迟提醒
        订单号:{order_no}
        最后更新:{last_update}
        已超过24小时未更新状态
        请及时跟进处理
        """
        
        self.alert_service.send_alert(
            title="订单运输延迟",
            message=notification,
            level="MEDIUM",
            recipients=["customer_service@company.com"]
        )

八、故障排查与优化

8.1 常见问题解决

问题1:签名验证失败

症状:返回"签名错误"或"签名验证失败"
# 解决方案:调试签名生成过程def debug_signature(params, app_secret):    """调试签名生成"""
    print("=== 签名调试信息 ===")    print(f"原始参数: {params}")    
    # 1. 过滤并排序参数
    filtered = {k: v for k, v in params.items() if v is not None and k != 'sign'}
    sorted_keys = sorted(filtered.keys())    
    print(f"排序后的键: {sorted_keys}")    
    # 2. 拼接字符串
    sign_str = ''
    for key in sorted_keys:
        value = str(filtered[key])
        sign_str += f"{key}{value}"
        print(f"  {key}: {value}")
    
    sign_str += app_secret    print(f"拼接结果: {sign_str}")    
    # 3. 计算签名
    import hashlib
    signature = hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()    print(f"计算签名: {signature}")    
    return signature

问题2:分页数据不准确

症状:分页数据重复或缺失
# 解决方案:使用稳定的排序字段def stable_pagination_search(self, **params):    """
    稳定的分页搜索
    """
    # 确保有排序字段
    if 'sort_field' not in params:
        params['sort_field'] = 'order_no'  # 使用唯一字段排序
    
    if 'sort_order' not in params:
        params['sort_order'] = 'asc'
    
    # 处理游标分页
    last_order_no = None
    all_orders = []    
    while True:        if last_order_no:            # 使用游标进行分页
            params['cursor'] = last_order_no
        
        result = self.search_orders(**params)        
        if not result.get('success'):            break
        
        orders = result.get('data', {}).get('list', [])        if not orders:            break
        
        all_orders.extend(orders)
        last_order_no = orders[-1].get('order_no')        
        # 检查是否还有更多数据
        pagination = result.get('data', {}).get('pagination', {})        if not pagination.get('has_next', False):            break
        
        # 避免频繁请求
        time.sleep(0.1)    
    return all_orders

问题3:API限流处理

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exceptionclass RateLimitedClient:    """带限流处理的客户端"""
    
    def __init__(self, api_client):        self.api_client = api_client        self.rate_limiter = RateLimiter(max_calls=100, period=60)  # 60秒100次
    
    def is_rate_limit_error(self, result):        """判断是否为限流错误"""
        return result.get('code') == '20005'  # 频率限制错误码
        @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception(lambda x: isinstance(x, dict) and 
                                x.get('code') == '20005')    )
    def search_with_retry(self, **params):        """带重试的搜索"""
        with self.rate_limiter:
            result = self.api_client.search_orders(**params)            
            if self.is_rate_limit_error(result):                raise result  # 触发重试
            
            return result

8.2 性能优化建议

  1. 合理使用缓存

# 多级缓存策略class MultiLevelCache:    def __init__(self):        self.memory_cache = {}  # 内存缓存
        self.redis_cache = redis.Redis()  # Redis缓存
        self.local_ttl = 60  # 内存缓存60秒
        self.redis_ttl = 300  # Redis缓存5分钟
    
    def get(self, key):        # 1. 检查内存缓存
        if key in self.memory_cache:
            item = self.memory_cache[key]            if time.time() < item['expire']:                return item['data']        
        # 2. 检查Redis缓存
        cached = self.redis_cache.get(key)        if cached:
            data = json.loads(cached)            # 更新内存缓存
            self.memory_cache[key] = {                'data': data,                'expire': time.time() + self.local_ttl
            }            return data        
        return None
  1. 连接池管理

import requestsfrom requests.adapters import HTTPAdapterfrom urllib3.util.retry import Retrydef create_session_with_pool():    """创建带连接池的会话"""
    session = requests.Session()    
    # 配置重试策略
    retry_strategy = Retry(
        total=3,
        backoff_factor=0.5,
        status_forcelist=[429, 500, 502, 503, 504],
    )
    
    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=10,  # 连接池大小
        pool_maxsize=100,
        pool_block=False
    )
    
    session.mount("https://", adapter)
    session.mount("http://", adapter)    
    return session
  1. 批量请求优化

from concurrent.futures import ThreadPoolExecutor, as_completedimport asyncioasync def batch_search_async(search_queries):    """异步批量搜索"""
    async with aiohttp.ClientSession() as session:
        tasks = []        for query in search_queries:
            task = self._async_search(session, query)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)        return results

九、监控与告警

9.1 监控指标

class APIMonitor:    """API监控"""
    
    def __init__(self):        self.metrics = {            'total_requests': 0,            'successful_requests': 0,            'failed_requests': 0,            'average_response_time': 0,            'error_codes': {},            'last_error_time': None
        }    
    def record_request(self, success, response_time, error_code=None):        """记录请求指标"""
        self.metrics['total_requests'] += 1
        
        if success:            self.metrics['successful_requests'] += 1
        else:            self.metrics['failed_requests'] += 1
            if error_code:                self.metrics['error_codes'][error_code] = \                    self.metrics['error_codes'].get(error_code, 0) + 1
            self.metrics['last_error_time'] = time.time()        
        # 更新平均响应时间(移动平均)
        alpha = 0.1  # 平滑因子
        old_avg = self.metrics['average_response_time']        self.metrics['average_response_time'] = \
            alpha * response_time + (1 - alpha) * old_avg    
    def get_health_status(self):        """获取健康状态"""
        total = self.metrics['total_requests']        if total == 0:            return "UNKNOWN"
        
        success_rate = self.metrics['successful_requests'] / total        
        if success_rate > 0.95:            return "HEALTHY"
        elif success_rate > 0.8:            return "DEGRADED"
        else:            return "UNHEALTHY"

9.2 告警配置

# alert_rules.yamlalert_rules:
  - name: "api_error_rate_high"
    condition: "error_rate > 0.1"
    duration: "5m"
    severity: "warning"
    message: "API错误率超过10%"
    
  - name: "api_response_time_slow"
    condition: "response_time_p95 > 5000"
    duration: "10m"
    severity: "warning"
    message: "API响应时间P95超过5秒"
    
  - name: "api_unavailable"
    condition: "success_rate == 0"
    duration: "2m"
    severity: "critical"
    message: "API完全不可用"

十、最佳实践总结

10.1 安全实践

  1. 密钥管理:使用环境变量或密钥管理服务,不要硬编码在代码中

  2. 访问控制:为不同环境使用不同的API密钥

  3. 请求验证:验证所有输入参数,防止注入攻击

  4. HTTPS强制:确保所有请求都使用HTTPS

10.2 性能实践

  1. 合理分页:根据实际需求设置合适的page_size

  2. 缓存策略:对不常变的数据使用缓存

  3. 连接复用:使用连接池减少连接建立开销

  4. 异步处理:批量操作使用异步方式提高吞吐量

10.3 代码质量

  1. 错误处理:完善的异常处理和重试机制

  2. 日志记录:详细记录请求和响应信息

  3. 单元测试:编写测试用例覆盖主要功能

  4. 代码审查:定期进行代码审查和安全检查

10.4 运维实践

  1. 监控告警:实时监控API调用情况

  2. 容量规划:根据业务量预估API调用频率

  3. 版本管理:记录API版本变更,制定升级计划

  4. 文档维护:保持接口文档的及时更新


附录:快速开始模板

# quick_start.pyfrom jc_logistics import JCLogisticsSearchAPI# 1. 初始化客户端client = JCLogisticsSearchAPI(
    app_key="your_app_key",
    app_secret="your_app_secret",
    sandbox=True  # 测试环境)# 2. 简单搜索result = client.search_orders(keyword="测试订单")print(f"找到 {len(result['data']['list'])} 条记录")# 3. 条件搜索result = client.search_orders(
    status_list=["DELIVERED"],
    start_time="2026-01-01 00:00:00",
    end_time="2026-02-01 23:59:59",
    page_size=50)# 4. 获取所有数据all_orders = client.search_all_orders(
    keyword="重要客户",
    status_list=["IN_TRANSIT", "DELIVERED"]
)print(f"总共找到 {len(all_orders)} 条订单")
通过本攻略,您应该能够全面掌握锦程物流item_search接口的对接和使用。建议根据实际业务需求选择合适的实现方案,并遵循最佳实践确保系统的稳定性和可维护性。


群贤毕至

访客