Skip to content

API

AbstractGP

AbstractGP(kernel, seqs, num_tasks, default_task, solo_task, noise, tfs_noise, requires_grad_noise, shape_noise, derivatives, derivatives_coeffs, adaptive_nugget)

Bases: Module

Source code in fastgps/abstract_gp.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(
        self,
        kernel,
        seqs,
        num_tasks,
        default_task,
        solo_task,
        noise,
        tfs_noise,
        requires_grad_noise,
        shape_noise,
        derivatives,
        derivatives_coeffs,
        adaptive_nugget,
    ):
    super().__init__()
    if not torch.get_default_dtype()==torch.float64:
        warnings.warn('''
            Using torch.float32 precision may significantly hurt FastGPs accuracy. 
            This is especailly evident when computing posterior variance and covariance values. 
            If possible, please use
                torch.set_default_dtype(torch.float64)''')
    # copy kernel parameters 
    self.kernel = kernel
    assert self.kernel.torchify, "requires torchify=True for the kernel"
    self.device = self.kernel.device
    self.d = self.kernel.d
    # multi-task kernel abstraction
    assert isinstance(num_tasks,int) and num_tasks>0
    self.num_tasks = num_tasks
    self.default_task = default_task
    self.solo_task = solo_task
    self.task_range = torch.arange(num_tasks,device=self.device)
    if solo_task:
        self.kernel = qmcpy.KernelMultiTask(
            base_kernel = self.kernel,
            num_tasks = 1, 
            factor = 1.,
            diag =  0.,
            requires_grad_factor = False, 
            requires_grad_diag = False,
            tfs_diag = (qmcpy.util.transforms.tf_identity,qmcpy.util.transforms.tf_identity),
            rank_factor = 1)
    assert isinstance(self.kernel,qmcpy.KernelMultiTask)
    # seqs setup 
    assert isinstance(seqs,np.ndarray) and seqs.shape==(self.num_tasks,)
    assert all(seqs[i].d==self.d for i in range(self.num_tasks))
    self.seqs = seqs
    self.n = torch.zeros(self.num_tasks,dtype=int,device=self.device)
    self.m = -1*torch.ones(self.num_tasks,dtype=int,device=self.device)
    # derivatives setup 
    if derivatives is None: derivatives = [torch.zeros((1,self.d),dtype=torch.int64,device=self.device) for i in range(self.num_tasks)]
    if isinstance(derivatives,torch.Tensor): derivatives = [derivatives]
    assert isinstance(derivatives,list) and len(derivatives)==self.num_tasks
    derivatives = [deriv[None,:] if deriv.ndim==1 else deriv for deriv in derivatives]
    assert all((derivatives[i].ndim==2 and derivatives[i].size(1)==self.d) for i in range(self.num_tasks))
    if derivatives_coeffs is None: derivatives_coeffs = [torch.ones(len(derivatives[i]),device=self.device) for i in range(self.num_tasks)]
    assert isinstance(derivatives_coeffs,list) and len(derivatives_coeffs)==self.num_tasks
    assert all((derivatives_coeffs[i].ndim==1 and len(derivatives_coeffs[i]))==len(derivatives[i]) for i in range(self.num_tasks))
    self.derivatives_cross = [[[None,None] for l1 in range(self.num_tasks)] for l0 in range(self.num_tasks)]
    self.derivatives_coeffs_cross = [[None for l1 in range(self.num_tasks)] for l0 in range(self.num_tasks)]
    for l0 in range(self.num_tasks):
        p0r = torch.arange(len(derivatives_coeffs[l0]),device=self.device)
        for l1 in range(l0+1):
            p1r = torch.arange(len(derivatives_coeffs[l1]),device=self.device)
            i0m,i1m = torch.meshgrid(p0r,p1r,indexing="ij")
            i0,i1 = i0m.flatten(),i1m.flatten()
            self.derivatives_cross[l0][l1][0] = derivatives[l0][i0]
            self.derivatives_cross[l0][l1][1] = derivatives[l1][i1]
            self.derivatives_coeffs_cross[l0][l1] = derivatives_coeffs[l0][i0]*derivatives_coeffs[l1][i1]
            if l0!=l1:
                self.derivatives_cross[l1][l0][0] = self.derivatives_cross[l0][l1][1]
                self.derivatives_cross[l1][l0][1] = self.derivatives_cross[l0][l1][0]
                self.derivatives_coeffs_cross[l1][l0] = self.derivatives_coeffs_cross[l0][l1]
    # noise
    self.raw_noise,self.tf_noise = self.kernel.parse_assign_param(
        pname = "noise",
        param = noise,
        shape_param = shape_noise,
        requires_grad_param = requires_grad_noise,
        tfs_param = tfs_noise,
        endsize_ops = [1],
        constraints = ["POSITIVE"])
    # storage and dynamic caches
    self._y = [torch.empty(0,device=self.device) for l in range(self.num_tasks)]
    self.xxb_seqs = np.array([_XXbSeq(self,self.seqs[i]) for i in range(self.num_tasks)],dtype=object)
    self.coeffs_cache = _CoeffsCache(self)
    self.inv_log_det_cache_dict = {}
    # derivative multitask setting checks 
    if any((derivatives[i]>0).any() or (derivatives_coeffs[i]!=1).any() for i in range(self.num_tasks)):
        self.kernel.raw_factor.requires_grad_(False)
        self.kernel.raw_diag.requires_grad_(False)
        assert (self.kernel.taskmat==1).all()
    self.adaptive_nugget = adaptive_nugget
    self.batch_param_names = ["noise"]

noise property

noise

Noise parameter.

coeffs property

coeffs

Coefficients \(\mathsf{K}^{-1} \boldsymbol{y}\).

x property

x

Current sampling locations. A torch.Tensor for single task problems. A list for multitask problems.

y property

y

Current sampling values. A torch.Tensor for single task problems. A list for multitask problems.

save_params

save_params(path)

Save the state dict to path

Arg

path (str): the path.

Source code in fastgps/abstract_gp.py
111
112
113
114
115
116
117
def save_params(self, path):
    """ Save the state dict to path 

    Arg:
        path (str): the path. 
    """
    torch.save(self.state_dict(),path)

load_params

load_params(path)

Load the state dict from path

Arg

path (str): the path.

Source code in fastgps/abstract_gp.py
118
119
120
121
122
123
124
def load_params(self, path):
    """ Load the state dict from path 

    Arg:
        path (str): the path. 
    """
    self.load_state_dict(torch.load(path,weights_only=True))

fit

fit(loss_metric='MLL', iterations=5000, lr=None, optimizer=None, stop_crit_improvement_threshold=0.05, stop_crit_wait_iterations=10, store_hists=False, verbose=5, verbose_indent=4, cv_weights=1)

Parameters:

Name Type Description Default
loss_metric str

either "MLL" (Marginal Log Likelihood) or "CV" (Cross Validation) or "GCV" (Generalized CV)

'MLL'
iterations int

number of optimization iterations

5000
lr float

learning rate for default optimizer

None
optimizer Optimizer

optimizer defaulted to torch.optim.Rprop(self.parameters(),lr=lr)

None
stop_crit_improvement_threshold float

stop fitting when the maximum number of iterations is reached or the best loss is note reduced by stop_crit_improvement_threshold for stop_crit_wait_iterations iterations

0.05
stop_crit_wait_iterations int

number of iterations to wait for improved loss before early stopping, see the argument description for stop_crit_improvement_threshold

10
store_hists Union[bool, int]

store parameter data every store_hists iterations.

False
verbose int

log every verbose iterations, set to 0 for silent mode

5
verbose_indent int

size of the indent to be applied when logging, helpful for logging multiple models

4
cv_weights Union[str, Tensor]

weights for cross validation

1

Returns:

Name Type Description
hist_data dict

iteration history data.

