1.申请百度ocr api_key,secret_key

https://console.bce.baidu.com/ai-engine/old/#/ai/ocr/app/list


目前一个月免费调用1000次,个人使用应该足够


image.png

image.png

2.申请deepseek apikey


https://platform.deepseek.com/api_keys


冲个10元,可以用很久很久。


3.程序代码如下:

(1)程序主逻辑
certificate_processor.py

import os
import pytesseract
from PIL import Image
import pandas as pd
import re
import cv2
import numpy as np
import requests
import base64
import json
import time
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CertificateProcessor:
    """Process certificates with configurable student/teacher logic"""
    
    CONFIGS = {
        "student": {
            "prompt": "你是一个证书信息提取专家。请从文本中提取以下信息,并以json格式返回:获奖时间,比赛名称,获奖者姓名,奖项,指导教师,组织机构。不要markdown格式,获奖时间若有则按2010.01这样的格式返回,如果某项信息不存在,返回空字符串",
            "renaming_pattern": "{date}{name}同学在{event}荣获{award}{ext}",
            "excel_fields": ["获奖时间", "比赛名称", "获奖者姓名", "奖项", "指导教师", "组织机构"]
        },
        "teacher": {
            "prompt": "你是一个证书信息提取专家。请从文本中提取以下信息,并以json格式返回:活动时间,活动名称,获奖者姓名,奖项,组织机构。不要markdown格式,若活动名称中有时间则提取活动内容中的时间,没有的话按落款时间并且按2010.01这样的格式返回,特别注意获得好评也算奖项,如果某项信息不存在,返回空字符串",
            "renaming_pattern": "{date}{name}在{event}荣获{award}{ext}",
            "excel_fields": ["活动时间", "活动名称", "获奖者姓名", "奖项", "组织机构"]
        }
    }

    def __init__(self, input_dir, cert_type="student", log_callback=None,
                 baidu_api_key=None, baidu_secret_key=None, deepseek_api_key=None):
        self.input_dir = input_dir
        self.cert_type = cert_type
        self.config = self.CONFIGS[cert_type]
        self.log_callback = log_callback if log_callback else self.default_log
        self.results = []
        
        # OCR 默认使用百度ocr,本地pytesseract识别效果不好
        self.baidu_ocr_enabled = True
        self.deepseek_api_enabled = True
        pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
        
        # 相关apikey
        self.baidu_api_key = baidu_api_key
        self.baidu_secret_key = baidu_secret_key
        self.deepseek_api_key = deepseek_api_key
        self.deepseek_api_url = "https://api.deepseek.com/chat/completions"
        
        # Validate credentials if provided
        if self.baidu_ocr_enabled and self.baidu_api_key and self.baidu_secret_key:
            self.validate_baidu_credentials()
        else:
            self.baidu_ocr_enabled = False
            self.log_callback("警告: 百度OCR凭据缺失,将使用本地OCR")
    
    def default_log(self, message):
        """Default logging if no callback provided"""
        logger.info(message)
    
    def validate_baidu_credentials(self):
        """Validate Baidu OCR API credentials"""
        token_url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={self.baidu_api_key}&client_secret={self.baidu_secret_key}"
        
        try:
            response = requests.get(token_url)
            token_data = response.json()
            
            if "error" in token_data:
                error_msg = token_data.get("error_description", "未知错误")
                self.log_callback(f"百度OCR认证失败: {error_msg}")
                self.baidu_ocr_enabled = False
                return
            
            access_token = token_data.get("access_token")
            if not access_token:
                self.log_callback("百度OCR认证失败: 响应中缺少access_token")
                self.baidu_ocr_enabled = False
        except Exception as e:
            self.log_callback(f"百度OCR认证请求失败: {str(e)}")
            self.baidu_ocr_enabled = False
    
    def preprocess_image(self, image):
        """本地识别二值化相关证书图片"""
        img = np.array(image)
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        equalized = cv2.equalizeHist(gray)
        thresh = cv2.adaptiveThreshold(
            equalized, 255, 
            cv2.ADAPTIVE_THRESH_GAUSSIAN_C, 
            cv2.THRESH_BINARY, 11, 2
        )
        denoised = cv2.medianBlur(thresh, 3)
        kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
        sharpened = cv2.filter2D(denoised, -1, kernel)
        return Image.fromarray(sharpened)
    
    def baidu_ocr(self, image_path):
        """使用百度ocr"""
        self.log_callback(f"[百度OCR] 开始处理: {os.path.basename(image_path)}")
        
        try:
            # Get access token
            token_url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={self.baidu_api_key}&client_secret={self.baidu_secret_key}"
            token_response = requests.get(token_url)
            token_data = token_response.json()
            
            if "error" in token_data or "access_token" not in token_data:
                self.log_callback("百度OCR访问令牌获取失败")
                return "", "百度OCR失败"
            
            access_token = token_data["access_token"]
            
            # Read and encode image
            with open(image_path, "rb") as f:
                img_data = base64.b64encode(f.read()).decode('utf-8')
            
            # Send OCR request
            url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate_basic"
            headers = {"Content-Type": "application/x-www-form-urlencoded"}
            params = {
                "access_token": access_token,
                "image": img_data,
                "language_type": "CHN_ENG",
                "detect_direction": "true"
            }
            
            response = requests.post(url, headers=headers, data=params)
            result = response.json()
            
            if "error_code" in result:
                error_msg = result.get("error_msg", "未知错误")
                self.log_callback(f"百度OCR识别错误: {error_msg}")
                return "", "百度OCR失败"
            
            if "words_result" not in result:
                self.log_callback("百度OCR识别失败: 无结果")
                return "", "百度OCR失败"
            
            text = "\n".join([item["words"] for item in result["words_result"]])
            self.log_callback(f"[百度OCR] 识别成功, 字符数: {len(text)}")
            return text, "百度OCR"
        except Exception as e:
            self.log_callback(f"百度OCR请求失败: {str(e)}")
            return "", "百度OCR失败"
    
    def local_ocr(self, image_path):
        """Use local Tesseract OCR"""
        try:
            orig_img = Image.open(image_path)
            processed_img = self.preprocess_image(orig_img)
            text = pytesseract.image_to_string(
                processed_img, 
               , 
                config='--psm 6 --oem 1'
            ).strip()
            self.log_callback(f"[本地OCR] 识别完成, 字符数: {len(text)}")
            return text, "本地OCR"
        except Exception as e:
            self.log_callback(f"本地OCR处理失败: {str(e)}")
            return "", "本地OCR失败"
    
    def hybrid_ocr(self, image_path):
        """Hybrid OCR approach"""
        if self.baidu_ocr_enabled:
            baidu_text, source = self.baidu_ocr(image_path)
            if baidu_text.strip():
                return baidu_text, source
        return self.local_ocr(image_path)
    
    def deepseek_extract(self, text):
        """Extract structured info using DeepSeek API"""
        headers = {
            "Authorization": f"Bearer {self.deepseek_api_key}",
            "Content-Type": "application/json"
        }
        
        messages = [
            {"role": "system", "content": self.config["prompt"]},
            {"role": "user", "content": text}
        ]
        
        payload = {
            "model": "deepseek-chat",
            "messages": messages,
            "temperature": 0.1,
            "max_tokens": 200
        }
        
        try:
            response = requests.post(self.deepseek_api_url, headers=headers, json=payload)
            response.raise_for_status()
            result = response.json()
            
            # Extract JSON from response
            raw_string = result['choices'][0]['message']['content']
            start_index = raw_string.find('{')
            end_index = raw_string.rfind('}') + 1
            json_string = raw_string[start_index:end_index]
            return json.loads(json_string)
        except Exception as e:
            self.log_callback(f"DeepSeek API错误: {str(e)}")
            return {field: '' for field in self.config["excel_fields"]}
    
    def process_certificates(self, output_excel):
        """Process all certificates in input directory"""
        image_exts = ['.jpg', '.jpeg', '.png', '.bmp']
        image_files = [f for f in os.listdir(self.input_dir) 
                      if any(f.lower().endswith(ext) for ext in image_exts)]
        
        total_files = len(image_files)
        self.log_callback(f"开始处理 {total_files} 个证书文件...")
        
        for i, filename in enumerate(image_files):
            image_path = os.path.join(self.input_dir, filename)
            self.log_callback(f"\n=== 正在处理: {filename} ({i+1}/{total_files}) ===")
            
            start_time = time.time()
            text_content, ocr_source = self.hybrid_ocr(image_path)
            ocr_time = time.time() - start_time
            
            # Extract structured info
            parsed_info = self.deepseek_extract(text_content)
            
            # Prepare result entry
            result = {
                '图片路径': image_path,
                '图片名称': filename,
                'OCR来源': ocr_source,
                'OCR耗时(秒)': f"{ocr_time:.2f}"
            }
            
            # Add extracted fields
            for field in self.config["excel_fields"]:
                result[field] = parsed_info.get(field, '')
            
            # Rename file
            renaming_fields = {
                'date': parsed_info.get('获奖时间' if self.cert_type == 'student' else '活动时间', ''),
                'name': parsed_info.get('获奖者姓名', ''),
                'event': parsed_info.get('比赛名称' if self.cert_type == 'student' else '活动名称', ''),
                'award': parsed_info.get('奖项', ''),
                'ext': os.path.splitext(filename)[1]
            }
            
            if all(renaming_fields.values()):
                new_name = self.config["renaming_pattern"].format(**renaming_fields)
                new_path = os.path.join(self.input_dir, new_name)
                
                # Handle filename conflicts
                counter = 1
                while os.path.exists(new_path):
                    renaming_fields['counter'] = counter
                    new_name = self.config["renaming_pattern"].format(**renaming_fields)
                    new_path = os.path.join(self.input_dir, new_name)
                    counter += 1
                
                os.rename(image_path, new_path)
                result['新文件名'] = new_name
                self.log_callback(f"文件已重命名为: {new_name}")
            else:
                result['新文件名'] = filename
                self.log_callback("关键信息缺失,未重命名文件")
            
            self.results.append(result)
            self.log_callback(f"=== 处理完成 ({ocr_source}), 耗时: {ocr_time:.2f}秒 ===")
        
        # Save to Excel
        df = pd.DataFrame(self.results)
        df.to_excel(output_excel, index=False)
        self.log_callback(f"\n处理完成! 结果已保存到 {output_excel}")
        return df

