Python sklearn.base 模块,clone() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sklearn.base.clone()。
def process_batch(self, work_batch):
fit_params = self.fit_params if self.fit_params is not None else {}
LOG.debug("Node %d received %d work items", comm_rank, len(work_batch))
results = []
for fold_id, train_index, test_index, parameters in work_batch:
ret = _fit_and_score(clone(self.estimator),
self._data_X, self._data_y,
self.scorer,
self.verbose, parameters, fit_params,
return_n_test_samples=True,
return_times=True)
result = parameters.copy()
result['score'] = ret[0]
result['n_samples_test'] = ret[1]
result['scoring_time'] = ret[2]
result['fold'] = fold_id
results.append(result)
LOG.debug("Node %d is done with fold %d", fold_id)
return results
def _do_fit(n_jobs, verbose, pre_dispatch, base_estimator,
X, y, scorer, parameter_iterable,
error_score, cv, **kwargs):
groups = kwargs.pop('groups')
@H_811_404@# test_score,n_samples,parameters
out = Parallel(n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch)(
delayed(_fit_and_score)(
clone(base_estimator), X,
train, test,
fit_params=fit_params,
return_train_score=False,
return_n_test_samples=True,
return_times=False,
return_parameters=True,
error_score=error_score)
for parameters in parameter_iterable
for train, test in cv.split(X, groups))
@H_811_404@# test_score,_,parameters
return [(mod[0], mod[1], None, mod[2]) for mod in out]
def _fit(x, clf, mf, grp, center, n_jobs):
"""Sub function for fitting
"""
@H_811_404@# Check the inputs size :
x, y = checkXY(x, center)
rep, nfeat = len(cv), len(x)
@H_811_404@# Tricks : construct a list of tuple containing the index of
@H_811_404@# (repetitions,features) & loop on it. Optimal for parallel computing :
claIdx, listRep, listFeat = list2index(rep, nfeat)
@H_811_404@# Run the classification :
cvs = Parallel(n_jobs=n_jobs)(delayed(_cvscore)(
x[k[1]], clone(clf), cv[k[0]]) for k in claIdx)
da, y_true, y_pred = zip(*cvs)
@H_811_404@# Reconstruct elements :
da = np.array(groupInList(da, listFeat))
y_true = groupInList(y_true, listFeat)
y_pred = groupInList(y_pred, listFeat)
return da, x, y_pred
def random_search(clf, param_distribution, n_iter_search, X_train, y_train):
'''
random search with optimization without nested resampling
@return: best_estimator,best score
'''
param_list = ParameterSampler(param_distribution, n_iter = n_iter_search)
best_score = 0.0
opt_clf = None
for params in param_list:
clf.set_params(**params)
clf.fit(X_train, y_train)
clf_accuracy = accuracy_score(y_train, clf.predict(X_train))
if clf_accuracy > best_score:
best_score = clf_accuracy
opt_clf = clone(clf)
opt_clf.fit(X_train, y_train)
return opt_clf, best_score
def _fit_binary(estimator, classes=None, sample_weight=None):
"""Fit a single binary estimator."""
unique_y = np.unique(y)
if len(unique_y) == 1:
if classes is not None:
if y[0] == -1:
c = 0
else:
c = y[0]
warnings.warn("Label %s is present in all training examples." %
str(classes[c]))
estimator = _ConstantPredictor().fit(X, unique_y)
else:
estimator = clone(estimator)
estimator.fit(X, sample_weight=None)
return estimator
def fit(self, X_link, y_link, X_prop, y_prop):
self.initialize_labels(y_prop, y_link)
y_link = self.link_encoder_.transform(y_link)
y_prop = self.prop_encoder_.transform(y_prop)
self.link_clf_ = SAGAClassifier(loss='smooth_hinge', penalty='l1',
tol=1e-4, max_iter=500,
random_state=0, verbose=0)
self.prop_clf_ = clone(self.link_clf_)
alpha_link = self.alpha_link * (1 - self.l1_ratio)
beta_link = self.alpha_link * self.l1_ratio
sw = compute_sample_weight('balanced', y_link)
self.link_clf_.set_params(alpha=alpha_link, beta=beta_link)
self.link_clf_.fit(X_link, sample_weight=sw)
alpha_prop = self.alpha_prop * (1 - self.l1_ratio)
beta_prop = self.alpha_prop * self.l1_ratio
self.prop_clf_.set_params(alpha=alpha_prop, beta=beta_prop)
self.prop_clf_.fit(X_prop, y_prop)
return self
def _clone_and_score_clusterer(clf, n_clusters):
"""Clones and scores clusterer instance.
Args:
clf: Clusterer instance that implements ``fit``,``fit_predict``,and
``score`` methods,and an ``n_clusters`` hyperparameter.
e.g. :class:`sklearn.cluster.KMeans` instance
X (array-like,shape (n_samples,n_features)):
Data to cluster,where n_samples is the number of samples and
n_features is the number of features.
n_clusters (int): Number of clusters
Returns:
score: score of clusters
time: Number of seconds it took to fit cluster
"""
start = time.time()
clf = clone(clf)
setattr(clf, 'n_clusters', n_clusters)
return clf.fit(X).score(X), time.time() - start
def _fit_binary(estimator, sample_weight, classes=None):
"""Fit a single binary estimator."""
unique_y = np.unique(y)
if len(unique_y) == 1:
if classes is not None:
if y[0] == -1:
c = 0
else:
c = y[0]
warnings.warn("Label %s is present in all training examples." %
str(classes[c]))
estimator = _ConstantPredictor().fit(X, sample_weight)
return estimator
def fit(self, y):
self.base_models_ = [list() for x in self.base_models]
self.Meta_model_ = clone(self.Meta_model)
kfold = KFold(n_splits=self.n_folds, shuffle=True, random_state=15)
@H_811_404@# train cloned base models then create out-of-fold predictions that are needed to train the cloned Meta-model
out_of_fold_predictions = np.zeros((X.shape[0], len(self.base_models)))
for i, model in enumerate(self.base_models):
for train_index, holdout_index in kfold.split(X, y):
instance = clone(model)
self.base_models_[i].append(instance)
instance.fit(X[train_index], y[train_index])
y_pred = instance.predict(X[holdout_index])
out_of_fold_predictions[holdout_index, i] = y_pred
@H_811_404@# Now train the cloned Meta-model using the out-of-fold predictions as new feature
self.Meta_model_.fit(out_of_fold_predictions, y)
return self
@H_811_404@# do the predictions of all base models on the test data and use the averaged predictions as
@H_811_404@#Meta-features for the final prediction which is done by the Meta-model
def fit(self, y=None):
self._colmask = [True] * X.shape[1]
self._colnames = X.columns.ravel().tolist()
@H_811_404@# Identify batches
groups = X[[self.by]].values.ravel().tolist()
self._colmask[X.columns.get_loc(self.by)] = False
@H_811_404@# Convert groups to IDs
glist = list(set(groups))
self._groups = np.array([glist.index(group)
for group in groups])
for gid, batch in enumerate(list(set(groups))):
scaler = clone(self._base_scaler)
mask = self._groups == gid
if not np.any(mask):
continue
self._scalers[batch] = scaler.fit(
X.ix[mask, self._colmask], y)
return self
def test_weighted_decision_path_train():
"""
Test the implementation of weighted_decision_path when all test points
are in train points.
"""
@H_811_404@# Test that when all samples are in the training data all weights
@H_811_404@# should be concentrated at the leaf.
X_train, _, y_train, _ = load_scaled_boston()
y_train = np.round(y_train)
for est in estimators:
clone_est = clone(est)
clone_est.fit(X_train, np.round(y_train))
check_weighted_decision_path_train(clone_est, X_train)
clone_est.partial_fit(X_train, X_train)
def test_apply():
X_train, X_test, y_test = load_scaled_boston()
y_train = np.round(y_train)
for est in estimators:
est_clone = clone(est)
est_clone.fit(X_train, y_train)
train_leaves = est_clone.tree_.children_left[est_clone.apply(X_train)]
test_leaves = est_clone.tree_.children_left[est_clone.apply(X_test)]
assert_true(np.all(train_leaves == -1))
assert_true(np.all(test_leaves == -1))
est_clone.partial_fit(X_train, y_train)
train_leaves = est_clone.tree_.children_left[est_clone.apply(X_train)]
test_leaves = est_clone.tree_.children_left[est_clone.apply(X_test)]
assert_true(np.all(train_leaves == -1))
assert_true(np.all(test_leaves == -1))
def _fit_one_bootstrap(self, i):
m = clone(self.model)
m._ensemble = True
X, y = self.X_, self.y_
n = X.shape[0]
n_samples = math.ceil(0.8 * n)
@H_811_404@# Get bootstrap set
X_bs, y_bs = resample(X, replace=True,
n_samples=n_samples, random_state=self.bs_seed+i)
m.fit(X_bs, y_bs)
if self.model.shadow_features:
return m.interval_, m._omegas, m._biase, m._shadowintervals
else:
return m.interval_, m._biase
def __init__(self, name,classifier=None, number_gen=20,
verbose=0, repeat=1, parallel=False,
make_logbook=False, random_state=None,
cv_metric_fuction=make_scorer(matthews_corrcoef),
features_metric_function=None):
self._name = name
self.estimator = SVC(kernel='linear', max_iter=10000) if classifier is None else clone(classifier)
self.number_gen = number_gen
self.verbose = verbose
self.repeat = repeat
self.parallel=parallel
self.make_logbook = make_logbook
self.random_state = random_state
self.cv_metric_function= cv_metric_fuction
self.features_metric_function= features_metric_function
self._random_object = check_random_state(self.random_state)
random.seed(self.random_state)
def __init__(self, max_iter=10000) if classifier is None else clone(classifier)
self.number_gen = number_gen
self.verbose = verbose
self.repeat = repeat
self.parallel=parallel
self.make_logbook = make_logbook
self.random_state = random_state
self.cv_metric_function= cv_metric_fuction
self.features_metric_function= features_metric_function
self._random_object = check_random_state(self.random_state)
random.seed(self.random_state)
def test_estimator_cloning(ds_under_test):
from sklearn.base import clone
class Generic(Step):
a = 10
b = 12
func = None
lst = []
def transform(self, dset):
params = self.get_params()
dset = self.func(dset=dset, **params)
return dset
def step_1(dset, **kw):
return kw['a'] * dset.mean(dim=('x', 'y')) ** kw['b']
g_estimator = Generic(func=step_1, lst=[[1], 2, 3])
g_estimator_clone = clone(g_estimator)
assert g_estimator.a == g_estimator_clone.a
assert g_estimator.b == g_estimator_clone.b
assert g_estimator.func == g_estimator_clone.func
def fit(self, y=None, **fit_params):
if not isinstance(X, pd.DataFrame):
raise ValueError('X is not a pandas.DataFrame')
self.models_ = {}
columns = self._get_fit_columns(X)
for key in X[self.by].unique():
@H_811_404@# copy the model
model = clone(self.base_model)
@H_811_404@# Select the rows that will be fitted
mask = (X[self.by] == key).tolist()
rows = X.index[mask]
@H_811_404@# Fit the model
model.fit(X.loc[rows, columns], y[mask], **fit_params)
@H_811_404@# Save the model
self.models_[key] = model
return self
def _fit_best_model(self, y):
"""Fit the estimator copy with best parameters found to the
provided data.
Parameters
----------
X : array-like,shape = [n_samples,n_features]
Input data,where n_samples is the number of samples and
n_features is the number of features.
y : array-like,shape = [n_samples] or [n_samples,n_output],
Target relative to X for classification or regression.
Returns
-------
self
"""
self.best_estimator_ = clone(self.estimator)
self.best_estimator_.set_params(**self.best_params_)
self.best_estimator_.fit(X, y)
return self
def fit_transform(self, y):
"""
Fit and transform a series of independent estimators to the dataset.
Parameters
----------
X : array,n_features,n_estimators)
The training input samples. For each data slice,a clone estimator
is fitted independently.
y : array,)
The target values.
Returns
-------
y_pred : array,n_estimators)
Predicted values for each estimator.
"""
return self.fit(X, y).transform(X)
def fit(self, y):
"""Fit a series of independent estimators to the dataset.
Parameters
----------
X : array,)
The target values.
Returns
-------
self : object
Return self.
"""
self._check_Xy(X, y)
self.estimators_ = list()
@H_811_404@# For fitting,the parallelization is across estimators.
parallel, p_func, n_jobs = parallel_func(_sl_fit, self.n_jobs)
estimators = parallel(
p_func(self.base_estimator, split, y)
for split in np.array_split(X, n_jobs, axis=-1))
self.estimators_ = np.concatenate(estimators, 0)
return self
def net_pickleable(self, net_fit):
"""NeuralNet instance that removes callbacks that are not
pickleable.
"""
@H_811_404@# callback fixture not pickleable,remove it
callbacks = net_fit.callbacks
net_fit.callbacks = []
callbacks_ = net_fit.callbacks_
@H_811_404@# remove mock callback
net_fit.callbacks_ = [(n, cb) for n, cb in net_fit.callbacks_
if not isinstance(cb, Mock)]
net_clone = clone(net_fit)
net_fit.callbacks = callbacks
net_fit.callbacks_ = callbacks_
return net_clone
def test_changing_model_reinitializes_optimizer(self, net, data):
@H_811_404@# The idea is that we change the model using `set_params` to
@H_811_404@# add parameters. Since the optimizer depends on the model
@H_811_404@# parameters it needs to be reinitialized.
X, y = data
net.set_params(module__nonlin=F.relu)
net.fit(X, y)
net.set_params(module__nonlin=nn.PReLU())
assert isinstance(net.module_.nonlin, nn.PReLU)
d1 = net.module_.nonlin.weight.data.clone().cpu().numpy()
@H_811_404@# make sure that we do not initialize again by making sure that
@H_811_404@# the network is initialized and by using partial_fit.
assert net.initialized_
net.partial_fit(X, y)
d2 = net.module_.nonlin.weight.data.clone().cpu().numpy()
@H_811_404@# all newly introduced parameters should have been trained (changed)
@H_811_404@# by the optimizer after 10 epochs.
assert (abs(d2 - d1) > 1e-05).all()
def _check_behavior_2d(clf):
@H_811_404@# 1d case
X = np.array([[0], [0], [0]]) @H_811_404@# ignored
y = np.array([1, 1, 1])
est = clone(clf)
est.fit(X, y)
y_pred = est.predict(X)
assert_equal(y.shape, y_pred.shape)
@H_811_404@# 2d case
y = np.array([[1, 0],
[2,
[1, 3]])
est = clone(clf)
est.fit(X, y_pred.shape)
def test_clone():
@H_811_404@# Tests that clone creates a correct deep copy.
@H_811_404@# We create an estimator,make a copy of its original state
@H_811_404@# (which,in this case,is the current state of the estimator),
@H_811_404@# and check that the obtained copy is a correct deep copy.
from sklearn.feature_selection import SelectFpr, f_classif
selector = SelectFpr(f_classif, alpha=0.1)
new_selector = clone(selector)
assert_true(selector is not new_selector)
assert_equal(selector.get_params(), new_selector.get_params())
selector = SelectFpr(f_classif, alpha=np.zeros((10, 2)))
new_selector = clone(selector)
assert_true(selector is not new_selector)
def test_classifier_results():
"""tests if classifier results match target"""
alpha = .1
n_features = 20
n_samples = 10
tol = .01
max_iter = 200
rng = np.random.RandomState(0)
X = rng.normal(size=(n_samples, n_features))
w = rng.normal(size=n_features)
y = np.dot(X, w)
y = np.sign(y)
clf1 = LogisticRegression(solver='sag', C=1. / alpha / n_samples,
max_iter=max_iter, tol=tol, random_state=77)
clf2 = clone(clf1)
clf1.fit(X, y)
clf2.fit(sp.csr_matrix(X), y)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert_almost_equal(pred1, decimal=12)
assert_almost_equal(pred2, decimal=12)
def test_sparse_input():
@H_811_404@# Test that sparse matrices are accepted as input
from scipy.sparse import csc_matrix
A = np.abs(random_state.randn(10, 10))
A[:, 2 * np.arange(5)] = 0
A_sparse = csc_matrix(A)
for solver in ('pg', 'cd'):
est1 = NMF(solver=solver, n_components=5, init='random',
random_state=0, tol=1e-2)
est2 = clone(est1)
W1 = est1.fit_transform(A)
W2 = est2.fit_transform(A_sparse)
H1 = est1.components_
H2 = est2.components_
assert_array_almost_equal(W1, W2)
assert_array_almost_equal(H1, H2)
def test_sparse_svc_clone_with_callable_kernel():
@H_811_404@# Test that the "dense_fit" is called even though we use sparse input
@H_811_404@# meaning that everything works fine.
a = svm.SVC(C=1, kernel=lambda x, y: x * y.T, probability=True,
random_state=0)
b = base.clone(a)
b.fit(X_sp, Y)
pred = b.predict(X_sp)
b.predict_proba(X_sp)
dense_svm = svm.SVC(C=1, y: np.dot(x, y.T),
probability=True, random_state=0)
pred_dense = dense_svm.fit(X, Y).predict(X)
assert_array_equal(pred_dense, pred)
@H_811_404@# b.decision_function(X_sp) # XXX : should be supported
def _fit_binary(estimator, L):
"""Fit a single binary estimator."""
estimator = clone(estimator)
return estimator.fit(X, L)
def _clone_h2o_obj(estimator, ignore=False, **kwargs):
@H_811_404@# do initial clone
est = clone(estimator)
@H_811_404@# set kwargs:
if kwargs:
for k, v in six.iteritems(kwargs):
setattr(est, k, v)
@H_811_404@# check on h2o estimator
if isinstance(estimator, H2OPipeline):
@H_811_404@# the last step from the original estimator
e = estimator.steps[-1][1]
if isinstance(e, H2OEstimator):
last_step = est.steps[-1][1]
@H_811_404@# so it's the last step
for k, v in six.iteritems(e._parms):
k, v = _kv_str(k, v)
@H_811_404@# if (not k in PARM_IGnorE) and (not v is None):
@H_811_404@# e._parms[k] = v
last_step._parms[k] = v
@H_811_404@# otherwise it's an BaseH2OFunctionWrapper
return est
def _new_base_estimator(est, clonable_kwargs):
"""When the grid searches are pickled,the estimator
has to be dropped out. When we load it back in,we have
to reinstate a new one,since the fit is predicated on
being able to clone a base estimator,we've got to have
an estimator to clone and fit.
Parameters
----------
est : str
The type of model to build
Returns
-------
estimator : H2OEstimator
The cloned base estimator
"""
est_map = {
'dl': H2ODeepLearningEstimator,
'gbm': H2OGradientBoostingEstimator,
'glm': H2OGeneralizedLinearEstimator,
@H_811_404@# 'glrm': H2OGeneralizedLowRankEstimator,
@H_811_404@# 'km' : H2OKMeansEstimator,
'nb': H2ONaiveBayesEstimator,
'rf': H2ORandomForestEstimator
}
estimator = est_map[est]() @H_811_404@# initialize the new ones
for k, v in six.iteritems(clonable_kwargs):
k, v)
estimator._parms[k] = v
return estimator
def fit(self,X,y):
'''
???????????StackingTransformer?combiner?
:param X: dataframe??????
:param y: series?index???X?????????
:return: self?
'''
self.n_classes_=len(set(y))
transformer=StackingTransformer(stages=self.stages,type='classification',n_folds=self.n_folds,
return_array=self.return_array,verbose=self.verbose,**self.kwds)
combiner=clone(self.combiner)
if isinstance(combiner,StackingClassifier):
params={'n_folds':self.n_folds,'return_array':self.return_array,'verbose':self.verbose}
else:
params={}
for k in self.kwds:
if k.startswith('combiner__'):
params[k.replace('combiner__','')]=self.kwds[k]
combiner.set_params(**params)
if self.verbose:
print('StackingTransformer???????')
transformer.fit(X,y)
X=transformer.transform(X,train=True)
if self.verbose:
print('StackingTransformer???????\n')
print('combiner????')
combiner.fit(X,y)
if self.verbose:
print('combiner????\n')
self.transformer_=transformer
self.combiner_=combiner
return self
def fit(self,y):
'''
???????????StackingTransformer?combiner?
:param X: dataframe??????
:param y: series?index???X?????????
:return: self?
'''
transformer=StackingTransformer(stages=self.stages,type='regression',StackingRegressor):
params={'n_folds':self.n_folds,y)
if self.verbose:
print('combiner????\n')
self.transformer_=transformer
self.combiner_=combiner
return self
def fit(self,y):
self.selector_=clone(self.selector)
self.selector_.fit(X,y)
self.feature_selected=self.selector_.get_support(indices=True).tolist()
if isinstance(X,pd.DataFrame):
self.feature_selected=X.columns[self.feature_selected].tolist()
return self
def _ms_fit(indexed_params, estimator, n_features, graph, prng):
@H_811_404@# unpack params
index, (alpha, grid_point) = indexed_params
@H_811_404@# draw a new fixed graph for alpha
cov, prec, adj = graph.create(n_features, alpha)
@H_811_404@# model selection (once per n_samples grid point)
n_samples = int(grid_point * n_features)
X = _sample_mvn(n_samples, cov, prng)
ms_estimator = clone(estimator)
ms_estimator.fit(X)
return index, ((cov, adj), ms_estimator.lam_, n_samples)
def _mc_fit(indexed_params, metrics, (nn, (cov, lam, n_samples) = indexed_params
@H_811_404@# compute mc trial
X = _sample_mvn(n_samples, prng)
mc_estimator = clone(estimator)
mc_estimator.set_params(lam=lam)
mc_estimator.fit(X)
results = {k: f(prec, mc_estimator.precision_) for k, f in metrics.items()}
return index, results
def search_test_params(base_clf, cv_params, train, scoring):
parameter_iterable = ParameterGrid(cv_params)
grid_scores = Parallel(n_jobs=-1)(
delayed(_fit_and_score)(clone(base_clf), scoring,
train, 0,
None, return_parameters=True)
for parameters in parameter_iterable)
@H_811_404@# grid_scores = [_fit_and_score(clone(base_clf),X,y,scoring,train,test,parameters,None,return_parameters=True) for parameters in parameter_iterable]
grid_scores = sorted(grid_scores, key=lambda x: x[0], reverse=True)
scores, parameters = grid_scores[0]
return scores, parameters
def _fit(self, y):
labels = list(set(y))
labels.sort()
if len(labels) == 1:
if self.verbose:
print('Leaf', labels)
return labels
try:
counts = [y.count(label) for label in labels]
except AttributeError:
unique, allcounts = np.unique(y, return_counts=True)
counts = [allcounts[np.searchsorted(unique, label)] for label in labels]
total = len(y)
div = [abs(0.5 - (sum(counts[:i + 1]) / total)) for i in range(0, len(counts))]
split_point = div.index(min(div))
split = labels[split_point]
left_labels = labels[:split_point + 1]
right_labels = labels[split_point + 1:]
if self.verbose:
print('Training:', labels, counts, div, left_labels, right_labels)
bin_y = [label in left_labels for label in y]
node_estimator = clone(self.base_estimator)
node_estimator.fit(X, bin_y)
left_indexes = [i for i, label in enumerate(y) if label in left_labels]
left_X = X[left_indexes]
left_y = [label for label in y if label in left_labels]
right_indexes = [i for i, label in enumerate(y) if label in right_labels]
right_X = X[right_indexes]
right_y = [label for label in y if label in right_labels]
if self.verbose:
print('Left/right train size:', len(left_y), len(right_y))
return node_estimator, self._fit(left_X, left_y), self._fit(right_X, right_y)
def fit(self, y):
self.models = []
from sklearn.base import clone
from sklearn.metrics import f1_score
self.planes = []
extraction = []
for i in xrange(self.n_features):
D = X.shape[1] / 2
@H_811_404@# copy it for feature extraction purposes
self.linear.fit(X, y)
self.models.append(clone(self.linear))
self.models[-1].coef_ = self.linear.coef_
lhs = self.linear.coef_[0,:D]
rhs = self.linear.coef_[0,D:]
if lhs.dot(lhs) > rhs.dot(rhs):
hyperplane = lhs
else:
hyperplane = rhs
feats, X = self._subproj(hyperplane, X)
self.planes.append(hyperplane)
hyperplane = hyperplane / np.sqrt(hyperplane.dot(hyperplane))
extraction.append(feats)
self.coef_ = np.array(self.planes)
Xe = np.concatenate(extraction).T
self.final.fit(Xe, y)
return self
def predict(self, thres=0.5, return_proba=True):
"""
Predict class for X.
The predicted class of an input sample is a Vote by the trees in
the forest,weighted by their probability estimates. That is,
the predicted class is the one with highest mean probability
estimate across the trees.
"""
if self._model == 'svc_lin':
from sklearn.base import clone
from sklearn.calibration import CalibratedClassifierCV
clf = CalibratedClassifierCV(clone(self._estimator).set_param(
**self._estimator.get_param()))
train_y = self._Xtrain[[self._rate_column]].values.ravel().tolist()
self._estimator = clf.fit(self._Xtrain, train_y)
proba = np.array(self._estimator.predict_proba(X))
if proba.shape[1] > 2:
pred = (proba > thres).astype(int)
else:
pred = (proba[:, 1] > thres).astype(int)
if return_proba:
return proba, pred
return pred
def transform(self, y=None):
if self.by in X.columns.ravel().tolist():
groups = X[[self.by]].values.ravel().tolist()
else:
groups = ['UnkNown'] * X.shape[0]
glist = list(set(groups))
groups = np.array([glist.index(group) for group in groups])
new_x = X.copy()
for gid, batch in enumerate(glist):
if batch in self._scalers:
mask = groups == gid
if not np.any(mask):
continue
scaler = self._scalers[batch]
new_x.ix[mask, self._colmask] = scaler.transform(
X.ix[mask, self._colmask])
else:
colmask = self._colmask
if self.by in self._colnames and len(colmask) == len(self._colnames):
del colmask[self._colnames.index(self.by)]
scaler = clone(self._base_scaler)
new_x.ix[:, colmask] = scaler.fit_transform(
X.ix[:, colmask])
return new_x
def cross_val_score(estimator, groups=None, scoring=None, cv=None,
n_jobs=1, verbose=0, fit_params=None,
pre_dispatch='2*n_jobs'):
"""
Evaluate a score by cross-validation
"""
if not isinstance(scoring, (list, tuple)):
scoring = [scoring]
X, groups = indexable(X, groups)
cv = check_cv(cv, classifier=is_classifier(estimator))
splits = list(cv.split(X, groups))
scorer = [check_scoring(estimator, scoring=s) for s in scoring]
@H_811_404@# We clone the estimator to make sure that all the folds are
@H_811_404@# independent,and that it is pickle-able.
parallel = Parallel(n_jobs=n_jobs,
pre_dispatch=pre_dispatch)
scores = parallel(delayed(_fit_and_score)(clone(estimator),
train,
fit_params)
for train, test in splits)
group_order = []
if hasattr(cv, 'groups'):
group_order = [np.array(cv.groups)[test].tolist()[0] for _, test in splits]
return np.squeeze(np.array(scores)), group_order
def permutation_test_score(estimator,
n_permutations=100, n_jobs=1, random_state=0,
verbose=0, scoring=None):
"""
Evaluate the significance of a cross-validated score with permutations,
as in test 1 of [Ojala2010]_.
A modification of original sklearn's permutation test score function
to evaluate p-value outside this function,so that the score can be
reused from outside.
.. [Ojala2010] Ojala and Garriga. Permutation Tests for Studying Classifier
Performance. The Journal of Machine Learning Research (2010)
vol. 11
"""
X, classifier=is_classifier(estimator))
scorer = check_scoring(estimator, scoring=scoring)
random_state = check_random_state(random_state)
@H_811_404@# We clone the estimator to make sure that all the folds are
@H_811_404@# independent,and that it is pickle-able.
permutation_scores = Parallel(n_jobs=n_jobs, verbose=verbose)(
delayed(_permutation_test_score)(
clone(estimator), _shuffle(y, groups, random_state),
groups, scorer)
for _ in range(n_permutations))
permutation_scores = np.array(permutation_scores)
return permutation_scores
def test_pure_set():
X = [[-2, -1], [-1, -2], [1, 1], 2], [2, 1]]
y = [1, 1]
for est in estimators:
est.fit(X, y)
assert_array_almost_equal(est.predict(X), y)
new_est = clone(est)
new_est.partial_fit(X, y)
assert_array_almost_equal(new_est.predict(X), y)
def test_numerical_stability():
X = np.array([
[152.08097839, 140.40744019, 129.75102234, 159.90493774],
[142.50700378, 135.81935120, 117.82884979, 162.75781250],
[127.28772736,
[132.37025452, 143.71923828, 138.35694885, 157.84558105],
[103.10237122, 143.71928406, 138.35696411, 157.84559631],
[127.71276855,
[120.91514587, 159.90493774]])
y = np.array(
[1., 0.70209277, 0.53896582, 0., 0.90914464, 0.48026916, 0.49622521])
with np.errstate(all="raise"):
for est in estimators:
new_est = clone(est)
if isinstance(est, ClassifierMixin):
y_curr = np.round(y)
else:
y_curr = y
new_est.fit(X, y_curr)
new_est.fit(X, -y_curr)
new_est.fit(-X, y_curr)
new_est.fit(-X, -y_curr)
new_est.partial_fit(X, y_curr)
new_est.partial_fit(-X, y_curr)
def test_parallel_train():
for curr_est in ensembles:
est = clone(curr_est)
y_pred = ([est.set_params(n_jobs=n_jobs).fit(X, y).predict(X)
for n_jobs in [1, 4, 8]])
for pred1, pred2 in zip(y_pred, y_pred[1:]):
assert_array_equal(pred1, pred2)
y_pred = ([est.set_params(n_jobs=n_jobs).partial_fit(X, pred2)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。