Source code in fastgps/abstract_gp.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def fit(
        self,
        loss_metric:str = "MLL",
        iterations:int = 5000,
        lr:float = None,
        optimizer:torch.optim.Optimizer = None,
        stop_crit_improvement_threshold:float = 5e-2,
        stop_crit_wait_iterations:int = 10,
        store_hists:bool = False,
        verbose:int = 5,
        verbose_indent:int = 4,
        cv_weights:torch.Tensor = 1,
        ):
    """
    Args:
        loss_metric (str): either "MLL" (Marginal Log Likelihood) or "CV" (Cross Validation) or "GCV" (Generalized CV)
        iterations (int): number of optimization iterations
        lr (float): learning rate for default optimizer
        optimizer (torch.optim.Optimizer): optimizer defaulted to `torch.optim.Rprop(self.parameters(),lr=lr)`
        stop_crit_improvement_threshold (float): stop fitting when the maximum number of iterations is reached or the best loss is note reduced by `stop_crit_improvement_threshold` for `stop_crit_wait_iterations` iterations 
        stop_crit_wait_iterations (int): number of iterations to wait for improved loss before early stopping, see the argument description for `stop_crit_improvement_threshold`
        store_hists (Union[bool,int]): store parameter data every `store_hists` iterations.
        verbose (int): log every `verbose` iterations, set to `0` for silent mode
        verbose_indent (int): size of the indent to be applied when logging, helpful for logging multiple models
        cv_weights (Union[str,torch.Tensor]): weights for cross validation

    Returns:
        hist_data (dict): iteration history data.
    """
    assert isinstance(loss_metric,str) and loss_metric.upper() in ["MLL","GCV","CV"] 
    assert (self.n>0).any(), "cannot fit without data"
    assert isinstance(iterations,int) and iterations>=0
    if optimizer is None:
        optimizer = self.get_default_optimizer(lr)
    assert isinstance(optimizer,torch.optim.Optimizer)
    assert isinstance(store_hists,int), "require int store_mll_hist" 
    assert (isinstance(verbose,int) or isinstance(verbose,bool)) and verbose>=0, "require verbose is a non-negative int"
    assert isinstance(verbose_indent,int) and verbose_indent>=0, "require verbose_indent is a non-negative int"
    assert np.isscalar(stop_crit_improvement_threshold) and 0<stop_crit_improvement_threshold, "require stop_crit_improvement_threshold is a positive float"
    assert (isinstance(stop_crit_wait_iterations,int) or stop_crit_wait_iterations==np.inf) and stop_crit_wait_iterations>0
    loss_metric = loss_metric.upper()
    logtol = np.log(1+stop_crit_improvement_threshold)
    if isinstance(cv_weights,str) and cv_weights.upper()=="L2R":
        _y = self.y 
        if _y.ndim==1:
            cv_weights = 1/torch.abs(_y) 
        else:
            cv_weights = 1/torch.linalg.norm(_y,2,dim=[i for i in range(_y.ndim-1)])
    if store_hists:
        hist_data = {}
        hist_data["iteration"] = []
        hist_data["loss"] = []
        hist_data["best_loss"] = []
        for pname in self.batch_param_names:
            hist_data[pname] = []
        for pname in self.kernel.batch_param_names:
            hist_data[pname] = []
        for pname in self.kernel.base_kernel.batch_param_names:
            hist_data[pname] = []
    else:
        hist_data = {}
    if verbose:
        _s = "%16s | %-10s | %-10s | %-10s | %-10s"%("iter of %.1e"%iterations,"best loss","loss","term1","term2")
        print(" "*verbose_indent+_s)
        print(" "*verbose_indent+"~"*len(_s))
    stop_crit_best_loss = torch.inf 
    stop_crit_save_loss = torch.inf 
    stop_crit_iterations_without_improvement_loss = 0
    os.environ["FASTGP_FORCE_RECOMPILE"] = "True"
    inv_log_det_cache = self.get_inv_log_det_cache()
    for i in range(iterations+1):
        if loss_metric=="GCV":
            numer,denom = inv_log_det_cache.get_gcv_numer_denom()
            term1 = numer 
            term2 = denom
            loss = (term1/term2).sum()
        elif loss_metric=="MLL":
            norm_term,logdet = inv_log_det_cache.get_norm_term_logdet_term()
            d_out = norm_term.numel()
            term1 = norm_term.sum()
            mll_const = d_out*self.n.sum()*np.log(2*np.pi)
            term2 = d_out/torch.tensor(logdet.shape).prod()*logdet.sum()
            loss = 1/2*(term1+term2+mll_const)
        elif loss_metric=="CV":
            coeffs = self.coeffs
            del os.environ["FASTGP_FORCE_RECOMPILE"]
            inv_diag = inv_log_det_cache.get_inv_diag()
            os.environ["FASTGP_FORCE_RECOMPILE"] = "True"
            term1 = term2 = torch.nan*torch.ones(1)
            squared_sums = ((coeffs/inv_diag)**2*cv_weights).sum(-1,keepdim=True)
            loss = squared_sums.sum().real
        else:
            assert False, "loss_metric parsing implementation error"
        if loss.item()<stop_crit_best_loss:
            stop_crit_best_loss = loss.item()
            best_params = OrderedDict([(pname,pval.clone()) for pname,pval in self.state_dict().items()])
        if (stop_crit_save_loss-loss.item())>logtol:
            stop_crit_iterations_without_improvement_loss = 0
            stop_crit_save_loss = stop_crit_best_loss
        else:
            stop_crit_iterations_without_improvement_loss += 1
        break_condition = i==iterations or stop_crit_iterations_without_improvement_loss==stop_crit_wait_iterations
        if store_hists and (break_condition or i%store_hists==0):
            hist_data["iteration"].append(i)
            hist_data["loss"].append(loss.item())
            hist_data["best_loss"].append(stop_crit_best_loss)
            for pname in self.batch_param_names:
                hist_data[pname].append(getattr(self,pname).data.detach().clone().cpu())
            for pname in self.kernel.batch_param_names:
                hist_data[pname].append(getattr(self.kernel,pname).data.detach().clone().cpu())
            for pname in self.kernel.base_kernel.batch_param_names:
                hist_data[pname].append(getattr(self.kernel.base_kernel,pname).data.detach().clone().cpu())
        if verbose and (i%verbose==0 or break_condition):
            _s = "%16.2e | %-10.2e | %-10.2e | %-10.2e | %-10.2e"%(i,stop_crit_best_loss,loss.item(),term1.item() if term1.numel()==1 else torch.nan,term2.item() if term2.numel()==1 else torch.nan)
            print(" "*verbose_indent+_s)
        if break_condition: break
        # with torch.autograd.set_detect_anomaly(True):
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    self.load_state_dict(best_params)
    del os.environ["FASTGP_FORCE_RECOMPILE"]
    if store_hists:
        hist_data["iteration"] = torch.tensor(hist_data["iteration"])
        hist_data["loss"] = torch.tensor(hist_data["loss"])
        hist_data["best_loss"] = torch.tensor(hist_data["best_loss"])
        for pname in self.batch_param_names:
            hist_data[pname] = torch.stack(hist_data[pname],dim=0)
        for pname in self.kernel.batch_param_names:
            hist_data[pname] = torch.stack(hist_data[pname],dim=0)
        for pname in self.kernel.base_kernel.batch_param_names:
            hist_data[pname] = torch.stack(hist_data[pname],dim=0)
    return hist_data

get_x_next

get_x_next(n, task=None)

Get the next sampling locations.

Parameters:

Name Type Description Default
n Union[int, Tensor]

maximum sample index per task

required
task Union[int, Tensor]

task index

None

Returns:

Name Type Description
x_next Union[Tensor, List]

next samples in the sequence

Source code in fastgps/abstract_gp.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_x_next(self, n:Union[int,torch.Tensor], task:Union[int,torch.Tensor]=None):
    """
    Get the next sampling locations. 

    Args:
        n (Union[int,torch.Tensor]): maximum sample index per task
        task (Union[int,torch.Tensor]): task index

    Returns:
        x_next (Union[torch.Tensor,List]): next samples in the sequence
    """
    if isinstance(n,(int,np.int64)): n = torch.tensor([n],dtype=int,device=self.device) 
    if isinstance(n,list): n = torch.tensor(n,dtype=int,device=self.device)
    if task is None: task = self.default_task
    inttask = isinstance(task,int)
    if inttask: task = torch.tensor([task],dtype=int,device=self.device)
    if isinstance(task,list): task = torch.tensor(task,dtype=int,device=self.device)
    assert isinstance(n,torch.Tensor) and isinstance(task,torch.Tensor) and n.ndim==task.ndim==1 and len(n)==len(task)
    assert (n>=self.n[task]).all(), "maximum sequence index must be greater than the current number of samples"
    x_next = [self.xxb_seqs[l][self.n[l]:n[i]][0] for i,l in enumerate(task)]
    return x_next[0] if inttask else x_next

add_y_next

add_y_next(y_next, task=None)

Add samples to the GP.

Parameters:

Name Type Description Default
y_next Union[Tensor, List]

new function evaluations at next sampling locations

required
task Union[int, Tensor]

task index

None
Source code in fastgps/abstract_gp.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def add_y_next(self, y_next:Union[torch.Tensor,List], task:Union[int,torch.Tensor]=None):
    """
    Add samples to the GP. 

    Args:
        y_next (Union[torch.Tensor,List]): new function evaluations at next sampling locations
        task (Union[int,torch.Tensor]): task index
    """
    if isinstance(y_next,torch.Tensor): y_next = [y_next]
    if task is None: task = self.default_task
    if isinstance(task,int): task = torch.tensor([task],dtype=int,device=self.device)
    if isinstance(task,list): task = torch.tensor(task,dtype=int,device=self.device)
    assert isinstance(y_next,list) and isinstance(task,torch.Tensor) and task.ndim==1 and len(y_next)==len(task)
    for i,l in enumerate(task):
        self._y[l] = torch.cat([self._y[l],y_next[i]],-1)
    self.n = torch.tensor([self._y[i].size(-1) for i in range(self.num_tasks)],dtype=int,device=self.device)
    self.m = torch.where(self.n==0,-1,torch.log2(self.n).round()).to(int) # round to avoid things like torch.log2(torch.tensor([2**3],dtype=torch.int64,device="cuda")).item() = 2.9999999999999996
    for key in list(self.inv_log_det_cache_dict.keys()):
        if (torch.tensor(key)<self.n.cpu()).any():
            del self.inv_log_det_cache_dict[key]

post_mean

post_mean(x, task=None, eval=True)

Posterior mean.

Parameters:

Name Type Description Default
x Tensor[N, d]

sampling locations

required
task Union[int, Tensor[T]]

task index

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pmean Tensor[..., T, N]

posterior mean

