import os from openai import OpenAI import base64 from PIL import Image from celery import shared_task from django.conf import settings import markdown2 from django.utils import timezone from .models import OpenAIDiagnose from django.core.exceptions import ObjectDoesNotExist def get_image_type(image_path): with Image.open(image_path) as img: return img.format # 返回图片的格式,例如 'PNG' 或 'JPEG' def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') # 将图片转为Base64编码 def generate_html_paper(md_result, file_path_name): # 写到md中 md_file = open(f'{file_path_name}.md', 'w', encoding='utf-8') md_file.write(md_result) md_file.close() # 从md中读出来再写到html中 md_read_file = open(f'{file_path_name}.md', 'r', encoding='utf-8') md_text = md_read_file.read() md_read_file.close() html = markdown2.markdown(md_text) html_write_file = open(f'{file_path_name}.html', 'w', encoding='utf-8') html_write_file.write( """ """+html) html_write_file.close() @shared_task def openai_diagnoser_asyna_wraper(uuid_str: str, username: str, diagnose_type: str): client = OpenAI( api_key = settings.OPAI_IMG_API_KEY, base_url = settings.OPAI_IMG_BASE_URL ) # 需要传给大模型的图片 # image_path = "ch.jpg" image_path = os.path.join(settings.BASE_DIR, 'upload_file', username, uuid_str) base64_image = encode_image(image_path) image_type = get_image_type(image_path) image_type = image_type.lower() # print("===> image type:" + image_type) # Image input: response = client.chat.completions.create( model="doubao-1-5-vision-pro-32k-250115", messages=[ { "role": "user", "content": [ { "type": "text", "text": """图中是几个题目,我已经用笔作答了,先帮我判题,然后通过我的作答情况给出本学科的学情分析。 从基础知识、学科素养、综合能力等方面给出学员指导意见""", }, { "type": "image_url", "image_url": { # 需要注意:传入Base64编码前需要增加前缀 data:image/{图片格式};base64,{Base64编码}: # PNG图片:"url": f"data:image/png;base64,{base64_image}" # JEPG图片:"url": f"data:image/jpeg;base64,{base64_image}" # WEBP图片:"url": f"data:image/webp;base64,{base64_image}" "url": f"data:image/{image_type};base64,{base64_image}" }, }, ], } ], ) # print(response.choices[0].message.content) generate_html_paper(response.choices[0].message.content, image_path) try: obj = OpenAIDiagnose.objects.get(uuid=uuid_str) # 只获取一个对象 obj.status = 'D' obj.updated_at = timezone.now() obj.save() except ObjectDoesNotExist: print("warning: record not found")