from re import split from flask import Flask, request, redirect, url_for, flash, render_template, send_from_directory, abort, Response, session from random import randint import cv2 import io import numpy as np import json import urllib import LFUtilities from Searcher import Searcher from GEMSearcher import GEMSearcher import WebAppSettings as settings import uuid import requests import os, os.path from PIL import Image import tornado.wsgi import tornado.httpserver import argparse import base64 from Heic2Png import conv from datetime import datetime app = Flask(__name__) app.secret_key = "27eduCBA09" security_id = 'FA6mMmeWZ8JjWKz5' rescorer = False lf_impl = 'BEBLID' HEIC_MAGIC_NUMBER = "AAAAGGZ0eXA=" @app.route('/bcir/') def api_root(): print('index_with_randoms.html') random_ids = [] for i in range(0, 15): random_ids.append(searcher.get_id(randint(0, 600))) return render_template('index_with_randoms.html', random_ids=random_ids) def url_to_file(url): dest_file = uuid.uuid4().hex + ".png" session["query"] = settings.log_folder + '/' + dest_file dest_path = settings.logs + "/" + dest_file req = urllib.request.Request( url, data=None, headers={ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36' } ) resp = urllib.request.urlopen(req) image = np.asarray(bytearray(resp.read()), dtype="uint8") decoded = cv2.imdecode(image, cv2.IMREAD_COLOR) resized = LFUtilities.resize(500, decoded) cv2.imwrite(dest_path, resized) #im = Image.fromarray(image) #im.save(dest_path) return dest_path def post_to_file(image): dest_file = uuid.uuid4().hex + ".png" session["query"] = settings.log_folder + '/' + dest_file dest_path = settings.logs + "/" + dest_file in_memory_file = io.BytesIO() image.save(in_memory_file) data = np.fromstring(in_memory_file.getvalue(), dtype=np.uint8) decoded = cv2.imdecode(data, cv2.IMREAD_COLOR) resized = LFUtilities.resize(500, decoded) cv2.imwrite(dest_path, resized) return dest_path def set_rescorer(rescore_param): global rescorer if rescore_param is not None: if rescore_param == 'true': rescorer = True elif rescore_param == 'false': rescorer = False def base64_to_file(image_base64): ext = ".jpg" if (image_base64.startswith(HEIC_MAGIC_NUMBER)): ext = ".heic" dest_file = uuid.uuid4().hex + ext dest_path = settings.logs + "/" + dest_file with open(dest_path, "wb") as image_file: byte_content = base64.b64decode(image_base64) image_file.write(byte_content) if (image_base64.startswith(HEIC_MAGIC_NUMBER)): dest_path = conv(dest_path) return dest_path def get_res(results, query_url=None, img_base64=None): if query_url is not None: return render_template('search.html', results=results, query_url=query_url) elif results is None: results = '{"classname":"Unknown"}' else: results = '{"classname":"' + results[0] + '", "conf":' + str(results[1]) + '}' print(results) json_res = json.dumps(results) print(json_res) if img_base64 != None: html_txt = str(datetime.now()) html_txt += '
' html_txt += results html_txt += '
' html_txt += '' html_txt += '


' html_txt += '\n' with open("/home/paolo/Dropbox/extension_logs/report.html", "a+") as myfile: myfile.write(html_txt) return json_res def get_resDict(results, query_url=None): if query_url is not None: return render_template('search.html', results=results, query_url=query_url) elif results is None: results = {"classname":"Unknown"} else: results = {"classname":"' + results[0][0] + '", "conf":' + str(results[0][1]) + '} print(results) json_res = json.dumps(results) print(json_res) return json_res @app.route('/bcir/searchById') def search_by_id(): id = request.args.get('id') set_rescorer(request.args.get("rescorer")) results = searcher.search_by_id(id, settings.k, rescorer, lf_impl) query_url = None if request.args.get("tohtml") is not None: query_url = id + ".jpg" return get_res(results, query_url) @app.route('/bcir/searchByImg', methods=['POST']) def search_by_img(): if 'image' not in request.files: return Response(json.dumps("Error, unable to get the image"), status=401, mimetype='application/json') sec_id = request.form.get("securityID") #if sec_id != security_id: # return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json') file = request.files['image'] img_file = post_to_file(file) set_rescorer(request.form.get("rescorer")) results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl) query_url = None if request.form.get("tohtml") is not None: query_url = "" return get_res(results, query_url) @app.route('/bcir/searchByImgB64', methods=['POST']) def search_by_img_base64(): print("query by base64 received") #print(request) #print(request.form) sec_id = request.form.get("securityID") #to fix a problem with photo orientation try: orientation = request.form.get("orientation") orientation = int(orientation) except: orientation = 1 #if sec_id != security_id: # return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json') image = request.form.get('image') if image: img_file = base64_to_file(image) else: flash('No img sent') return redirect(request.url) set_rescorer(request.form.get("rescorer")) results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl) query_url = None if request.form.get("tohtml") is not None: query_url = "" return get_res(results, query_url, img_base64=image) @app.route('/bcir/searchByURL') def search_by_url(): sec_id = request.args.get("securityID") #if sec_id != security_id: # return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json') url = request.args.get('url') img_file = url_to_file(url) set_rescorer(request.args.get("rescorer")) results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl) query_url = None if request.args.get("tohtml") is not None: query_url = url return get_res(results, query_url) @app.route('/bcir/setRescorer') def set_bot_rescorer(): set_rescorer(request.args.get("rescorer")) return "Rescorer set to " + str(rescorer) @app.route('/bcir/') def download_file(filename): print(filename) values = filename.split('/') folder = values[0] name = values[1] print(folder) print(name) return send_from_directory(settings.working_folder + '/' + folder, name, as_attachment=False) """ if __name__ == '__main__': parser = argparse.ArgumentParser(description='Reading configuration file') parser.add_argument('conf', type=str, help='Configuration file path') args = parser.parse_args() settings.load_setting(args.conf) global searcher searcher = BeniCulturaliSearcher() #app.run(host='0.0.0.0', port=8090, ssl_context='adhoc') app.run(host='0.0.0.0', port=settings.port) # app.run(host='0.0.0.0', port=settings.port) """ def start_tornado(app, port=8190): http_server = tornado.httpserver.HTTPServer(tornado.wsgi.WSGIContainer(app)) http_server.listen(port) app.logger.info("Tornado server starting on port {}".format(port)) tornado.ioloop.IOLoop.instance().start() def start_from_terminal(app): parser = argparse.ArgumentParser(description='Reading configuration file') parser.add_argument('conf', type=str, help='Configuration file path') args = parser.parse_args() settings.load_setting(args.conf) global searcher searcher = Searcher() #if args.debug: # app.run(debug=True, host='0.0.0.0', port=settings.port) # else: #start_tornado(app, settings.port) app.run(debug=True, host='0.0.0.0', port=settings.port) if __name__ == '__main__': start_from_terminal(app)