Source code in fastgps/abstract_gp.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def post_mean(self, x:torch.Tensor, task:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior mean. 

    Args:
        x (torch.Tensor[N,d]): sampling locations
        task (Union[int,torch.Tensor[T]]): task index
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pmean (torch.Tensor[...,T,N]): posterior mean
    """
    coeffs = self.coeffs
    if eval:
        incoming_grad_enabled = torch.is_grad_enabled()
        torch.set_grad_enabled(False)
    assert x.ndim==2 and x.size(1)==self.d, "x must a torch.Tensor with shape (-1,d)"
    if task is None: task = self.default_task
    inttask = isinstance(task,int)
    if inttask: task = torch.tensor([task],dtype=int,device=self.device)
    if isinstance(task,list): task = torch.tensor(task,dtype=int,device=self.device)
    assert task.ndim==1 and (task>=0).all() and (task<self.num_tasks).all()
    kmat = torch.cat([torch.cat([self.kernel(task[l0],l1,x[:,None,:],self.get_xb(l1)[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
    pmean = torch.einsum("...i,...i->...",kmat,coeffs[...,None,None,:])
    if eval:
        torch.set_grad_enabled(incoming_grad_enabled)
    return pmean[...,0,:] if inttask else pmean

post_var

post_var(x, task=None, n=None, eval=True)

Posterior variance.

Parameters:

Name Type Description Default
x Tensor[N, d]

sampling locations

required
task Union[int, Tensor[T]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pvar Tensor[T, N]

posterior variance

Source code in fastgps/abstract_gp.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def post_var(self, x:torch.Tensor, task:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior variance.

    Args:
        x (torch.Tensor[N,d]): sampling locations
        task (Union[int,torch.Tensor[T]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pvar (torch.Tensor[T,N]): posterior variance
    """
    if n is None: n = self.n
    if isinstance(n,int): n = torch.tensor([n],dtype=int,device=self.device)
    assert isinstance(n,torch.Tensor)
    assert x.ndim==2 and x.size(1)==self.d, "x must a torch.Tensor with shape (-1,d)"
    inv_log_det_cache = self.get_inv_log_det_cache(n)
    if eval:
        incoming_grad_enabled = torch.is_grad_enabled()
        torch.set_grad_enabled(False)
    if task is None: task = self.default_task
    inttask = isinstance(task,int)
    if inttask: task = torch.tensor([task],dtype=int,device=self.device)
    if isinstance(task,list): task = torch.tensor(task,dtype=int,device=self.device)
    assert task.ndim==1 and (task>=0).all() and (task<self.num_tasks).all()
    kmat_new = torch.cat([self.kernel(task[l0],task[l0],x,x,*self.derivatives_cross[task[l0]][task[l0]],self.derivatives_coeffs_cross[task[l0]][task[l0]])[...,None,:] for l0 in range(len(task))],dim=-2)
    kmat = torch.cat([torch.cat([self.kernel(task[l0],l1,x[:,None,:],self.get_xb(l1,n=n[l1])[None,:,:],*self.derivatives_cross[task[l0]][l1],self.derivatives_coeffs_cross[task[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task))],dim=-3)
    kmat_perm = torch.permute(kmat,[-3,-2]+[i for i in range(kmat.ndim-3)]+[-1])
    t_perm = inv_log_det_cache.gram_matrix_solve(kmat_perm)
    t = torch.permute(t_perm,[2+i for i in range(t_perm.ndim-3)]+[0,1,-1])
    diag = kmat_new-(t*kmat).sum(-1)
    diag[diag<0] = 0 
    if eval:
        torch.set_grad_enabled(incoming_grad_enabled)
    return diag[...,0,:] if inttask else diag

post_cov

post_cov(x0, x1, task0=None, task1=None, n=None, eval=True)

Posterior covariance.

Parameters:

Name Type Description Default
x0 Tensor[N, d]

left sampling locations

required
x1 Tensor[M, d]

right sampling locations

required
task0 Union[int, Tensor[T1]]

left task index

None
task1 Union[int, Tensor[T2]]

right task index

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pcov Tensor[T1, T2, N, M]

posterior covariance matrix

Source code in fastgps/abstract_gp.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def post_cov(self, x0:torch.Tensor, x1:torch.Tensor, task0:Union[int,torch.Tensor]=None, task1:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior covariance. 

    Args:
        x0 (torch.Tensor[N,d]): left sampling locations
        x1 (torch.Tensor[M,d]): right sampling locations
        task0 (Union[int,torch.Tensor[T1]]): left task index
        task1 (Union[int,torch.Tensor[T2]]): right task index
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pcov (torch.Tensor[T1,T2,N,M]): posterior covariance matrix
    """
    if n is None: n = self.n
    if isinstance(n,int): n = torch.tensor([n],dtype=int,device=self.device)
    assert isinstance(n,torch.Tensor)
    assert x0.ndim==2 and x0.size(1)==self.d, "x must a torch.Tensor with shape (-1,d)"
    assert x1.ndim==2 and x1.size(1)==self.d, "z must a torch.Tensor with shape (-1,d)"
    inv_log_det_cache = self.get_inv_log_det_cache(n)
    if eval:
        incoming_grad_enabled = torch.is_grad_enabled()
        torch.set_grad_enabled(False)
    if task0 is None: task0 = self.default_task
    inttask0 = isinstance(task0,int)
    if inttask0: task0 = torch.tensor([task0],dtype=int,device=self.device)
    if isinstance(task0,list): task0 = torch.tensor(task0,dtype=int,device=self.device)
    assert task0.ndim==1 and (task0>=0).all() and (task0<self.num_tasks).all()
    if task1 is None: task1 = self.default_task
    inttask1 = isinstance(task1,int)
    if inttask1: task1 = torch.tensor([task1],dtype=int,device=self.device)
    if isinstance(task1,list): task1 = torch.tensor(task1,dtype=int,device=self.device)
    assert task1.ndim==1 and (task1>=0).all() and (task1<self.num_tasks).all()
    equal = torch.equal(x0,x1) and torch.equal(task0,task1)
    kmat_new = torch.cat([torch.cat([self.kernel(task0[l0],task1[l1],x0[:,None,:],x1[None,:,:],*self.derivatives_cross[task0[l0]][task1[l1]],self.derivatives_coeffs_cross[task0[l0]][task1[l1]])[...,None,None,:,:] for l1 in range(len(task1))],dim=-3) for l0 in range(len(task0))],dim=-4)
    kmat1 = torch.cat([torch.cat([self.kernel(task0[l0],l1,x0[:,None,:],self.get_xb(l1,n=n[l1])[None,:,:],*self.derivatives_cross[task0[l0]][l1],self.derivatives_coeffs_cross[task0[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task0))],dim=-3)
    kmat2 = kmat1 if equal else torch.cat([torch.cat([self.kernel(task1[l0],l1,x1[:,None,:],self.get_xb(l1,n=n[l1])[None,:,:],*self.derivatives_cross[task1[l0]][l1],self.derivatives_coeffs_cross[task1[l0]][l1]) for l1 in range(self.num_tasks)],dim=-1)[...,None,:,:] for l0 in range(len(task1))],dim=-3)
    kmat2_perm = torch.permute(kmat2,[-3,-2]+[i for i in range(kmat2.ndim-3)]+[-1])
    t_perm = inv_log_det_cache.gram_matrix_solve(kmat2_perm)
    t = torch.permute(t_perm,[2+i for i in range(t_perm.ndim-3)]+[0,1,-1])
    kmat = kmat_new-(kmat1[...,:,None,:,None,:]*t[...,None,:,None,:,:]).sum(-1)
    if equal:
        tmesh,nmesh = torch.meshgrid(torch.arange(kmat.size(0),device=self.device),torch.arange(x0.size(0),device=x0.device),indexing="ij")            
        tidx,nidx = tmesh.ravel(),nmesh.ravel()
        diag = kmat[...,tidx,tidx,nidx,nidx]
        diag[diag<0] = 0 
        kmat[...,tidx,tidx,nidx,nidx] = diag 
    if eval:
        torch.set_grad_enabled(incoming_grad_enabled)
    if inttask0 and inttask1:
        return kmat[...,0,0,:,:]
    elif inttask0 and not inttask1:
        return kmat[...,0,:,:,:]
    elif not inttask0 and inttask1:
        return kmat[...,:,0,:,:]
    else: # not inttask0 and not inttask1
        return kmat

post_error

post_error(x, task=None, n=None, confidence=0.99, eval=True)

Posterior error.

Parameters:

Name Type Description Default
x Tensor[N, d]

sampling locations

required
task Union[int, Tensor[T]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True
confidence float

confidence level in \((0,1)\) for the credible interval

0.99

Returns:

Name Type Description
cvar Tensor[T]

posterior variance

quantile float64
scipy.stats.norm.ppf(1-(1-confidence)/2)
perror Tensor[T]

posterior error

Source code in fastgps/abstract_gp.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def post_error(self, x:torch.Tensor, task:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, confidence:float=0.99, eval:bool=True):
    """
    Posterior error. 

    Args:
        x (torch.Tensor[N,d]): sampling locations
        task (Union[int,torch.Tensor[T]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`
        confidence (float): confidence level in $(0,1)$ for the credible interval

    Returns:
        cvar (torch.Tensor[T]): posterior variance
        quantile (np.float64):
            ```python
            scipy.stats.norm.ppf(1-(1-confidence)/2)
            ```
        perror (torch.Tensor[T]): posterior error
    """
    assert np.isscalar(confidence) and 0<confidence<1, "confidence must be between 0 and 1"
    q = scipy.stats.norm.ppf(1-(1-confidence)/2)
    pvar = self.post_var(x,task=task,n=n,eval=eval,)
    pstd = torch.sqrt(pvar)
    perror = q*pstd
    return pvar,q,perror

post_ci

post_ci(x, task=None, confidence=0.99, eval=True)

Posterior credible interval.

Parameters:

Name Type Description Default
x Tensor[N, d]

sampling locations

required
task Union[int, Tensor[T]]

task indices

None
confidence float

confidence level in \((0,1)\) for the credible interval

0.99
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pmean Tensor[..., T, N]

posterior mean

pvar Tensor[T, N]

posterior variance

quantile float64
scipy.stats.norm.ppf(1-(1-confidence)/2)
pci_low Tensor[..., T, N]

posterior credible interval lower bound

pci_high Tensor[..., T, N]

posterior credible interval upper bound

Source code in fastgps/abstract_gp.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def post_ci(self, x:torch.Tensor, task:Union[int,torch.Tensor]=None, confidence:float=0.99, eval:bool=True):
    """
    Posterior credible interval.

    Args:
        x (torch.Tensor[N,d]): sampling locations
        task (Union[int,torch.Tensor[T]]): task indices
        confidence (float): confidence level in $(0,1)$ for the credible interval
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pmean (torch.Tensor[...,T,N]): posterior mean
        pvar (torch.Tensor[T,N]): posterior variance 
        quantile (np.float64):
            ```python
            scipy.stats.norm.ppf(1-(1-confidence)/2)
            ```
        pci_low (torch.Tensor[...,T,N]): posterior credible interval lower bound
        pci_high (torch.Tensor[...,T,N]): posterior credible interval upper bound
    """
    assert np.isscalar(confidence) and 0<confidence<1, "confidence must be between 0 and 1"
    q = scipy.stats.norm.ppf(1-(1-confidence)/2)
    pmean = self.post_mean(x,task=task,eval=eval)
    pvar,q,perror = self.post_error(x,task=task,confidence=confidence)
    pci_low = pmean-q*perror 
    pci_high = pmean+q*perror
    return pmean,pvar,q,pci_low,pci_high

post_cubature_mean

post_cubature_mean(task=None, eval=True)

Posterior cubature mean.

Parameters:

Name Type Description Default
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True
task Union[int, Tensor[T]]

task indices

None

Returns:

Name Type Description
pcmean Tensor[..., T]

posterior cubature mean

Source code in fastgps/abstract_gp.py
479
480
481
482
483
484
485
486
487
488
489
490
def post_cubature_mean(self, task:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior cubature mean. 

    Args:
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`
        task (Union[int,torch.Tensor[T]]): task indices

    Returns:
        pcmean (torch.Tensor[...,T]): posterior cubature mean
    """
    raise NotImplementedError()

post_cubature_var

post_cubature_var(task=None, n=None, eval=True)

Posterior cubature variance.

Parameters:

Name Type Description Default
task Union[int, Tensor[T]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pcvar Tensor[T]

posterior cubature variance

Source code in fastgps/abstract_gp.py
491
492
493
494
495
496
497
498
499
500
501
502
503
def post_cubature_var(self, task:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior cubature variance. 

    Args:
        task (Union[int,torch.Tensor[T]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pcvar (torch.Tensor[T]): posterior cubature variance
    """
    raise NotImplementedError()

post_cubature_cov

post_cubature_cov(task0=None, task1=None, n=None, eval=True)

Posterior cubature covariance.

Parameters:

Name Type Description Default
task0 Union[int, Tensor[T1]]

task indices

None
task1 Union[int, Tensor[T2]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature covariance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pcvar Tensor[T1, T2]

posterior cubature covariance

Source code in fastgps/abstract_gp.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def post_cubature_cov(self, task0:Union[int,torch.Tensor]=None, task1:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, eval:bool=True):
    """
    Posterior cubature covariance. 

    Args:
        task0 (Union[int,torch.Tensor[T1]]): task indices
        task1 (Union[int,torch.Tensor[T2]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature covariance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pcvar (torch.Tensor[T1,T2]): posterior cubature covariance
    """
    raise NotImplementedError()

post_cubature_error

post_cubature_error(task=None, n=None, confidence=0.99, eval=True)

Posterior cubature error.

Parameters:

Name Type Description Default
task Union[int, Tensor[T]]

task indices

None
n Union[int, Tensor[num_tasks]]

number of points at which to evaluate the posterior cubature variance.

None
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True
confidence float

confidence level in \((0,1)\) for the credible interval

0.99

Returns:

Name Type Description
pcvar Tensor[T]

posterior cubature variance

quantile float64
scipy.stats.norm.ppf(1-(1-confidence)/2)
pcerror Tensor[T]

posterior cubature error

Source code in fastgps/abstract_gp.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def post_cubature_error(self, task:Union[int,torch.Tensor]=None, n:Union[int,torch.Tensor]=None, confidence:float=0.99, eval:bool=True):
    """
    Posterior cubature error. 

    Args:
        task (Union[int,torch.Tensor[T]]): task indices
        n (Union[int,torch.Tensor[num_tasks]]): number of points at which to evaluate the posterior cubature variance.
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`
        confidence (float): confidence level in $(0,1)$ for the credible interval

    Returns:
        pcvar (torch.Tensor[T]): posterior cubature variance
        quantile (np.float64):
            ```python
            scipy.stats.norm.ppf(1-(1-confidence)/2)
            ```
        pcerror (torch.Tensor[T]): posterior cubature error
    """
    assert np.isscalar(confidence) and 0<confidence<1, "confidence must be between 0 and 1"
    q = scipy.stats.norm.ppf(1-(1-confidence)/2)
    pcvar = self.post_cubature_var(task=task,n=n,eval=eval)
    pcstd = torch.sqrt(pcvar)
    pcerror = q*pcstd
    return pcvar,q,pcerror

post_cubature_ci

post_cubature_ci(task=None, confidence=0.99, eval=True)

Posterior cubature credible.

Parameters:

Name Type Description Default
task Union[int, Tensor[T]]

task indices

None
confidence float

confidence level in \((0,1)\) for the credible interval

0.99
eval bool

if True, disable gradients, otherwise use torch.is_grad_enabled()

True

Returns:

Name Type Description
pcmean Tensor[..., T]

posterior cubature mean

pcvar Tensor[T]

posterior cubature variance

quantile float64
scipy.stats.norm.ppf(1-(1-confidence)/2)
pcci_low Tensor[..., T]

posterior cubature credible interval lower bound

pcci_high Tensor[..., T]

posterior cubature credible interval upper bound

Source code in fastgps/abstract_gp.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
def post_cubature_ci(self, task:Union[int,torch.Tensor]=None, confidence:float=0.99, eval:bool=True):
    """
    Posterior cubature credible.

    Args:
        task (Union[int,torch.Tensor[T]]): task indices
        confidence (float): confidence level in $(0,1)$ for the credible interval
        eval (bool): if `True`, disable gradients, otherwise use `torch.is_grad_enabled()`

    Returns:
        pcmean (torch.Tensor[...,T]): posterior cubature mean
        pcvar (torch.Tensor[T]): posterior cubature variance
        quantile (np.float64):
            ```python
            scipy.stats.norm.ppf(1-(1-confidence)/2)
            ```
        pcci_low (torch.Tensor[...,T]): posterior cubature credible interval lower bound
        pcci_high (torch.Tensor[...,T]): posterior cubature credible interval upper bound
    """
    assert np.isscalar(confidence) and 0<confidence<1, "confidence must be between 0 and 1"
    q = scipy.stats.norm.ppf(1-(1-confidence)/2)
    pcmean = self.post_cubature_mean(task=task,eval=eval) 
    pcvar,q,pcerror = self.post_cubature_error(task=task,confidence=confidence,eval=eval)
    pcci_low = pcmean-pcerror
    pcci_high = pcmean+pcerror
    return pcmean,pcvar,q,pcci_low,pcci_high

StandardGP

StandardGP(kernel, seqs, noise=0.0001, tfs_noise=(qmcpy.util.transforms.tf_exp_eps_inv, qmcpy.util.transforms.tf_exp_eps), requires_grad_noise=False, shape_noise=torch.Size([1]), derivatives=None, derivatives_coeffs=None, adaptive_nugget=True, data=None)

Bases: AbstractGP

Standard Gaussian process regression

Examples:

>>> device = "cpu"
>>> if device!="mps":
...     torch.set_default_dtype(torch.float64)
>>> def f_ackley(x, a=20, b=0.2, c=2*np.pi, scaling=32.768):
...     # https://www.sfu.ca/~ssurjano/ackley.html
...     assert x.ndim==2
...     x = 2*scaling*x-scaling
...     t1 = a*torch.exp(-b*torch.sqrt(torch.mean(x**2,1)))
...     t2 = torch.exp(torch.mean(torch.cos(c*x),1))
...     t3 = a+np.exp(1)
...     y = -t1-t2+t3
...     return y
>>> n = 2**6
>>> d = 2
>>> sgp = StandardGP(
...     qmcpy.KernelSquaredExponential(d,torchify=True,device=device),
...     qmcpy.DigitalNetB2(dimension=d,seed=7))
>>> x_next = sgp.get_x_next(n)
>>> y_next = f_ackley(x_next)
>>> sgp.add_y_next(y_next)
>>> rng = torch.Generator().manual_seed(17)
>>> x = torch.rand((2**7,d),generator=rng).to(device)
>>> y = f_ackley(x)
>>> pmean = sgp.post_mean(x)
>>> pmean.shape
torch.Size([128])
>>> torch.linalg.norm(y-pmean)/torch.linalg.norm(y)
tensor(0.0817)
>>> torch.linalg.norm(sgp.post_mean(sgp.x)-sgp.y)/torch.linalg.norm(y)
tensor(0.0402)
>>> data = sgp.fit(verbose=0)
>>> list(data.keys())
[]
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0577)
>>> z = torch.rand((2**8,d),generator=rng).to(device)
>>> pcov = sgp.post_cov(x,z)
>>> pcov.shape
torch.Size([128, 256])
>>> pcov = sgp.post_cov(x,x)
>>> pcov.shape
torch.Size([128, 128])
>>> (pcov.diagonal()>=0).all()
tensor(True)
>>> pvar = sgp.post_var(x)
>>> pvar.shape
torch.Size([128])
>>> torch.allclose(pcov.diagonal(),pvar)
True
>>> pmean,pstd,q,ci_low,ci_high = sgp.post_ci(x,confidence=0.99)
>>> ci_low.shape
torch.Size([128])
>>> ci_high.shape
torch.Size([128])
>>> sgp.post_cubature_mean()
tensor(20.3584)
>>> sgp.post_cubature_var()
tensor(0.0035)
>>> pcmean,pcvar,q,pcci_low,pcci_high = sgp.post_cubature_ci(confidence=0.99)
>>> pcci_low
tensor(20.2069)
>>> pcci_high
tensor(20.5098)
>>> pcov_future = sgp.post_cov(x,z,n=2*n)
>>> pvar_future = sgp.post_var(x,n=2*n)
>>> pcvar_future = sgp.post_cubature_var(n=2*n)
>>> x_next = sgp.get_x_next(2*n)
>>> y_next = f_ackley(x_next)
>>> sgp.add_y_next(y_next)
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0848)
>>> torch.allclose(sgp.post_cov(x,z),pcov_future)
True
>>> torch.allclose(sgp.post_var(x),pvar_future)
True
>>> torch.allclose(sgp.post_cubature_var(),pcvar_future)
True
>>> data = sgp.fit(verbose=False)
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0565)
>>> x_next = sgp.get_x_next(4*n)
>>> y_next = f_ackley(x_next)
>>> sgp.add_y_next(y_next)
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.1515)
>>> data = sgp.fit(verbose=False)
>>> torch.linalg.norm(y-sgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0555)
>>> pcov_16n = sgp.post_cov(x,z,n=16*n)
>>> pvar_16n = sgp.post_var(x,n=16*n)
>>> pcvar_16n = sgp.post_cubature_var(n=16*n)
>>> x_next = sgp.get_x_next(16*n)
>>> y_next = f_ackley(x_next)
>>> sgp.add_y_next(y_next)
>>> torch.allclose(sgp.post_cov(x,z),pcov_16n)
True
>>> torch.allclose(sgp.post_var(x),pvar_16n)
True
>>> torch.allclose(sgp.post_cubature_var(),pcvar_16n)
True

Different loss metrics for fitting

>>> n = 2**6
>>> d = 3
>>> sgp = StandardGP(
...     qmcpy.KernelSquaredExponential(d,torchify=True,device=device),
...     qmcpy.DigitalNetB2(dimension=d,seed=7))
>>> x_next = sgp.get_x_next(n)
>>> y_next = torch.stack([torch.sin(x_next).sum(-1),torch.cos(x_next).sum(-1)],axis=0)
>>> sgp.add_y_next(y_next)
>>> data = sgp.fit(loss_metric="MLL",iterations=5,verbose=0)
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights=1/torch.arange(1,2*n+1,device=device).reshape((2,n)))
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights="L2R")
>>> data = sgp.fit(loss_metric="GCV",iterations=5,verbose=0)

Data Driven

>>> x = [
...     torch.rand((3,1),generator=rng).to(device),
...     torch.rand((12,1),generator=rng).to(device),
... ]
>>> y = [
...     torch.stack([torch.sin(2*np.pi*x[0][:,0]),torch.cos(2*np.pi*x[0][:,0]),torch.acos(x[0][:,0])],dim=0).to(device),
...     torch.stack([4*torch.sin(2*np.pi*x[1][:,0]),4*torch.cos(2*np.pi*x[1][:,0]),4*torch.acos(x[1][:,0])],dim=0).to(device),
... ]
>>> sgp = StandardGP(
...     qmcpy.KernelMultiTask(
...         qmcpy.KernelGaussian(d=1,torchify=True,device=device),
...         num_tasks = 2,
...     ),
...     seqs={"x":x,"y":y},
...     noise = 1e-3,
... )
>>> data = sgp.fit(verbose=0,iterations=10)
>>> xticks = torch.linspace(0,1,101,device=device) 
>>> pmean,pvar,q,pci_low,pci_high = sgp.post_ci(xticks[:,None])
>>> pcmean,pcvar,q,pcci_low,pcci_high =  sgp.post_cubature_ci()

Parameters:

Name Type Description Default
kernel AbstractKernel

Kernel object. Set to qmcpy.KernelMultiTask for a multi-task GP.

required
seqs Union[int,qmcpy.DiscreteDistribution,List]]

list of sequence generators. If an int seed is passed in we use

[qmcpy.DigitalNetB2(d,seed=seed_i) for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
See the qmcpy.DiscreteDistribution docs for more info.

required
noise float

positive noise variance i.e. nugget term

0.0001
tfs_noise Tuple[callable, callable]

the first argument transforms to the raw value to be optimized, the second applies the inverse transform

(tf_exp_eps_inv, tf_exp_eps)
requires_grad_noise bool

wheather or not to optimize the noise parameter

False
shape_noise Size

shape of the noise parameter, defaults to torch.Size([1])

Size([1])
derivatives list

list of derivative orders e.g. to include a function and its gradient set

derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]

None
derivatives_coeffs list

list of derivative coefficients where if derivatives[k].shape==(p,d) then we should have derivatives_coeffs[k].shape==(p,)

None
adaptive_nugget bool

if True, use the adaptive nugget which modifies noises based on trace ratios.

True
data dict

dictory of data with keys 'x' and 'y' where data['x'] and data['y'] are both torch.Tensors or list of torch.Tensors with lengths equal to the number of tasks

None
Source code in fastgps/standard_gp.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def __init__(self,
        kernel:qmcpy.kernel.abstract_kernel.AbstractKernel,
        seqs:Union[qmcpy.IIDStdUniform,int],
        noise:float = 1e-4,
        tfs_noise:Tuple[callable,callable] = (qmcpy.util.transforms.tf_exp_eps_inv,qmcpy.util.transforms.tf_exp_eps),
        requires_grad_noise:bool = False, 
        shape_noise:torch.Size = torch.Size([1]),
        derivatives:list = None,
        derivatives_coeffs:list = None,
        adaptive_nugget:bool = True,
        data:dict = None,
        ):
    """
    Args:
        kernel (qmcpy.AbstractKernel): Kernel object. Set to `qmcpy.KernelMultiTask` for a multi-task GP.
        seqs (Union[int,qmcpy.DiscreteDistribution,List]]): list of sequence generators. If an int `seed` is passed in we use 
            ```python
            [qmcpy.DigitalNetB2(d,seed=seed_i) for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
            ```
            See the <a href="https://qmcpy.readthedocs.io/en/latest/algorithms.html#discrete-distribution-class" target="_blank">`qmcpy.DiscreteDistribution` docs</a> for more info. 
        noise (float): positive noise variance i.e. nugget term
        tfs_noise (Tuple[callable,callable]): the first argument transforms to the raw value to be optimized, the second applies the inverse transform
        requires_grad_noise (bool): wheather or not to optimize the noise parameter
        shape_noise (torch.Size): shape of the noise parameter, defaults to `torch.Size([1])`
        derivatives (list): list of derivative orders e.g. to include a function and its gradient set 
            ```python
            derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]
            ```
        derivatives_coeffs (list): list of derivative coefficients where if `derivatives[k].shape==(p,d)` then we should have `derivatives_coeffs[k].shape==(p,)`
        adaptive_nugget (bool): if True, use the adaptive nugget which modifies noises based on trace ratios.  
        data (dict): dictory of data with keys 'x' and 'y' where data['x'] and data['y'] are both `torch.Tensor`s or list of `torch.Tensor`s with lengths equal to the number of tasks
    """
    self._XBDTYPE = torch.get_default_dtype()
    self._FTOUTDTYPE = torch.get_default_dtype()
    if isinstance(kernel,qmcpy.KernelMultiTask):
        solo_task = False
        num_tasks = kernel.num_tasks
        default_task = torch.arange(num_tasks)
    else:
        solo_task = True
        default_task = 0 
        num_tasks = 1
    if isinstance(seqs,dict):
        data = seqs
        assert "x" in data and "y" in data, "dict seqs must have keys 'x' and 'y'"
        if isinstance(data["x"],torch.Tensor): data["x"] = [data["x"]]
        if isinstance(data["y"],torch.Tensor): data["y"] = [data["y"]]
        assert isinstance(data["x"],list) and len(data["x"])==num_tasks and all(isinstance(x_l,torch.Tensor) and x_l.ndim==2 and x_l.size(1)==kernel.d for x_l in data["x"]), "data['x'] should be a list of 2d tensors of length num_tasks with each number of columns equal to the dimension"
        assert isinstance(data["y"],list) and len(data["y"])==num_tasks and all(isinstance(y_l,torch.Tensor) and y_l.ndim>=1 for y_l in data["y"]), "data['y'] should be a list of tensors of length num_tasks"
        seqs = np.array([DummyDiscreteDistrib(data["x"][l].cpu().detach().numpy()) for l in range(num_tasks)],dtype=object)
    else:
        data = None
        if isinstance(seqs,int):
            global_seed = seqs
            seqs = np.array([qmcpy.DigitalNetB2(kernel.d,seed=seed,order="GRAY") for seed in np.random.SeedSequence(global_seed).spawn(num_tasks)],dtype=object)
        if isinstance(seqs,qmcpy.DiscreteDistribution):
            seqs = np.array([seqs],dtype=object)
        if isinstance(seqs,list):
            seqs = np.array(seqs,dtype=object)
    assert seqs.shape==(num_tasks,), "seqs should be a length num_tasks=%d list"%num_tasks
    assert all(seqs[i].replications==1 for i in range(num_tasks)) and "each seq should have only 1 replication"
    super().__init__(
        kernel,
        seqs,
        num_tasks,
        default_task,
        solo_task,
        noise,
        tfs_noise,
        requires_grad_noise,
        shape_noise,
        derivatives,
        derivatives_coeffs,
        adaptive_nugget,
    )
    if data is not None:
        self.add_y_next(data["y"],task=torch.arange(self.num_tasks))

AbstractFastGP

AbstractFastGP(alpha, ft, ift, omega, *args, **kwargs)

Bases: AbstractGP

Source code in fastgps/abstract_fast_gp.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(self,
        alpha,
        ft,
        ift,
        omega,
        *args,
        **kwargs
    ):
    super().__init__(*args,**kwargs)
    # alpha
    assert (np.isscalar(alpha) and alpha%1==0) or (isinstance(alpha,torch.Tensor) and alpha.shape==(self.d,)), "alpha should be an int or a torch.Tensor of length d"
    if np.isscalar(alpha):
        alpha = int(alpha)*torch.ones(self.d,dtype=int,device=self.device)
    self.alpha = alpha
    # fast transforms 
    self.ft_unstable = ft
    self.ift_unstable = ift
    self.omega = omega
    # storage and dynamic caches
    self.k1parts_seq = np.array([[_K1PartsSeq(self,self.xxb_seqs[l0],self.xxb_seqs[l1],*self.derivatives_cross[l0][l1]) if l1>=l0 else None for l1 in range(self.num_tasks)] for l0 in range(self.num_tasks)],dtype=object)
    self.lam_caches = np.array([[_LamCaches(self,l0,l1,*self.derivatives_cross[l0][l1],self.derivatives_coeffs_cross[l0][l1]) if l1>=l0 else None for l1 in range(self.num_tasks)] for l0 in range(self.num_tasks)],dtype=object)
    self.ytilde_cache = np.array([_YtildeCache(self,i) for i in range(self.num_tasks)],dtype=object)

ft

ft(x)

One dimensional fast transform along the last dimenions. For FastGPLattice this is the orthonormal Fast Fourier Transform (FFT). For FastGPDigitalNetB2 this is the orthonormal Fast Walsh Hadamard Transform (FWHT).

Parameters:

Name Type Description Default
x Tensor

inputs to be transformed along the last dimension. Require n = x.size(-1) is a power of 2.

required

Returns:

Name Type Description
y Tensor

transformed inputs with the same shape as x

Source code in fastgps/abstract_fast_gp.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def ft(self, x):
    """
    One dimensional fast transform along the last dimenions. 
        For `FastGPLattice` this is the orthonormal Fast Fourier Transform (FFT). 
        For `FastGPDigitalNetB2` this is the orthonormal Fast Walsh Hadamard Transform (FWHT). 

    Args: 
        x (torch.Tensor): inputs to be transformed along the last dimension. Require `n = x.size(-1)` is a power of 2. 

    Returns: 
        y (torch.Tensor): transformed inputs with the same shape as `x` 
    """
    xmean = x.mean(-1)
    y = self.ft_unstable(x-xmean[...,None])
    y[...,0] += xmean*np.sqrt(x.size(-1))
    return y

ift

ift(x)

One dimensional inverse fast transform along the last dimenions. For FastGPLattice this is the orthonormal Inverse Fast Fourier Transform (IFFT). For FastGPDigitalNetB2 this is the orthonormal Fast Walsh Hadamard Transform (FWHT).

Parameters:

Name Type Description Default
x Tensor

inputs to be transformed along the last dimension. Require n = x.size(-1) is a power of 2.

required

Returns:

Name Type Description
y Tensor

transformed inputs with the same shape as x

Source code in fastgps/abstract_fast_gp.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def ift(self, x):
    """
    One dimensional inverse fast transform along the last dimenions. 
        For `FastGPLattice` this is the orthonormal Inverse Fast Fourier Transform (IFFT). 
        For `FastGPDigitalNetB2` this is the orthonormal Fast Walsh Hadamard Transform (FWHT). 

    Args: 
        x (torch.Tensor): inputs to be transformed along the last dimension. Require `n = x.size(-1)` is a power of 2. 

    Returns: 
        y (torch.Tensor): transformed inputs with the same shape as `x` 
    """
    xmean = x.mean(-1)
    y = self.ift_unstable(x-xmean[...,None])
    y[...,0] += xmean*np.sqrt(x.size(-1))
    return y

FastGPLattice

FastGPLattice(kernel, seqs, alpha=2, noise=2 * qmcpy.util.transforms.EPS64, tfs_noise=(qmcpy.util.transforms.tf_exp_eps_inv, qmcpy.util.transforms.tf_exp_eps), requires_grad_noise=False, shape_noise=torch.Size([1]), derivatives=None, derivatives_coeffs=None, adaptive_nugget=False)

Bases: AbstractFastGP

Fast Gaussian process regression using lattice points and shift invariant kernels

Examples:

>>> device = "cpu"
>>> if device!="mps":
...     torch.set_default_dtype(torch.float64)
>>> def f_ackley(x, a=20, b=0.2, c=2*np.pi, scaling=32.768):
...     # https://www.sfu.ca/~ssurjano/ackley.html
...     assert x.ndim==2
...     x = 2*scaling*x-scaling
...     t1 = a*torch.exp(-b*torch.sqrt(torch.mean(x**2,1)))
...     t2 = torch.exp(torch.mean(torch.cos(c*x),1))
...     t3 = a+np.exp(1)
...     y = -t1-t2+t3
...     return y
>>> n = 2**10
>>> d = 2
>>> fgp = FastGPLattice(
...     qmcpy.KernelShiftInvar(d,torchify=True,device=device),
...     seqs = qmcpy.Lattice(dimension=d,seed=7))
>>> x_next = fgp.get_x_next(n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> rng = torch.Generator().manual_seed(17)
>>> x = torch.rand((2**7,d),generator=rng).to(device)
>>> y = f_ackley(x)
>>> pmean = fgp.post_mean(x)
>>> pmean.shape
torch.Size([128])
>>> torch.linalg.norm(y-pmean)/torch.linalg.norm(y)
tensor(0.0348)
>>> torch.allclose(fgp.post_mean(fgp.x),fgp.y,atol=1e-3)
True
>>> fgp.post_cubature_mean()
tensor(20.1842)
>>> fgp.post_cubature_var()
tensor(6.9917e-09)
>>> data = fgp.fit(verbose=0)
>>> list(data.keys())
[]
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0361)
>>> z = torch.rand((2**8,d),generator=rng).to(device)
>>> pcov = fgp.post_cov(x,z)
>>> pcov.shape
torch.Size([128, 256])
>>> pcov = fgp.post_cov(x,x)
>>> pcov.shape
torch.Size([128, 128])
>>> (pcov.diagonal()>=0).all()
tensor(True)
>>> pvar = fgp.post_var(x)
>>> pvar.shape
torch.Size([128])
>>> torch.allclose(pcov.diagonal(),pvar)
True
>>> pmean,pstd,q,ci_low,ci_high = fgp.post_ci(x,confidence=0.99)
>>> ci_low.shape
torch.Size([128])
>>> ci_high.shape
torch.Size([128])
>>> fgp.post_cubature_mean()
tensor(20.1842)
>>> fgp.post_cubature_var()
tensor(3.1129e-06)
>>> pcmean,pcvar,q,pcci_low,pcci_high = fgp.post_cubature_ci(confidence=0.99)
>>> pcci_low
tensor(20.1797)
>>> pcci_high
tensor(20.1888)
>>> pcov_future = fgp.post_cov(x,z,n=2*n)
>>> pvar_future = fgp.post_var(x,n=2*n)
>>> pcvar_future = fgp.post_cubature_var(n=2*n)
>>> x_next = fgp.get_x_next(2*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0304)
>>> torch.allclose(fgp.post_cov(x,z),pcov_future)
True
>>> torch.allclose(fgp.post_var(x),pvar_future)
True
>>> torch.allclose(fgp.post_cubature_var(),pcvar_future)
True
>>> data = fgp.fit(verbose=False)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0274)
>>> x_next = fgp.get_x_next(4*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0277)
>>> data = fgp.fit(verbose=False)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0276)
>>> pcov_16n = fgp.post_cov(x,z,n=16*n)
>>> pvar_16n = fgp.post_var(x,n=16*n)
>>> pcvar_16n = fgp.post_cubature_var(n=16*n)
>>> x_next = fgp.get_x_next(16*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.allclose(fgp.post_cov(x,z),pcov_16n)
True
>>> torch.allclose(fgp.post_var(x),pvar_16n)
True
>>> torch.allclose(fgp.post_cubature_var(),pcvar_16n)
True

Different loss metrics for fitting

>>> n = 2**6
>>> d = 3
>>> sgp = FastGPLattice(
...     qmcpy.KernelShiftInvar(d,torchify=True,device=device),
...     qmcpy.Lattice(dimension=d,seed=7))
>>> x_next = sgp.get_x_next(n)
>>> y_next = torch.stack([torch.sin(x_next).sum(-1),torch.cos(x_next).sum(-1)],axis=0)
>>> sgp.add_y_next(y_next)
>>> data = sgp.fit(loss_metric="MLL",iterations=5,verbose=0)
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights=1/torch.arange(1,2*n+1,device=device).reshape((2,n)))
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights="L2R")
>>> data = sgp.fit(loss_metric="GCV",iterations=5,verbose=0)

