diagnose-backend/diagnose/dp_views.py

155 lines
6.0 KiB
Python

# -*- coding: utf-8 -*-
import os
from django.conf import settings
from django.http import JsonResponse, FileResponse
import uuid
from rest_framework.views import APIView
from rest_framework.permissions import IsAuthenticated
from authentication.authentication import CustomTokenAuthentication
from django.core.exceptions import ObjectDoesNotExist
from .models import Papers
from ec_user.models import SchoolInfo
import json
from .openai_gen import paper_gen_async_wraper, convert_to_pdf
# ============== PYdoc ===================
# from docx import Document
# import pypandoc
# pypandoc.download_pandoc()
# ================ weasyprint ===========
# 库用不了
# from weasyprint import HTML
# HTML(string=html).write_pdf("output.pdf")
class DpArguments():
subject = "数学"
textbook_version = "人教版"
start_grade = 8
start_semester = 1
start_chapter = 1
end_grade = 9
end_semester = 1
end_chapter = 1
student = {
"grade": 8,
"age": 10,
"school": "民族小学"
}
def __getstate__(self):
# 返回一个字典,包含所有需要序列化的属性
state = self.__dict__.copy()
# 如果你不想序列化某个属性,可以从字典中删除
# 例如: del state['student']
return state
def __setstate__(self, state):
# 用字典中的数据恢复对象状态
self.__dict__.update(state)
class MyProtectedGeneratePaper(APIView):
authentication_classes = [CustomTokenAuthentication] # 使用自定义的 Token 认证
permission_classes = [IsAuthenticated] # 需要用户认证才能访问
def post(self, request):
user = request.user
uuid_str = str(uuid.uuid4())
radom_path = user.username +'_'+ uuid_str
file_path_name = os.path.join(settings.BASE_DIR, 'generate_paper', radom_path)
# 处理 POST 请求,更新用户的联系信息
try:
data = json.loads(request.body)
subject = data.get("subject")
textbook_version = data.get("textbook_version")
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON format"}, status=400)
# auth_subjects = user.subjects.all()
if not user.subjects.filter(name=subject).exists():
print("subject", subject)
return JsonResponse({"error_code": "2001"}, status=403)
if user.subject_usage_count <= 0:
return JsonResponse({"error_code": "2002"}, status=403)
dp_args = DpArguments()
dp_args.subject = subject
dp_args.textbook_version = textbook_version
school_obj = SchoolInfo.objects.filter(user=user).first()
if school_obj is None:
return JsonResponse({"error_code": "2003"}, status=403)
dp_args.student['age'] = user.age
dp_args.student['school'] = school_obj.school_name
dp_args.student['grade'] = school_obj.grade
print(dp_args.student, dp_args.subject, dp_args.textbook_version, dp_args.start_grade, dp_args.start_semester,\
dp_args.start_chapter, dp_args.end_grade, dp_args.end_semester, dp_args.end_chapter)
# 调用异步任务
obj = Papers.objects.create(uuid=uuid_str, user=user, status='P')
obj.save()
user.subject_usage_count = user.subject_usage_count - 1
user.save()
paper_gen_async_wraper.delay(file_path_name, uuid_str, \
dp_args.subject, dp_args.textbook_version, dp_args.start_grade, dp_args.start_semester, dp_args.start_chapter, \
dp_args.end_grade, dp_args.end_semester, dp_args.end_chapter, dp_args.student)
return JsonResponse({"status": "success", "uuid": uuid_str})
class MyProtectedGenerateCheck(APIView):
authentication_classes = [CustomTokenAuthentication] # 使用自定义的 Token 认证
permission_classes = [IsAuthenticated] # 需要用户认证才能访问
def post(self, request):
user = request.user
# 处理 POST 请求,更新用户的联系信息
try:
data = json.loads(request.body)
uuid_str = data.get("uuid")
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON format"}, status=400)
try:
# print(uuid_str)
obj = Papers.objects.get(uuid=uuid_str, user=user) # 只获取一个对象
if obj.status == 'D':
return JsonResponse({"status": "done", "download_url": f'/diagnose/download/?uuid_str={uuid_str}'})
else:
return JsonResponse({"status": "processing", "download_url": None})
except ObjectDoesNotExist:
return JsonResponse({"error_code": "2004"}, status=404)
class MyProtectedDownloadPaper(APIView):
authentication_classes = [CustomTokenAuthentication] # 使用自定义的 Token 认证
permission_classes = [IsAuthenticated] # 需要用户认证才能访问
def get(self, request):
uuid_str = request.query_params.get('uuid_str', None)
if not uuid_str:
return JsonResponse({"error": "Missing uuid_str in query parameters"}, status=400)
user = request.user
username = user.username
file_path_name = os.path.join(settings.BASE_DIR, 'generate_paper', f'{username}_{uuid_str}')
# 检查文件是否存在
print(file_path_name)
if not os.path.exists(file_path_name + ".html"):
return JsonResponse({"error_code": "2005"}, status=403)
convert_to_pdf(file_path_name)
try:
# 直接将文件路径传递给 FileResponse
response = FileResponse(open(file_path_name+".pdf", 'rb'), as_attachment=True)
response['Content-Disposition'] = f'attachment; filename="{username}_{uuid_str}.pdf"'
return response
except Exception as e:
return JsonResponse({"error": f"Failed to open file: {str(e)}"}, status=500)