pasp.program

  1import enum, types
  2
  3import clingo
  4from clingo.symbol import Function
  5
  6def unique_fact(i: int = None) -> str:
  7  """
  8  Creates a new unique fact for probabilistic rules. To do this, we update a counter `unique_fact.i`
  9  in a way equivalent to C's `static` variables.
 10  """
 11  if i is None:
 12    unique_fact.i += 1
 13    return f"__unique_id_{unique_fact.i}"
 14  return f"__unique_id_{i}"
 15unique_fact.i = 0
 16
 17def unique_pgrule_id(gen: bool = True):
 18  if gen:
 19    unique_pgrule_id.i += 1
 20    return unique_pgrule_id.i
 21  return unique_pgrule_id.i
 22unique_pgrule_id.i = -1
 23
 24class ProbFact:
 25  """
 26  A Probabilistic Fact (PF) is a (Logic Program) fact which is "chosen" with some probability.
 27  """
 28
 29  def __init__(self, p: str, f: str, learnable: bool = False):
 30    "Constructs a PF out of a probability `p` and fact `f`."
 31    self.p = float(p)
 32    self.f = f
 33    # Construct a clingo.symbol.Function from this fact.
 34    self.cl_f = clingo.parse_term(f)
 35    self.learnable = learnable
 36
 37  def __str__(self) -> str: return f"{round(self.p, ndigits = 3)}{'?' if self.learnable else ''}::{self.f}"
 38  def __repr__(self) -> str: return self.__str__()
 39
 40class ProbRule:
 41  """
 42  A Probabilistic Rule (PR) is a (Logic Program) rule that (when propositional) may be chosen with
 43  some probability `p`. A non-propositional PR must be grounded first.
 44  """
 45
 46  def __init__(self, p: str, f: str, is_prop: bool = True, unify: str = None, ufact: str = None,
 47               learnable: bool = False, sharing: bool = False):
 48    self.p = p
 49    self.f = f
 50    self.is_prop = is_prop
 51    self.learnable = learnable and (not is_prop)
 52    self.unify = unify
 53    self.sharing = sharing # sharing parameter i.e. parameter tying.
 54    self.prop_pf = ProbFact(p, unique_fact() if ufact is None else ufact,
 55                            learnable = learnable and (sharing or is_prop))
 56    self.prop_f = f"{f}, {self.prop_pf.f}."
 57    self.pf_ids = None
 58
 59  def __str__(self) -> str:
 60    return f"{self.prop_pf.p if self.is_prop else self.p}" \
 61           f"{('*' if self.sharing else '') + ('?' if self.learnable else '')}" \
 62           f"::{self.f}"
 63  def __repr__(self) -> str: return self.__str__()
 64
 65class CredalFact:
 66  """
 67  A Credal Fact (CF) consists of a fact `f` attached to a probability interval `[l, u]`, where `l ∈
 68  [0, 1]` is the lowest probability `f` may attain and `u ≥ l ∈ [0, 1]` is the highest.
 69  """
 70
 71  def __init__(self, l: float, u: float, f: str):
 72    self.l, self.u = float(l), float(u)
 73    self.f = f
 74    self.cl_f = clingo.parse_term(f)
 75
 76  def __getitem__(self, i: bool) -> float: return self.l if False else self.u
 77  def __str__(self) -> str: return f"[{self.l}, {self.u}]::{self.f}"
 78  def __repr__(self) -> str: return self.__str__()
 79
 80def _str_query_assignment(f: Function, t: bool) -> str:
 81  """
 82  String formats a query tuple `(f, t)`, where `f` is an atom and `t` is whether it should appear
 83  in the program or not.
 84  """
 85  return str(f) if t == Query.TERM_POS else ("not " + str(f) if t == Query.TERM_NEG else "undef " + str(f))
 86
 87class AnnotatedDisjunction:
 88  def __init__(self, P: list[float], F: list[str], learnable: bool = False):
 89    self.P = P
 90    self.F = F
 91    self.cl_F = [clingo.parse_term(f) for f in F]
 92    self.learnable = learnable
 93
 94  def __getitem__(self, i: int) -> tuple[float, str]:
 95    return self.P[i], self.F[i]
 96  def __str__(self) -> str:
 97    return "; ".join([f"{(p if (p := round(self.P[i], ndigits = 3)) > 0 else '*')}{'?' if self.learnable else ''}::{self.F[i]}" for i in range(len(self.P))])
 98  def __repr__(self) -> str: return self.__str__()
 99