Parameters:

Name Type Description Default
kernel KernelShiftInvar

Kernel object. Set to qmcpy.KernelMultiTask for a multi-task GP.

required
seqs [int, Lattice, List]

list of lattice sequence generators with order="RADICAL INVERSE" and randomize in ["FALSE","SHIFT"]. If an int seed is passed in we use

[qmcpy.Lattice(d,seed=seed_i,randomize="SHIFT") for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
See the qmcpy.Lattice docs for more info

required
alpha int

smoothness parameter

2
noise float

positive noise variance i.e. nugget term

2 * EPS64
tfs_noise Tuple[callable, callable]

the first argument transforms to the raw value to be optimized, the second applies the inverse transform

(tf_exp_eps_inv, tf_exp_eps)
requires_grad_noise bool

wheather or not to optimize the noise parameter

False
shape_noise Size

shape of the noise parameter, defaults to torch.Size([1])

Size([1])
derivatives list

list of derivative orders e.g. to include a function and its gradient set

derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]

None
derivatives_coeffs list

list of derivative coefficients where if derivatives[k].shape==(p,d) then we should have derivatives_coeffs[k].shape==(p,)

None
adaptive_nugget bool

if True, use the adaptive nugget which modifies noises based on trace ratios.

False
Source code in fastgps/fast_gp_lattice.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def __init__(self,
        kernel:qmcpy.KernelShiftInvar,
        seqs:qmcpy.Lattice,
        alpha:int = 2,
        noise:float = 2*qmcpy.util.transforms.EPS64, 
        tfs_noise:Tuple[callable,callable] = (qmcpy.util.transforms.tf_exp_eps_inv,qmcpy.util.transforms.tf_exp_eps),
        requires_grad_noise:bool = False, 
        shape_noise:torch.Size = torch.Size([1]),
        derivatives:list = None,
        derivatives_coeffs:list = None,
        adaptive_nugget:bool = False,
        ):
    """
    Args:
        kernel (qmcpy.KernelShiftInvar): Kernel object. Set to `qmcpy.KernelMultiTask` for a multi-task GP.
        seqs ([int,qmcpy.Lattice,List]): list of lattice sequence generators
            with order="RADICAL INVERSE" and randomize in `["FALSE","SHIFT"]`. If an int `seed` is passed in we use 
            ```python
            [qmcpy.Lattice(d,seed=seed_i,randomize="SHIFT") for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
            ```
            See the <a href="https://qmcpy.readthedocs.io/en/latest/algorithms.html#module-qmcpy.discrete_distribution.lattice.lattice" target="_blank">`qmcpy.Lattice` docs</a> for more info
        alpha (int): smoothness parameter
        noise (float): positive noise variance i.e. nugget term
        tfs_noise (Tuple[callable,callable]): the first argument transforms to the raw value to be optimized, the second applies the inverse transform
        requires_grad_noise (bool): wheather or not to optimize the noise parameter
        shape_noise (torch.Size): shape of the noise parameter, defaults to `torch.Size([1])`
        derivatives (list): list of derivative orders e.g. to include a function and its gradient set 
            ```python
            derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]
            ```
        derivatives_coeffs (list): list of derivative coefficients where if `derivatives[k].shape==(p,d)` then we should have `derivatives_coeffs[k].shape==(p,)`
        adaptive_nugget (bool): if True, use the adaptive nugget which modifies noises based on trace ratios.  
    """
    self._XBDTYPE = torch.get_default_dtype()
    self._FTOUTDTYPE = torch.complex64 if torch.get_default_dtype()==torch.float32 else torch.complex128
    if isinstance(kernel,qmcpy.KernelMultiTask):
        solo_task = False
        num_tasks = kernel.num_tasks
        default_task = torch.arange(num_tasks)
    else:
        solo_task = True
        default_task = 0 
        num_tasks = 1
    if isinstance(seqs,int):
        global_seed = seqs
        seqs = np.array([qmcpy.Lattice(kernel.d,seed=seed,randomize="SHIFT") for seed in np.random.SeedSequence(global_seed).spawn(num_tasks)],dtype=object)
    if isinstance(seqs,qmcpy.Lattice):
        seqs = np.array([seqs],dtype=object)
    if isinstance(seqs,list):
        seqs = np.array(seqs,dtype=object)
    assert seqs.shape==(num_tasks,), "seqs should be a length num_tasks=%d list"%num_tasks
    assert all(isinstance(seqs[i],qmcpy.Lattice) for i in range(num_tasks)), "each seq should be a qmcpy.Lattice instances"
    assert all(seqs[i].order=="RADICAL INVERSE" for i in range(num_tasks)), "each seq should be in 'RADICAL INVERSE' order "
    assert all(seqs[i].replications==1 for i in range(num_tasks)) and "each seq should have only 1 replication"
    assert all(seqs[i].randomize in ['FALSE','SHIFT'] for i in range(num_tasks)), "each seq should have randomize in ['FALSE','SHIFT']"
    ft = qmcpy.fftbr_torch
    ift = qmcpy.ifftbr_torch
    omega = qmcpy.omega_fftbr_torch
    super().__init__(
        alpha,
        ft,
        ift,
        omega,
        kernel,
        seqs,
        num_tasks,
        default_task,
        solo_task,
        noise,
        tfs_noise,
        requires_grad_noise,
        shape_noise,
        derivatives,
        derivatives_coeffs,
        adaptive_nugget,
    )

