Skip to content

API

AbstractGP

AbstractGP(kernel, seqs, num_tasks, default_task, solo_task, noise, tfs_noise, requires_grad_noise, shape_noise, derivatives, derivatives_coeffs, adaptive_nugget, ptransform)

Bases: Module

Source code in fastgps/abstract_gp.py
def __init__(
        self,
        kernel,
        seqs,
        num_tasks,
        default_task,
        solo_task,
        noise,
        tfs_noise,
        requires_grad_noise,
        shape_noise,
        derivatives,
        derivatives_coeffs,
        adaptive_nugget,
        ptransform
    ):
    super().__init__()
    if not torch.get_default_dtype()==torch.float64:
        warnings.warn('''
            Using torch.float32 precision may significantly hurt FastGPs accuracy. 
            This is especailly evident when computing posterior variance and covariance values. 
            If possible, please use
                torch.set_default_dtype(torch.float64)''')
    # copy kernel parameters 
    self.kernel = kernel
    assert self.kernel.torchify, "requires torchify=True for the kernel"
    self.device = self.kernel.device
    self.d = self.kernel.d
    # multi-task kernel abstraction
    assert isinstance(num_tasks,int) and num_tasks>0
    self.num_tasks = num_tasks
    self.default_task = default_task
    self.solo_task = solo_task
    self.task_range = torch.arange(num_tasks,device=self.device)
    if solo_task:
        self.kernel = qp.KernelMultiTask(
            base_kernel = self.kernel,
            num_tasks = 1, 
            factor = 1.,
            diag =  0.,
            requires_grad_factor = False, 
            requires_grad_diag = False,
            tfs_diag = (qp.util.transforms.tf_identity,qp.util.transforms.tf_identity),
            rank_factor = 1)
    assert isinstance(self.kernel,qp.KernelMultiTask)
    # seqs setup 
    assert isinstance(seqs,np.ndarray) and seqs.shape==(self.num_tasks,)
    assert all(seqs[i].d==self.d for i in range(self.num_tasks))
    self.seqs = seqs
    self.n = torch.zeros(self.num_tasks,dtype=int,device=self.device)
    self.n_cumsum = torch.zeros(self.num_tasks,dtype=int,device=self.device)
    self.m = -1*torch.ones(self.num_tasks,dtype=int,device=self.device)
    # derivatives setup 
    if derivatives is None: derivatives = [torch.zeros((1,self.d),dtype=torch.int64,device=self.device) for i in range(self.num_tasks)]
    if isinstance(derivatives,torch.Tensor): derivatives = [derivatives]
    assert isinstance(derivatives,list) and len(derivatives)==self.num_tasks
    derivatives = [deriv[None,:] if deriv.ndim==1 else deriv for deriv in derivatives]
    assert all((derivatives[i].ndim==2 and derivatives[i].size(1)==self.d) for i in range(self.num_tasks))
    if derivatives_coeffs is None: derivatives_coeffs = [torch.ones(len(derivatives[i]),device=self.device) for i in range(self.num_tasks)]
    assert isinstance(derivatives_coeffs,list) and len(derivatives_coeffs)==self.num_tasks
    assert all((derivatives_coeffs[i].ndim==1 and len(derivatives_coeffs[i]))==len(derivatives[i]) for i in range(self.num_tasks))
    self.derivatives_cross = [[[None,None] for l1 in range(self.num_tasks)] for l0 in range(self.num_tasks)]
    self.derivatives_coeffs_cross = [[None for l1 in range(self.num_tasks)] for l0 in range(self.num_tasks)]
    self.derivatives_flag = any((derivatives_i!=0).any() for derivatives_i in derivatives) 
    if not self.derivatives_flag:
        assert all((derivatives_coeffs_i==1).all() for derivatives_coeffs_i in derivatives_coeffs) 
    for l0 in range(self.num_tasks):
        p0r = torch.arange(len(derivatives_coeffs[l0]),device=self.device)
        for l1 in range(l0+1):
            p1r = torch.arange(len(derivatives_coeffs[l1]),device=self.device)
            i0m,i1m = torch.meshgrid(p0r,p1r,indexing="ij")
            i0,i1 = i0m.flatten(),i1m.flatten()
            self.derivatives_cross[l0][l1][0] = derivatives[l0][i0]
            self.derivatives_cross[l0][l1][1] = derivatives[l1][i1]
            self.derivatives_coeffs_cross[l0][l1] = derivatives_coeffs[l0][i0]*derivatives_coeffs[l1][i1]
            if l0!=l1:
                self.derivatives_cross[l1][l0][0] = self.derivatives_cross[l0][l1][1]
                self.derivatives_cross[l1][l0][1] = self.derivatives_cross[l0][l1][0]
                self.derivatives_coeffs_cross[l1][l0] = self.derivatives_coeffs_cross[l0][l1]
    # noise
    self.raw_noise = self.kernel.parse_assign_param(
        pname = "noise",
        param = noise,
        shape_param = shape_noise,
        requires_grad_param = requires_grad_noise,
        tfs_param = tfs_noise,
        endsize_ops = [1],
        constraints = ["NON-NEGATIVE"])
    self.tfs_noise = tfs_noise
    self.prior_mean = torch.zeros(self.num_tasks,device=self.device)
    # storage and dynamic caches
    self._y = [torch.empty(0,device=self.device) for l in range(self.num_tasks)]
    self.xxb_seqs = np.array([_XXbSeq(self,self.seqs[i]) for i in range(self.num_tasks)],dtype=object)
    self.n_x = torch.zeros(self.num_tasks,dtype=int)
    self.inv_log_det_cache_dict = {}
    # derivative multitask setting checks 
    if any((derivatives[i]>0).any() or (derivatives_coeffs[i]!=1).any() for i in range(self.num_tasks)):
        self.kernel.raw_factor.requires_grad_(False)
        self.kernel.raw_diag.requires_grad_(False)
        assert (self.kernel.taskmat==1).all()
    self.adaptive_nugget = adaptive_nugget
    self.batch_param_names = ["noise"]
    self.stable = False # maybe change this in the future if we come across any more calcellation error issues
    self.ptransform = str(ptransform).upper()
    if self.ptransform=='TENT': self.ptransform = 'BAKER'
    assert self.ptransform in ['NONE','BAKER'], "invalid ptransform = %s"%self.ptransform

noise property

noise

Noise parameter.

coeffs property

coeffs

Coefficients \(\mathsf{K}^{-1} \boldsymbol{y}\).

x property

x

Current sampling locations. A torch.Tensor for single task problems. A list for multitask problems.

y property

y

Current sampling values. A torch.Tensor for single task problems. A list for multitask problems.

save_params

save_params(path)

Save the state dict to path

Arg

path (str): the path.

Source code in fastgps/abstract_gp.py
def save_params(self, path):
    """ Save the state dict to path 

    Arg:
        path (str): the path. 
    """
    torch.save(self.state_dict(),path)

load_params

load_params(path)

Load the state dict from path

Arg

path (str): the path.

Source code in fastgps/abstract_gp.py
def load_params(self, path):
    """ Load the state dict from path 

    Arg:
        path (str): the path. 
    """
    self.load_state_dict(torch.load(path,weights_only=True))

fit

fit(loss_metric='MLL', iterations=5000, lr=None, optimizer=None, stop_crit_improvement_threshold=0.05, stop_crit_wait_iterations=10, store_hists=False, verbose=5, verbose_indent=4, cv_weights=1, update_prior_mean=True)

Parameters:

Name Type Description Default
loss_metric str

either "MLL" (Marginal Log Likelihood) or "CV" (Cross Validation) or "GCV" (Generalized CV)

'MLL'
iterations int

number of optimization iterations

5000
lr float

learning rate for default optimizer

None
optimizer Optimizer

optimizer defaulted to torch.optim.Rprop(self.parameters(),lr=lr)

None
stop_crit_improvement_threshold float

stop fitting when the maximum number of iterations is reached or the best loss is note reduced by stop_crit_improvement_threshold for stop_crit_wait_iterations iterations

0.05
stop_crit_wait_iterations int

number of iterations to wait for improved loss before early stopping, see the argument description for stop_crit_improvement_threshold

10
store_hists Union[bool, int]

store parameter data every store_hists iterations.

False
verbose int

log every verbose iterations, set to 0 for silent mode

5
verbose_indent int

size of the indent to be applied when logging, helpful for logging multiple models

4
cv_weights Union[str, Tensor]

weights for cross validation

1
update_prior_mean bool

if True, then update the prior mean to optimize the loss.

True

Returns:

Name Type Description
hist_data dict

iteration history data.