(2)gui界面

certificate_gui.py

import tkinter as tk
from tkinter import ttk, filedialog, scrolledtext
import threading
import os
from certificate_processor import CertificateProcessor
import queue

class CertificateApp:
    def __init__(self):
        self.window = tk.Tk()
        self.window.title("证书处理系统")
        self.window.geometry("800x600")
        self.window.resizable(True, True)
        
        # Configure styles
        self.style = ttk.Style()
        self.style.configure("TFrame", padding=5)
        self.style.configure("TButton", padding=5)
        self.style.configure("TLabel", padding=5)
        
        # Create frames
        self.create_file_selection_frame()
        self.create_api_settings_frame()  # Add API settings frame
        self.create_type_selection_frame()
        self.create_control_frame()
        self.create_log_frame()
        self.create_progress_frame()
        
        # Message queue for thread-safe logging
        self.message_queue = queue.Queue()
        
        # Start periodic queue check
        self.window.after(100, self.process_queue)
        
    def create_file_selection_frame(self):
        """Create file selection components"""
        self.file_frame = ttk.LabelFrame(self.window, text="图片目录选择")
        self.file_frame.pack(fill="x", padx=10, pady=5)
        
        # Directory entry
        self.dir_label = ttk.Label(self.file_frame, text="图片目录:")
        self.dir_label.grid(row=0, column=0, padx=5, pady=5, sticky="w")
        
        self.dir_entry = ttk.Entry(self.file_frame, width=60)
        self.dir_entry.grid(row=0, column=1, padx=5, pady=5, sticky="we")
        
        # Browse button
        self.browse_btn = ttk.Button(
            self.file_frame, 
            text="浏览...", 
            command=self.browse_directory
        )
        self.browse_btn.grid(row=0, column=2, padx=5, pady=5)
        
        # Configure grid weights
        self.file_frame.columnconfigure(1, weight=1)
    
    def create_api_settings_frame(self):
        """Create API key input components"""
        self.api_frame = ttk.LabelFrame(self.window, text="API密钥设置")
        self.api_frame.pack(fill="x", padx=10, pady=5)
        
        # Baidu API Key
        self.baidu_key_label = ttk.Label(self.api_frame, text="百度API Key:")
        self.baidu_key_label.grid(row=0, column=0, padx=5, pady=5, sticky="w")
        
        self.baidu_key_entry = ttk.Entry(self.api_frame, width=50, show="*")
        self.baidu_key_entry.grid(row=0, column=1, padx=5, pady=5, sticky="we")
        
        # Baidu Secret Key
        self.baidu_secret_label = ttk.Label(self.api_frame, text="百度Secret Key:")
        self.baidu_secret_label.grid(row=1, column=0, padx=5, pady=5, sticky="w")
        
        self.baidu_secret_entry = ttk.Entry(self.api_frame, width=50, show="*")
        self.baidu_secret_entry.grid(row=1, column=1, padx=5, pady=5, sticky="we")
        
        # DeepSeek API Key
        self.deepseek_label = ttk.Label(self.api_frame, text="DeepSeek API Key:")
        self.deepseek_label.grid(row=2, column=0, padx=5, pady=5, sticky="w")
        
        self.deepseek_entry = ttk.Entry(self.api_frame, width=50, show="*")
        self.deepseek_entry.grid(row=2, column=1, padx=5, pady=5, sticky="we")
        
        # Configure grid weights
        self.api_frame.columnconfigure(1, weight=1)
    
    def create_type_selection_frame(self):
        """Create certificate type selection components"""
        self.type_frame = ttk.LabelFrame(self.window, text="证书类型")
        self.type_frame.pack(fill="x", padx=10, pady=5)
        
        self.type_var = tk.StringVar(value="student")
        
        self.student_rb = ttk.Radiobutton(
            self.type_frame, 
            text="学生证书", 
            variable=self.type_var, 
            value="student"
        )
        self.student_rb.pack(side="left", padx=10, pady=5)
        
        self.teacher_rb = ttk.Radiobutton(
            self.type_frame, 
            text="教师证书", 
            variable=self.type_var, 
            value="teacher"
        )
        self.teacher_rb.pack(side="left", padx=10, pady=5)
    
    def create_control_frame(self):
        """Create processing control buttons"""
        self.ctrl_frame = ttk.Frame(self.window)
        self.ctrl_frame.pack(fill="x", padx=10, pady=5)
        
        self.start_btn = ttk.Button(
            self.ctrl_frame, 
            text="开始处理", 
            command=self.start_processing
        )
        self.start_btn.pack(side="left", padx=5, pady=5)
        
        self.stop_btn = ttk.Button(
            self.ctrl_frame, 
            text="停止", 
            state="disabled",
            command=self.stop_processing
        )
        self.stop_btn.pack(side="left", padx=5, pady=5)
        
        self.open_folder_btn = ttk.Button(
            self.ctrl_frame, 
            text="打开结果文件夹", 
            command=self.open_output_folder
        )
        self.open_folder_btn.pack(side="right", padx=5, pady=5)
    
    def create_log_frame(self):
        """Create log display area"""
        self.log_frame = ttk.LabelFrame(self.window, text="处理日志")
        self.log_frame.pack(fill="both", expand=True, padx=10, pady=5)
        
        # Create text widget with scrollbar
        self.log_text = scrolledtext.ScrolledText(
            self.log_frame, 
            wrap="word", 
            state="normal"
        )
        self.log_text.pack(fill="both", expand=True, padx=5, pady=5)
        
        # Configure tags for different log levels
        self.log_text.tag_config("info", foreground="black")
        self.log_text.tag_config("success", foreground="green")
        self.log_text.tag_config("warning", foreground="orange")
        self.log_text.tag_config("error", foreground="red")
    
    def create_progress_frame(self):
        """Create progress bar"""
        self.progress_frame = ttk.Frame(self.window)
        self.progress_frame.pack(fill="x", padx=10, pady=5)
        
        self.progress = ttk.Progressbar(
            self.progress_frame, 
            orient="horizontal", 
            mode="determinate",
            length=400
        )
        self.progress.pack(fill="x", padx=5, pady=5)
        
        self.status_label = ttk.Label(
            self.progress_frame, 
            text="就绪"
        )
        self.status_label.pack(pady=5)
    
    def browse_directory(self):
        """Open directory browser dialog"""
        directory = filedialog.askdirectory()
        if directory:
            self.dir_entry.delete(0, tk.END)
            self.dir_entry.insert(0, directory)
    
    def start_processing(self):
        """Start certificate processing in a separate thread"""
        directory = self.dir_entry.get()
        if not directory or not os.path.isdir(directory):
            self.log("错误: 请选择有效的图片目录", "error")
            return
        
        # Get API keys
        baidu_api_key = self.baidu_key_entry.get().strip()
        baidu_secret_key = self.baidu_secret_entry.get().strip()
        deepseek_api_key = self.deepseek_entry.get().strip()
        
        # Disable UI controls during processing
        self.start_btn.config(state="disabled")
        self.stop_btn.config(state="normal")
        self.browse_btn.config(state="disabled")
        self.student_rb.config(state="disabled")
        self.teacher_rb.config(state="disabled")
        self.baidu_key_entry.config(state="disabled")
        self.baidu_secret_entry.config(state="disabled")
        self.deepseek_entry.config(state="disabled")
        
        # Get certificate type
        cert_type = self.type_var.get()
        
        # Log API key usage status
        if baidu_api_key and baidu_secret_key:
            self.log("百度OCR凭据已提供", "info")
        else:
            self.log("警告: 百度OCR凭据缺失,将使用本地OCR", "warning")
        
        if deepseek_api_key:
            self.log("DeepSeek API凭据已提供", "info")
        else:
            self.log("错误: DeepSeek API凭据缺失,处理将失败", "error")
            self.queue_message("UI:ENABLE_CONTROLS")
            return
        
        # Clear log
        self.log_text.delete(1.0, tk.END)
        self.log(f"开始处理{cert_type}证书...", "info")
        
        # Reset progress
        self.progress["value"] = 0
        self.status_label.config(text="处理中...")
        
        # Create processor with log callback and API keys
        self.processor = CertificateProcessor(
            directory,
            cert_type,
            self.queue_message,
            baidu_api_key=baidu_api_key,
            baidu_secret_key=baidu_secret_key,
            deepseek_api_key=deepseek_api_key
        )
        
        # Start processing in separate thread
        self.processing = True
        self.thread = threading.Thread(
            target=self.run_processing, 
            daemon=True
        )
        self.thread.start()
    
    def run_processing(self):
        """Thread function for processing"""
        try:
            output_file = os.path.join(
                self.dir_entry.get(), 
                "证书统计表.xlsx"
            )
            self.processor.process_certificates(output_file)
            self.queue_message("处理完成!", "success")
        except Exception as e:
            self.queue_message(f"处理错误: {str(e)}", "error")
        finally:
            self.queue_message("UI:ENABLE_CONTROLS")
    
    def stop_processing(self):
        """Stop processing"""
        self.processing = False
        self.queue_message("处理已停止", "warning")
        self.queue_message("UI:ENABLE_CONTROLS")
    
    def open_output_folder(self):
        """Open output folder in file explorer"""
        directory = self.dir_entry.get()
        if directory and os.path.isdir(directory):
            os.startfile(directory)
    
    def queue_message(self, message, tag="info"):
        """Add message to queue for thread-safe logging"""
        self.message_queue.put((message, tag))
    
    def process_queue(self):
        """Process messages from the queue"""
        try:
            while not self.message_queue.empty():
                message, tag = self.message_queue.get_nowait()
                
                if message == "UI:ENABLE_CONTROLS":
                    # Re-enable UI controls
                    self.start_btn.config(state="normal")
                    self.stop_btn.config(state="disabled")
                    self.browse_btn.config(state="normal")
                    self.student_rb.config(state="normal")
                    self.teacher_rb.config(state="normal")
                    self.baidu_key_entry.config(state="normal")
                    self.baidu_secret_entry.config(state="normal")
                    self.deepseek_entry.config(state="normal")
                    self.status_label.config(text="就绪")
                    continue
                
                # Update log
                self.log(message, tag)
                
                # Update progress if needed
                if "正在处理" in message and "==" in message:
                    try:
                        # Extract progress info
                        parts = message.split("(")
                        if len(parts) > 1:
                            progress_parts = parts[1].split("/")
                            current = int(progress_parts[0])
                            total = int(progress_parts[1].split(")")[0])
                            percent = (current / total) * 100
                            self.progress["value"] = percent
                    except:
                        pass
        
        finally:
            # Schedule next queue check
            self.window.after(100, self.process_queue)
    
    def log(self, message, tag="info"):
        """Add message to log display"""
        self.log_text.config(state="normal")
        self.log_text.insert(tk.END, message + "\n", tag)
        self.log_text.see(tk.END)
        self.log_text.config(state="disabled")
    
    def run(self):
        """Run the application"""
        self.window.mainloop()

if __name__ == "__main__":
    app = CertificateApp()
    app.run()





0 评论 最近

没有评论!