Extension_image_recognition/WebAppFaultTolerant2.py

379 lines
12 KiB
Python

from re import split
from flask import Flask, request, redirect, url_for, flash, render_template, send_from_directory, abort, Response, session
from random import randint
import cv2
import io
import numpy as np
import json
import urllib
import LFUtilities
from Searcher import Searcher
from GEMSearcher import GEMSearcher
import WebAppSettings as settings
import uuid
import requests
import os, os.path
from PIL import Image
import tornado.wsgi
import tornado.httpserver
import argparse
import base64
from Heic2Png import conv
from datetime import datetime
app = Flask(__name__)
app.secret_key = "27eduCBA09"
security_id = 'FA6mMmeWZ8JjWKz5'
rescorer = False
lf_impl = 'BEBLID'
HEIC_MAGIC_NUMBER = "AAAAGGZ0eXA="
@app.route('/bcir/')
def api_root():
print('index_with_randoms.html')
random_ids = []
for i in range(0, 15):
random_ids.append(searcher.get_id(randint(0, 600)))
return render_template('index_with_randoms.html', random_ids=random_ids)
def url_to_file(url):
dest_file = uuid.uuid4().hex + ".png"
session["query"] = settings.log_folder + '/' + dest_file
dest_path = settings.logs + "/" + dest_file
req = urllib.request.Request(
url,
data=None,
headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
}
)
resp = urllib.request.urlopen(req)
image = np.asarray(bytearray(resp.read()), dtype="uint8")
decoded = cv2.imdecode(image, cv2.IMREAD_COLOR)
resized = LFUtilities.resize(500, decoded)
cv2.imwrite(dest_path, resized)
#im = Image.fromarray(image)
#im.save(dest_path)
return dest_path
def post_to_file(image):
dest_file = uuid.uuid4().hex + ".png"
session["query"] = settings.log_folder + '/' + dest_file
dest_path = settings.logs + "/" + dest_file
in_memory_file = io.BytesIO()
image.save(in_memory_file)
data = np.fromstring(in_memory_file.getvalue(), dtype=np.uint8)
decoded = cv2.imdecode(data, cv2.IMREAD_COLOR)
resized = LFUtilities.resize(500, decoded)
cv2.imwrite(dest_path, resized)
return dest_path
def set_rescorer(rescore_param):
global rescorer
if rescore_param is not None:
if rescore_param == 'true':
rescorer = True
elif rescore_param == 'false':
rescorer = False
def base64_to_file(image_base64):
ext = ".jpg"
if (image_base64.startswith(HEIC_MAGIC_NUMBER)):
ext = ".heic"
dest_file = uuid.uuid4().hex + ext
dest_path = settings.logs + "/" + dest_file
with open(dest_path, "wb") as image_file:
byte_content = base64.b64decode(image_base64)
image_file.write(byte_content)
if (image_base64.startswith(HEIC_MAGIC_NUMBER)):
dest_path = conv(dest_path)
return dest_path
def get_res(results, query_url=None, img_base64=None):
if query_url is not None:
return render_template('search.html', results=results, query_url=query_url)
elif results is None:
results = '{"classname":"Unknown"}'
else:
results = '{"classname":"' + results[0] + '", "conf":' + str(results[1]) + '}'
print(results)
json_res = json.dumps(results)
print(json_res)
if img_base64 != None:
html_txt = str(datetime.now())
html_txt += '</br>'
html_txt += results
html_txt += '</br>'
html_txt += '<img height="256" src="data:image/png;base64,' + img_base64 + '">'
html_txt += '</br></br></br>'
html_txt += '\n'
with open("/home/paolo/Dropbox/extension_logs/report.html", "a+") as myfile:
myfile.write(html_txt)
return json_res
def get_resDict(results, query_url=None):
if query_url is not None:
return render_template('search.html', results=results, query_url=query_url)
elif results is None:
results = {"classname":"Unknown"}
else:
results = {"classname":"' + results[0][0] + '", "conf":' + str(results[0][1]) + '}
print(results)
json_res = json.dumps(results)
print(json_res)
return json_res
def rotate_img(img_path, degree):
im1 = Image.open(img_path)
im2 = im1.rotate(degree, expand=True)
dest = img_path + "_" + str(degree) + ".jpg"
im2.save(dest)
return dest
@app.route('/bcir/searchById')
def search_by_id():
id = request.args.get('id')
set_rescorer(request.args.get("rescorer"))
results = searcher.search_by_id(id, settings.k, rescorer, lf_impl)
query_url = None
if request.args.get("tohtml") is not None:
query_url = id + ".jpg"
return get_res(results, query_url)
@app.route('/bcir/searchByImg', methods=['POST'])
def search_by_img():
if 'image' not in request.files:
return Response(json.dumps("Error, unable to get the image"), status=401, mimetype='application/json')
sec_id = request.form.get("securityID")
#if sec_id != security_id:
# return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json')
file = request.files['image']
img_file = post_to_file(file)
set_rescorer(request.form.get("rescorer"))
results_0 = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
degree_90 = rotate_img(img_file, 90)
results_90 = searcher.search_by_img(degree_90, settings.k, rescorer, lf_impl)
degree_270 = rotate_img(img_file, 270)
results_270 = searcher.search_by_img(degree_270, settings.k, rescorer, lf_impl)
max_res = results_0
if results_90 is not None:
if max_res is None:
max_res = results_90
elif results_90[1] > max_res[1]:
max_res = results_90
if results_270 is not None:
if max_res is None:
max_res = results_270
elif results_270[1] > max_res[1]:
max_res = results_270
query_url = None
if request.form.get("tohtml") is not None:
query_url = ""
return get_res(max_res, query_url)
@app.route('/bcir/searchByImgB64', methods=['POST'])
def search_by_img_base64():
print("query by base64 received")
#print(request)
#print(request.form)
sec_id = request.form.get("securityID")
#if sec_id != security_id:
# return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json')
#to fix a problem with photo orientation
try:
orientation = request.form.get("orientation")
orientation = int(orientation)
except:
orientation = 1
image = request.form.get('image')
if image:
img_file = base64_to_file(image)
else:
flash('No img sent')
return redirect(request.url)
oriented_img = img_file
if orientation == 3:
oriented_img = rotate_img(img_file, 180)
elif orientation == 6:
oriented_img = rotate_img(img_file, 270)
elif orientation == 8:
oriented_img = rotate_img(img_file, 90)
results = searcher.search_by_img(oriented_img, settings.k, rescorer, lf_impl)
set_rescorer(request.form.get("rescorer"))
query_url = None
if request.form.get("tohtml") is not None:
query_url = ""
return get_res(results, query_url, img_base64=image)
'''
results_0 = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
degree_90 = rotate_img(img_file, 90)
results_90 = searcher.search_by_img(degree_90, settings.k, rescorer, lf_impl)
degree_270 = rotate_img(img_file, 270)
results_270 = searcher.search_by_img(degree_270, settings.k, rescorer, lf_impl)
max_res = results_0
if results_90 is not None:
if max_res is None:
max_res = results_90
elif results_90[1] > max_res[1]:
max_res = results_90
if results_270 is not None:
if max_res is None:
max_res = results_270
elif results_270[1] > max_res[1]:
max_res = results_270
query_url = None
if request.form.get("tohtml") is not None:
query_url = ""
return get_res(max_res, query_url, img_base64=image)
'''
@app.route('/bcir/searchByURL')
def search_by_url():
sec_id = request.args.get("securityID")
#if sec_id != security_id:
# return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json')
url = request.args.get('url')
img_file = url_to_file(url)
set_rescorer(request.args.get("rescorer"))
#to fix a problem with photo orientation
try:
orientation = request.args.get("orientation")
orientation = int(orientation)
except:
orientation = 1
oriented_img = img_file
if orientation == 3:
oriented_img = rotate_img(img_file, 180)
elif orientation == 6:
oriented_img = rotate_img(img_file, 90)
elif orientation == 8:
oriented_img = rotate_img(img_file, 270)
results = searcher.search_by_img(oriented_img, settings.k, rescorer, lf_impl)
set_rescorer(request.form.get("rescorer"))
query_url = None
if request.args.get("tohtml") is not None:
query_url = url
return get_res(results, query_url)
results_0 = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
degree_90 = rotate_img(img_file, 90)
results_90 = searcher.search_by_img(degree_90, settings.k, rescorer, lf_impl)
degree_270 = rotate_img(img_file, 270)
results_270 = searcher.search_by_img(degree_270, settings.k, rescorer, lf_impl)
max_res = results_0
if results_90 is not None:
if max_res is None:
max_res = results_90
elif results_90[1] > max_res[1]:
max_res = results_90
if results_270 is not None:
if max_res is None:
max_res = results_270
elif results_270[1] > max_res[1]:
max_res = results_270
query_url = None
if request.args.get("tohtml") is not None:
query_url = url
return get_res(max_res, query_url)
@app.route('/bcir/setRescorer')
def set_bot_rescorer():
set_rescorer(request.args.get("rescorer"))
return "Rescorer set to " + str(rescorer)
@app.route('/bcir/<path:filename>')
def download_file(filename):
print(filename)
values = filename.split('/')
folder = values[0]
name = values[1]
print(folder)
print(name)
return send_from_directory(settings.working_folder + '/' + folder, name, as_attachment=False)
"""
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Reading configuration file')
parser.add_argument('conf', type=str, help='Configuration file path')
args = parser.parse_args()
settings.load_setting(args.conf)
global searcher
searcher = BeniCulturaliSearcher()
#app.run(host='0.0.0.0', port=8090, ssl_context='adhoc')
app.run(host='0.0.0.0', port=settings.port)
# app.run(host='0.0.0.0', port=settings.port)
"""
def start_tornado(app, port=8190):
http_server = tornado.httpserver.HTTPServer(tornado.wsgi.WSGIContainer(app))
http_server.listen(port)
app.logger.info("Tornado server starting on port {}".format(port))
tornado.ioloop.IOLoop.instance().start()
def start_from_terminal(app):
parser = argparse.ArgumentParser(description='Reading configuration file')
parser.add_argument('conf', type=str, help='Configuration file path')
args = parser.parse_args()
settings.load_setting(args.conf)
global searcher
searcher = Searcher()
#if args.debug:
# app.run(debug=True, host='0.0.0.0', port=settings.port)
# else:
#start_tornado(app, settings.port)
app.run(debug=False, host='0.0.0.0', port=settings.port)
if __name__ == '__main__':
start_from_terminal(app)