Source code in fastgps/abstract_gp.py
def fit(
        self,
        loss_metric:str = "MLL",
        iterations:int = 5000,
        lr:float = None,
        optimizer:torch.optim.Optimizer = None,
        stop_crit_improvement_threshold:float = 5e-2,
        stop_crit_wait_iterations:int = 10,
        store_hists:bool = False,
        verbose:int = 5,
        verbose_indent:int = 4,
        cv_weights:torch.Tensor = 1,
        update_prior_mean:bool = True,
        ):
    """
    Args:
        loss_metric (str): either "MLL" (Marginal Log Likelihood) or "CV" (Cross Validation) or "GCV" (Generalized CV)
        iterations (int): number of optimization iterations
        lr (float): learning rate for default optimizer
        optimizer (torch.optim.Optimizer): optimizer defaulted to `torch.optim.Rprop(self.parameters(),lr=lr)`
        stop_crit_improvement_threshold (float): stop fitting when the maximum number of iterations is reached or the best loss is note reduced by `stop_crit_improvement_threshold` for `stop_crit_wait_iterations` iterations 
        stop_crit_wait_iterations (int): number of iterations to wait for improved loss before early stopping, see the argument description for `stop_crit_improvement_threshold`
        store_hists (Union[bool,int]): store parameter data every `store_hists` iterations.
        verbose (int): log every `verbose` iterations, set to `0` for silent mode
        verbose_indent (int): size of the indent to be applied when logging, helpful for logging multiple models
        cv_weights (Union[str,torch.Tensor]): weights for cross validation
        update_prior_mean (bool): if `True`, then update the prior mean to optimize the loss.

    Returns:
        hist_data (dict): iteration history data.
    """
    assert isinstance(loss_metric,str) and loss_metric.upper() in ["MLL","GCV","CV"] 
    assert (self.n>0).any(), "cannot fit without data"
    assert isinstance(iterations,int) and iterations>=0
    if optimizer is None:
        optimizer = self.get_default_optimizer(lr)
    assert isinstance(optimizer,torch.optim.Optimizer)
    assert isinstance(store_hists,int), "require int store_mll_hist" 
    assert (isinstance(verbose,int) or isinstance(verbose,bool)) and verbose>=0, "require verbose is a non-negative int"
    assert isinstance(verbose_indent,int) and verbose_indent>=0, "require verbose_indent is a non-negative int"
    assert np.isscalar(stop_crit_improvement_threshold) and 0<stop_crit_improvement_threshold, "require stop_crit_improvement_threshold is a positive float"
    assert (isinstance(stop_crit_wait_iterations,int) or stop_crit_wait_iterations==np.inf) and stop_crit_wait_iterations>0
    loss_metric = loss_metric.upper()
    logtol = np.log(1+stop_crit_improvement_threshold)
    if isinstance(cv_weights,str) and cv_weights.upper()=="L2R":
        _y = self.y 
        if _y.ndim==1:
            cv_weights = 1/torch.abs(_y) 
        else:
            cv_weights = 1/torch.linalg.norm(_y,2,dim=[i for i in range(_y.ndim-1)])
    if store_hists:
        hist_data = {}
        hist_data["iteration"] = []
        hist_data["loss"] = []
        hist_data["best_loss"] = []
        hist_data["prior_mean"] = []
        for pname in self.batch_param_names:
            hist_data[pname] = []
        for pname in self.kernel.batch_param_names:
            hist_data[pname] = []
        for pname in self.kernel.base_kernel.batch_param_names:
            hist_data[pname] = []
    else:
        hist_data = {}
    if verbose:
        _s = "%16s | %-10s | %-10s"%("iter of %.1e"%iterations,"best loss","loss")
        print(" "*verbose_indent+_s)
        print(" "*verbose_indent+"~"*len(_s))
    stop_crit_best_loss = torch.inf 
    stop_crit_save_loss = torch.inf 
    stop_crit_iterations_without_improvement_loss = 0
    update_prior_mean = update_prior_mean and (not self.derivatives_flag) and loss_metric!="CV"
    inv_log_det_cache = self.get_inv_log_det_cache()
    best_params = None
    try:
        pcstd = torch.sqrt(self.post_cubature_var(eval=False))
        can_compute_pcstd = True
    except Exception as e:
        if "parsed_single_integral_01d" not in str(e): raise
        can_compute_pcstd = False
    for i in range(iterations+1):
        os.environ["FASTGP_FORCE_RECOMPILE"] = "True"
        if loss_metric=="MLL":
            loss = inv_log_det_cache.mll_loss(self,update_prior_mean)
        elif loss_metric=="GCV":
            loss = inv_log_det_cache.gcv_loss(self,update_prior_mean)
        elif loss_metric=="CV":
            loss = inv_log_det_cache.cv_loss(self,cv_weights,update_prior_mean)
        else:
            assert False, "loss_metric parsing implementation error"
        del os.environ["FASTGP_FORCE_RECOMPILE"]
        pcstd = torch.sqrt(self.post_cubature_var()) if can_compute_pcstd else torch.inf*torch.ones(1,device=self.device)
        if loss.item()<stop_crit_best_loss and (pcstd>0).all():
            stop_crit_best_loss = loss.item()
            best_params = (self.prior_mean.clone().detach(),OrderedDict([(pname,pval.clone()) for pname,pval in self.state_dict().items()]))
        if (stop_crit_save_loss-loss.item())>logtol:
            stop_crit_iterations_without_improvement_loss = 0
            stop_crit_save_loss = stop_crit_best_loss
        else:
            stop_crit_iterations_without_improvement_loss += 1
        break_condition = i==iterations or stop_crit_iterations_without_improvement_loss==stop_crit_wait_iterations or (pcstd<=0).any()
        if store_hists and (break_condition or i%store_hists==0):
            hist_data["iteration"].append(i)
            hist_data["loss"].append(loss.item())
            hist_data["best_loss"].append(stop_crit_best_loss)
            hist_data["prior_mean"].append(self.prior_mean.clone())
            for pname in self.batch_param_names:
                hist_data[pname].append(getattr(self,pname).data.detach().clone().cpu())
            for pname in self.kernel.batch_param_names:
                hist_data[pname].append(getattr(self.kernel,pname).data.detach().clone().cpu())
            for pname in self.kernel.base_kernel.batch_param_names:
                hist_data[pname].append(getattr(self.kernel.base_kernel,pname).data.detach().clone().cpu())
        if verbose and (i%verbose==0 or break_condition):
            _s = "%16.2e | %-10.2e | %-10.2e"%(i,stop_crit_best_loss,loss.item())
            print(" "*verbose_indent+_s)
        if break_condition: break
        # with torch.autograd.set_detect_anomaly(True):
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    if best_params is not None:
        self.prior_mean = best_params[0]
        self.load_state_dict(best_params[1])
    if store_hists:
        hist_data["iteration"] = torch.tensor(hist_data["iteration"])
        hist_data["loss"] = torch.tensor(hist_data["loss"])
        hist_data["best_loss"] = torch.tensor(hist_data["best_loss"])
        hist_data["prior_mean"] = torch.stack(hist_data["prior_mean"],dim=0)
        for pname in self.batch_param_names:
            hist_data[pname] = torch.stack(hist_data[pname],dim=0)
        for pname in self.kernel.batch_param_names:
            hist_data[pname] = torch.stack(hist_data[pname],dim=0)
        for pname in self.kernel.base_kernel.batch_param_names:
            hist_data[pname] = torch.stack(hist_data[pname],dim=0)
    return hist_data

get_x_next

get_x_next(n, task=None)

Get the next sampling locations.

Parameters:

Name Type Description Default
n Union[int, Tensor]

maximum sample index per task

required
task Union[int, Tensor]

task index

None

Returns:

Name Type Description
x_next Union[Tensor, List]

next samples in the sequence

Source code in fastgps/abstract_gp.py
def get_x_next(self, n:Union[int,torch.Tensor], task:Union[int,torch.Tensor]=None):
    """
    Get the next sampling locations. 

    Args:
        n (Union[int,torch.Tensor]): maximum sample index per task
        task (Union[int,torch.Tensor]): task index

    Returns:
        x_next (Union[torch.Tensor,List]): next samples in the sequence
    """
    if isinstance(n,(int,np.int64)): n = torch.tensor([n],dtype=int,device=self.device) 
    if isinstance(n,list): n = torch.tensor(n,dtype=int,device=self.device)
    if task is None: task = self.default_task
    inttask = isinstance(task,int)
    if inttask: task = torch.tensor([task],dtype=int,device=self.device)
    if isinstance(task,list): task = torch.tensor(task,dtype=int,device=self.device)
    assert isinstance(n,torch.Tensor) and isinstance(task,torch.Tensor) and n.ndim==task.ndim==1 and len(n)==len(task)
    assert (n>=self.n[task]).all(), "maximum sequence index must be greater than the current number of samples"
    x_next = [self.xxb_seqs[l].getitem_x(self,slice(self.n[l],n[i])) for i,l in enumerate(task)]
    return x_next[0] if inttask else x_next

add_y_next

add_y_next(y_next, task=None)

Add samples to the GP.

Parameters:

Name Type Description Default
y_next Union[Tensor, List]

new function evaluations at next sampling locations

required
task Union[int, Tensor]

task index

None
Source code in fastgps/abstract_gp.py
def add_y_next(self, y_next:Union[torch.Tensor,List], task:Union[int,torch.Tensor]=None):
    """
    Add samples to the GP. 

    Args:
        y_next (Union[torch.Tensor,List]): new function evaluations at next sampling locations
        task (Union[int,torch.Tensor]): task index
    """
    if isinstance(y_next,torch.Tensor): y_next = [y_next]
    if task is None: task = self.default_task
    if isinstance(task,int): task = torch.tensor([task],dtype=int,device=self.device)
    if isinstance(task,list): task = torch.tensor(task,dtype=int,device=self.device)
    assert isinstance(y_next,list) and isinstance(task,torch.Tensor) and task.ndim==1 and len(y_next)==len(task)
    for i,l in enumerate(task):
        self._y[l] = torch.cat([self._y[l],y_next[i]],-1)
    shape_batch = list(self._y[0].shape[:-1])
    if (self.n==0).all() and len(shape_batch)>0:
        self.prior_mean = torch.zeros(shape_batch+[self.num_tasks],device=self.device)
    self.n = torch.tensor([self._y[i].size(-1) for i in range(self.num_tasks)],dtype=int,device=self.device)
    self.m = torch.where(self.n==0,-1,torch.log2(self.n).round()).to(int) # round to avoid things like torch.log2(torch.tensor([2**3],dtype=torch.int64,device="cuda")).item() = 2.9999999999999996
    self.n_cumsum = torch.hstack([torch.zeros(1,dtype=self.n.dtype,device=self.n.device),self.n.cumsum(0)[:-1]])
    for key in list(self.inv_log_det_cache_dict.keys()):
        if (torch.tensor(key)<self.n.cpu()).any():
            del self.inv_log_det_cache_dict[key]

post_mean

post_mean(x, task=None, eval=True)

Posterior mean.

Parameters:

Name Type Description Default
x Tensor[N, d]

sampling locations

required
task Union[int, Tensor[T]]

task index

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pmean Tensor[..., T, N]

posterior mean

