forked from moreo/QuaPy
109 lines
4.2 KiB
Python
109 lines
4.2 KiB
Python
import random
|
|
import subprocess
|
|
import tempfile
|
|
from os import remove, makedirs
|
|
from os.path import join, exists
|
|
from subprocess import PIPE, STDOUT
|
|
import shutil
|
|
|
|
import numpy as np
|
|
from sklearn.base import BaseEstimator, ClassifierMixin
|
|
from sklearn.datasets import dump_svmlight_file
|
|
|
|
|
|
class SVMperf(BaseEstimator, ClassifierMixin):
|
|
|
|
# losses with their respective codes in svm_perf implementation
|
|
valid_losses = {'01':0, 'f1':1, 'kld':12, 'nkld':13, 'q':22, 'qacc':23, 'qf1':24, 'qgm':25, 'mae':26, 'mrae':27}
|
|
|
|
def __init__(self, svmperf_base, C=0.01, verbose=False, loss='01'):
|
|
assert exists(svmperf_base), f'path {svmperf_base} does not seem to point to a valid path'
|
|
self.svmperf_base = svmperf_base
|
|
self.C = C
|
|
self.verbose = verbose
|
|
self.loss = loss
|
|
|
|
def set_params(self, **parameters):
|
|
assert list(parameters.keys()) == ['C'], 'currently, only the C parameter is supported'
|
|
self.C = parameters['C']
|
|
|
|
def fit(self, X, y):
|
|
assert self.loss in SVMperf.valid_losses, \
|
|
f'unsupported loss {self.loss}, valid ones are {list(SVMperf.valid_losses.keys())}'
|
|
|
|
self.svmperf_learn = join(self.svmperf_base, 'svm_perf_learn')
|
|
self.svmperf_classify = join(self.svmperf_base, 'svm_perf_classify')
|
|
self.loss_cmd = '-w 3 -l ' + str(self.valid_losses[self.loss])
|
|
self.c_cmd = '-c ' + str(self.C)
|
|
|
|
self.classes_ = sorted(np.unique(y))
|
|
self.n_classes_ = len(self.classes_)
|
|
|
|
local_random = random.Random()
|
|
# this would allow to run parallel instances of predict
|
|
random_code = '-'.join(str(local_random.randint(0,1000000)) for _ in range(5))
|
|
# self.tmpdir = tempfile.TemporaryDirectory(suffix=random_code)
|
|
# tmp dir are removed after the fit terminates in multiprocessing... moving to regular directories + __del__
|
|
self.tmpdir = '.svmperf-' + random_code
|
|
makedirs(self.tmpdir, exist_ok=True)
|
|
|
|
# self.model = join(self.tmpdir.name, 'model-'+random_code)
|
|
# traindat = join(self.tmpdir.name, f'train-{random_code}.dat')
|
|
self.model = join(self.tmpdir, 'model-'+random_code)
|
|
traindat = join(self.tmpdir, f'train-{random_code}.dat')
|
|
|
|
dump_svmlight_file(X, y, traindat, zero_based=False)
|
|
|
|
cmd = ' '.join([self.svmperf_learn, self.c_cmd, self.loss_cmd, traindat, self.model])
|
|
if self.verbose:
|
|
print('[Running]', cmd)
|
|
p = subprocess.run(cmd.split(), stdout=PIPE, stderr=STDOUT)
|
|
if not exists(self.model):
|
|
print(p.stderr.decode('utf-8'))
|
|
remove(traindat)
|
|
|
|
if self.verbose:
|
|
print(p.stdout.decode('utf-8'))
|
|
|
|
return self
|
|
|
|
def predict(self, X):
|
|
confidence_scores = self.decision_function(X)
|
|
predictions = (confidence_scores > 0) * 1
|
|
return predictions
|
|
|
|
def decision_function(self, X, y=None):
|
|
assert hasattr(self, 'tmpdir'), 'predict called before fit'
|
|
assert self.tmpdir is not None, 'model directory corrupted'
|
|
assert exists(self.model), 'model not found'
|
|
if y is None:
|
|
y = np.zeros(X.shape[0])
|
|
|
|
# in order to allow for parallel runs of predict, a random code is assigned
|
|
local_random = random.Random()
|
|
random_code = '-'.join(str(local_random.randint(0, 1000000)) for _ in range(5))
|
|
# predictions_path = join(self.tmpdir.name, 'predictions'+random_code+'.dat')
|
|
# testdat = join(self.tmpdir.name, 'test'+random_code+'.dat')
|
|
predictions_path = join(self.tmpdir, 'predictions' + random_code + '.dat')
|
|
testdat = join(self.tmpdir, 'test' + random_code + '.dat')
|
|
dump_svmlight_file(X, y, testdat, zero_based=False)
|
|
|
|
cmd = ' '.join([self.svmperf_classify, testdat, self.model, predictions_path])
|
|
if self.verbose:
|
|
print('[Running]', cmd)
|
|
p = subprocess.run(cmd.split(), stdout=PIPE, stderr=STDOUT)
|
|
|
|
if self.verbose:
|
|
print(p.stdout.decode('utf-8'))
|
|
|
|
scores = np.loadtxt(predictions_path)
|
|
remove(testdat)
|
|
remove(predictions_path)
|
|
|
|
return scores
|
|
|
|
def __del__(self):
|
|
if hasattr(self, 'tmpdir'):
|
|
pass # shutil.rmtree(self.tmpdir, ignore_errors=True)
|
|
|