FastGPDigitalNetB2

FastGPDigitalNetB2(kernel, seqs, alpha=2, noise=2 * qmcpy.util.transforms.EPS64, tfs_noise=(qmcpy.util.transforms.tf_exp_eps_inv, qmcpy.util.transforms.tf_exp_eps), requires_grad_noise=False, shape_noise=torch.Size([1]), derivatives=None, derivatives_coeffs=None, adaptive_nugget=False)

Bases: AbstractFastGP

Fast Gaussian process regression using digitally shifted digital nets paired with digitally shift invariant kernels

Examples:

>>> device = "cpu"
>>> if device!="mps":
...     torch.set_default_dtype(torch.float64)
>>> def f_ackley(x, a=20, b=0.2, c=2*np.pi, scaling=32.768):
...     # https://www.sfu.ca/~ssurjano/ackley.html
...     assert x.ndim==2
...     x = 2*scaling*x-scaling
...     t1 = a*torch.exp(-b*torch.sqrt(torch.mean(x**2,1)))
...     t2 = torch.exp(torch.mean(torch.cos(c*x),1))
...     t3 = a+np.exp(1)
...     y = -t1-t2+t3
...     return y
>>> n = 2**10
>>> d = 2
>>> fgp = FastGPDigitalNetB2(
...     qmcpy.KernelDigShiftInvar(d,torchify=True,device=device),
...     qmcpy.DigitalNetB2(dimension=d,seed=7))
>>> x_next = fgp.get_x_next(n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> rng = torch.Generator().manual_seed(17)
>>> x = torch.rand((2**7,d),generator=rng).to(device)
>>> y = f_ackley(x)
>>> pmean = fgp.post_mean(x)
>>> pmean.shape
torch.Size([128])
>>> torch.linalg.norm(y-pmean)/torch.linalg.norm(y)
tensor(0.0319)
>>> torch.allclose(fgp.post_mean(fgp.x),fgp.y)
True
>>> data = fgp.fit(verbose=0)
>>> list(data.keys())
[]
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0308)
>>> z = torch.rand((2**8,d),generator=rng).to(device)
>>> pcov = fgp.post_cov(x,z)
>>> pcov.shape
torch.Size([128, 256])
>>> pcov = fgp.post_cov(x,x)
>>> pcov.shape
torch.Size([128, 128])
>>> (pcov.diagonal()>=0).all()
tensor(True)
>>> pvar = fgp.post_var(x)
>>> pvar.shape
torch.Size([128])
>>> torch.allclose(pcov.diagonal(),pvar,atol=1e-5)
True
>>> pmean,pstd,q,ci_low,ci_high = fgp.post_ci(x,confidence=0.99)
>>> ci_low.shape
torch.Size([128])
>>> ci_high.shape
torch.Size([128])
>>> fgp.post_cubature_mean()
tensor(20.1841)
>>> fgp.post_cubature_var()
tensor(0.0002)
>>> pcmean,pcvar,q,pcci_low,pcci_high = fgp.post_cubature_ci(confidence=0.99)
>>> pcci_low
tensor(20.1494)
>>> pcci_high
tensor(20.2188)
>>> pcov_future = fgp.post_cov(x,z,n=2*n)
>>> pvar_future = fgp.post_var(x,n=2*n)
>>> pcvar_future = fgp.post_cubature_var(n=2*n)
>>> x_next = fgp.get_x_next(2*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0245)
>>> torch.allclose(fgp.post_cov(x,z),pcov_future)
True
>>> torch.allclose(fgp.post_var(x),pvar_future)
True
>>> torch.allclose(fgp.post_cubature_var(),pcvar_future)
True
>>> data = fgp.fit(verbose=False)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0247)
>>> x_next = fgp.get_x_next(4*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0149)
>>> data = fgp.fit(verbose=False)
>>> torch.linalg.norm(y-fgp.post_mean(x))/torch.linalg.norm(y)
tensor(0.0131)
>>> pcov_16n = fgp.post_cov(x,z,n=16*n)
>>> pvar_16n = fgp.post_var(x,n=16*n)
>>> pcvar_16n = fgp.post_cubature_var(n=16*n)
>>> x_next = fgp.get_x_next(16*n)
>>> y_next = f_ackley(x_next)
>>> fgp.add_y_next(y_next)
>>> torch.allclose(fgp.post_cov(x,z),pcov_16n)
True
>>> torch.allclose(fgp.post_var(x),pvar_16n)
True
>>> torch.allclose(fgp.post_cubature_var(),pcvar_16n)
True