Source code in fastgps/abstract_gp.py
def post_mean(self, x:torch.Tensor, task:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior mean. 

    Args:
        x (torch.Tensor[N,d]): sampling locations
        task (Union[int,torch.Tensor[T]]): task index
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pmean (torch.Tensor[...,T,N]): posterior mean
    """
    coeffs = self.coeffs
    if eval:
        incoming_grad_enabled = torch.is_grad_enabled()
        torch.set_grad_enabled(False)
    assert x.ndim==2 and x.size(1)==self.d, "x must a torch.Tensor with shape (-1,d)"
    if task is None: task = self.default_task
    inttask = isinstance(task,int)
    if inttask: task = torch.tensor([task],dtype=int,device=self.device)
    if isinstance(task,list): task = torch.tensor(task,dtype=int,device=self.device)
    assert task.ndim==1 and (task>=0).all() and (task<self.num_tasks).all()
    if self.ptransform=="NONE":
        kmat = torch.cat([torch.cat([self.kernel(task[l0],l1,x[:,None,:],self.get_xb(l1)[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
    elif self.ptransform=="BAKER":
        kmat_left = torch.cat([torch.cat([self.kernel(task[l0],l1,x[:,None,:]/2,self.get_xb(l1)[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
        kmat_right = torch.cat([torch.cat([self.kernel(task[l0],l1,1-x[:,None,:]/2,self.get_xb(l1)[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
        kmat = 1/2*(kmat_left+kmat_right)
    else:
        raise Exception("invalid ptransform = %s"%self.ptransform)
    pmean = self.prior_mean[...,task,None]+torch.einsum("...i,...i->...",kmat,coeffs[...,None,None,:])
    if eval:
        torch.set_grad_enabled(incoming_grad_enabled)
    return pmean[...,0,:] if inttask else pmean

post_var

post_var(x, task=None, n=None, eval=True)

Posterior variance.

Parameters:

Name Type Description Default
x Tensor[N, d]

sampling locations

required
task Union[int, Tensor[T]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pvar Tensor[T, N]

posterior variance

Source code in fastgps/abstract_gp.py
def post_var(self, x:torch.Tensor, task:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior variance.

    Args:
        x (torch.Tensor[N,d]): sampling locations
        task (Union[int,torch.Tensor[T]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pvar (torch.Tensor[T,N]): posterior variance
    """
    if n is None: n = self.n
    if isinstance(n,int): n = torch.tensor([n],dtype=int,device=self.device)
    assert isinstance(n,torch.Tensor)
    assert x.ndim==2 and x.size(1)==self.d, "x must a torch.Tensor with shape (-1,d)"
    inv_log_det_cache = self.get_inv_log_det_cache(n)
    if eval:
        incoming_grad_enabled = torch.is_grad_enabled()
        torch.set_grad_enabled(False)
    if task is None: task = self.default_task
    inttask = isinstance(task,int)
    if inttask: task = torch.tensor([task],dtype=int,device=self.device)
    if isinstance(task,list): task = torch.tensor(task,dtype=int,device=self.device)
    assert task.ndim==1 and (task>=0).all() and (task<self.num_tasks).all()
    if self.ptransform=='NONE':
        kmat_new = torch.cat([self.kernel(task[l0],task[l0],x,x,*self.derivatives_cross[task[l0]][task[l0]],self.derivatives_coeffs_cross[task[l0]][task[l0]])[...,None,:] for l0 in range(len(task))],dim=-2)
        kmat = torch.cat([torch.cat([self.kernel(task[l0],l1,x[:,None,:],self.get_xb(l1,n[l1])[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
        kmat_perm = torch.permute(kmat,[-3,-2]+[i for i in range(kmat.ndim-3)]+[-1])
        t_perm = inv_log_det_cache.gram_matrix_solve(self,kmat_perm)
        t = torch.permute(t_perm,[2+i for i in range(t_perm.ndim-3)]+[0,1,-1])
        diag = kmat_new-(t*kmat).sum(-1)
    elif self.ptransform=='BAKER':
        kmat_new_1 = torch.cat([self.kernel(task[l0],task[l0],x/2,x/2,*self.derivatives_cross[task[l0]][task[l0]],self.derivatives_coeffs_cross[task[l0]][task[l0]])[...,None,:] for l0 in range(len(task))],dim=-2)
        kmat_1 = torch.cat([torch.cat([self.kernel(task[l0],l1,x[:,None,:]/2,self.get_xb(l1,n[l1])[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
        kmat_perm_1 = torch.permute(kmat_1,[-3,-2]+[i for i in range(kmat_1.ndim-3)]+[-1])
        t_perm_1 = inv_log_det_cache.gram_matrix_solve(self,kmat_perm_1)
        t_1 = torch.permute(t_perm_1,[2+i for i in range(t_perm_1.ndim-3)]+[0,1,-1])
        diag_1 = kmat_new_1-(t_1*kmat_1).sum(-1)
        kmat_new_2 = torch.cat([self.kernel(task[l0],task[l0],1-x/2,1-x/2,*self.derivatives_cross[task[l0]][task[l0]],self.derivatives_coeffs_cross[task[l0]][task[l0]])[...,None,:] for l0 in range(len(task))],dim=-2)
        kmat_2 = torch.cat([torch.cat([self.kernel(task[l0],l1,1-x[:,None,:]/2,self.get_xb(l1,n[l1])[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
        kmat_perm_2 = torch.permute(kmat_2,[-3,-2]+[i for i in range(kmat_2.ndim-3)]+[-1])
        t_perm_2 = inv_log_det_cache.gram_matrix_solve(self,kmat_perm_2)
        t_2 = torch.permute(t_perm_2,[2+i for i in range(t_perm_2.ndim-3)]+[0,1,-1])
        diag_2 = kmat_new_2-(t_2*kmat_2).sum(-1)
        kmat_new_3 = torch.cat([self.kernel(task[l0],task[l0],x/2,1-x/2,*self.derivatives_cross[task[l0]][task[l0]],self.derivatives_coeffs_cross[task[l0]][task[l0]])[...,None,:] for l0 in range(len(task))],dim=-2)
        kmat_3 = torch.cat([torch.cat([self.kernel(task[l0],l1,x[:,None,:]/2,self.get_xb(l1,n[l1])[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
        kmat_p3 = torch.cat([torch.cat([self.kernel(task[l0],l1,1-x[:,None,:]/2,self.get_xb(l1,n[l1])[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
        kmat_perm_3 = torch.permute(kmat_p3,[-3,-2]+[i for i in range(kmat_p3.ndim-3)]+[-1])
        t_perm_3 = inv_log_det_cache.gram_matrix_solve(self,kmat_perm_3)
        t_3 = torch.permute(t_perm_3,[2+i for i in range(t_perm_3.ndim-3)]+[0,1,-1])
        diag_3 = kmat_new_3-(t_3*kmat_3).sum(-1)
        diag = 1/4*(diag_1+diag_2+2*diag_3)
    else:
        raise Exception("invalid ptransform = %s"%self.ptransform)
    diag[diag<0] = 0 
    if eval:
        torch.set_grad_enabled(incoming_grad_enabled)
    return diag[...,0,:] if inttask else diag

post_cov

post_cov(x0, x1, task0=None, task1=None, n=None, eval=True)

Posterior covariance.

Parameters:

Name Type Description Default
x0 Tensor[N, d]

left sampling locations

required
x1 Tensor[M, d]

right sampling locations

required
task0 Union[int, Tensor[T1]]

left task index

None
task1 Union[int, Tensor[T2]]

right task index

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pcov Tensor[T1, T2, N, M]

posterior covariance matrix

Source code in fastgps/abstract_gp.py
def post_cov(self, x0:torch.Tensor, x1:torch.Tensor, task0:Union[int,torch.Tensor]=None, task1:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior covariance. 

    Args:
        x0 (torch.Tensor[N,d]): left sampling locations
        x1 (torch.Tensor[M,d]): right sampling locations
        task0 (Union[int,torch.Tensor[T1]]): left task index
        task1 (Union[int,torch.Tensor[T2]]): right task index
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pcov (torch.Tensor[T1,T2,N,M]): posterior covariance matrix
    """
    if n is None: n = self.n
    if isinstance(n,int): n = torch.tensor([n],dtype=int,device=self.device)
    assert isinstance(n,torch.Tensor)
    assert x0.ndim==2 and x0.size(1)==self.d, "x must a torch.Tensor with shape (-1,d)"
    assert x1.ndim==2 and x1.size(1)==self.d, "z must a torch.Tensor with shape (-1,d)"
    inv_log_det_cache = self.get_inv_log_det_cache(n)
    if eval:
        incoming_grad_enabled = torch.is_grad_enabled()
        torch.set_grad_enabled(False)
    if task0 is None: task0 = self.default_task
    inttask0 = isinstance(task0,int)
    if inttask0: task0 = torch.tensor([task0],dtype=int,device=self.device)
    if isinstance(task0,list): task0 = torch.tensor(task0,dtype=int,device=self.device)
    assert task0.ndim==1 and (task0>=0).all() and (task0<self.num_tasks).all()
    if task1 is None: task1 = self.default_task
    inttask1 = isinstance(task1,int)
    if inttask1: task1 = torch.tensor([task1],dtype=int,device=self.device)
    if isinstance(task1,list): task1 = torch.tensor(task1,dtype=int,device=self.device)
    assert task1.ndim==1 and (task1>=0).all() and (task1<self.num_tasks).all()
    if self.ptransform=="NONE":
        equal = torch.equal(x0,x1) and torch.equal(task0,task1)
        kmat_new = torch.cat([torch.cat([self.kernel(task0[l0],task1[l1],x0[:,None,:],x1[None,:,:],*self.derivatives_cross[task0[l0]][task1[l1]],self.derivatives_coeffs_cross[task0[l0]][task1[l1]])[...,None,None,:,:] for l1 in range(len(task1))],dim=-3) for l0 in range(len(task0))],dim=-4)
        kmat1 = torch.cat([torch.cat([self.kernel(task0[l0],l1,x0[:,None,:],self.get_xb(l1,n[l1])[None,:,:],*self.derivatives_cross[task0[l0]][l1],self.derivatives_coeffs_cross[task0[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task0))],dim=-3)
        kmat2 = kmat1 if equal else torch.cat([torch.cat([self.kernel(task1[l0],l1,x1[:,None,:],self.get_xb(l1,n[l1])[None,:,:],*self.derivatives_cross[task1[l0]][l1],self.derivatives_coeffs_cross[task1[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task1))],dim=-3)
        kmat2_perm = torch.permute(kmat2,[-3,-2]+[i for i in range(kmat2.ndim-3)]+[-1])
        t_perm = inv_log_det_cache.gram_matrix_solve(self,kmat2_perm)
        t = torch.permute(t_perm,[2+i for i in range(t_perm.ndim-3)]+[0,1,-1])
        kmat = kmat_new-(kmat1[...,:,None,:,None,:]*t[...,None,:,None,:,:]).sum(-1)
    elif self.ptransform=="BAKER":
        kmat = 0
        for x0i,x1i in [[x0/2,x1/2],[x0/2,1-x1/2],[1-x0/2,x1/2],[1-x0/2,1-x1/2]]:
            equal = torch.equal(x0i,x1i) and torch.equal(task0,task1)
            kmat_new = torch.cat([torch.cat([self.kernel(task0[l0],task1[l1],x0i[:,None,:],x1i[None,:,:],*self.derivatives_cross[task0[l0]][task1[l1]],self.derivatives_coeffs_cross[task0[l0]][task1[l1]])[...,None,None,:,:] for l1 in range(len(task1))],dim=-3) for l0 in range(len(task0))],dim=-4)
            kmat1 = torch.cat([torch.cat([self.kernel(task0[l0],l1,x0i[:,None,:],self.get_xb(l1,n[l1])[None,:,:],*self.derivatives_cross[task0[l0]][l1],self.derivatives_coeffs_cross[task0[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task0))],dim=-3)
            kmat2 = kmat1 if equal else torch.cat([torch.cat([self.kernel(task1[l0],l1,x1i[:,None,:],self.get_xb(l1,n[l1])[None,:,:],*self.derivatives_cross[task1[l0]][l1],self.derivatives_coeffs_cross[task1[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task1))],dim=-3)
            kmat2_perm = torch.permute(kmat2,[-3,-2]+[i for i in range(kmat2.ndim-3)]+[-1])
            t_perm = inv_log_det_cache.gram_matrix_solve(self,kmat2_perm)
            t = torch.permute(t_perm,[2+i for i in range(t_perm.ndim-3)]+[0,1,-1])
            kmat += kmat_new-(kmat1[...,:,None,:,None,:]*t[...,None,:,None,:,:]).sum(-1)
        kmat = kmat/4
    else:
        raise Exception("invalid ptransform = %s"%self.ptransform)
    if equal:
        tmesh,nmesh = torch.meshgrid(torch.arange(kmat.size(0),device=self.device),torch.arange(x0.size(0),device=x0.device),indexing="ij")            
        tidx,nidx = tmesh.ravel(),nmesh.ravel()
        diag = kmat[...,tidx,tidx,nidx,nidx]
        diag[diag<0] = 0 
        kmat[...,tidx,tidx,nidx,nidx] = diag 
    if eval:
        torch.set_grad_enabled(incoming_grad_enabled)
    if inttask0 and inttask1:
        return kmat[...,0,0,:,:]
    elif inttask0 and not inttask1:
        return kmat[...,0,:,:,:]
    elif not inttask0 and inttask1:
        return kmat[...,:,0,:,:]
    else: # not inttask0 and not inttask1
        return kmat

post_error

post_error(x, task=None, n=None, confidence=0.99, eval=True)

Posterior error.

Parameters:

Name Type Description Default
x Tensor[N, d]

sampling locations

required
task Union[int, Tensor[T]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True
confidence float

confidence level in \((0,1)\) for the credible interval

0.99

Returns:

Name Type Description
cvar Tensor[T]

posterior variance

quantile float64
scipy.stats.norm.ppf(1-(1-confidence)/2)
perror Tensor[T]

posterior error

Source code in fastgps/abstract_gp.py
def post_error(self, x:torch.Tensor, task:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, confidence:float=0.99, eval:bool=True):
    """
    Posterior error. 

    Args:
        x (torch.Tensor[N,d]): sampling locations
        task (Union[int,torch.Tensor[T]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`
        confidence (float): confidence level in $(0,1)$ for the credible interval

    Returns:
        cvar (torch.Tensor[T]): posterior variance
        quantile (np.float64):
            ```python
            scipy.stats.norm.ppf(1-(1-confidence)/2)
            ```
        perror (torch.Tensor[T]): posterior error
    """
    assert np.isscalar(confidence) and 0<confidence<1, "confidence must be between 0 and 1"
    q = scipy.stats.norm.ppf(1-(1-confidence)/2)
    pvar = self.post_var(x,task=task,n=n,eval=eval,)
    pstd = torch.sqrt(pvar)
    perror = q*pstd
    return pvar,q,perror

post_ci

post_ci(x, task=None, confidence=0.99, eval=True)

Posterior credible interval.

Parameters:

Name Type Description Default
x Tensor[N, d]

sampling locations

required
task Union[int, Tensor[T]]

task indices

None
confidence float

confidence level in \((0,1)\) for the credible interval

0.99
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pmean Tensor[..., T, N]

posterior mean

pvar Tensor[T, N]

posterior variance

quantile float64
scipy.stats.norm.ppf(1-(1-confidence)/2)
pci_low Tensor[..., T, N]

posterior credible interval lower bound

pci_high Tensor[..., T, N]

posterior credible interval upper bound

Source code in fastgps/abstract_gp.py
def post_ci(self, x:torch.Tensor, task:Union[int,torch.Tensor]=None, confidence:float=0.99, eval:bool=True):
    """
    Posterior credible interval.

    Args:
        x (torch.Tensor[N,d]): sampling locations
        task (Union[int,torch.Tensor[T]]): task indices
        confidence (float): confidence level in $(0,1)$ for the credible interval
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pmean (torch.Tensor[...,T,N]): posterior mean
        pvar (torch.Tensor[T,N]): posterior variance 
        quantile (np.float64):
            ```python
            scipy.stats.norm.ppf(1-(1-confidence)/2)
            ```
        pci_low (torch.Tensor[...,T,N]): posterior credible interval lower bound
        pci_high (torch.Tensor[...,T,N]): posterior credible interval upper bound
    """
    assert np.isscalar(confidence) and 0<confidence<1, "confidence must be between 0 and 1"
    q = scipy.stats.norm.ppf(1-(1-confidence)/2)
    pmean = self.post_mean(x,task=task,eval=eval)
    pvar,q,perror = self.post_error(x,task=task,confidence=confidence)
    pci_low = pmean-q*perror 
    pci_high = pmean+q*perror
    return pmean,pvar,q,pci_low,pci_high

post_cubature_mean

post_cubature_mean(task=None, eval=True)

Posterior cubature mean.

Parameters:

Name Type Description Default
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True
task Union[int, Tensor[T]]

task indices

None

Returns:

Name Type Description
pcmean Tensor[..., T]

posterior cubature mean

Source code in fastgps/abstract_gp.py
def post_cubature_mean(self, task:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior cubature mean. 

    Args:
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`
        task (Union[int,torch.Tensor[T]]): task indices

    Returns:
        pcmean (torch.Tensor[...,T]): posterior cubature mean
    """
    raise NotImplementedError()

post_cubature_var

post_cubature_var(task=None, n=None, eval=True)

Posterior cubature variance.

Parameters:

Name Type Description Default
task Union[int, Tensor[T]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pcvar Tensor[T]

posterior cubature variance

Source code in fastgps/abstract_gp.py
def post_cubature_var(self, task:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior cubature variance. 

    Args:
        task (Union[int,torch.Tensor[T]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pcvar (torch.Tensor[T]): posterior cubature variance
    """
    raise NotImplementedError()

post_cubature_cov

post_cubature_cov(task0=None, task1=None, n=None, eval=True)

Posterior cubature covariance.

Parameters:

Name Type Description Default
task0 Union[int, Tensor[T1]]

task indices

None
task1 Union[int, Tensor[T2]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature covariance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pcvar Tensor[T1, T2]

posterior cubature covariance

Source code in fastgps/abstract_gp.py
def post_cubature_cov(self, task0:Union[int,torch.Tensor]=None, task1:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior cubature covariance. 

    Args:
        task0 (Union[int,torch.Tensor[T1]]): task indices
        task1 (Union[int,torch.Tensor[T2]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature covariance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pcvar (torch.Tensor[T1,T2]): posterior cubature covariance
    """
    raise NotImplementedError()

post_cubature_error

post_cubature_error(task=None, n=None, confidence=0.99, eval=True)

Posterior cubature error.

Parameters:

Name Type Description Default
task Union[int, Tensor[T]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True
confidence float

confidence level in \((0,1)\) for the credible interval

0.99

Returns:

Name Type Description
pcvar Tensor[T]

posterior cubature variance

quantile float64
scipy.stats.norm.ppf(1-(1-confidence)/2)
pcerror Tensor[T]

posterior cubature error

Source code in fastgps/abstract_gp.py
def post_cubature_error(self, task:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, confidence:float=0.99, eval:bool=True):
    """
    Posterior cubature error. 

    Args:
        task (Union[int,torch.Tensor[T]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`
        confidence (float): confidence level in $(0,1)$ for the credible interval

    Returns:
        pcvar (torch.Tensor[T]): posterior cubature variance
        quantile (np.float64):
            ```python
            scipy.stats.norm.ppf(1-(1-confidence)/2)
            ```
        pcerror (torch.Tensor[T]): posterior cubature error
    """
    assert np.isscalar(confidence) and 0<confidence<1, "confidence must be between 0 and 1"
    q = scipy.stats.norm.ppf(1-(1-confidence)/2)
    pcvar = self.post_cubature_var(task=task,n=n,eval=eval)
    pcstd = torch.sqrt(pcvar)
    pcerror = q*pcstd
    return pcvar,q,pcerror

post_cubature_ci

post_cubature_ci(task=None, confidence=0.99, eval=True)

Posterior cubature credible.

Parameters:

Name Type Description Default
task Union[int, Tensor[T]]

task indices

None
confidence float

confidence level in \((0,1)\) for the credible interval

0.99
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pcmean Tensor[..., T]

posterior cubature mean

pcvar Tensor[T]

posterior cubature variance

quantile float64
scipy.stats.norm.ppf(1-(1-confidence)/2)
pcci_low Tensor[..., T]

posterior cubature credible interval lower bound

pcci_high Tensor[..., T]

posterior cubature credible interval upper bound

Source code in fastgps/abstract_gp.py
def post_cubature_ci(self, task:Union[int,torch.Tensor]=None, confidence:float=0.99, eval:bool=True):
    """
    Posterior cubature credible.

    Args:
        task (Union[int,torch.Tensor[T]]): task indices
        confidence (float): confidence level in $(0,1)$ for the credible interval
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pcmean (torch.Tensor[...,T]): posterior cubature mean
        pcvar (torch.Tensor[T]): posterior cubature variance
        quantile (np.float64):
            ```python
            scipy.stats.norm.ppf(1-(1-confidence)/2)
            ```
        pcci_low (torch.Tensor[...,T]): posterior cubature credible interval lower bound
        pcci_high (torch.Tensor[...,T]): posterior cubature credible interval upper bound
    """
    assert np.isscalar(confidence) and 0<confidence<1, "confidence must be between 0 and 1"
    q = scipy.stats.norm.ppf(1-(1-confidence)/2)
    pcmean = self.post_cubature_mean(task=task,eval=eval) 
    pcvar,q,pcerror = self.post_cubature_error(task=task,confidence=confidence,eval=eval)
    pcci_low = pcmean-pcerror
    pcci_high = pcmean+pcerror
    return pcmean,pcvar,q,pcci_low,pcci_high

StandardGP

StandardGP(kernel, seqs, noise=0.0001, tfs_noise=(qp.util.transforms.tf_exp_eps_inv, qp.util.transforms.tf_exp_eps), requires_grad_noise=False, shape_noise=torch.Size([1]), derivatives=None, derivatives_coeffs=None, adaptive_nugget=True, data=None, ptransform=None)

Bases: AbstractGP

Standard Gaussian process regression

Examples:

>>> device = "cpu"
>>> if device!="mps":
...     torch.set_default_dtype(torch.float64)
>>> def f_ackley(x, a=20, b=0.2, c=2*np.pi, scaling=32.768):
...     # https://www.sfu.ca/~ssurjano/ackley.html
...     assert x.ndim==2
...     x = 2*scaling*x-scaling
...     t1 = a*torch.exp(-b*torch.sqrt(torch.mean(x**2,1)))
...     t2 = torch.exp(torch.mean(torch.cos(c*x),1))
...     t3 = a+np.exp(1)
...     y = -t1-t2+t3
...     return y
>>> n = 2**6
>>> d = 2
>>> sgp = StandardGP(
...     qp.KernelSquaredExponential(d,torchify=True,device=device),
...     qp.DigitalNetB2(dimension=d,seed=7))
>>> x_next = sgp.get_x_next(n)
>>> y_next = f_ackley(x_next)
>>> sgp.add_y_next(y_next)
>>> rng = torch.Generator().manual_seed(17)
>>> x = torch.rand((2**7,d),generator=rng).to(device)
>>> y = f_ackley(x)
>>> pmean = sgp.post_mean(x)
>>> pmean.shape
torch.Size([128])
>>> torch.linalg.norm(y-pmean)/torch.linalg.norm(y)
tensor(0.0817)
>>> torch.linalg.norm(sgp.post_mean(sgp.x)-sgp.y)/torch.linalg.norm(y)
tensor(0.0402)
>>> data = sgp.fit(verbose=0)
>>> list(data.keys())
[]
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0472)
>>> z = torch.rand((2**8,d),generator=rng).to(device)
>>> pcov = sgp.post_cov(x,z)
>>> pcov.shape
torch.Size([128, 256])
>>> pcov = sgp.post_cov(x,x)
>>> pcov.shape
torch.Size([128, 128])
>>> (pcov.diagonal()>=0).all()
tensor(True)
>>> pvar = sgp.post_var(x)
>>> pvar.shape
torch.Size([128])
>>> torch.allclose(pcov.diagonal(),pvar)
True
>>> pmean,pstd,q,ci_low,ci_high = sgp.post_ci(x,confidence=0.99)
>>> ci_low.shape
torch.Size([128])
>>> ci_high.shape
torch.Size([128])
>>> sgp.post_cubature_mean()
tensor(20.3665)
>>> sgp.post_cubature_var()
tensor(0.0015)
>>> pcmean,pcvar,q,pcci_low,pcci_high = sgp.post_cubature_ci(confidence=0.99)
>>> pcci_low
tensor(20.2684)
>>> pcci_high
tensor(20.4647)
>>> pcov_future = sgp.post_cov(x,z,n=2*n)
>>> pvar_future = sgp.post_var(x,n=2*n)
>>> pcvar_future = sgp.post_cubature_var(n=2*n)
>>> x_next = sgp.get_x_next(2*n)
>>> y_next = f_ackley(x_next)
>>> sgp.add_y_next(y_next)
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0589)
>>> torch.allclose(sgp.post_cov(x,z),pcov_future)
True
>>> torch.allclose(sgp.post_var(x),pvar_future)
True
>>> torch.allclose(sgp.post_cubature_var(),pcvar_future)
True
>>> data = sgp.fit(verbose=False)
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0440)
>>> x_next = sgp.get_x_next(4*n)
>>> y_next = f_ackley(x_next)
>>> sgp.add_y_next(y_next)
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0923)
>>> data = sgp.fit(verbose=False)
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0412)
>>> pcov_16n = sgp.post_cov(x,z,n=16*n)
>>> pvar_16n = sgp.post_var(x,n=16*n)
>>> pcvar_16n = sgp.post_cubature_var(n=16*n)
>>> x_next = sgp.get_x_next(16*n)
>>> y_next = f_ackley(x_next)
>>> sgp.add_y_next(y_next)
>>> torch.allclose(sgp.post_cov(x,z),pcov_16n)
True
>>> torch.allclose(sgp.post_var(x),pvar_16n)
True
>>> torch.allclose(sgp.post_cubature_var(),pcvar_16n)
True

Different loss metrics for fitting

>>> n = 2**6
>>> d = 3
>>> sgp = StandardGP(
...     qp.KernelMatern52(d,torchify=True,device=device),
...     qp.DigitalNetB2(dimension=d,seed=7))
>>> x_next = sgp.get_x_next(n)
>>> y_next = torch.stack([torch.sin(x_next).sum(-1),torch.cos(x_next).sum(-1)],axis=0)
>>> sgp.add_y_next(y_next)
>>> data = sgp.fit(loss_metric="MLL",iterations=5,verbose=0)
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights=1/torch.arange(1,2*n+1,device=device).reshape((2,n)))
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights="L2R")
>>> data = sgp.fit(loss_metric="GCV",iterations=5,verbose=0)

Data Driven

>>> x = [
...     torch.rand((3,1),generator=rng).to(device),
...     torch.rand((12,1),generator=rng).to(device),
... ]
>>> y = [
...     torch.stack([torch.sin(2*np.pi*x[0][:,0]),torch.cos(2*np.pi*x[0][:,0]),torch.acos(x[0][:,0])],dim=0).to(device),
...     torch.stack([4*torch.sin(2*np.pi*x[1][:,0]),4*torch.cos(2*np.pi*x[1][:,0]),4*torch.acos(x[1][:,0])],dim=0).to(device),
... ]
>>> sgp = StandardGP(
...     qp.KernelMultiTask(
...         qp.KernelGaussian(d=1,torchify=True,device=device),
...         num_tasks = 2,
...     ),
...     seqs={"x":x,"y":y},
...     noise = 1e-3,
... )
>>> data = sgp.fit(verbose=0,iterations=10)
>>> xticks = torch.linspace(0,1,101,device=device) 
>>> pmean,pvar,q,pci_low,pci_high = sgp.post_ci(xticks[:,None])
>>> pcmean,pcvar,q,pcci_low,pcci_high =  sgp.post_cubature_ci()

Batch Inference

>>> d = 4
>>> n = 2**10
>>> dnb2 = qp.DigitalNetB2(d,seed=11) 
>>> kernel = qp.KernelGaussian(d,torchify=True,shape_scale=(2,1),shape_lengthscales=(3,2,d))
>>> fgp = StandardGP(kernel,dnb2) 
>>> x = fgp.get_x_next(n) 
>>> x.shape
torch.Size([1024, 4])
>>> y = (x**torch.arange(6).reshape((3,2))[:,:,None,None]).sum(-1)
>>> y.shape
torch.Size([3, 2, 1024])
>>> fgp.add_y_next(y) 
>>> data = fgp.fit(verbose=0)
>>> fgp.post_cubature_mean()
tensor([[4.0000, 2.0000],
        [1.3333, 1.0000],
        [0.8000, 0.6666]])
>>> pcv = fgp.post_cubature_var()
>>> pcv.shape
torch.Size([3, 2])
>>> (pcv<5e-6).all()
tensor(True)
>>> pcv4 = fgp.post_cubature_var(n=4*n)
>>> pcv4.shape
torch.Size([3, 2])

Parameters:

Name Type Description Default
kernel AbstractKernel

Kernel object. Set to qp.KernelMultiTask for a multi-task GP.

required
seqs Union[int,qp.DiscreteDistribution,List]]

list of sequence generators. If an int seed is passed in we use

[qp.DigitalNetB2(d,seed=seed_i) for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
See the qp.DiscreteDistribution docs for more info.

required
noise float

positive noise variance i.e. nugget term

0.0001
tfs_noise Tuple[callable, callable]

the first argument transforms to the raw value to be optimized, the second applies the inverse transform

(tf_exp_eps_inv, tf_exp_eps)
requires_grad_noise bool

wheather or not to optimize the noise parameter

False
shape_noise Size

shape of the noise parameter, defaults to torch.Size([1])

Size([1])
derivatives list

list of derivative orders e.g. to include a function and its gradient set

derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]

None
derivatives_coeffs list

list of derivative coefficients where if derivatives[k].shape==(p,d) then we should have derivatives_coeffs[k].shape==(p,)

None
adaptive_nugget bool

if True, use the adaptive nugget which modifies noises based on trace ratios.

True
data dict

dictory of data with keys 'x' and 'y' where data['x'] and data['y'] are both torch.Tensors or list of torch.Tensors with lengths equal to the number of tasks

None
ptransform str

periodization transform in [None, 'BAKER'] where 'BAKER' is also known as the tent transform.

None
Source code in fastgps/standard_gp.py
def __init__(self,
        kernel:qp.kernel.abstract_kernel.AbstractKernel,
        seqs:Union[qp.IIDStdUniform,int],
        noise:float = 1e-4,
        tfs_noise:Tuple[callable,callable] = (qp.util.transforms.tf_exp_eps_inv,qp.util.transforms.tf_exp_eps),
        requires_grad_noise:bool = False, 
        shape_noise:torch.Size = torch.Size([1]),
        derivatives:list = None,
        derivatives_coeffs:list = None,
        adaptive_nugget:bool = True,
        data:dict = None,
        ptransform:str = None,
        ):
    """
    Args:
        kernel (qp.AbstractKernel): Kernel object. Set to `qp.KernelMultiTask` for a multi-task GP.
        seqs (Union[int,qp.DiscreteDistribution,List]]): list of sequence generators. If an int `seed` is passed in we use 
            ```python
            [qp.DigitalNetB2(d,seed=seed_i) for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
            ```
            See the <a href="https://qp.readthedocs.io/en/latest/algorithms.html#discrete-distribution-class" target="_blank">`qp.DiscreteDistribution` docs</a> for more info. 
        noise (float): positive noise variance i.e. nugget term
        tfs_noise (Tuple[callable,callable]): the first argument transforms to the raw value to be optimized, the second applies the inverse transform
        requires_grad_noise (bool): wheather or not to optimize the noise parameter
        shape_noise (torch.Size): shape of the noise parameter, defaults to `torch.Size([1])`
        derivatives (list): list of derivative orders e.g. to include a function and its gradient set 
            ```python
            derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]
            ```
        derivatives_coeffs (list): list of derivative coefficients where if `derivatives[k].shape==(p,d)` then we should have `derivatives_coeffs[k].shape==(p,)`
        adaptive_nugget (bool): if True, use the adaptive nugget which modifies noises based on trace ratios.  
        data (dict): dictory of data with keys 'x' and 'y' where data['x'] and data['y'] are both `torch.Tensor`s or list of `torch.Tensor`s with lengths equal to the number of tasks
        ptransform (str): periodization transform in `[None, 'BAKER']` where `'BAKER'` is also known as the tent transform.
    """
    self._XBDTYPE = torch.get_default_dtype()
    self._FTOUTDTYPE = torch.get_default_dtype()
    if isinstance(kernel,qp.KernelMultiTask):
        solo_task = False
        num_tasks = kernel.num_tasks
        default_task = torch.arange(num_tasks)
    else:
        solo_task = True
        default_task = 0 
        num_tasks = 1
    if isinstance(seqs,dict):
        data = seqs
        assert "x" in data and "y" in data, "dict seqs must have keys 'x' and 'y'"
        if isinstance(data["x"],torch.Tensor): data["x"] = [data["x"]]
        if isinstance(data["y"],torch.Tensor): data["y"] = [data["y"]]
        assert isinstance(data["x"],list) and len(data["x"])==num_tasks and all(isinstance(x_l,torch.Tensor) and x_l.ndim==2 and x_l.size(1)==kernel.d for x_l in data["x"]), "data['x'] should be a list of 2d tensors of length num_tasks with each number of columns equal to the dimension"
        assert isinstance(data["y"],list) and len(data["y"])==num_tasks and all(isinstance(y_l,torch.Tensor) and y_l.ndim>=1 for y_l in data["y"]), "data['y'] should be a list of tensors of length num_tasks"
        seqs = np.array([DummyDiscreteDistrib(data["x"][l].cpu().detach().numpy()) for l in range(num_tasks)],dtype=object)
    else:
        data = None
        if isinstance(seqs,int):
            global_seed = seqs
            seqs = np.array([qp.DigitalNetB2(kernel.d,seed=seed,order="GRAY") for seed in np.random.SeedSequence(global_seed).spawn(num_tasks)],dtype=object)
        if isinstance(seqs,qp.DiscreteDistribution):
            seqs = np.array([seqs],dtype=object)
        if isinstance(seqs,list):
            seqs = np.array(seqs,dtype=object)
    assert seqs.shape==(num_tasks,), "seqs should be a length num_tasks=%d list"%num_tasks
    assert all(seqs[i].replications==1 for i in range(num_tasks)) and "each seq should have only 1 replication"
    super().__init__(
        kernel,
        seqs,
        num_tasks,
        default_task,
        solo_task,
        noise,
        tfs_noise,
        requires_grad_noise,
        shape_noise,
        derivatives,
        derivatives_coeffs,
        adaptive_nugget,
        ptransform,
    )
    if data is not None:
        self.add_y_next(data["y"],task=torch.arange(self.num_tasks))

AbstractFastGP

AbstractFastGP(ft, ift, omega, *args, **kwargs)

Bases: AbstractGP

Source code in fastgps/abstract_fast_gp.py
def __init__(self,
        ft,
        ift,
        omega,
        *args,
        **kwargs
    ):
    super().__init__(*args,**kwargs)
    # fast transforms 
    self.ft_unstable = ft
    self.ift_unstable = ift
    self.omega = omega
    # storage and dynamic caches
    self.k1parts_seq = np.array([[_K1PartsSeq(self,l0,l1,*self.derivatives_cross[l0][l1]) if l1>=l0 else None for l1 in range(self.num_tasks)] for l0 in range(self.num_tasks)],dtype=object)
    self.lam_caches = np.array([[_LamCaches(self,l0,l1,*self.derivatives_cross[l0][l1],self.derivatives_coeffs_cross[l0][l1]) if l1>=l0 else None for l1 in range(self.num_tasks)] for l0 in range(self.num_tasks)],dtype=object)
    self.ytilde_cache = np.array([_YtildeCache(self,i) for i in range(self.num_tasks)],dtype=object)

ft

ft(x)

One dimensional fast transform along the last dimenions. For FastGPLattice this is the orthonormal Fast Fourier Transform (FFT). For FastGPDigitalNetB2 this is the orthonormal Fast Walsh Hadamard Transform (FWHT).

Parameters:

Name Type Description Default
x Tensor

inputs to be transformed along the last dimension. Require n = x.size(-1) is a power of 2.

required

Returns:

Name Type Description
y Tensor

transformed inputs with the same shape as x

Source code in fastgps/abstract_fast_gp.py
def ft(self, x):
    """
    One dimensional fast transform along the last dimenions. 
        For `FastGPLattice` this is the orthonormal Fast Fourier Transform (FFT). 
        For `FastGPDigitalNetB2` this is the orthonormal Fast Walsh Hadamard Transform (FWHT). 

    Args: 
        x (torch.Tensor): inputs to be transformed along the last dimension. Require `n = x.size(-1)` is a power of 2. 

    Returns: 
        y (torch.Tensor): transformed inputs with the same shape as `x` 
    """
    xmean = x.mean(-1)
    y = self.ft_unstable(x-xmean[...,None])
    y[...,0] += xmean*np.sqrt(x.size(-1))
    return y

ift

ift(x)

One dimensional inverse fast transform along the last dimenions. For FastGPLattice this is the orthonormal Inverse Fast Fourier Transform (IFFT). For FastGPDigitalNetB2 this is the orthonormal Fast Walsh Hadamard Transform (FWHT).

Parameters:

Name Type Description Default
x Tensor

inputs to be transformed along the last dimension. Require n = x.size(-1) is a power of 2.

required

Returns:

Name Type Description
y Tensor

transformed inputs with the same shape as x

Source code in fastgps/abstract_fast_gp.py
def ift(self, x):
    """
    One dimensional inverse fast transform along the last dimenions. 
        For `FastGPLattice` this is the orthonormal Inverse Fast Fourier Transform (IFFT). 
        For `FastGPDigitalNetB2` this is the orthonormal Fast Walsh Hadamard Transform (FWHT). 

    Args: 
        x (torch.Tensor): inputs to be transformed along the last dimension. Require `n = x.size(-1)` is a power of 2. 

    Returns: 
        y (torch.Tensor): transformed inputs with the same shape as `x` 
    """
    xmean = x.mean(-1)
    y = self.ift_unstable(x-xmean[...,None])
    y[...,0] += xmean*np.sqrt(x.size(-1))
    return y

FastGPLattice

FastGPLattice(kernel, seqs, noise=2 * qp.util.transforms.EPS64, tfs_noise=(qp.util.transforms.tf_exp_eps_inv, qp.util.transforms.tf_exp_eps), requires_grad_noise=False, shape_noise=torch.Size([1]), derivatives=None, derivatives_coeffs=None, adaptive_nugget=False, ptransform=None)

Bases: AbstractFastGP

Fast Gaussian process regression using lattice points and shift invariant kernels

Examples:

>>> device = "cpu"
>>> if device!="mps":
...     torch.set_default_dtype(torch.float64)
>>> def f_ackley(x, a=20, b=0.2, c=2*np.pi, scaling=32.768):
...     # https://www.sfu.ca/~ssurjano/ackley.html
...     assert x.ndim==2
...     x = 2*scaling*x-scaling
...     t1 = a*torch.exp(-b*torch.sqrt(torch.mean(x**2,1)))
...     t2 = torch.exp(torch.mean(torch.cos(c*x),1))
...     t3 = a+np.exp(1)
...     y = -t1-t2+t3
...     return y
>>> n = 2**10
>>> d = 2
>>> fgp = FastGPLattice(
...     qp.KernelShiftInvar(d,torchify=True,device=device),
...     seqs = qp.Lattice(dimension=d,seed=7))
>>> x_next = fgp.get_x_next(n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> rng = torch.Generator().manual_seed(17)
>>> x = torch.rand((2**7,d),generator=rng).to(device)
>>> y = f_ackley(x)
>>> pmean = fgp.post_mean(x)
>>> pmean.shape
torch.Size([128])
>>> torch.linalg.norm(y-pmean)/torch.linalg.norm(y)
tensor(0.0334)
>>> torch.allclose(fgp.post_mean(fgp.x),fgp.y,atol=1e-3)
True
>>> fgp.post_cubature_mean()
tensor(20.1842)
>>> fgp.post_cubature_var()
tensor(1.2005e-09)
>>> data = fgp.fit(verbose=0)
>>> list(data.keys())
[]
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0360)
>>> z = torch.rand((2**8,d),generator=rng).to(device)
>>> pcov = fgp.post_cov(x,z)
>>> pcov.shape
torch.Size([128, 256])
>>> pcov = fgp.post_cov(x,x)
>>> pcov.shape
torch.Size([128, 128])
>>> (pcov.diagonal()>=0).all()
tensor(True)
>>> pvar = fgp.post_var(x)
>>> pvar.shape
torch.Size([128])
>>> torch.allclose(pcov.diagonal(),pvar)
True
>>> pmean,pstd,q,ci_low,ci_high = fgp.post_ci(x,confidence=0.99)
>>> ci_low.shape
torch.Size([128])
>>> ci_high.shape
torch.Size([128])
>>> fgp.post_cubature_mean()
tensor(20.1842)
>>> fgp.post_cubature_var()
tensor(2.8903e-06)
>>> pcmean,pcvar,q,pcci_low,pcci_high = fgp.post_cubature_ci(confidence=0.99)
>>> pcci_low
tensor(20.1798)
>>> pcci_high
tensor(20.1886)
>>> pcov_future = fgp.post_cov(x,z,n=2*n)
>>> pvar_future = fgp.post_var(x,n=2*n)
>>> pcvar_future = fgp.post_cubature_var(n=2*n)
>>> x_next = fgp.get_x_next(2*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0295)
>>> torch.allclose(fgp.post_cov(x,z),pcov_future)
True
>>> torch.allclose(fgp.post_var(x),pvar_future)
True
>>> torch.allclose(fgp.post_cubature_var(),pcvar_future)
True
>>> data = fgp.fit(verbose=False)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0274)
>>> x_next = fgp.get_x_next(4*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0277)
>>> data = fgp.fit(verbose=False)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0276)
>>> pcov_16n = fgp.post_cov(x,z,n=16*n)
>>> pvar_16n = fgp.post_var(x,n=16*n)
>>> pcvar_16n = fgp.post_cubature_var(n=16*n)
>>> x_next = fgp.get_x_next(16*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.allclose(fgp.post_cov(x,z),pcov_16n)
True
>>> torch.allclose(fgp.post_var(x),pvar_16n)
True
>>> torch.allclose(fgp.post_cubature_var(),pcvar_16n)
True

Different loss metrics for fitting

>>> n = 2**6
>>> d = 3
>>> sgp = FastGPLattice(
...     qp.KernelShiftInvar(d,torchify=True,device=device),
...     qp.Lattice(dimension=d,seed=7))
>>> x_next = sgp.get_x_next(n)
>>> y_next = torch.stack([torch.sin(x_next).sum(-1),torch.cos(x_next).sum(-1)],axis=0)
>>> sgp.add_y_next(y_next)
>>> data = sgp.fit(loss_metric="MLL",iterations=5,verbose=0)
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights=1/torch.arange(1,2*n+1,device=device).reshape((2,n)))
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights="L2R")
>>> data = sgp.fit(loss_metric="GCV",iterations=5,verbose=0)

Batch Inference

>>> d = 4
>>> n = 2**10
>>> dnb2 = qp.Lattice(d,seed=7) 
>>> kernel = qp.KernelSICombined(d,torchify=True,shape_alpha=(2,4,d),shape_scale=(2,1),shape_lengthscales=(3,2,d))
>>> fgp = FastGPLattice(kernel,dnb2) 
>>> x = fgp.get_x_next(n) 
>>> x.shape
torch.Size([1024, 4])
>>> y = (x**torch.arange(6).reshape((3,2))[:,:,None,None]).sum(-1)
>>> y.shape
torch.Size([3, 2, 1024])
>>> fgp.add_y_next(y) 
>>> data = fgp.fit(verbose=0)
>>> fgp.post_cubature_mean()
tensor([[4.0000, 2.0001],
        [1.3334, 1.0001],
        [0.8001, 0.6668]])
>>> fgp.post_cubature_var()
tensor([[0.0008, 0.0008],
        [0.0008, 0.0008],
        [0.0008, 0.0008]])
>>> fgp.post_cubature_var(n=4*n)
tensor([[0., 0.],
        [0., 0.],
        [0., 0.]])

Parameters:

Name Type Description Default
kernel (KernelShiftInvar, KernelShiftInvarCombined)

Kernel object. Set to qp.KernelMultiTask for a multi-task GP.

required
seqs [int, Lattice, List]

list of lattice sequence generators with order="RADICAL INVERSE" and randomize in ["FALSE","SHIFT"]. If an int seed is passed in we use

[qp.Lattice(d,seed=seed_i,randomize="SHIFT") for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
See the qp.Lattice docs for more info

required
noise float

positive noise variance i.e. nugget term

2 * EPS64
tfs_noise Tuple[callable, callable]

the first argument transforms to the raw value to be optimized, the second applies the inverse transform

(tf_exp_eps_inv, tf_exp_eps)
requires_grad_noise bool

wheather or not to optimize the noise parameter

False
shape_noise Size

shape of the noise parameter, defaults to torch.Size([1])

Size([1])
derivatives list

list of derivative orders e.g. to include a function and its gradient set

derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]

None
derivatives_coeffs list

list of derivative coefficients where if derivatives[k].shape==(p,d) then we should have derivatives_coeffs[k].shape==(p,)

None
adaptive_nugget bool

if True, use the adaptive nugget which modifies noises based on trace ratios.

False
ptransform str

periodization transform in [None, 'BAKER'] where 'BAKER' is also known as the tent transform.

None
Source code in fastgps/fast_gp_lattice.py
def __init__(self,
        kernel:Union[qp.KernelShiftInvar,qp.KernelShiftInvarCombined],
        seqs:qp.Lattice,
        noise:float = 2*qp.util.transforms.EPS64, 
        tfs_noise:Tuple[callable,callable] = (qp.util.transforms.tf_exp_eps_inv,qp.util.transforms.tf_exp_eps),
        requires_grad_noise:bool = False, 
        shape_noise:torch.Size = torch.Size([1]),
        derivatives:list = None,
        derivatives_coeffs:list = None,
        adaptive_nugget:bool = False,
        ptransform:str = None,
        ):
    """
    Args:
        kernel (qp.KernelShiftInvar,qp.KernelShiftInvarCombined): Kernel object. Set to `qp.KernelMultiTask` for a multi-task GP.
        seqs ([int,qp.Lattice,List]): list of lattice sequence generators
            with order="RADICAL INVERSE" and randomize in `["FALSE","SHIFT"]`. If an int `seed` is passed in we use 
            ```python
            [qp.Lattice(d,seed=seed_i,randomize="SHIFT") for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
            ```
            See the <a href="https://qp.readthedocs.io/en/latest/algorithms.html#module-qp.discrete_distribution.lattice.lattice" target="_blank">`qp.Lattice` docs</a> for more info
        noise (float): positive noise variance i.e. nugget term
        tfs_noise (Tuple[callable,callable]): the first argument transforms to the raw value to be optimized, the second applies the inverse transform
        requires_grad_noise (bool): wheather or not to optimize the noise parameter
        shape_noise (torch.Size): shape of the noise parameter, defaults to `torch.Size([1])`
        derivatives (list): list of derivative orders e.g. to include a function and its gradient set 
            ```python
            derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]
            ```
        derivatives_coeffs (list): list of derivative coefficients where if `derivatives[k].shape==(p,d)` then we should have `derivatives_coeffs[k].shape==(p,)`
        adaptive_nugget (bool): if True, use the adaptive nugget which modifies noises based on trace ratios.  
        ptransform (str): periodization transform in `[None, 'BAKER']` where `'BAKER'` is also known as the tent transform.
    """
    self._XBDTYPE = torch.get_default_dtype()
    self._FTOUTDTYPE = torch.complex64 if torch.get_default_dtype()==torch.float32 else torch.complex128
    if isinstance(kernel,qp.KernelMultiTask):
        solo_task = False
        num_tasks = kernel.num_tasks
        default_task = torch.arange(num_tasks)
    else:
        solo_task = True
        default_task = 0 
        num_tasks = 1
    if isinstance(seqs,int):
        global_seed = seqs
        seqs = np.array([qp.Lattice(kernel.d,seed=seed,randomize="SHIFT") for seed in np.random.SeedSequence(global_seed).spawn(num_tasks)],dtype=object)
    if isinstance(seqs,qp.Lattice):
        seqs = np.array([seqs],dtype=object)
    if isinstance(seqs,list):
        seqs = np.array(seqs,dtype=object)
    assert seqs.shape==(num_tasks,), "seqs should be a length num_tasks=%d list"%num_tasks
    assert all(isinstance(seqs[i],qp.Lattice) for i in range(num_tasks)), "each seq should be a qp.Lattice instances"
    assert all(seqs[i].order=="RADICAL INVERSE" for i in range(num_tasks)), "each seq should be in 'RADICAL INVERSE' order "
    assert all(seqs[i].replications==1 for i in range(num_tasks)) and "each seq should have only 1 replication"
    assert all(seqs[i].randomize in ['FALSE','SHIFT'] for i in range(num_tasks)), "each seq should have randomize in ['FALSE','SHIFT']"
    ft = qp.fftbr_torch
    ift = qp.ifftbr_torch
    omega = qp.omega_fftbr_torch
    super().__init__(
        ft,
        ift,
        omega,
        kernel,
        seqs,
        num_tasks,
        default_task,
        solo_task,
        noise,
        tfs_noise,
        requires_grad_noise,
        shape_noise,
        derivatives,
        derivatives_coeffs,
        adaptive_nugget,
        ptransform,
    )

FastGPDigitalNetB2

FastGPDigitalNetB2(kernel, seqs, noise=2 * qp.util.transforms.EPS64, tfs_noise=(qp.util.transforms.tf_exp_eps_inv, qp.util.transforms.tf_exp_eps), requires_grad_noise=False, shape_noise=torch.Size([1]), derivatives=None, derivatives_coeffs=None, adaptive_nugget=False, ptransform=None)

Bases: AbstractFastGP

Fast Gaussian process regression using digitally shifted digital nets paired with digitally shift invariant kernels

Examples:

>>> device = "cpu"
>>> if device!="mps":
...     torch.set_default_dtype(torch.float64)
>>> def f_ackley(x, a=20, b=0.2, c=2*np.pi, scaling=32.768):
...     # https://www.sfu.ca/~ssurjano/ackley.html
...     assert x.ndim==2
...     x = 2*scaling*x-scaling
...     t1 = a*torch.exp(-b*torch.sqrt(torch.mean(x**2,1)))
...     t2 = torch.exp(torch.mean(torch.cos(c*x),1))
...     t3 = a+np.exp(1)
...     y = -t1-t2+t3
...     return y
>>> n = 2**10
>>> d = 2
>>> fgp = FastGPDigitalNetB2(
...     qp.KernelDigShiftInvar(d,torchify=True,device=device),
...     qp.DigitalNetB2(dimension=d,seed=7))
>>> x_next = fgp.get_x_next(n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> rng = torch.Generator().manual_seed(17)
>>> x = torch.rand((2**7,d),generator=rng).to(device)
>>> y = f_ackley(x)
>>> pmean = fgp.post_mean(x)
>>> pmean.shape
torch.Size([128])
>>> torch.linalg.norm(y-pmean)/torch.linalg.norm(y)
tensor(0.0308)
>>> torch.allclose(fgp.post_mean(fgp.x),fgp.y)
True
>>> data = fgp.fit(verbose=0)
>>> list(data.keys())
[]
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0328)
>>> z = torch.rand((2**8,d),generator=rng).to(device)
>>> pcov = fgp.post_cov(x,z)
>>> pcov.shape
torch.Size([128, 256])
>>> pcov = fgp.post_cov(x,x)
>>> pcov.shape
torch.Size([128, 128])
>>> (pcov.diagonal()>=0).all()
tensor(True)
>>> pvar = fgp.post_var(x)
>>> pvar.shape
torch.Size([128])
>>> torch.allclose(pcov.diagonal(),pvar,atol=1e-5)
True
>>> pmean,pstd,q,ci_low,ci_high = fgp.post_ci(x,confidence=0.99)
>>> ci_low.shape
torch.Size([128])
>>> ci_high.shape
torch.Size([128])
>>> fgp.post_cubature_mean()
tensor(20.1846)
>>> fgp.post_cubature_var()
tensor(0.0002)
>>> pcmean,pcvar,q,pcci_low,pcci_high = fgp.post_cubature_ci(confidence=0.99)
>>> pcci_low
tensor(20.1466)
>>> pcci_high
tensor(20.2227)
>>> pcov_future = fgp.post_cov(x,z,n=2*n)
>>> pvar_future = fgp.post_var(x,n=2*n)
>>> pcvar_future = fgp.post_cubature_var(n=2*n)
>>> x_next = fgp.get_x_next(2*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0267)
>>> torch.allclose(fgp.post_cov(x,z),pcov_future)
True
>>> torch.allclose(fgp.post_var(x),pvar_future)
True
>>> torch.allclose(fgp.post_cubature_var(),pcvar_future)
True
>>> data = fgp.fit(verbose=False)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0254)
>>> x_next = fgp.get_x_next(4*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0162)
>>> data = fgp.fit(verbose=False)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0132)
>>> pcov_16n = fgp.post_cov(x,z,n=16*n)
>>> pvar_16n = fgp.post_var(x,n=16*n)
>>> pcvar_16n = fgp.post_cubature_var(n=16*n)
>>> x_next = fgp.get_x_next(16*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.allclose(fgp.post_cov(x,z),pcov_16n)
True
>>> torch.allclose(fgp.post_var(x),pvar_16n)
True
>>> torch.allclose(fgp.post_cubature_var(),pcvar_16n)
True

Different loss metrics for fitting

>>> n = 2**6
>>> d = 3
>>> sgp = FastGPDigitalNetB2(
...     qp.KernelDigShiftInvar(d,torchify=True,device=device),
...     qp.DigitalNetB2(dimension=d,seed=7))
>>> x_next = sgp.get_x_next(n)
>>> y_next = torch.stack([torch.sin(x_next).sum(-1),torch.cos(x_next).sum(-1)],axis=0)
>>> sgp.add_y_next(y_next)
>>> data = sgp.fit(loss_metric="MLL",iterations=5,verbose=0)
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights=1/torch.arange(1,2*n+1,device=device).reshape((2,n)))
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights="L2R")
>>> data = sgp.fit(loss_metric="GCV",iterations=5,verbose=0)

Batch Inference

>>> d = 4
>>> n = 2**10
>>> dnb2 = qp.DigitalNetB2(d,seed=7) 
>>> kernel = qp.KernelDSICombined(d,torchify=True,shape_alpha=(2,4,d),shape_scale=(2,1),shape_lengthscales=(3,2,d))
>>> fgp = FastGPDigitalNetB2(kernel,dnb2) 
>>> x = fgp.get_x_next(n) 
>>> x.shape
torch.Size([1024, 4])
>>> y = (x**torch.arange(6).reshape((3,2))[:,:,None,None]).sum(-1)
>>> y.shape
torch.Size([3, 2, 1024])
>>> fgp.add_y_next(y) 
>>> data = fgp.fit(verbose=0)
>>> fgp.post_cubature_mean()
tensor([[4.0000, 2.0000],
        [1.3333, 1.0000],
        [0.8000, 0.6667]])
>>> fgp.post_cubature_var()
tensor([[2.2939e-16, 9.6623e-09],
        [1.6232e-08, 7.8090e-09],
        [3.7257e-08, 2.0389e-08]])
>>> fgp.post_cubature_var(n=4*n)
tensor([[2.9341e-18, 2.2246e-10],
        [2.8796e-10, 1.6640e-10],
        [5.9842e-10, 3.7897e-10]])

Parameters:

Name Type Description Default
kernel Union[KernelDigShiftInvar, KernelDigShiftInvarAdaptiveAlpha, KernelDigShiftInvarCombined]

Kernel object. Set to qp.KernelMultiTask for a multi-task GP.

required
seqs Union[int,qp.DigitalNetB2,List]]

list of digital sequence generators in base \(b=2\) with order="RADICAL INVERSE" and randomize in ["FALSE","DS"]. If an int seed is passed in we use

[qp.DigitalNetB2(d,seed=seed_i,randomize="DS") for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
See the qp.DigitalNetB2 docs for more info. If num_tasks==1 then randomize may be in ["FALSE","DS","LMS","LMS DS"].

required
noise float

positive noise variance i.e. nugget term

2 * EPS64
tfs_noise Tuple[callable, callable]

the first argument transforms to the raw value to be optimized, the second applies the inverse transform

(tf_exp_eps_inv, tf_exp_eps)
requires_grad_noise bool

wheather or not to optimize the noise parameter

False
shape_noise Size

shape of the noise parameter, defaults to torch.Size([1])

Size([1])
derivatives list

list of derivative orders e.g. to include a function and its gradient set

derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]

None
derivatives_coeffs list

list of derivative coefficients where if derivatives[k].shape==(p,d) then we should have derivatives_coeffs[k].shape==(p,)

None
adaptive_nugget bool

if True, use the adaptive nugget which modifies noises based on trace ratios.

False
ptransform str

periodization transform in [None, 'BAKER'] where 'BAKER' is also known as the tent transform.

None
Source code in fastgps/fast_gp_digital_net_b2.py
def __init__(self,
        kernel:Union[qp.KernelDigShiftInvar,qp.KernelDigShiftInvarAdaptiveAlpha,qp.KernelDigShiftInvarCombined],
        seqs:Union[qp.DigitalNetB2,int],
        noise:float = 2*qp.util.transforms.EPS64,
        tfs_noise:Tuple[callable,callable] = (qp.util.transforms.tf_exp_eps_inv,qp.util.transforms.tf_exp_eps),
        requires_grad_noise:bool = False, 
        shape_noise:torch.Size = torch.Size([1]),
        derivatives:list = None,
        derivatives_coeffs:list = None,
        adaptive_nugget:bool = False,
        ptransform:str = None,
        ):
    """
    Args:
        kernel (Union[qp.KernelDigShiftInvar,qp.KernelDigShiftInvarAdaptiveAlpha,qp.KernelDigShiftInvarCombined]): Kernel object. Set to `qp.KernelMultiTask` for a multi-task GP.
        seqs (Union[int,qp.DigitalNetB2,List]]): list of digital sequence generators in base $b=2$ 
            with order="RADICAL INVERSE" and randomize in `["FALSE","DS"]`. If an int `seed` is passed in we use 
            ```python
            [qp.DigitalNetB2(d,seed=seed_i,randomize="DS") for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
            ```
            See the <a href="https://qp.readthedocs.io/en/latest/algorithms.html#module-qp.discrete_distribution.digital_net_b2.digital_net_b2" target="_blank">`qp.DigitalNetB2` docs</a> for more info. 
            If `num_tasks==1` then randomize may be in `["FALSE","DS","LMS","LMS DS"]`. 
        noise (float): positive noise variance i.e. nugget term
        tfs_noise (Tuple[callable,callable]): the first argument transforms to the raw value to be optimized, the second applies the inverse transform
        requires_grad_noise (bool): wheather or not to optimize the noise parameter
        shape_noise (torch.Size): shape of the noise parameter, defaults to `torch.Size([1])`
        derivatives (list): list of derivative orders e.g. to include a function and its gradient set 
            ```python
            derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]
            ```
        derivatives_coeffs (list): list of derivative coefficients where if `derivatives[k].shape==(p,d)` then we should have `derivatives_coeffs[k].shape==(p,)`
        adaptive_nugget (bool): if True, use the adaptive nugget which modifies noises based on trace ratios.  
        ptransform (str): periodization transform in `[None, 'BAKER']` where `'BAKER'` is also known as the tent transform.
    """
    self._XBDTYPE = torch.int64
    self._FTOUTDTYPE = torch.get_default_dtype()
    if isinstance(kernel,qp.KernelMultiTask):
        solo_task = False
        num_tasks = kernel.num_tasks
        default_task = torch.arange(num_tasks)
    else:
        solo_task = True
        default_task = 0 
        num_tasks = 1
    if isinstance(seqs,int):
        global_seed = seqs
        seqs = np.array([qp.DigitalNetB2(kernel.d,seed=seed,randomize="DS") for seed in np.random.SeedSequence(global_seed).spawn(num_tasks)],dtype=object)
    if isinstance(seqs,qp.DigitalNetB2):
        seqs = np.array([seqs],dtype=object)
    if isinstance(seqs,list):
        seqs = np.array(seqs,dtype=object)
    assert seqs.shape==(num_tasks,), "seqs should be a length num_tasks=%d list"%num_tasks
    assert all(isinstance(seqs[i],qp.DigitalNetB2) for i in range(num_tasks)), "each seq should be a qp.DigitalNetB2 instances"
    assert all(seqs[i].order=="RADICAL INVERSE" for i in range(num_tasks)), "each seq should be in 'RADICAL INVERSE' order "
    assert all(seqs[i].replications==1 for i in range(num_tasks)) and "each seq should have only 1 replication"
    if num_tasks==1:
        assert seqs[0].randomize in ['FALSE','DS','LMS','LMS DS'], "seq should have randomize in ['FALSE','DS','LMS','LMS DS']"
    else:
        assert all(seqs[i].randomize in ['FALSE','DS'] for i in range(num_tasks)), "each seq should have randomize in ['FALSE','DS']"
    ts = torch.tensor([seqs[i].t for i in range(num_tasks)])
    assert (ts<64).all(), "each seq must have t<64"
    assert (ts==ts[0]).all(), "all seqs should have the same t"
    self.t = ts[0].item()
    if isinstance(kernel,qp.KernelMultiTask):
        kernel.base_kernel.set_t(self.t)
    else:
        kernel.set_t(self.t)
    ift = ft = qp.fwht_torch
    omega = qp.omega_fwht_torch
    super().__init__(
        ft,
        ift,
        omega,
        kernel,
        seqs,
        num_tasks,
        default_task,
        solo_task,
        noise,
        tfs_noise,
        requires_grad_noise,
        shape_noise,
        derivatives,
        derivatives_coeffs,
        adaptive_nugget,
        ptransform,
    )