Extension_image_recognition/WebApp.py

277 lines
8.3 KiB
Python
Executable File

from re import split
from flask import Flask, request, redirect, url_for, flash, render_template, send_from_directory, abort, Response, session
from random import randint
import cv2
import io
import numpy as np
import json
import urllib
import LFUtilities
from Searcher import Searcher
from GEMSearcher import GEMSearcher
import WebAppSettings as settings
import uuid
import requests
import os, os.path
from PIL import Image
import tornado.wsgi
import tornado.httpserver
import argparse
import base64
from Heic2Png import conv
from datetime import datetime
app = Flask(__name__)
app.secret_key = "27eduCBA09"
security_id = 'FA6mMmeWZ8JjWKz5'
rescorer = False
lf_impl = 'BEBLID'
HEIC_MAGIC_NUMBER = "AAAAGGZ0eXA="
@app.route('/bcir/')
def api_root():
print('index_with_randoms.html')
random_ids = []
for i in range(0, 15):
random_ids.append(searcher.get_id(randint(0, 600)))
return render_template('index_with_randoms.html', random_ids=random_ids)
def url_to_file(url):
dest_file = uuid.uuid4().hex + ".png"
session["query"] = settings.log_folder + '/' + dest_file
dest_path = settings.logs + "/" + dest_file
req = urllib.request.Request(
url,
data=None,
headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
}
)
resp = urllib.request.urlopen(req)
image = np.asarray(bytearray(resp.read()), dtype="uint8")
decoded = cv2.imdecode(image, cv2.IMREAD_COLOR)
resized = LFUtilities.resize(500, decoded)
cv2.imwrite(dest_path, resized)
#im = Image.fromarray(image)
#im.save(dest_path)
return dest_path
def post_to_file(image):
dest_file = uuid.uuid4().hex + ".png"
session["query"] = settings.log_folder + '/' + dest_file
dest_path = settings.logs + "/" + dest_file
in_memory_file = io.BytesIO()
image.save(in_memory_file)
data = np.fromstring(in_memory_file.getvalue(), dtype=np.uint8)
decoded = cv2.imdecode(data, cv2.IMREAD_COLOR)
resized = LFUtilities.resize(500, decoded)
cv2.imwrite(dest_path, resized)
return dest_path
def set_rescorer(rescore_param):
global rescorer
if rescore_param is not None:
if rescore_param == 'true':
rescorer = True
elif rescore_param == 'false':
rescorer = False
def base64_to_file(image_base64):
ext = ".jpg"
if (image_base64.startswith(HEIC_MAGIC_NUMBER)):
ext = ".heic"
dest_file = uuid.uuid4().hex + ext
dest_path = settings.logs + "/" + dest_file
with open(dest_path, "wb") as image_file:
byte_content = base64.b64decode(image_base64)
image_file.write(byte_content)
if (image_base64.startswith(HEIC_MAGIC_NUMBER)):
dest_path = conv(dest_path)
return dest_path
def get_res(results, query_url=None, img_base64=None):
if query_url is not None:
return render_template('search.html', results=results, query_url=query_url)
elif results is None:
results = '{"classname":"Unknown"}'
else:
results = '{"classname":"' + results[0] + '", "conf":' + str(results[1]) + '}'
print(results)
json_res = json.dumps(results)
print(json_res)
if img_base64 != None:
html_txt = str(datetime.now())
html_txt += '</br>'
html_txt += results
html_txt += '</br>'
html_txt += '<img height="256" src="data:image/png;base64,' + img_base64 + '">'
html_txt += '</br></br></br>'
html_txt += '\n'
with open("/home/paolo/Dropbox/extension_logs/report.html", "a+") as myfile:
myfile.write(html_txt)
return json_res
def get_resDict(results, query_url=None):
if query_url is not None:
return render_template('search.html', results=results, query_url=query_url)
elif results is None:
results = {"classname":"Unknown"}
else:
results = {"classname":"' + results[0][0] + '", "conf":' + str(results[0][1]) + '}
print(results)
json_res = json.dumps(results)
print(json_res)
return json_res
@app.route('/bcir/searchById')
def search_by_id():
id = request.args.get('id')
set_rescorer(request.args.get("rescorer"))
results = searcher.search_by_id(id, settings.k, rescorer, lf_impl)
query_url = None
if request.args.get("tohtml") is not None:
query_url = id + ".jpg"
return get_res(results, query_url)
@app.route('/bcir/searchByImg', methods=['POST'])
def search_by_img():
if 'image' not in request.files:
return Response(json.dumps("Error, unable to get the image"), status=401, mimetype='application/json')
sec_id = request.form.get("securityID")
#if sec_id != security_id:
# return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json')
file = request.files['image']
img_file = post_to_file(file)
set_rescorer(request.form.get("rescorer"))
results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
query_url = None
if request.form.get("tohtml") is not None:
query_url = ""
return get_res(results, query_url)
@app.route('/bcir/searchByImgB64', methods=['POST'])
def search_by_img_base64():
print("query by base64 received")
#print(request)
#print(request.form)
sec_id = request.form.get("securityID")
#to fix a problem with photo orientation
try:
orientation = request.form.get("orientation")
orientation = int(orientation)
except:
orientation = 1
#if sec_id != security_id:
# return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json')
image = request.form.get('image')
if image:
img_file = base64_to_file(image)
else:
flash('No img sent')
return redirect(request.url)
set_rescorer(request.form.get("rescorer"))
results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
query_url = None
if request.form.get("tohtml") is not None:
query_url = ""
return get_res(results, query_url, img_base64=image)
@app.route('/bcir/searchByURL')
def search_by_url():
sec_id = request.args.get("securityID")
#if sec_id != security_id:
# return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json')
url = request.args.get('url')
img_file = url_to_file(url)
set_rescorer(request.args.get("rescorer"))
results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
query_url = None
if request.args.get("tohtml") is not None:
query_url = url
return get_res(results, query_url)
@app.route('/bcir/setRescorer')
def set_bot_rescorer():
set_rescorer(request.args.get("rescorer"))
return "Rescorer set to " + str(rescorer)
@app.route('/bcir/<path:filename>')
def download_file(filename):
print(filename)
values = filename.split('/')
folder = values[0]
name = values[1]
print(folder)
print(name)
return send_from_directory(settings.working_folder + '/' + folder, name, as_attachment=False)
"""
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Reading configuration file')
parser.add_argument('conf', type=str, help='Configuration file path')
args = parser.parse_args()
settings.load_setting(args.conf)
global searcher
searcher = BeniCulturaliSearcher()
#app.run(host='0.0.0.0', port=8090, ssl_context='adhoc')
app.run(host='0.0.0.0', port=settings.port)
# app.run(host='0.0.0.0', port=settings.port)
"""
def start_tornado(app, port=8190):
http_server = tornado.httpserver.HTTPServer(tornado.wsgi.WSGIContainer(app))
http_server.listen(port)
app.logger.info("Tornado server starting on port {}".format(port))
tornado.ioloop.IOLoop.instance().start()
def start_from_terminal(app):
parser = argparse.ArgumentParser(description='Reading configuration file')
parser.add_argument('conf', type=str, help='Configuration file path')
args = parser.parse_args()
settings.load_setting(args.conf)
global searcher
searcher = Searcher()
#if args.debug:
# app.run(debug=True, host='0.0.0.0', port=settings.port)
# else:
#start_tornado(app, settings.port)
app.run(debug=True, host='0.0.0.0', port=settings.port)
if __name__ == '__main__':
start_from_terminal(app)