Different loss metrics for fitting

>>> n = 2**6
>>> d = 3
>>> sgp = FastGPDigitalNetB2(
...     qmcpy.KernelDigShiftInvar(d,torchify=True,device=device),
...     qmcpy.DigitalNetB2(dimension=d,seed=7))
>>> x_next = sgp.get_x_next(n)
>>> y_next = torch.stack([torch.sin(x_next).sum(-1),torch.cos(x_next).sum(-1)],axis=0)
>>> sgp.add_y_next(y_next)
>>> data = sgp.fit(loss_metric="MLL",iterations=5,verbose=0)
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights=1/torch.arange(1,2*n+1,device=device).reshape((2,n)))
>>> data = sgp.fit(loss_metric="CV",iterations=5,verbose=0,cv_weights="L2R")
>>> data = sgp.fit(loss_metric="GCV",iterations=5,verbose=0)

Parameters:

Name Type Description Default
kernel KernelDigShiftInvar

Kernel object. Set to qmcpy.KernelMultiTask for a multi-task GP.

required
seqs Union[int,qmcpy.DigitalNetB2,List]]

list of digital sequence generators in base \(b=2\) with order="RADICAL INVERSE" and randomize in ["FALSE","DS"]. If an int seed is passed in we use

[qmcpy.DigitalNetB2(d,seed=seed_i,randomize="DS") for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
See the qmcpy.DigitalNetB2 docs for more info. If num_tasks==1 then randomize may be in ["FALSE","DS","LMS","LMS DS"].

required
alpha int

smoothness parameter

2
noise float

positive noise variance i.e. nugget term

2 * EPS64
tfs_noise Tuple[callable, callable]

the first argument transforms to the raw value to be optimized, the second applies the inverse transform

(tf_exp_eps_inv, tf_exp_eps)
requires_grad_noise bool

wheather or not to optimize the noise parameter

False
shape_noise Size

shape of the noise parameter, defaults to torch.Size([1])

Size([1])
derivatives list

list of derivative orders e.g. to include a function and its gradient set

derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]