100try:
101  import torch
102except ModuleNotFoundError:
103  print("PyTorch not found! PyTorch must be installed for neural rules and neural ADs to be used "
104        "in programs.")
105
106class Data:
107  def __init__(self, name: str, arg: str, test, train = None):
108    self.name = name
109    self.arg = arg
110    import pandas
111    if issubclass(type(test), pandas.DataFrame): self.test = torch.tensor(test.to_numpy())
112    else: self.test = test
113    if issubclass(type(train), pandas.DataFrame): self.train = torch.tensor(train.to_numpy())
114    else: self.train = train
115
116    if self.train is not None:
117      assert self.test.shape[1:] == self.train.shape[1:], \
118        "Train and test sets must have same shape (excluding the first dimension)!"
119
120  def __str__(self):
121    if self.train is None: return f"{self.name}({self.arg}) ~ test({self.test.shape})"
122    else: return f"{self.name}({self.arg}) ~ test({self.test.shape}), train({self.train.shape})"
123  def __repr__(self): return self.__str__()
124
125class Neural:
126  def __init__(self, net, data: Data, learnable: bool, rep: str, nvals: int, opt_params: dict,
127               outcomes: list, heads: list, bodies: list, signs: list, name: str):
128    self.net = net
129    self.learnable = learnable
130    self.rep = rep
131    self.data = data
132    self.outcomes = 1 if outcomes is None else len(outcomes)
133    self.nvals = nvals
134
135    self.H = heads
136    self.B = bodies
137    self.S = signs
138    self.name = name
139
140    # Update default opt_params with given params.
141    _opt_params = {"lr": 1., "maximize": True}
142    _opt_params.update(opt_params)
143    optimizer = _opt_params.pop("optim", "SGD")
144    self.opt = getattr(torch.optim, optimizer)(net.parameters(), **_opt_params)
145
146    self.test = torch.cat(tuple(d.test for d in data))
147    self.out = None
148    self.view = None
149    # Derivatives of the logic program to be passed to backwards.
150    self.dw = None
151    # Initialize dw so we can use inference without learning.
152    if self.data[0].train is not None: self.prepare_train(0)
153    # User specified step function.
154    self.step = None
155
156  def __str__(self): return self.rep
157  def __repr__(self): return self.__str__()
158
159  def set_train(self): self.net.train()
160  def set_eval(self): self.net.eval()
161
162  def prepare_train(self, batch: int):
163    "Prepares the output tensor. Should be called *before* learning."
164    dims = self.data[0].train.shape[1:]
165    if self.view is None:
166      self.view = torch.empty(batch*len(self.data), *dims)
167      if self.learnable: self.dw = torch.zeros(self.out_shape(batch))
168    else:
169      T = self.view
170      if (s := T.untyped_storage().size()//(T.element_size()*dims.numel())) < batch:
171        self.view.resize_(batch*len(self.data), *dims)
172        if self.learnable: self.dw.resize_(self.out_shape(batch))
173
174  def out_shape(self, batch: int) -> tuple:
175    "The output tensor shape."
176    raise NotImplementedError("Neural components must override this method accordingly!")
177
178  def pr(self):
179    "Retrieves the probabilities of the neural rule from the test set."
180    with torch.inference_mode():
181      return self.net(self.test).cpu().numpy()
182
183  def forward(self, start: int = 0, end: int = None):
184    "Retrieves the probabilities of the neural rule from the train set."
185    torch.cat(tuple(data.train[start:end] for data in self.data), out=self.view)
186    if self.learnable:
187      self.out = self.net(self.view)
188      return self.out.data.cpu().numpy()
189    with torch.inference_mode():
190      return self.net(self.view).data.cpu().numpy()
191
192  def backward(self):
193    """ Performs backpropagation and runs the optimizer step.
194    Argument `dl` is the derivative of the program as a `numpy.ndarray`.
195    """
196    self.out.backward(self.dw[:len(self.out)])
197    self.opt.step()
198    self.opt.zero_grad()
199    if self.step is not None: self.step()
200
201  def set_step_callback(self, f): self.step = types.MethodType(f, self)
202
203  def ntest(self): return self.data[0].test.shape[0]
204  def ntrain(self): return self.data[0].train.shape[0] if self.learnable else 0
205
206class NeuralRule(Neural):
207  def __init__(self, heads: list, bodies: list, signs: list, name: str, net, rep: str, data: list,
208               learnable: bool, params: dict, outcomes: list):
209    super().__init__(net, data, learnable, rep, 1, params, outcomes, heads, bodies, signs, name)
210    # Heads and bodies must be numpy.uint64 values representing _rep, not Symbols.
211
212    # Validate net during parsing so that it won't blow up in our faces during inference or learning.
213    p = self.pr()
214    assert p.ndim == 2, \
215           "Networks embedded onto neural rules must output a single probability!"
216
217  def out_shape(self, batch: int) -> tuple:
218    return (batch*len(self.data), self.outcomes)
219
220class NeuralAD(Neural):
221  def __init__(self, heads: list, bodies: list, signs: list, name: str, vals: list, net, rep: str, \
222               data: list, learnable: bool, params: dict, outcomes: list, heads_str: list):
223    super().__init__(net, data, learnable, rep, len(vals), params, outcomes, heads, bodies, signs,
224                     name)
225    self.vals  = vals
226    self.heads_str = heads_str
227
228    # Validate net during parsing so that it won't blow up in our faces during inference or learning.
229    p = self.pr()
230    assert p.ndim == 2, \
231           "Networks embedded onto neural rules must output a 1D probability tensor!"
232
233  def out_shape(self, batch: int):
234    return (batch*len(self.data)*self.outcomes, self.nvals)
235
236class Semantics(enum.IntEnum):
237  STABLE = 0
238  PARTIAL = 1
239  LSTABLE = 2
240  SMPROBLOG = 3
241
242class Query:
243  """
244  A query is a meta-command within a PLP to signal the solver to produce and output a probabilistic
245  query. A query follows a modified PASOCS [1] syntax, that is, the query (not necessarily in this
246  order)
247
248  ```
249  #query(q1; ...; qk; not p1; ...; not pm | e1; ...; en; not v1; ...; not vt)
250  ```
251
252  of ground atoms `q1, ..., qk`, `p1, ..., pm`, `e1, ..., en`, `v1, ..., vt` is equivalent to asking
253  the probability
254
255  ```
256  P({q1, ..., qk} = true, {p1, ..., pm} = false | {e1, ..., en} = true, {v1, ..., vt} = false).
257  ```
258
259  See concrete examples in the `/examples` folder.
260
261  [1] - PASOCS: A Parallel Approximate Solver for Probabilistic Logic Programs under the Credal
262  Semantics. Tuckey et al, 2021. URL: https://arxiv.org/abs/2105.10908.
263  """
264
265  TERM_NEG = 0
266  TERM_POS = 1
267  TERM_UND = 2
268
269  def __init__(self, Q: iter = [], E: iter = [], semantics: Semantics = Semantics.STABLE):
270    """
271    Constructs a query from query (`Q`) and evidence (`E`) assignments.
272
273    We use the notation `iter` as a type hint to mean `Q` and `E` are iterables.
274    """
275    self.Q = [Query.parse_term(q, semantics) for q in Q]
276    self.E = [Query.parse_term(e, semantics) for e in E]
277
278  @staticmethod
279  def parse_term(u: str, s: Semantics):
280    if u.startswith("not "): t, n = u[4:], Query.TERM_NEG
281    elif u.startswith("undef "): t, n = u[6:], Query.TERM_UND
282    else: t, n = u, Query.TERM_POS
283    return clingo.parse_term(t), n, None if s == Semantics.STABLE else clingo.parse_term(f"_{t}")
284
285  @staticmethod
286  def parse_rep(u: int, s: bool, sem: Semantics):
287    t = clingo.Symbol(u)
288    return t, s, None if sem == Semantics.STABLE else clingo.parse_term(f"_{str(t)}")
289
290  def __str__(self) -> str:
291    qs = f"ℙ({', '.join(_str_query_assignment(q, t) for q, t, _ in self.Q)}"
292    if len(self.E) != 0: return qs + f" | {', '.join(_str_query_assignment(e, t) for e, t, _ in self.E)})"
293    return qs + ")"
294  def __repr__(self) -> str: return self.__str__()
295
296class VarQuery:
297  def __init__(self, ground_id: int, Q: iter, E: iter = [], semantics: Semantics = Semantics.STABLE):
298    self.Q, self.E = [None for _ in range(len(Q))], [None for _ in range(len(E))]
299    self.Q_s, self.E_s = [None for _ in range(len(Q))], [None for _ in range(len(E))]
300    for i in range(len(Q)): self.Q[i], self.Q_s[i] = VarQuery.parse_term(Q[i])
301    for i in range(len(E)): self.E[i], self.E_s[i] = VarQuery.parse_term(E[i])
302    self.sem = semantics
303    qr, ev = ', '.join(self.Q), (', ' + ', '.join(self.E)) if len(self.E) else ''
304    self.gr_rule = f"__gquery(@grquery({ground_id}, {qr}{ev})) :- {qr}{ev}."
305    self.ground_queries = None
306
307  def parse_term(u: iter) -> list:
308    if u.startswith("not "): return u[4:], Query.TERM_NEG
309    elif u.startswith("undef "): return u[6:], Query.TERM_UND
310    return u, Query.TERM_POS
311
312  def to_ground(self, reps: tuple, P):
313    n, m = len(self.Q), len(self.E)
314    k = len(reps)//(n+m)
315    queries = [Query() for _ in range(k)]
316    for i in range(k):
317      u = i*(n+m)
318      queries[i].Q = [Query.parse_rep(reps[u+j], self.Q_s[j], self.sem) for j in range(n)]
319      queries[i].E = [Query.parse_rep(reps[u+n+j], self.E_s[j], self.sem) for j in range(m)]
320    P.Q.extend(queries)
321
322  def __str__(self) -> str:
323    qs = f"ℙ({', '.join(q for q in self.Q)}"
324    if len(self.E) != 0: return qs + f" | {', '.join(e for e in self.E)})"
325    return qs + ")"
326  def __repr__(self) -> str: return self.__str__()
327
328class Program:
329  """
330  A Probabilistic Logic Program (PLP) usually configures a triple `<P,PF,CF>`, where `P` is a logic
331  program, `PF` are probabilistic facts and `CF` are credal facts. We extend a PLP into a tuple
332  `<P,PF,CF,Q>`, where `Q` are the queries to be asked from `P`, `PF` and `CF`.
333
334  We accept ProbLog's syntactic sugar for probabilistic rules,
335
336  ```
337  p::h(X) :- b1(X), b2(X), ..., bn(X).
338  ```
339
340  meaning that if `b1(X), b2(X), ..., bn(X)` is true, `h(X)` is added with probability `p`. This is
341  equivalent to
342
343  ```
344  p::a.
345  h(X) :- b1(X), b2(X), ..., bn(X), a.
346  ```
347
348  where `a` is a unique probabilistic fact added with probability `p`.
349  """
350
351  def __init__(self, P: str, PF: list[ProbFact], PR: list[ProbRule], Q: list[Query], \
352               VQ: list[VarQuery], CF: list[CredalFact], AD: list[AnnotatedDisjunction], \
353               NR: list[NeuralRule], NA: list[NeuralAD], semantics: Semantics = Semantics.STABLE, \
354               stable_p = None, directives: list = None):
355    """
356    Constructs a PLP out of a logic program `P`, probabilistic facts `PF`, credal facts `CF` and
357    queries `Q`.
358    """
359    self.P = P
360    self.PF = PF
361    self.PR = PR
362    self.Q = Q
363    self.VQ = VQ
364    self.CF = CF
365    self.AD = AD
366    self.NR = NR
367    self.NA = NA
368
369    # Number of instances in data.
370    self.m_test = 0
371    self.m_train = 0
372    if (len(NR) > 0) or (len(NA) > 0):
373      self.m_test  = NR[0].ntest() if len(NR) > 0 else NA[0].ntest()
374      for nr in (NR + NA):
375        if nr.learnable:
376          self.m_train = nr.ntrain()
377          break
378
379    self.gr_P = ""
380    self.is_ground = False
381
382    self.semantics = semantics
383    self.stable = stable_p
384
385    self.directives = directives
386
387  def train(self):
388    for N in self.NR:
389      if N.learnable: N.set_train()
390    for N in self.NA:
391      if N.learnable: N.set_train()
392
393  def eval(self):
394    for N in self.NR:
395      if N.learnable: N.set_eval()
396    for N in self.NA:
397      if N.learnable: N.set_eval()
398
399  @staticmethod
400  def str_if_contains(s: str, L):
401    return f"\n{s}:\n{L}," if len(L) > 0 else ""
402
403  def __str__(self) -> str:
404    return f"<Logic Program:\n{self.P}," + \
405           self.str_if_contains("Probabilistic Facts", self.PF) + \
406           self.str_if_contains("Credal Facts", self.CF) + \
407           self.str_if_contains("Annotated Disjunctions", self.AD) + \
408           self.str_if_contains("Probabilistic Rules", self.PR) + \
409           self.str_if_contains("Neural Rules", self.NR) + \
410           self.str_if_contains("Neural Annotated Disjunctions", self.NA) + \
411           self.str_if_contains("Variable Queries", self.VQ) + \
412           f"\nQueries:\n{self.Q}>"
413  def __repr__(self) -> str: return self.__str__()
414
415  def __call__(self, **kwargs):
416    if self.directives is not None:
417      if "learn" in self.directives:
418        f, A = self.directives["learn"]
419        D = f()
420        from .wlearn import learn
421        if isinstance(D, tuple): learn(self, *D, **A)
422        else: learn(self, D, **A)
423    if len(self.Q) + len(self.VQ) > 0:
424      from exact import exact
425      from approx import aseo
426      A = {"quiet": False, "status": True}
427      A.update(kwargs)
428      # TODO: implement additional semantics for ASEO and remove the exact exception below.
429      if ("psemantics" in self.directives) and (self.directives["inference"][0] == "exact"):
430        A.update(self.directives["psemantics"])
431      f = vars()[self.directives["inference"][0]]
432      return f(self, *self.directives["inference"][1], **A)
def unique_fact(i: int = None) -> str:
 7def unique_fact(i: int = None) -> str:
 8  """
 9  Creates a new unique fact for probabilistic rules. To do this, we update a counter `unique_fact.i`
10  in a way equivalent to C's `static` variables.
11  """
12  if i is None:
13    unique_fact.i += 1
14    return f"__unique_id_{unique_fact.i}"
15  return f"__unique_id_{i}"

Creates a new unique fact for probabilistic rules. To do this, we update a counter unique_fact.i in a way equivalent to C's static variables.

def unique_pgrule_id(gen: bool = True):
18def unique_pgrule_id(gen: bool = True):
19  if gen:
20    unique_pgrule_id.i += 1
21    return unique_pgrule_id.i
22  return unique_pgrule_id.i
class ProbFact:
25class ProbFact:
26  """
27  A Probabilistic Fact (PF) is a (Logic Program) fact which is "chosen" with some probability.
28  """
29
30  def __init__(self, p: str, f: str, learnable: bool = False):
31    "Constructs a PF out of a probability `p` and fact `f`."
32    self.p = float(p)
33    self.f = f
34    # Construct a clingo.symbol.Function from this fact.
35    self.cl_f = clingo.parse_term(f)
36    self.learnable = learnable
37
38  def __str__(self) -> str: return f"{round(self.p, ndigits = 3)}{'?' if self.learnable else ''}::{self.f}"
39  def __repr__(self) -> str: return self.__str__()

A Probabilistic Fact (PF) is a (Logic Program) fact which is "chosen" with some probability.

ProbFact(p: str, f: str, learnable: bool = False)
30  def __init__(self, p: str, f: str, learnable: bool = False):
31    "Constructs a PF out of a probability `p` and fact `f`."
32    self.p = float(p)
33    self.f = f
34    # Construct a clingo.symbol.Function from this fact.
35    self.cl_f = clingo.parse_term(f)
36    self.learnable = learnable

Constructs a PF out of a probability p and fact f.

p
f
cl_f
learnable
class ProbRule:
41class ProbRule:
42  """
43  A Probabilistic Rule (PR) is a (Logic Program) rule that (when propositional) may be chosen with
44  some probability `p`. A non-propositional PR must be grounded first.
45  """
46
47  def __init__(self, p: str, f: str, is_prop: bool = True, unify: str = None, ufact: str = None,
48               learnable: bool = False, sharing: bool = False):
49    self.p = p
50    self.f = f
51    self.is_prop = is_prop
52    self.learnable = learnable and (not is_prop)
53    self.unify = unify
54    self.sharing = sharing # sharing parameter i.e. parameter tying.
55    self.prop_pf = ProbFact(p, unique_fact() if ufact is None else ufact,
56                            learnable = learnable and (sharing or is_prop))
57    self.prop_f = f"{f}, {self.prop_pf.f}."
58    self.pf_ids = None
59
60  def __str__(self) -> str:
61    return f"{self.prop_pf.p if self.is_prop else self.p}" \
62           f"{('*' if self.sharing else '') + ('?' if self.learnable else '')}" \
63           f"::{self.f}"
64  def __repr__(self) -> str: return self.__str__()

A Probabilistic Rule (PR) is a (Logic Program) rule that (when propositional) may be chosen with some probability p. A non-propositional PR must be grounded first.

ProbRule( p: str, f: str, is_prop: bool = True, unify: str = None, ufact: str = None, learnable: bool = False, sharing: bool = False)
47  def __init__(self, p: str, f: str, is_prop: bool = True, unify: str = None, ufact: str = None,
48               learnable: bool = False, sharing: bool = False):
49    self.p = p
50    self.f = f
51    self.is_prop = is_prop
52    self.learnable = learnable and (not is_prop)
53    self.unify = unify
54    self.sharing = sharing # sharing parameter i.e. parameter tying.
55    self.prop_pf = ProbFact(p, unique_fact() if ufact is None else ufact,
56                            learnable = learnable and (sharing or is_prop))
57    self.prop_f = f"{f}, {self.prop_pf.f}."
58    self.pf_ids = None
p
f
is_prop
learnable
unify
sharing
prop_pf
prop_f
pf_ids
class CredalFact:
66class CredalFact:
67  """
68  A Credal Fact (CF) consists of a fact `f` attached to a probability interval `[l, u]`, where `l ∈
69  [0, 1]` is the lowest probability `f` may attain and `u ≥ l ∈ [0, 1]` is the highest.
70  """
71
72  def __init__(self, l: float, u: float, f: str):
73    self.l, self.u = float(l), float(u)
74    self.f = f
75    self.cl_f = clingo.parse_term(f)
76
77  def __getitem__(self, i: bool) -> float: return self.l if False else self.u
78  def __str__(self) -> str: return f"[{self.l}, {self.u}]::{self.f}"
79  def __repr__(self) -> str: return self.__str__()

A Credal Fact (CF) consists of a fact f attached to a probability interval [l, u], where l ∈ [0, 1] is the lowest probability f may attain and u ≥ l ∈ [0, 1] is the highest.

CredalFact(l: float, u: float, f: str)
72  def __init__(self, l: float, u: float, f: str):
73    self.l, self.u = float(l), float(u)
74    self.f = f
75    self.cl_f = clingo.parse_term(f)
f
cl_f
class AnnotatedDisjunction:
88class AnnotatedDisjunction:
89  def __init__(self, P: list[float], F: list[str], learnable: bool = False):
90    self.P = P
91    self.F = F
92    self.cl_F = [clingo.parse_term(f) for f in F]
93    self.learnable = learnable
94
95  def __getitem__(self, i: int) -> tuple[float, str]:
96    return self.P[i], self.F[i]
97  def __str__(self) -> str:
98    return "; ".join([f"{(p if (p := round(self.P[i], ndigits = 3)) > 0 else '*')}{'?' if self.learnable else ''}::{self.F[i]}" for i in range(len(self.P))])
99  def __repr__(self) -> str: return self.__str__()
AnnotatedDisjunction(P: list[float], F: list[str], learnable: bool = False)
89  def __init__(self, P: list[float], F: list[str], learnable: bool = False):
90    self.P = P
91    self.F = F
92    self.cl_F = [clingo.parse_term(f) for f in F]
93    self.learnable = learnable
P
F
cl_F
learnable
class Data:
107class Data:
108  def __init__(self, name: str, arg: str, test, train = None):
109    self.name = name
110    self.arg = arg
111    import pandas
112    if issubclass(type(test), pandas.DataFrame): self.test = torch.tensor(test.to_numpy())
113    else: self.test = test
114    if issubclass(type(train), pandas.DataFrame): self.train = torch.tensor(train.to_numpy())
115    else: self.train = train
116
117    if self.train is not None:
118      assert self.test.shape[1:] == self.train.shape[1:], \
119        "Train and test sets must have same shape (excluding the first dimension)!"
120
121  def __str__(self):
122    if self.train is None: return f"{self.name}({self.arg}) ~ test({self.test.shape})"
123    else: return f"{self.name}({self.arg}) ~ test({self.test.shape}), train({self.train.shape})"
124  def __repr__(self): return self.__str__()
Data(name: str, arg: str, test, train=None)
108  def __init__(self, name: str, arg: str, test, train = None):
109    self.name = name
110    self.arg = arg
111    import pandas
112    if issubclass(type(test), pandas.DataFrame): self.test = torch.tensor(test.to_numpy())
113    else: self.test = test
114    if issubclass(type(train), pandas.DataFrame): self.train = torch.tensor(train.to_numpy())
115    else: self.train = train
116
117    if self.train is not None:
118      assert self.test.shape[1:] == self.train.shape[1:], \
119        "Train and test sets must have same shape (excluding the first dimension)!"
name
arg
class Neural:
126class Neural:
127  def __init__(self, net, data: Data, learnable: bool, rep: str, nvals: int, opt_params: dict,
128               outcomes: list, heads: list, bodies: list, signs: list, name: str):
129    self.net = net
130    self.learnable = learnable
131    self.rep = rep
132    self.data = data
133    self.outcomes = 1 if outcomes is None else len(outcomes)
134    self.nvals = nvals
135
136    self.H = heads
137    self.B = bodies
138    self.S = signs
139    self.name = name
140
141    # Update default opt_params with given params.
142    _opt_params = {"lr": 1., "maximize": True}
143    _opt_params.update(opt_params)
144    optimizer = _opt_params.pop("optim", "SGD")
145    self.opt = getattr(torch.optim, optimizer)(net.parameters(), **_opt_params)
146
147    self.test = torch.cat(tuple(d.test for d in data))
148    self.out = None
149    self.view = None
150    # Derivatives of the logic program to be passed to backwards.
151    self.dw = None
152    # Initialize dw so we can use inference without learning.
153    if self.data[0].train is not None: self.prepare_train(0)
154    # User specified step function.
155    self.step = None
156
157  def __str__(self): return self.rep
158  def __repr__(self): return self.__str__()
159
160  def set_train(self): self.net.train()
161  def set_eval(self): self.net.eval()
162
163  def prepare_train(self, batch: int):
164    "Prepares the output tensor. Should be called *before* learning."
165    dims = self.data[0].train.shape[1:]
166    if self.view is None:
167      self.view = torch.empty(batch*len(self.data), *dims)
168      if self.learnable: self.dw = torch.zeros(self.out_shape(batch))
169    else:
170      T = self.view
171      if (s := T.untyped_storage().size()//(T.element_size()*dims.numel())) < batch:
172        self.view.resize_(batch*len(self.data), *dims)
173        if self.learnable: self.dw.resize_(self.out_shape(batch))
174
175  def out_shape(self, batch: int) -> tuple:
176    "The output tensor shape."
177    raise NotImplementedError("Neural components must override this method accordingly!")
178
179  def pr(self):
180    "Retrieves the probabilities of the neural rule from the test set."
181    with torch.inference_mode():
182      return self.net(self.test).cpu().numpy()
183
184  def forward(self, start: int = 0, end: int = None):
185    "Retrieves the probabilities of the neural rule from the train set."
186    torch.cat(tuple(data.train[start:end] for data in self.data), out=self.view)
187    if self.learnable:
188      self.out = self.net(self.view)
189      return self.out.data.cpu().numpy()
190    with torch.inference_mode():
191      return self.net(self.view).data.cpu().numpy()
192
193  def backward(self):
194    """ Performs backpropagation and runs the optimizer step.
195    Argument `dl` is the derivative of the program as a `numpy.ndarray`.
196    """
197    self.out.backward(self.dw[:len(self.out)])
198    self.opt.step()
199    self.opt.zero_grad()
200    if self.step is not None: self.step()
201
202  def set_step_callback(self, f): self.step = types.MethodType(f, self)
203
204  def ntest(self): return self.data[0].test.shape[0]
205  def ntrain(self): return self.data[0].train.shape[0] if self.learnable else 0
Neural( net, data: Data, learnable: bool, rep: str, nvals: int, opt_params: dict, outcomes: list, heads: list, bodies: list, signs: list, name: str)
127  def __init__(self, net, data: Data, learnable: bool, rep: str, nvals: int, opt_params: dict,
128               outcomes: list, heads: list, bodies: list, signs: list, name: str):
129    self.net = net
130    self.learnable = learnable
131    self.rep = rep
132    self.data = data
133    self.outcomes = 1 if outcomes is None else len(outcomes)
134    self.nvals = nvals
135
136    self.H = heads
137    self.B = bodies
138    self.S = signs
139    self.name = name
140
141    # Update default opt_params with given params.
142    _opt_params = {"lr": 1., "maximize": True}
143    _opt_params.update(opt_params)
144    optimizer = _opt_params.pop("optim", "SGD")
145    self.opt = getattr(torch.optim, optimizer)(net.parameters(), **_opt_params)
146
147    self.test = torch.cat(tuple(d.test for d in data))
148    self.out = None
149    self.view = None
150    # Derivatives of the logic program to be passed to backwards.
151    self.dw = None
152    # Initialize dw so we can use inference without learning.
153    if self.data[0].train is not None: self.prepare_train(0)
154    # User specified step function.
155    self.step = None
net
learnable
rep
data
outcomes
nvals
H
B
S
name
opt
test
out
view
dw
step
def set_train(self):
160  def set_train(self): self.net.train()
def set_eval(self):
161  def set_eval(self): self.net.eval()
def prepare_train(self, batch: int):
163  def prepare_train(self, batch: int):
164    "Prepares the output tensor. Should be called *before* learning."
165    dims = self.data[0].train.shape[1:]
166    if self.view is None:
167      self.view = torch.empty(batch*len(self.data), *dims)
168      if self.learnable: self.dw = torch.zeros(self.out_shape(batch))
169    else:
170      T = self.view
171      if (s := T.untyped_storage().size()//(T.element_size()*dims.numel())) < batch:
172        self.view.resize_(batch*len(self.data), *dims)
173        if self.learnable: self.dw.resize_(self.out_shape(batch))

Prepares the output tensor. Should be called before learning.

def out_shape(self, batch: int) -> tuple:
175  def out_shape(self, batch: int) -> tuple:
176    "The output tensor shape."
177    raise NotImplementedError("Neural components must override this method accordingly!")

The output tensor shape.

def pr(self):
179  def pr(self):
180    "Retrieves the probabilities of the neural rule from the test set."
181    with torch.inference_mode():
182      return self.net(self.test).cpu().numpy()

Retrieves the probabilities of the neural rule from the test set.

def forward(self, start: int = 0, end: int = None):
184  def forward(self, start: int = 0, end: int = None):
185    "Retrieves the probabilities of the neural rule from the train set."
186    torch.cat(tuple(data.train[start:end] for data in self.data), out=self.view)
187    if self.learnable:
188      self.out = self.net(self.view)
189      return self.out.data.cpu().numpy()
190    with torch.inference_mode():
191      return self.net(self.view).data.cpu().numpy()

Retrieves the probabilities of the neural rule from the train set.

def backward(self):
193  def backward(self):
194    """ Performs backpropagation and runs the optimizer step.
195    Argument `dl` is the derivative of the program as a `numpy.ndarray`.
196    """
197    self.out.backward(self.dw[:len(self.out)])
198    self.opt.step()
199    self.opt.zero_grad()
200    if self.step is not None: self.step()

Performs backpropagation and runs the optimizer step. Argument dl is the derivative of the program as a numpy.ndarray.

def set_step_callback(self, f):
202  def set_step_callback(self, f): self.step = types.MethodType(f, self)
def ntest(self):
204  def ntest(self): return self.data[0].test.shape[0]
def ntrain(self):
205  def ntrain(self): return self.data[0].train.shape[0] if self.learnable else 0
class NeuralRule(Neural):
207class NeuralRule(Neural):
208  def __init__(self, heads: list, bodies: list, signs: list, name: str, net, rep: str, data: list,
209               learnable: bool, params: dict, outcomes: list):
210    super().__init__(net, data, learnable, rep, 1, params, outcomes, heads, bodies, signs, name)
211    # Heads and bodies must be numpy.uint64 values representing _rep, not Symbols.
212
213    # Validate net during parsing so that it won't blow up in our faces during inference or learning.
214    p = self.pr()
215    assert p.ndim == 2, \
216           "Networks embedded onto neural rules must output a single probability!"
217
218  def out_shape(self, batch: int) -> tuple:
219    return (batch*len(self.data), self.outcomes)
NeuralRule( heads: list, bodies: list, signs: list, name: str, net, rep: str, data: list, learnable: bool, params: dict, outcomes: list)
208  def __init__(self, heads: list, bodies: list, signs: list, name: str, net, rep: str, data: list,
209               learnable: bool, params: dict, outcomes: list):
210    super().__init__(net, data, learnable, rep, 1, params, outcomes, heads, bodies, signs, name)
211    # Heads and bodies must be numpy.uint64 values representing _rep, not Symbols.
212
213    # Validate net during parsing so that it won't blow up in our faces during inference or learning.
214    p = self.pr()
215    assert p.ndim == 2, \
216           "Networks embedded onto neural rules must output a single probability!"
def out_shape(self, batch: int) -> tuple:
218  def out_shape(self, batch: int) -> tuple:
219    return (batch*len(self.data), self.outcomes)

The output tensor shape.

class NeuralAD(Neural):
221class NeuralAD(Neural):
222  def __init__(self, heads: list, bodies: list, signs: list, name: str, vals: list, net, rep: str, \
223               data: list, learnable: bool, params: dict, outcomes: list, heads_str: list):
224    super().__init__(net, data, learnable, rep, len(vals), params, outcomes, heads, bodies, signs,
225                     name)
226    self.vals  = vals
227    self.heads_str = heads_str
228
229    # Validate net during parsing so that it won't blow up in our faces during inference or learning.
230    p = self.pr()
231    assert p.ndim == 2, \
232           "Networks embedded onto neural rules must output a 1D probability tensor!"
233
234  def out_shape(self, batch: int):
235    return (batch*len(self.data)*self.outcomes, self.nvals)
NeuralAD( heads: list, bodies: list, signs: list, name: str, vals: list, net, rep: str, data: list, learnable: bool, params: dict, outcomes: list, heads_str: list)
222  def __init__(self, heads: list, bodies: list, signs: list, name: str, vals: list, net, rep: str, \
223               data: list, learnable: bool, params: dict, outcomes: list, heads_str: list):
224    super().__init__(net, data, learnable, rep, len(vals), params, outcomes, heads, bodies, signs,
225                     name)
226    self.vals  = vals
227    self.heads_str = heads_str
228
229    # Validate net during parsing so that it won't blow up in our faces during inference or learning.
230    p = self.pr()
231    assert p.ndim == 2, \
232           "Networks embedded onto neural rules must output a 1D probability tensor!"
vals
heads_str
def out_shape(self, batch: int):
234  def out_shape(self, batch: int):
235    return (batch*len(self.data)*self.outcomes, self.nvals)

The output tensor shape.

class Semantics(enum.IntEnum):
237class Semantics(enum.IntEnum):
238  STABLE = 0
239  PARTIAL = 1
240  LSTABLE = 2
241  SMPROBLOG = 3
STABLE = <Semantics.STABLE: 0>
PARTIAL = <Semantics.PARTIAL: 1>
LSTABLE = <Semantics.LSTABLE: 2>
SMPROBLOG = <Semantics.SMPROBLOG: 3>
class Query:
243class Query:
244  """
245  A query is a meta-command within a PLP to signal the solver to produce and output a probabilistic
246  query. A query follows a modified PASOCS [1] syntax, that is, the query (not necessarily in this
247  order)
248
249  ```
250  #query(q1; ...; qk; not p1; ...; not pm | e1; ...; en; not v1; ...; not vt)
251  ```
252
253  of ground atoms `q1, ..., qk`, `p1, ..., pm`, `e1, ..., en`, `v1, ..., vt` is equivalent to asking
254  the probability
255
256  ```
257  P({q1, ..., qk} = true, {p1, ..., pm} = false | {e1, ..., en} = true, {v1, ..., vt} = false).
258  ```
259
260  See concrete examples in the `/examples` folder.
261
262  [1] - PASOCS: A Parallel Approximate Solver for Probabilistic Logic Programs under the Credal
263  Semantics. Tuckey et al, 2021. URL: https://arxiv.org/abs/2105.10908.
264  """
265
266  TERM_NEG = 0
267  TERM_POS = 1
268  TERM_UND = 2
269
270  def __init__(self, Q: iter = [], E: iter = [], semantics: Semantics = Semantics.STABLE):
271    """
272    Constructs a query from query (`Q`) and evidence (`E`) assignments.
273
274    We use the notation `iter` as a type hint to mean `Q` and `E` are iterables.
275    """
276    self.Q = [Query.parse_term(q, semantics) for q in Q]
277    self.E = [Query.parse_term(e, semantics) for e in E]
278
279  @staticmethod
280  def parse_term(u: str, s: Semantics):
281    if u.startswith("not "): t, n = u[4:], Query.TERM_NEG
282    elif u.startswith("undef "): t, n = u[6:], Query.TERM_UND
283    else: t, n = u, Query.TERM_POS
284    return clingo.parse_term(t), n, None if s == Semantics.STABLE else clingo.parse_term(f"_{t}")
285
286  @staticmethod
287  def parse_rep(u: int, s: bool, sem: Semantics):
288    t = clingo.Symbol(u)
289    return t, s, None if sem == Semantics.STABLE else clingo.parse_term(f"_{str(t)}")
290
291  def __str__(self) -> str:
292    qs = f"ℙ({', '.join(_str_query_assignment(q, t) for q, t, _ in self.Q)}"
293    if len(self.E) != 0: return qs + f" | {', '.join(_str_query_assignment(e, t) for e, t, _ in self.E)})"
294    return qs + ")"
295  def __repr__(self) -> str: return self.__str__()

A query is a meta-command within a PLP to signal the solver to produce and output a probabilistic query. A query follows a modified PASOCS [1] syntax, that is, the query (not necessarily in this order)

#query(q1; ...; qk; not p1; ...; not pm | e1; ...; en; not v1; ...; not vt)

of ground atoms q1, ..., qk, p1, ..., pm, e1, ..., en, v1, ..., vt is equivalent to asking the probability

P({q1, ..., qk} = true, {p1, ..., pm} = false | {e1, ..., en} = true, {v1, ..., vt} = false).

See concrete examples in the /examples folder.

[1] - PASOCS: A Parallel Approximate Solver for Probabilistic Logic Programs under the Credal Semantics. Tuckey et al, 2021. URL: https://arxiv.org/abs/2105.10908.

Query( Q: <built-in function iter> = [], E: <built-in function iter> = [], semantics: Semantics = <Semantics.STABLE: 0>)
270  def __init__(self, Q: iter = [], E: iter = [], semantics: Semantics = Semantics.STABLE):
271    """
272    Constructs a query from query (`Q`) and evidence (`E`) assignments.
273
274    We use the notation `iter` as a type hint to mean `Q` and `E` are iterables.
275    """
276    self.Q = [Query.parse_term(q, semantics) for q in Q]
277    self.E = [Query.parse_term(e, semantics) for e in E]

Constructs a query from query (Q) and evidence (E) assignments.

We use the notation iter as a type hint to mean Q and E are iterables.

TERM_NEG = 0
TERM_POS = 1
TERM_UND = 2
Q
E
@staticmethod
def parse_term(u: str, s: Semantics):
279  @staticmethod
280  def parse_term(u: str, s: Semantics):
281    if u.startswith("not "): t, n = u[4:], Query.TERM_NEG
282    elif u.startswith("undef "): t, n = u[6:], Query.TERM_UND
283    else: t, n = u, Query.TERM_POS
284    return clingo.parse_term(t), n, None if s == Semantics.STABLE else clingo.parse_term(f"_{t}")
@staticmethod
def parse_rep(u: int, s: bool, sem: Semantics):
286  @staticmethod
287  def parse_rep(u: int, s: bool, sem: Semantics):
288    t = clingo.Symbol(u)
289    return t, s, None if sem == Semantics.STABLE else clingo.parse_term(f"_{str(t)}")
class VarQuery:
297class VarQuery:
298  def __init__(self, ground_id: int, Q: iter, E: iter = [], semantics: Semantics = Semantics.STABLE):
299    self.Q, self.E = [None for _ in range(len(Q))], [None for _ in range(len(E))]
300    self.Q_s, self.E_s = [None for _ in range(len(Q))], [None for _ in range(len(E))]
301    for i in range(len(Q)): self.Q[i], self.Q_s[i] = VarQuery.parse_term(Q[i])
302    for i in range(len(E)): self.E[i], self.E_s[i] = VarQuery.parse_term(E[i])
303    self.sem = semantics
304    qr, ev = ', '.join(self.Q), (', ' + ', '.join(self.E)) if len(self.E) else ''
305    self.gr_rule = f"__gquery(@grquery({ground_id}, {qr}{ev})) :- {qr}{ev}."
306    self.ground_queries = None
307
308  def parse_term(u: iter) -> list:
309    if u.startswith("not "): return u[4:], Query.TERM_NEG
310    elif u.startswith("undef "): return u[6:], Query.TERM_UND
311    return u, Query.TERM_POS
312
313  def to_ground(self, reps: tuple, P):
314    n, m = len(self.Q), len(self.E)
315    k = len(reps)//(n+m)
316    queries = [Query() for _ in range(k)]
317    for i in range(k):
318      u = i*(n+m)
319      queries[i].Q = [Query.parse_rep(reps[u+j], self.Q_s[j], self.sem) for j in range(n)]
320      queries[i].E = [Query.parse_rep(reps[u+n+j], self.E_s[j], self.sem) for j in range(m)]
321    P.Q.extend(queries)
322
323  def __str__(self) -> str:
324    qs = f"ℙ({', '.join(q for q in self.Q)}"
325    if len(self.E) != 0: return qs + f" | {', '.join(e for e in self.E)})"
326    return qs + ")"
327  def __repr__(self) -> str: return self.__str__()
VarQuery( ground_id: int, Q: <built-in function iter>, E: <built-in function iter> = [], semantics: Semantics = <Semantics.STABLE: 0>)
298  def __init__(self, ground_id: int, Q: iter, E: iter = [], semantics: Semantics = Semantics.STABLE):
299    self.Q, self.E = [None for _ in range(len(Q))], [None for _ in range(len(E))]
300    self.Q_s, self.E_s = [None for _ in range(len(Q))], [None for _ in range(len(E))]
301    for i in range(len(Q)): self.Q[i], self.Q_s[i] = VarQuery.parse_term(Q[i])
302    for i in range(len(E)): self.E[i], self.E_s[i] = VarQuery.parse_term(E[i])
303    self.sem = semantics
304    qr, ev = ', '.join(self.Q), (', ' + ', '.join(self.E)) if len(self.E) else ''
305    self.gr_rule = f"__gquery(@grquery({ground_id}, {qr}{ev})) :- {qr}{ev}."
306    self.ground_queries = None
sem
gr_rule
ground_queries
def parse_term(u: <built-in function iter>) -> list:
308  def parse_term(u: iter) -> list:
309    if u.startswith("not "): return u[4:], Query.TERM_NEG
310    elif u.startswith("undef "): return u[6:], Query.TERM_UND
311    return u, Query.TERM_POS
def to_ground(self, reps: tuple, P):
313  def to_ground(self, reps: tuple, P):
314    n, m = len(self.Q), len(self.E)
315    k = len(reps)//(n+m)
316    queries = [Query() for _ in range(k)]
317    for i in range(k):
318      u = i*(n+m)
319      queries[i].Q = [Query.parse_rep(reps[u+j], self.Q_s[j], self.sem) for j in range(n)]
320      queries[i].E = [Query.parse_rep(reps[u+n+j], self.E_s[j], self.sem) for j in range(m)]
321    P.Q.extend(queries)
class Program:
329class Program:
330  """
331  A Probabilistic Logic Program (PLP) usually configures a triple `<P,PF,CF>`, where `P` is a logic
332  program, `PF` are probabilistic facts and `CF` are credal facts. We extend a PLP into a tuple
333  `<P,PF,CF,Q>`, where `Q` are the queries to be asked from `P`, `PF` and `CF`.
334
335  We accept ProbLog's syntactic sugar for probabilistic rules,
336
337  ```
338  p::h(X) :- b1(X), b2(X), ..., bn(X).
339  ```
340
341  meaning that if `b1(X), b2(X), ..., bn(X)` is true, `h(X)` is added with probability `p`. This is
342  equivalent to
343
344  ```
345  p::a.
346  h(X) :- b1(X), b2(X), ..., bn(X), a.
347  ```
348
349  where `a` is a unique probabilistic fact added with probability `p`.
350  """
351
352  def __init__(self, P: str, PF: list[ProbFact], PR: list[ProbRule], Q: list[Query], \
353               VQ: list[VarQuery], CF: list[CredalFact], AD: list[AnnotatedDisjunction], \
354               NR: list[NeuralRule], NA: list[NeuralAD], semantics: Semantics = Semantics.STABLE, \
355               stable_p = None, directives: list = None):
356    """
357    Constructs a PLP out of a logic program `P`, probabilistic facts `PF`, credal facts `CF` and
358    queries `Q`.
359    """
360    self.P = P
361    self.PF = PF
362    self.PR = PR
363    self.Q = Q
364    self.VQ = VQ
365    self.CF = CF
366    self.AD = AD
367    self.NR = NR
368    self.NA = NA
369
370    # Number of instances in data.
371    self.m_test = 0
372    self.m_train = 0
373    if (len(NR) > 0) or (len(NA) > 0):
374      self.m_test  = NR[0].ntest() if len(NR) > 0 else NA[0].ntest()
375      for nr in (NR + NA):
376        if nr.learnable:
377          self.m_train = nr.ntrain()
378          break
379
380    self.gr_P = ""
381    self.is_ground = False
382
383    self.semantics = semantics
384    self.stable = stable_p
385
386    self.directives = directives
387
388  def train(self):
389    for N in self.NR:
390      if N.learnable: N.set_train()
391    for N in self.NA:
392      if N.learnable: N.set_train()
393
394  def eval(self):
395    for N in self.NR:
396      if N.learnable: N.set_eval()
397    for N in self.NA:
398      if N.learnable: N.set_eval()
399
400  @staticmethod
401  def str_if_contains(s: str, L):
402    return f"\n{s}:\n{L}," if len(L) > 0 else ""
403
404  def __str__(self) -> str:
405    return f"<Logic Program:\n{self.P}," + \
406           self.str_if_contains("Probabilistic Facts", self.PF) + \
407           self.str_if_contains("Credal Facts", self.CF) + \
408           self.str_if_contains("Annotated Disjunctions", self.AD) + \
409           self.str_if_contains("Probabilistic Rules", self.PR) + \
410           self.str_if_contains("Neural Rules", self.NR) + \
411           self.str_if_contains("Neural Annotated Disjunctions", self.NA) + \
412           self.str_if_contains("Variable Queries", self.VQ) + \
413           f"\nQueries:\n{self.Q}>"
414  def __repr__(self) -> str: return self.__str__()
415
416  def __call__(self, **kwargs):
417    if self.directives is not None:
418      if "learn" in self.directives:
419        f, A = self.directives["learn"]
420        D = f()
421        from .wlearn import learn
422        if isinstance(D, tuple): learn(self, *D, **A)
423        else: learn(self, D, **A)
424    if len(self.Q) + len(self.VQ) > 0:
425      from exact import exact
426      from approx import aseo
427      A = {"quiet": False, "status": True}
428      A.update(kwargs)
429      # TODO: implement additional semantics for ASEO and remove the exact exception below.
430      if ("psemantics" in self.directives) and (self.directives["inference"][0] == "exact"):
431        A.update(self.directives["psemantics"])
432      f = vars()[self.directives["inference"][0]]
433      return f(self, *self.directives["inference"][1], **A)

A Probabilistic Logic Program (PLP) usually configures a triple <P,PF,CF>, where P is a logic program, PF are probabilistic facts and CF are credal facts. We extend a PLP into a tuple <P,PF,CF,Q>, where Q are the queries to be asked from P, PF and CF.

We accept ProbLog's syntactic sugar for probabilistic rules,

p::h(X) :- b1(X), b2(X), ..., bn(X).

meaning that if b1(X), b2(X), ..., bn(X) is true, h(X) is added with probability p. This is equivalent to

p::a.
h(X) :- b1(X), b2(X), ..., bn(X), a.

where a is a unique probabilistic fact added with probability p.

Program( P: str, PF: list[ProbFact], PR: list[ProbRule], Q: list[Query], VQ: list[VarQuery], CF: list[CredalFact], AD: list[AnnotatedDisjunction], NR: list[NeuralRule], NA: list[NeuralAD], semantics: Semantics = <Semantics.STABLE: 0>, stable_p=None, directives: list = None)
352  def __init__(self, P: str, PF: list[ProbFact], PR: list[ProbRule], Q: list[Query], \
353               VQ: list[VarQuery], CF: list[CredalFact], AD: list[AnnotatedDisjunction], \
354               NR: list[NeuralRule], NA: list[NeuralAD], semantics: Semantics = Semantics.STABLE, \
355               stable_p = None, directives: list = None):
356    """
357    Constructs a PLP out of a logic program `P`, probabilistic facts `PF`, credal facts `CF` and
358    queries `Q`.
359    """
360    self.P = P
361    self.PF = PF
362    self.PR = PR
363    self.Q = Q
364    self.VQ = VQ
365    self.CF = CF
366    self.AD = AD
367    self.NR = NR
368    self.NA = NA
369
370    # Number of instances in data.
371    self.m_test = 0
372    self.m_train = 0
373    if (len(NR) > 0) or (len(NA) > 0):
374      self.m_test  = NR[0].ntest() if len(NR) > 0 else NA[0].ntest()
375      for nr in (NR + NA):
376        if nr.learnable:
377          self.m_train = nr.ntrain()
378          break
379
380    self.gr_P = ""
381    self.is_ground = False
382
383    self.semantics = semantics
384    self.stable = stable_p
385
386    self.directives = directives

Constructs a PLP out of a logic program P, probabilistic facts PF, credal facts CF and queries Q.

P
PF
PR
Q
VQ
CF
AD
NR
NA
m_test
m_train
gr_P
is_ground
semantics
stable
directives
def train(self):
388  def train(self):
389    for N in self.NR:
390      if N.learnable: N.set_train()
391    for N in self.NA:
392      if N.learnable: N.set_train()
def eval(self):
394  def eval(self):
395    for N in self.NR:
396      if N.learnable: N.set_eval()
397    for N in self.NA:
398      if N.learnable: N.set_eval()
@staticmethod
def str_if_contains(s: str, L):
400  @staticmethod
401  def str_if_contains(s: str, L):
402    return f"\n{s}:\n{L}," if len(L) > 0 else ""