"""
Implementation of the cure rate survival analysis, based on the work by
Dr. Nemanja Kosovalic and Prof. Sandip Barui, both formerly at the
University of South Alabama. Current implementation supports option
to use Hard EM algorithm and SCAR (selected completely at random),
among others, for estimating cure probabilites.
Primary Author: Nem Kosovalic (nem.kosovalic@aimpointdigital.com)
Secondary Author: Yash Puranik (yash.puranik@aimpointdigital.com)
Company: Aimpoint Digital, LP
"""
import numpy as np
from sklearn.linear_model import LogisticRegression # type: ignore
from sklearn.metrics import accuracy_score # type: ignore
from tqdm import tqdm # type: ignore
from sksurv.metrics import concordance_index_censored as cindex # type: ignore
from apd_crs._parameters import _CENSOR_LABEL, _NON_CURE_LABEL, _CURE_LABEL, _LO_INT, \
_HI_INT
from apd_crs._validate import _val_train_data, _val_test_data, _validate_kwargs # type: ignore
from apd_crs._estimate import _estimate_labels # type: ignore
from apd_crs._survival_func import _survival_fit_weights, _overall_survival
[docs]class SurvivalAnalysis: # pylint: disable=too-many-instance-attributes
"""
SurvivalAnalysis class
This class implements methods for survival analysis: i.e. time-to-event
analysis using a dataset for which outcomes of a fraction of the
dataset are unknown (censored), and for which some individuals never
experience the event. For example, a medical trial where time since diagnosis is
measured and some participants drop out during the trial period, some of which are
cured. Or a manufacturing dataset where time-to-failure since maintenace is measured,
and some equipment takes so long to fail that it "never sees" the event.
For every training point (a patient in a medical trial, or machine in a
manufacturing dataset), we have three potential labels:
Cured: The event never occurs.
Non-cured: The event occurs at some (possibly unobserved) time.
Censored: The event does not occur during the time period, while it may or may not occur later,
but is unobserved.
The methods in this class help estimate the labels (Cured/Non-cured) for
each of the points in the censored group. Using the estimated labels, a
classifier is built to predict whether a training point will belong to the
cured group or the non-cured group.
In addition, the methods in the class help predict the overall survival
probability of any training point upto time t, as well as the risk for a
training point at time t
Parameters
----------
estimator : {'hard_EM', 'clustering', 'fifty_fifty', 'all_cens_cured'}, default='hard_EM'
The method used for estimating labels for censored training data.
If 'hard_EM', the hard expectation maximization algorithm is utilized to
estimate labels for censored population. If 'clustering', a clustering
model is built with a single cluster for the non-censored rows, and two
clusters for the censored rows. By comparing the distance of the two
censored clusters to the noncensored cluster center, cure labels are
assigned to the censored rows. If 'fifty_fifty', each censored point is
assigned to the cured/non-cured group with equal probability. Finally,
'all-cens-cured' assumes that all censored population are cured.
classifier : A classifier object, default=None
A fully initialized classification model object that follows the
scikit-learn classification API. The model is used to classify
between cured and non-cured labels. If no classifier input is
provided, a logistic regression model with default parameters will
be utilized.
random_state : int, array_like, SeedSquence, BitGenerator or Generator or None, default=None
Random state for reproducible random number generation in the
algorithms.
Attributes
----------
estimator_ : str
Method used for estimating censored data
classifier_ : A classification object
A fully initialized classification object with scikit-learn
classification API
scale_ : float
Scale parameter for the fitted Weibull distribution
shape_ : float
Shape parameter for the fitted Weibull distribution
gamma_ : np.array
Gamma parameter for survival function
References
---------
Kosovalić, N., Barui, S. A Hard EM algorithm for prediction
of the cured fraction in survival data.
Comput Stat (2021). https://doi.org/10.1007/s00180-021-01140-0
"""
# Parameter attributes
_censor_label = _CENSOR_LABEL
_cure_label = _CURE_LABEL
_non_cure_label = _NON_CURE_LABEL
# Imported methods
_validate_train_data = _val_train_data
_validate_test_data = _val_test_data
_estimate_censored_labels = _estimate_labels
def __init__(self, estimator="hard_EM", classifier=None, random_state=None):
if estimator not in {"hard_EM", "clustering", "fifty_fifty", "all_cens_cured"}:
raise ValueError("estimator must be 'hard_EM', 'clustering', 'fifty_fifty' or "
f"'all_cens_cured', got {estimator}")
self.estimator_ = estimator
self.classifier_ = classifier
self.random_state_ = random_state
self.rnd_gen = np.random.default_rng(seed=random_state)
self.n_features = -1
self._is_fitted_ = False
self._is_scar_ = False
self.scale_ = None
self.shape_ = None
self.gamma_ = None
[docs] def get_censor_label(self):
"""
Return the value for the label to be utilized to pass 'censored' rows
Parameters
----------
None
Returns
-------
censor_label : int
Integer value to be used to denote censored training points
"""
return self._censor_label
[docs] def get_cure_label(self):
"""
Return the value for the label to be utilized to indicate 'cured' rows
Parameters
----------
None
Returns
-------
cure_label : int
Integer value to be used to denote cured training points
"""
return self._cure_label
[docs] def get_non_cure_label(self):
"""
Return the value for the label to be utilized to indicate 'non-cured'
rows
Parameters
----------
None
Returns
-------
non_cure_label : int
Integer value to be used to denote non-cured training points
"""
return self._non_cure_label
def _reset_pu_fit(self):
'''
Resets the classifier built for estimating labels for censored
individuals
'''
self.classifier_ = None
self._is_fitted_ = False
self._is_scar_ = False
[docs] def pu_fit(self, training_data, training_labels, **kwargs):
"""
Fits a model using censored and non censored inputs to estimate
cured/non-cured labels
Parameters
----------
training_data : {array-like} of shape (n_samples, n_features)
Training data
training_labels : {array-like} of shape (n_samples, 1)
Labels for the training data.
Value of _censor_label_ implies training point was censored,
and _non_censor_label_ implies non censored.
The _censor_label_ and _non_censor_label_ values can be obtained
with get_censor_label and get_non_censor_label methods
Optional kwargs include:
pu_reg_term : float, default=0.5
The strength of the quadratic regularization term (C*w^2) on the
non-intercept model covariate weights for PU learning
pu_initialize : {'censoring_rate', 'use_clustering', 'use_random'},
default='use_random'
Method to determine how initial guesses for the SLSQP minimization
are generated. The covariate weights are initialized at
random from a uniform distribution.
If the option 'censoring_rate' is selected, cure labels are
initialized assuming the probability of being cured is the
censoring rate. If 'use_clustering' is selected then a single
cluster is created from the noncensored rows, and two clusters
are created from the censored rows. The cluster closest to the
noncensored rows is assigned the label non_cured as an initial
guess. If "use_random" is selected, multiple guesses for the
unkown cure labels are generated randomly, multiple minimization
problems are solved. The output chosen is the one corresponding to
the lowest objective
pu_max_guesses : int, default=50
Maximum local searches to launch when initialize method is
use_random. Otherwise ignored
pu_max_processes : int, default=1
Maximum parallel local searches to launch when initialize method is
use_random. Otherwise ignored.
if -1, all available processors are utilized
pu_max_iter: int, default=1000
maximum number of iterations for SLSQP method.
pu_weight_lo : {array-like} of shape (n_features, 1), default=-0.5
Lower bounds on weights for sampling from uniform distribution for
initial value guesses
pu_weight_hi : {array-like} of shape (n_features, 1), default=0.5
Upper bounds on weights for sampling from uniform distribution for
initial value guesses
pu_kmeans_init : int
Number of Kmeans initializations to try when estimator is clustering
is_scar : {True, False}, default=False
True if SCAR (selected completely at random) assumption holds for
the dataset. In this situation, we find the probability of being NOT
censored given covariates. The latter is then divided by an appropriate
constant to get probability of being NOT cured.
See the paper https://cseweb.ucsd.edu/~elkan/posonly.pdf for more
details
Returns
--------
self
Fitted estimator.
"""
training_data_, training_labels_, _ = self._validate_train_data(training_data,
training_labels)
_, self.n_features = training_data_.shape
random_state = self.rnd_gen.integers(_LO_INT, _HI_INT)
# Initialize classifier
if self.classifier_ is None:
self.classifier_ = LogisticRegression(random_state=random_state)
self._is_scar_ = kwargs.get("is_scar", False)
# Estimate values for censored individuals based on estimation method
# chosen, if scar is false
if not self._is_scar_:
training_labels_estimated = self._estimate_censored_labels(training_data_,
np.copy(training_labels_),
**kwargs)
else:
training_labels_estimated = training_labels_
self.classifier_.fit(training_data_, training_labels_estimated)
self._is_fitted_ = True
return self
[docs] def predict_overall_survival(self, test_data, test_times, test_labels=None):
'''
Predict overall survival function for test data based on fitting
survival function.
Parameters
----------
test_data : {array-like} of shape (n_samples, n_features)
Test data
test_times : {array-like} of shape (n_samples, k)
Times at which survival of an individual is to be determined. k can be greater than 1
when the survival of an individual at multiple time points is to be determined
test_labels: {array-like} of shape (n_samples, 1), default=None
Test labels indicating censored/non censored status for test data.
Method will provide cure predictions for censored individuals. This
is only needed if model is fit with is_scar=True assumption
Returns
-------
predictions : {array-like} of shape (n_samples, k)
Overall survival function of any suspectible or non-susceptible
individual
'''
test_data_ = self._validate_test_data(test_data)
test_times_ = np.asarray(test_times)
n_test, n_times = test_times_.shape
if len(test_data_) != n_test:
raise ValueError("Size of times and test_data do not match")
probabilities = self.predict_cure_proba(test_data_, test_labels)
if self.scale_ is None or self.shape_ is None or self.gamma_ is None:
raise Exception("Fit survival function first by calling survival fit "
"or stochastic fit")
survival = [[_overall_survival(test_times_[i, j], probabilities[i, 0], test_data_[i, :],
self.scale_, self.shape_, self.gamma_)
for j in range(n_times)] for i in range(n_test)]
return np.array(survival, np.float_)
[docs] def predict_cure_labels(self, test_data, test_labels=None):
"""
Predict cured/non-cured labels for data
Parameters
----------
test_data : {array-like} of shape (n_samples, n_features)
Test data
test_labels: {array-like} of shape (n_samples, 1), default=None
Test labels indicating censored/non censored status for test data.
Method will provide cure predictions for censored individuals
Returns
-------
predictions : {array-like} of shape (n_samples, 1)
Predicted cured/non_cured labels
"""
if not self._is_fitted_:
raise Exception("This instance is not fitted yet. Call 'fit' first")
test_data_ = self._validate_test_data(test_data)
if self._is_scar_:
if test_labels is None:
raise ValueError("Model fit under SCAR Assumption. Must pass "
"censor labels in test data")
if len(test_labels) != len(test_data_):
raise ValueError("Size of test_labels and test_data don't match")
# Under scar assumption, labels are calculated a little differently
probabilities = self._predict_proba(test_data_, test_labels, self._is_scar_)
cure_labels = np.rint(probabilities[:, 1])
return cure_labels
return self.classifier_.predict(test_data_)
def _predict_proba(self, test_data, test_labels, is_scar):
'''
Helper function that calculates probabilities of being cured/not cured
under and not under SCAR assumption
'''
probabilities = self.classifier_.predict_proba(test_data)
if self.classifier_.classes_[0] != self._cure_label:
# Must flip probability columns for correct output
cured_prob = probabilities[:, 1]
probabilities[:, 1] = probabilities[:, 0]
probabilities[:, 0] = cured_prob
# Need to normalize under scar assumption
if is_scar:
noncens_probs = probabilities[:, 1][test_labels == 1]
constant = noncens_probs.mean()
probabilities[:, 1] /= constant
non_cure_prob = probabilities[:, 1]
non_cure_prob[non_cure_prob > 1.] = 1. # cut noncure probs off at 1
probabilities[:, 1] = non_cure_prob
probabilities[:, 0] = (1 - probabilities[:, 1])
return probabilities
[docs] def predict_cure_proba(self, test_data, test_labels=None):
"""
Generate probability estimates for each training point as to whether it
is cured/noncured
Parameters
----------
test_data : {array-like} of shape (n_samples, n_features)
Test data
test_labels : {array-like} of shape (n_samples, 1), default=None
Test labels indicating censored/non censored status for test data.
Method will provide cure predictions for censored individuals
Returns
-------
probabilities : {array-like} shape (n_samples, 2)
Predicted cured, non_cured probabilities
"""
if not self._is_fitted_:
raise Exception("This instance is not fitted yet. Call 'fit' first")
test_data_ = self._validate_test_data(test_data)
if self._is_scar_:
if test_labels is None:
raise ValueError("Model fit under SCAR Assumption. Must pass "
"censor labels in test data")
test_labels_ = np.asarray(test_labels)
if len(test_labels_) != len(test_data_):
raise ValueError("Size of test_labels and test_data don't match")
else:
test_labels_ = None
return self._predict_proba(test_data_, test_labels_, self._is_scar_)
[docs] def predict_danger(self, test_data, test_labels=None, weights=None):
"""
Generate a value for the danger, a measure of risk associated with an individual of facing
the event. The absolute value of the number is unimportant. When comparing danger for two
individuals, their relative values are important.
Parameters
----------
test_data : {array-like} of shape (n_samples, n_features)
Test data
test_labels : {array-like} of shape (n_samples, 1), default=None
Test labels indicating censored/non censored status for test data.
Method will provide cure predictions for censored individuals
weights : {array-like} of shape (2, 1), default=None
Normalizing weights for calculating danger. If None, weights of (0.5, 0.5) are used
Returns
-------
danger : {array-like} shape (n_samples, 1)
Danger associated with every individual in training set
"""
if not self._is_fitted_:
raise Exception("This instance is not fitted yet. Call 'fit' first")
test_data_ = self._validate_test_data(test_data)
if self._is_scar_:
if test_labels is None:
raise ValueError("Model fit under SCAR Assumption. Must pass "
"censor labels in test data")
test_labels_ = np.asarray(test_labels)
if len(test_labels_) != len(test_data_):
raise ValueError("Size of test_labels and test_data don't match")
else:
test_labels_ = None
if weights is not None:
weights_ = np.asarray(weights)
if len(weights_) != 2:
raise ValueError("Size of weights array is not 2")
else:
weights_ = (0.5, 0.5)
danger = (weights_[0]*np.dot(test_data_, self.gamma_) +
weights_[1]*np.log(self.predict_cure_proba(test_data_, test_labels_)[:, 1]))
return danger
[docs] def get_risk_factors(self):
'''
Returns the relative risk factors associated with every covariate in data set
Returns
-------
factors : {array-like} of shape (n_features, 1)
Relative weight of each factor in the dataset
'''
return self.gamma_
[docs] def get_covariate_importance(self):
'''
Returns the relative weights for logistic regression for each covariate in data set
Returns
-------
factors : {array-like} of shape (n_features, 1)
Relative weight of each factor in the dataset
'''
return self.classifier_.coef_[0]
[docs] def get_params(self, deep=True):
"""
Get parameters for the estimator
Parameters
----------
deep : bool, default=True
If True, will return the parameters for SurvivalAnalysis class and
contained subojects
Returns
-------
params : dict
Parameter names for class mapped to values
"""
param_dict = {"estimator_": self.estimator_, "random_state_": self.random_state_,
"n_features": self.n_features, "_is_fitted_": self._is_fitted_,
"scale_": self.scale_, "shape_": self.shape_, "gamma_": self.gamma_}
if deep:
if hasattr(self.classifier_, "get_params"):
classifier_params = self.classifier_.get_params().items()
for key, val in classifier_params:
param_dict["classifier__" + key] = val
return param_dict
[docs] def set_params(self, **params):
"""
Set the parameters of this SurvivalAnalysis object.
Parameters
----------
**params : dict
Estimator parameters
Returns
-------
self : SurvivalAnalysis object
"""
valid_params = self.get_params(deep=True)
classifier_params = {}
for key, value in params.items():
key, delim, sub_key = key.partition("__")
if key not in valid_params:
raise ValueError("Invalid parameter {0} for SurvivalAnalysis class".format(key))
if delim:
classifier_params[sub_key] = value
else:
setattr(self, key, value)
params[key] = value
self.classifier_.set_params(**classifier_params)
return self
[docs] def score_labels(self, test_data, test_labels, sample_weight=None):
"""
Returns the accuracy score on a given test_data provided labels
Parameters
----------
test_data : {array-like} of shape (n_samples, n_features)
test data
test_labels : {array-like} of shape (n_samples, 1)
Labels for the test data.
sample_weight : array_like of shape (n_samples,), default = None
Sample weights
Returns
-------
score : float
Mean accuracy
"""
test_data_ = np.asarray(test_data)
n_rows, _ = test_data.shape
if n_rows != len(test_labels):
raise ValueError(
"test_data and test_labels do not have the same number of rows"
)
predicted_labels = self.predict_cure_labels(test_data_)
return accuracy_score(
test_labels, predicted_labels, sample_weight=sample_weight
)
[docs] def cindex(self, test_times, test_labels, danger):
'''
Returns the concordance index on test/score data. I.e. the ratio
of concordant pairs to comparable ones.
Parameters
----------
test_times : {array-like} of shape (n_samples, 1)
Column of times of event/times of censoring.
test_labels : {array-like} of shape (n_samples, 1)
Column indicating whether censored or event was observed
danger : float
Measures the risk of an individual. The higher the riskier. Sometimes
taken as ln of proportionality factor (exponential term) from proportional
hazard function. In this setting, a risk also incorporating the (non) cure
probability is reasonable. E.g. ln(1-pi)+gamma*x, or something similar.
Returns
-------
c-index: float
Concordance index
'''
non_cure_label = self.get_non_cure_label()
# Convert to boolean for use with sksurv
test_labels_ = np.asarray(test_labels) == non_cure_label
return cindex(test_labels_, test_times, danger)[0]
def _fit_weights(self, training_data, training_labels, training_times, **kwargs):
'''
Fit the lifetime parameters and survival function and returns the fit
weights
'''
# We first need to fit a model to estimate labels
if not self._is_fitted_:
self.pu_fit(training_data, training_labels, **kwargs)
if self._is_scar_:
probabilities = self.predict_cure_proba(training_data, training_labels)[:, 0]
else:
probabilities = self.predict_cure_proba(training_data)[:, 0]
weights, errors, log_likelihood = \
_survival_fit_weights(training_data, training_labels, training_times,
probabilities, **kwargs)
return weights, errors, log_likelihood
[docs] def survival_fit(self, training_data, training_labels, training_times, **kwargs):
'''
Fits the lifetime parameters and survival function and returns a fitted
object
Parameters
----------
training_data : {array-like} of shape (n_samples, n_features)
Training data
training_labels : {array-like} of shape (n_samples, n_features)
Labels for training_data
training_times : {array-like} of shape (n_samples, 1)
Times for all training points (censoring time or event time)
Optional kwargs include:
surv_reg_term : float, default=0.5
The strength of the quadratic regularization term (C*w^2) on the
non-intercept model covariate weights for survival fit.
surv_max_iter: int, default=1000
maximum number of iterations for SLSQP method.
is_scar : {True, False}, default=False
True if SCAR (selected completely at random) assumption holds for
the dataset. In this situation, we find the probability of being NOT
censored given covariates. The latter is then divided by an appropriate
constant to get probability of being NOT cured.
See the paper https://cseweb.ucsd.edu/~elkan/posonly.pdf for more details
Returns
-------
self
Fitted estimator.
'''
training_data_, training_labels_, training_times_ = self._validate_train_data(
training_data, training_labels, training_times)
weights, _, _ = self._fit_weights(training_data_, training_labels_,
training_times_, **kwargs)
_, self.n_features = training_data_.shape
self.shape_ = weights[0]
self.scale_ = weights[1]
self.gamma_ = weights[2:]
return self
[docs] def stochastic_fit(self, training_data, training_labels, training_times, # pylint: disable=too-many-arguments, too-many-locals
**kwargs):
'''
Fits the lifetime parameters and survival function and returns a fitted
object. This is a meta algorithm heuristic for large datasets where
survival_fit does not scale well, due to the computational bottleneck
of censored individuals.
Step 1: split into smaller datasets
Step 2: for each dataset run survival fit
Step 3: as final output take average of parameters.
Parameters
----------
training_data : {array-like} of shape (n_samples, n_features)
Training data
training_labels : {array-like} of shape (n_samples, n_features)
Labels for training_data
training_times : {array-like} of shape (n_samples, 1)
Times for all training points (censoringtime or event time)
Optional kwargs include:
surv_reg_term : float, default=0.5
strength of regularization parameter
surv_max_iter: int, default=1000
maximum number of iterations for SLSQP method.
surv_batch_size : int, default=200
The maximum size for a batch for stochastic fit
is_scar : {True, False}, default=False
True if SCAR (selected completely at random) assumption holds for
the dataset. In this situation, we find the probability of being NOT
censored given covariates. The latter is then divided by an appropriate
constant to get probability of being NOT cured.
See the paper https://cseweb.ucsd.edu/~elkan/posonly.pdf for more
details
Returns
-------
self
Fitted estimator.
'''
training_data_, training_labels_, training_times_ = self._validate_train_data(
training_data, training_labels, training_times)
kwargs = _validate_kwargs(kwargs)
batch_size = kwargs.get("surv_batch_size")
_, self.n_features = training_data_.shape
is_scar = kwargs.get("is_scar", False)
if is_scar:
# Things are fast under SCAR assumption. Can stick to one split
batch_size = len(training_data_)
n_splits = 1
else:
n_cens = training_labels_[training_labels_ == _CENSOR_LABEL].shape[0]
n_rows = training_labels_.shape[0]
n_splits = int(np.ceil(n_cens/batch_size))
shuffled = self.rnd_gen.permutation(n_rows)
print(f"Splitting up data into {n_splits} pieces")
outputs = []
for i in tqdm(range(n_splits)):
# Must reset cure label estimator each time
self._reset_pu_fit()
indices = shuffled[batch_size*i:batch_size*(i+1)]
data_chunk = training_data_[indices]
label_chunk = training_labels_[indices]
time_chunk = training_times_[indices]
weights, _, _ = self._fit_weights(data_chunk, label_chunk, time_chunk,
**kwargs)
outputs.append(weights)
mean_weights = np.mean(outputs, axis=0)
self.shape_ = mean_weights[0]
self.scale_ = mean_weights[1]
self.gamma_ = mean_weights[2:]
return self