Extension_image_recognition/WebAppTest.py

291 lines
9.1 KiB
Python
Executable File

from re import split
from flask import Flask, request, redirect, url_for, flash, render_template, send_from_directory, abort, Response
from random import randint
import cv2
import io
import numpy as np
import json
import urllib
from Searcher import Searcher
from GEMSearcher import GEMSearcher
import WebAppSettings as settings
import uuid
import requests
import os, os.path
from PIL import Image
import tornado.wsgi
import tornado.httpserver
import argparse
import base64
app = Flask(__name__)
security_id = 'FA6mMmeWZ8JjWKz5'
@app.route('/bcirtest/')
def api_root():
print('index_with_randoms.html')
random_ids = []
for i in range(0, 15):
random_ids.append(searcher.get_id(randint(0, 600)))
return render_template('index_with_randoms.html', random_ids=random_ids)
def url_to_file(url):
dest_file = uuid.uuid4().hex + ".png"
dest_path = settings.logs + "/" + dest_file
req = urllib.request.Request(
url,
data=None,
headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
}
)
resp = urllib.request.urlopen(req)
image = np.asarray(bytearray(resp.read()), dtype="uint8")
decoded = cv2.imdecode(image, cv2.IMREAD_COLOR)
cv2.imwrite(dest_path, decoded)
#im = Image.fromarray(image)
#im.save(dest_path)
return dest_path
def post_to_file(image):
dest_file = uuid.uuid4().hex + ".png"
dest_path = settings.logs + "/" + dest_file
image.save(dest_path)
return dest_path
def base64_to_file(image_base64):
dest_file = uuid.uuid4().hex + ".png"
dest_path = settings.logs + "/" + dest_file
with open(dest_path, "wb") as image_file:
byte_content = base64.b64decode(image_base64)
image_file.write(byte_content)
return dest_path
def get_res(results, query_url=None):
if query_url is not None:
return render_template('search.html', results=results, query_url=query_url)
elif results is None:
results = '{"classname":"Unknown"}'
else:
results = '{"classname":"' + results[0][0] + '", "conf":' + str(results[0][1]) + '}'
print(results)
json_res = json.dumps(results)
print(json_res)
return json_res
@app.route('/bcirtest/searchById')
def search_by_id():
id = request.args.get('id')
rescorer = False
lf_impl = 'ORB'
if request.args.get("rescorer") == 'true':
rescorer = True
if request.args.get("lf_impl") == 'beblid':
lf_impl = 'BEBLID'
elif request.args.get("lf_impl") == 'latch':
lf_impl = 'LATCH'
#results = searcher.search_by_id(id, settings.k, rescorer, lf_impl)
results = searcher.search_by_id(id, settings.k)
query_url = None
if request.args.get("tohtml") is not None:
query_url = id + ".jpg"
return get_res(results, query_url)
@app.route('/bcirtest/searchByImg', methods=['POST'])
def search_by_img():
if 'image' not in request.files:
return Response(json.dumps("Error, unable to get the image"), status=401, mimetype='application/json')
sec_id = request.form.get("securityID")
#if sec_id != security_id:
# return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json')
file = request.files['image']
img_file = post_to_file(file)
rescorer = False
lf_impl = 'ORB500'
if request.form.get("rescorer") == 'true':
rescorer = True
if request.form.get("lf_impl") == 'beblid':
lf_impl = 'BEBLID'
elif request.form.get("lf_impl") == 'latch':
lf_impl = 'LATCH'
elif request.form.get("lf_impl") == 'orb':
lf_impl = 'ORB'
#dest_file = uuid.uuid4().hex + ".jpg"
#dest_path = settings.logs + "/" + dest_file
#file.save(dest_path)
#files = {'image': (dest_file, open(dest_path, 'rb'))}
#r = requests.post(settings.rmac_service, files=files)
#results = search_engine.search_by_img(np.array(r.json()), settings.k)
#results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
query_url = None
if request.form.get("tohtml") is not None:
query_url = ""
return get_res(results, query_url)
@app.route('/bcirtest/searchByImgB64', methods=['POST'])
def search_by_img_base64():
sec_id = request.form.get("securityID")
#if sec_id != security_id:
# return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json')
image = request.form.get('image')
if image:
img_file = base64_to_file(image)
else:
flash('No img sent')
return redirect(request.url)
rescorer = False
lf_impl = 'ORB500'
if request.form.get("rescorer") == 'true':
rescorer = True
if request.form.get("lf_impl") == 'beblid':
lf_impl = 'BEBLID'
elif request.form.get("lf_impl") == 'latch':
lf_impl = 'LATCH'
elif request.form.get("lf_impl") == 'orb':
lf_impl = 'ORB'
#dest_file = uuid.uuid4().hex + ".jpg"
#dest_path = settings.logs + "/" + dest_file
#file.save(dest_path)
#files = {'image': (dest_file, open(dest_path, 'rb'))}
#r = requests.post(settings.rmac_service, files=files)
#results = search_engine.search_by_img(np.array(r.json()), settings.k)
#results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
results = searcher.search_by_img(img_file, settings.k)
query_url = None
if request.form.get("tohtml") is not None:
query_url = ""
return get_res(results, query_url)
@app.route('/bcirtest/searchByURL')
def search_by_url():
sec_id = request.args.get("securityID")
#if sec_id != security_id:
# return Response(json.dumps("Error 401, not authorized"), status=401, mimetype='application/json')
url = request.args.get('url')
rescorer = False
if request.args.get("rescorer") == 'true':
rescorer = True
img_file = url_to_file(url)
# query = cv2.imdecode(image, cv2.IMREAD_COLOR)
# dest_file = uuid.uuid4().hex + ".jpg"
# dest_path = settings.logs + "/" + dest_file
# cv2.imwrite(dest_path, query)
# files = {'image': open(dest_path, 'rb')}
# r = requests.post(settings.rmac_service, files=files)
# results = search_engine.search_by_img(np.array(r.json()), settings.k)
rescorer = False
lf_impl = 'ORB500'
if request.args.get("rescorer") == 'true':
rescorer = True
if request.args.get("lf_impl") == 'beblid':
lf_impl = 'BEBLID'
elif request.args.get("lf_impl") == 'latch':
lf_impl = 'LATCH'
elif request.args.get("lf_impl") == 'orb':
lf_impl = 'ORB'
#results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
results = searcher.search_by_img(img_file, settings.k, rescorer, lf_impl)
query_url = None
if request.args.get("tohtml") is not None:
query_url = url
return get_res(results, query_url)
@app.route('/bcirtest/addImg', methods=['POST'])
def add_img():
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
try:
file = request.files['image']
id = request.files['image'].filename
id, _ = os.path.splitext(id)
img_file = post_to_file(file)
searcher.add(img_file, id)
json_res = json.dumps("done")
return json_res
except:
abort(500)
@app.route('/bcirtest/rmImg')
def remove_img():
try:
id = request.args.get('id')
searcher.remove(id)
json_res = json.dumps("done")
return json_res
except:
abort(500)
@app.route('/bcirtest/<path:filename>')
def download_file(filename):
print(filename)
values = filename.split('/')
print(values)
print(settings.img_folder)
return send_from_directory(settings.img_folder, filename, as_attachment=False)
"""
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Reading configuration file')
parser.add_argument('conf', type=str, help='Configuration file path')
args = parser.parse_args()
settings.load_setting(args.conf)
global searcher
searcher = BeniCulturaliSearcher()
#app.run(host='0.0.0.0', port=8090, ssl_context='adhoc')
app.run(host='0.0.0.0', port=settings.port)
# app.run(host='0.0.0.0', port=settings.port)
"""
def start_tornado(app, port=8190):
http_server = tornado.httpserver.HTTPServer(tornado.wsgi.WSGIContainer(app))
http_server.listen(port)
app.logger.info("Tornado server starting on port {}".format(port))
tornado.ioloop.IOLoop.instance().start()
def start_from_terminal(app):
parser = argparse.ArgumentParser(description='Reading configuration file')
parser.add_argument('conf', type=str, help='Configuration file path')
args = parser.parse_args()
settings.load_setting(args.conf)
global searcher
searcher = Searcher()
#if args.debug:
# app.run(debug=True, host='0.0.0.0', port=settings.port)
# else:
#start_tornado(app, settings.port)
app.run(debug=True, host='0.0.0.0', port=8290)
if __name__ == '__main__':
start_from_terminal(app)