None
derivatives_coeffs list

list of derivative coefficients where if derivatives[k].shape==(p,d) then we should have derivatives_coeffs[k].shape==(p,)

None
adaptive_nugget bool

if True, use the adaptive nugget which modifies noises based on trace ratios.

False
Source code in fastgps/fast_gp_digital_net_b2.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def __init__(self,
        kernel:qmcpy.KernelDigShiftInvar,
        seqs:Union[qmcpy.DigitalNetB2,int],
        alpha:int = 2,
        noise:float = 2*qmcpy.util.transforms.EPS64,
        tfs_noise:Tuple[callable,callable] = (qmcpy.util.transforms.tf_exp_eps_inv,qmcpy.util.transforms.tf_exp_eps),
        requires_grad_noise:bool = False, 
        shape_noise:torch.Size = torch.Size([1]),
        derivatives:list = None,
        derivatives_coeffs:list = None,
        adaptive_nugget:bool = False,
        ):
    """
    Args:
        kernel (qmcpy.KernelDigShiftInvar): Kernel object. Set to `qmcpy.KernelMultiTask` for a multi-task GP.
        seqs (Union[int,qmcpy.DigitalNetB2,List]]): list of digital sequence generators in base $b=2$ 
            with order="RADICAL INVERSE" and randomize in `["FALSE","DS"]`. If an int `seed` is passed in we use 
            ```python
            [qmcpy.DigitalNetB2(d,seed=seed_i,randomize="DS") for seed_i in np.random.SeedSequence(seed).spawn(num_tasks)]
            ```
            See the <a href="https://qmcpy.readthedocs.io/en/latest/algorithms.html#module-qmcpy.discrete_distribution.digital_net_b2.digital_net_b2" target="_blank">`qmcpy.DigitalNetB2` docs</a> for more info. 
            If `num_tasks==1` then randomize may be in `["FALSE","DS","LMS","LMS DS"]`. 
        alpha (int): smoothness parameter
        noise (float): positive noise variance i.e. nugget term
        tfs_noise (Tuple[callable,callable]): the first argument transforms to the raw value to be optimized, the second applies the inverse transform
        requires_grad_noise (bool): wheather or not to optimize the noise parameter
        shape_noise (torch.Size): shape of the noise parameter, defaults to `torch.Size([1])`
        derivatives (list): list of derivative orders e.g. to include a function and its gradient set 
            ```python
            derivatives = [torch.zeros(d,dtype=int)]+[ej for ej in torch.eye(d,dtype=int)]
            ```
        derivatives_coeffs (list): list of derivative coefficients where if `derivatives[k].shape==(p,d)` then we should have `derivatives_coeffs[k].shape==(p,)`
        adaptive_nugget (bool): if True, use the adaptive nugget which modifies noises based on trace ratios.  
    """
    self._XBDTYPE = torch.int64
    self._FTOUTDTYPE = torch.get_default_dtype()
    if isinstance(kernel,qmcpy.KernelMultiTask):
        solo_task = False
        num_tasks = kernel.num_tasks
        default_task = torch.arange(num_tasks)
    else:
        solo_task = True
        default_task = 0 
        num_tasks = 1
    if isinstance(seqs,int):
        global_seed = seqs
        seqs = np.array([qmcpy.DigitalNetB2(kernel.d,seed=seed,randomize="DS") for seed in np.random.SeedSequence(global_seed).spawn(num_tasks)],dtype=object)
    if isinstance(seqs,qmcpy.DigitalNetB2):
        seqs = np.array([seqs],dtype=object)
    if isinstance(seqs,list):
        seqs = np.array(seqs,dtype=object)
    assert seqs.shape==(num_tasks,), "seqs should be a length num_tasks=%d list"%num_tasks
    assert all(isinstance(seqs[i],qmcpy.DigitalNetB2) for i in range(num_tasks)), "each seq should be a qmcpy.DigitalNetB2 instances"
    assert all(seqs[i].order=="RADICAL INVERSE" for i in range(num_tasks)), "each seq should be in 'RADICAL INVERSE' order "
    assert all(seqs[i].replications==1 for i in range(num_tasks)) and "each seq should have only 1 replication"
    if num_tasks==1:
        assert seqs[0].randomize in ['FALSE','DS','LMS','LMS DS'], "seq should have randomize in ['FALSE','DS','LMS','LMS DS']"
    else:
        assert all(seqs[i].randomize in ['FALSE','DS'] for i in range(num_tasks)), "each seq should have randomize in ['FALSE','DS']"
    ts = torch.tensor([seqs[i].t for i in range(num_tasks)])
    assert (ts<64).all(), "each seq must have t<64"
    assert (ts==ts[0]).all(), "all seqs should have the same t"
    self.t = ts[0].item()
    if isinstance(kernel,qmcpy.KernelMultiTask):
        kernel.base_kernel.set_t(self.t)
    else:
        kernel.set_t(self.t)
    ift = ft = qmcpy.fwht_torch
    omega = qmcpy.omega_fwht_torch
    super().__init__(
        alpha,
        ft,
        ift,
        omega,
        kernel,
        seqs,
        num_tasks,
        default_task,
        solo_task,
        noise,
        tfs_noise,
        requires_grad_noise,
        shape_noise,
        derivatives,
        derivatives_coeffs,
        adaptive_nugget,
    )
    assert (1<=self.alpha).all() and (self.alpha<=4).all()