# -*- coding: utf-8 -*- import os from openai import OpenAI import markdown2 import pdfkit from django.conf import settings from django.http import JsonResponse, FileResponse from celery import shared_task import uuid from django.utils import timezone from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated from authentication.authentication import CustomTokenAuthentication from django.core.exceptions import ObjectDoesNotExist from .models import Papers from ec_user.models import SchoolInfo import json # ============== PYdoc =================== # from docx import Document # import pypandoc # pypandoc.download_pandoc() # ================ weasyprint =========== # 库用不了 # from weasyprint import HTML # HTML(string=html).write_pdf("output.pdf") class DpArguments(): subject = "数学" textbook_version = "人教版" start_grade = 8 start_semester = 1 start_chapter = 1 end_grade = 9 end_semester = 1 end_chapter = 1 student = { "grade": 8, "age": 10, "school": "民族小学" } def __getstate__(self): # 返回一个字典,包含所有需要序列化的属性 state = self.__dict__.copy() # 如果你不想序列化某个属性,可以从字典中删除 # 例如: del state['student'] return state def __setstate__(self, state): # 用字典中的数据恢复对象状态 self.__dict__.update(state) def generate_html_paper(md_result, file_path_name): # 写到md中 md_file = open(f'{file_path_name}.md', 'w', encoding='utf-8') md_file.write(md_result) md_file.close() # 从md中读出来再写到html中 md_read_file = open(f'{file_path_name}.md', 'r', encoding='utf-8') md_text = md_read_file.read() md_read_file.close() html = markdown2.markdown(md_text) html_write_file = open(f'{file_path_name}.html', 'w', encoding='utf-8') html_write_file.write( """ """+html) html_write_file.close() def convert_to_pdf(file_path_name): # =============== pdfkit ================ # 从html中读出来再写到pdf中 f = open(f'{file_path_name}.html', 'r', encoding='utf-8') content = f.read() f.close() config = pdfkit.configuration(wkhtmltopdf=settings.WKHTMLTOPDF_PATH) pdfkit.from_string(content, f'{file_path_name}.pdf', configuration=config) @shared_task def async_wraper(file_path_name, uuid_str, subject, textbook_version, start_grade, \ start_semester, start_chapter, end_grade, end_semester, end_chapter, student): # client = OpenAI( # api_key = settings.OPAI_API_KEY, # base_url = settings.OPAI_BASE_URL, # ) # completion = client.chat.completions.create( # model = "deepseek-r1-250120", # your model endpoint ID # messages = [ # {"role": "system", "content": "你是教学助手"}, # # {"role": "user", # # "content": f"请根据以下信息生成一份检测卷:学科为{subject},教材版本是{textbook_version},\ # # 检测范围从{start_grade}年级{start_semester}学期{start_chapter}章\ # # 到{end_grade}年级{end_semester}学期{end_chapter}章,学生年级为{student.get('grade')},\ # # 年龄为{student.get('age')},所在学校是{student.get('school')}。\ # # 根据答题所需时间一个半小时设置题量。题目有梯度。\ # # 试卷题目为{subject}{textbook_version}检测卷。只生成题目,不输出答案及说明等其他内容。" # # }, # {"role": "user", "content": f"""请严格按照以下要求生成一份{subject}检测卷: # 【试卷信息】 # 1. 标题:**{subject}{textbook_version}综合检测卷** # 2. 检测范围:{start_grade}年级{start_semester}学期第{start_chapter}章 至 # {end_grade}年级{end_semester}学期第{end_chapter}章 # 3. 适用对象:{student.get('school')} {student.get('grade')}年级学生(年龄{student.get('age')}) # 【命题要求】 # 1. 题型结构: # - 按难度梯度分为基础题(60%)、中档题(30%)、提高题(10%) # - 根据学科特点设计合理的题型(如数学可设置选择题/填空题/计算题/应用题,语文可设置阅读理解/文言文/写作等) # - 同一知识点不重复考查 # 2. 题量控制: # - 总题数控制在18-22题(以优秀学生完成时间约75分钟为标准) # - 选择题不超过5题,填空题不超过6题 # - 需包含至少1道综合应用题(理科)或材料分析题(文科) # 3. 内容要求: # - 严格按照指定章节范围命题 # - 题目表述清晰无歧义,数字单位标注完整 # - 禁止出现需图片辅助的题目 # - 数学类题目需给出必要公式位置(如"(提示:可用公式____)") # 【输出格式】 # 仅输出以下内容: # 1. 试卷标题(加粗居中) # 2. 考试说明(含总分值100分、考试时长、姓名班级填写处) # 3. 分模块的题目内容(标注题型、分值和题号) # 4. "——以下为题目区域——" 分隔线 # 禁止包含: # - 答案及解析 # - 评分标准 # - 额外说明文字 # - markdown格式及特殊符号"""} # ], # ) # generate_html_paper(completion.choices[0].message.content, file_path_name) import time time.sleep(30) generate_html_paper(""" # Test md""", file_path_name) try: obj = Papers.objects.get(uuid=uuid_str) # 只获取一个对象 obj.status = 'D' obj.updated_at = timezone.now() obj.save() except ObjectDoesNotExist: print("warning: record not found") class MyProtectedGeneratePaper(APIView): authentication_classes = [CustomTokenAuthentication] # 使用自定义的 Token 认证 permission_classes = [IsAuthenticated] # 需要用户认证才能访问 def post(self, request): user = request.user uuid_str = str(uuid.uuid4()) radom_path = user.username +'_'+ uuid_str file_path_name = os.path.join(settings.BASE_DIR, 'generate_paper', radom_path) # 处理 POST 请求,更新用户的联系信息 try: data = json.loads(request.body) subject = data.get("subject") textbook_version = data.get("textbook_version") except json.JSONDecodeError: return JsonResponse({"error": "Invalid JSON format"}, status=400) # auth_subjects = user.subjects.all() if not user.subjects.filter(name=subject).exists(): print("subject", subject) return JsonResponse({"error_code": "2001"}, status=403) if user.subject_usage_count <= 0: return JsonResponse({"error_code": "2002"}, status=403) dp_args = DpArguments() dp_args.subject = subject dp_args.textbook_version = textbook_version school_obj = SchoolInfo.objects.filter(user=user).first() if school_obj is None: return JsonResponse({"error_code": "2003"}, status=403) dp_args.student['age'] = user.age dp_args.student['school'] = school_obj.school_name dp_args.student['grade'] = school_obj.grade print(dp_args.student, dp_args.subject, dp_args.textbook_version, dp_args.start_grade, dp_args.start_semester,\ dp_args.start_chapter, dp_args.end_grade, dp_args.end_semester, dp_args.end_chapter) # 调用异步任务 obj = Papers.objects.create(uuid=uuid_str, user=user, status='P') obj.save() user.subject_usage_count = user.subject_usage_count - 1 user.save() async_wraper.delay(file_path_name, uuid_str, \ dp_args.subject, dp_args.textbook_version, dp_args.start_grade, dp_args.start_semester, dp_args.start_chapter, \ dp_args.end_grade, dp_args.end_semester, dp_args.end_chapter, dp_args.student) return JsonResponse({"status": "success", "uuid": uuid_str}) class MyProtectedGenerateCheck(APIView): authentication_classes = [CustomTokenAuthentication] # 使用自定义的 Token 认证 permission_classes = [IsAuthenticated] # 需要用户认证才能访问 def post(self, request): user = request.user # 处理 POST 请求,更新用户的联系信息 try: data = json.loads(request.body) uuid_str = data.get("uuid") except json.JSONDecodeError: return JsonResponse({"error": "Invalid JSON format"}, status=400) try: # print(uuid_str) obj = Papers.objects.get(uuid=uuid_str, user=user) # 只获取一个对象 if obj.status == 'D': return JsonResponse({"status": "done", "download_url": f'/diagnose/download/?uuid_str={uuid_str}'}) else: return JsonResponse({"status": "processing", "download_url": None}) except ObjectDoesNotExist: return JsonResponse({"error_code": "2004"}, status=404) class MyProtectedDownloadPaper(APIView): authentication_classes = [CustomTokenAuthentication] # 使用自定义的 Token 认证 permission_classes = [IsAuthenticated] # 需要用户认证才能访问 def get(self, request): uuid_str = request.query_params.get('uuid_str', None) if not uuid_str: return JsonResponse({"error": "Missing uuid_str in query parameters"}, status=400) user = request.user username = user.username file_path_name = os.path.join(settings.BASE_DIR, 'generate_paper', f'{username}_{uuid_str}') # 检查文件是否存在 print(file_path_name) if not os.path.exists(file_path_name + ".html"): return JsonResponse({"error_code": "2005"}, status=403) convert_to_pdf(file_path_name) try: # 直接将文件路径传递给 FileResponse response = FileResponse(open(file_path_name+".pdf", 'rb'), as_attachment=True) response['Content-Disposition'] = f'attachment; filename="{username}_{uuid_str}.pdf"' return response except Exception as e: return JsonResponse({"error": f"Failed to open file: {str(e)}"}, status=500)