src.timed_net package

Summary

This simulations implements a temporal-aware PT net by attaching time information to the PT net tokens. It simulates a supply chain depending on two component producers and an assembler.

_images/net_diagram.png

In the diagram,

  • producer1 builds component1 in a time interval approximated by a normal distribution \(\mathcal{N}(\mu_1, \sigma_1)\) with the mean \(\mu_1\) and standard deviation \(\sigma_1\) seconds.

  • Similarly producer builds component2 in a time interval of \(\mathcal{N}(\mu_2, \sigma_2)\)

  • The components are assembled and served to the consumer.

  • Lastly, the consumer places a new order to the producers.

The picture below is the auto-generated PT net diagram by SoyutNet.

_images/graph3.png
  • p1 and t1 define the producer1.

  • p2 and t2 define the producer2.

  • t31 is the assembler.

  • p3 is the consumer.

  • The component1 and 2 are labeled by {1} and {2}.

System description

SoyutNet tokens are represented by a tuple of integers.

token: tuple[int, int] = (label, id)

Time information is embedded into the second item (id) of token. Simulation starts with an initial token at the producers

  • produer1 has (PRODUCER1_LABEL, T0)

  • produer2 has (PRODUCER2_LABEL, T0)

where T0 = 0 (secs) initially. Each producer adds a random integer distributed by \(\mathcal{N}(\mu_i, \sigma_i)\), \(i \in {1,2}\) at a (TimedTransition) instance then sends to their output arcs.

121
122    class TimedTransition(soyutnet.Transition):
123        def __init__(self, name, rng_params, *args, **kwargs):
124            super().__init__(name=name, net=net, *args, **kwargs)
125            self._rng_params = rng_params
126
127        def _delay(self):
128            return round(random.gauss(*self._rng_params))
129
130        async def _process_tokens(self):
131            for label in self._tokens:
132                ids = self._tokens[label]
133                self._tokens[label] = list(map(lambda x: x + self._delay(), ids))
134                """Add producer delay"""
135
136            return await super()._process_tokens()
137

The assembler receives tokens, redirects to the consumer. The total duration of this process is the maximum of the delay introduced by producers.

141
142    class CombinerTransition(soyutnet.Transition):
143        def __init__(self, *args, **kwargs):
144            super().__init__(net=net, *args, **kwargs)
145            self.arrival_time = []
146
147        async def _process_tokens(self):
148            max_id = 0
149            arrivals = [0, 0]
150            for label in self._tokens:
151                ids = self._tokens[label]
152                arrivals[label - 1] = max(ids)
153                if ids:
154                    max_id = max(max_id, arrivals[label - 1])
155            for label in self._tokens:
156                self._tokens[label] = [max_id] * len(self._tokens[label])
157                """Total delay is the max of two branches"""
158
159            self.arrival_time.append(arrivals)
160
161            return await super()._process_tokens()
162

The consumer places an other order to the producers after receiving the token. At each cycle, the token IDs are incremented.

\[(label, t) \rightarrow (label, t+\Delta t)\]

The tokens are counted by the consumer and the timings are saved.

533
534    production_time = TimeInstants()
535    Ti = lambda k: production_time[k - 1]
536    controller = Controller(production_time=production_time)
537    converged = asyncio.Condition()
538
539    async def stock_counter(tr):
540        nonlocal production_time, converged
541        t = None
542        for label in tr._tokens:
543            ids = tr._tokens[label]
544            if not ids:
545                continue
546            for id in ids:
547                if t is None:
548                    t = id
549                    production_time.append(t)
550                else:
551                    assert t == id
552
553        dT = Ti(0) - Ti(-1)
554        controller.measure(dT)
555        u = controller.advance()
556        """
557        Measured production time and generated the amount of new order delays ``u``.
558        u = (dt1, dt2) means that new order to producer1 and 2 will be placed after
559        dt1 and dt2 seconds.
560        """
561        if controller.is_done():
562            """If simulation is done inform the canceller."""
563            async with converged:
564                converged.notify_all()
565
566        for i in range(2):
567            """Postpone orders."""
568            label = i + 1
569            assert len(tr._tokens[label]) == 1
570            tr._tokens[label][0] += u[i]
571
572        return True
573

The whole implementation can be found at https://github.com/dmrokan/soyutnet-simulations/blob/main/src/timed_net/main.py

Joint probability distribution

The total duration of the described process is distributed according to

\[\begin{split}X &= \max{(X_1, X_2)} \\ X_i &\sim \mathcal{N}(\mu_i, \sigma_i) \Rightarrow E[X_i] = \mu_i, ~Var[X_i] = \sigma_i^2\end{split}\]

The mean and variance of \(X\) is given as

\[\begin{split}E[X] & = \mu_1 \Phi(\alpha) + \mu_2 \Phi(-\alpha) + \theta \phi(\alpha) \\ E[X^2] &= (\sigma_1^2+\mu_1^2) \Phi(\alpha) + (\sigma_2^2+\mu_2^2) \Phi(-\alpha) + (\mu_1 + \mu_2) \theta \phi(\alpha) \\ Var[X] &= E[X^2] - E[X]^2 \\ \theta &= \sqrt{\sigma_1^2+\sigma_2^2} \\ \alpha &= \frac{\mu_1-\mu_2}{\theta}\end{split}\]

where \(\Phi\) and \(\phi\) are the CDF and PDF of normal random distribution [nadarajah2008].

20
21
22def pdf(x, mean=0, std=1):
23    """phi"""
24    var = std**2
25    a = 2 * math.pi * var
26    a = 1 / (a**0.5)
27    return a * math.exp(-((x - mean) ** 2) / (2 * var))
28
29
30def cdf(x, mean=0, std=1):
31    """Phi"""
32    a = 2**0.5
33    return 0.5 * (1 + math.erf((x - mean) / (std * a)))
34
35

Real-time moment estimation

The moments \(E[X], E[X^2]\) and variance \(Var[X]\) can be estimated by dynamic averaging of observed random variable \(x_n\).

\[\begin{split}E[x_n] &= \mu \\ Var[x_n] &= E[x_n^2] - \mu^2 = \sigma^2\end{split}\]

For this purpose, a special list class NormalSamples is implemented.

244
245    relative_error = lambda a, b, c=1 / (2**BIT_WIDTH - 1): abs((a - b) / (b + c))
246
247    class NormalSamples(UserList):
248        def __init__(
249            self,
250            *args,
251            eps=1e-2,
252            convergence_condition=10,
253            rng_params=None,
254            validate_conv=False,
255        ):
256            super().__init__(*args)
257            self._eps = eps
258            self._moments = None
259            self._variance = Qp.tuple((0, eps, 0.0))
260            self._cc = convergence_condition
261            self._rng_params = None
262            if rng_params is not None:
263                self.set_rng_params(rng_params)
264            self._max_size = 600 * 1024 * 1024
265            self._initialize_moments()
266            self._iter = 0
267            self._validate_conv = validate_conv
268            if validate_conv:
269                self._last_n_dmu = Qp.list([0] * self._cc)
270                self._last_n_dvar = Qp.list([0] * self._cc)
271

As products arrive, the difference between the ordering time and arrival time is calculated. When this delta time is appended to the NormalSamples, it automatically updates moments and variance.

287
288        def _update_moment(self, moment, val):
289            moment = Qp.tuple(moment)
290            if abs(val) < 1e-2 * self._eps:
291                """Ignore very small numbers in statistics"""
292                return moment
293            l = len(self)
294            mu, eps, eps0 = moment
295            if l % self._cc == 0:
296                eps0 = 0.0
297            mu_prev = mu
298            mu = (mu * l + val) / (l + 1)
299            dmu = relative_error(mu_prev, mu)
300            eps = (eps * l + dmu) / (l + 1)
301            eps0 = max(eps0, eps)
302            assert isinstance(mu, Qp)
303            assert isinstance(eps, Qp)
304            assert isinstance(eps0, Qp)
305            """
306            Save the max of last self._cc samples which
307            will be used for deciding convergence later.
308            """
309
310            return (mu, eps, eps0)
311

If last N samples are very close to each other distrbution, the NormalSamples instance informs about convergence.

  • mu is the estimate of moment calculated by dynamic averaging.

  • eps the average change in current and previous mu.

  • eps0 is the max of last self._cc number of eps values which is used to determine convergence.

Estimate producer delays

The problem is developing an algorithm to estimate the producer delays separately by using the difference between the time a new order is placed and the time when the product is received.

System model

A simple model of the producer/consumer flow can be

\[\begin{split}x_1[n+1] &= w_1[n] + u_1[n] \\ x_2[n+1] &= w_2[n] + u_2[n] \\ x[n+1] &= \max\left\{x_1[n], x_2[n]\right\} \\ y[n] &= x[n]\end{split}\]
  • \(n\) is the product counter starting from zero.

  • \(x_i\) is the production time of component_i.

  • \(w_i\) is a Gaussian random variable, \(E[w_i] = \mu_i, Var[w_i] = \sigma_i^2\).

  • \(y = x\) is the total production observed by the consumer, \(E[y] = \mu, Var[y] = \sigma^2\).

  • \(u_i (sec)\) denotes postponing the orders by the consumer. It is assumed that the consumer can schedule orders for later instead of placing them immediately.

  • The units are seconds.

State machine observer/controller scheme

The observer/controller scheme runs a few tests on the producer/consumer flow. The order of tests are implemented as a state machine with the states below

409
410    class State(Enum):
411        OBSERVE_JOINT_DIST = auto()
412        TEST_PRODUCER1 = auto()
413        TEST_PRODUCER2 = auto()
414        ESTIMATE_DELAYS = auto()
415        DONE = auto()
416
  • OBSERVE_JOINT_DIST: Estimate joint mean (\(\mu\)) and variance (\(\sigma\)).

  • TEST_PRODUCER1: Find out producer1’s response by postponing orders.

  • TEST_PRODUCER2: Find out producer2’s response by postponing orders.

  • ESTIMATE_DELAYS: Calculate the difference between \(\mu_1\) and \(\mu_2\) approximately.

  • DONE: Exit simulation.

The state machine is implemented as below.

486
487        def advance(self):
488            """Iterate the controller"""
489            match self._state[0]:  # Check current state
490                case State.OBSERVE_JOINT_DIST:  # Initial state
491                    """1. Validate that total production time is as expected."""
492                    mu, std, conv = self.found()
493                    assert isinstance(mu, int)
494                    assert isinstance(std, int)
495                    if conv:  # Check convergence
496                        self._observed.append(self._u + (mu, std))
497                        match self._state[-1]:  # Check previous state
498                            case State.OBSERVE_JOINT_DIST:
499                                self._change_state(State.TEST_PRODUCER1)
500                            case State.TEST_PRODUCER1:
501                                self._change_state(State.TEST_PRODUCER2)
502                            case State.TEST_PRODUCER2:
503                                self._change_state(State.ESTIMATE_DELAYS)
504                            case State.ESTIMATE_DELAYS:
505                                self._change_state(State.DONE)
506                case State.TEST_PRODUCER1:
507                    """2. Postpone ordering from producer1"""
508                    self._u = (self._mu0, 0)
509                    self._update_rng_params(tuple(self._u))
510                    self._change_state(State.OBSERVE_JOINT_DIST)
511                case State.TEST_PRODUCER2:
512                    """3. Postpone ordering from producer2"""
513                    self._u = (0, self._mu0)
514                    self._update_rng_params(tuple(self._u))
515                    self._change_state(State.OBSERVE_JOINT_DIST)
516                case State.ESTIMATE_DELAYS:
517                    """4. Estimate the slow producer"""
518                    test1 = self._observed[1]
519                    test2 = self._observed[2]
520                    dt = test1[2] - test2[2]
521                    index = int(dt > 0)
522                    dt = abs(dt)
523                    self._slow_producer = (index + 1, dt)
524                    self._u = (0, 0)
525                    self._update_rng_params(tuple(self._u))
526                    self._change_state(State.OBSERVE_JOINT_DIST)
527
528            return tuple(map(int, self._u))
529
  1. Set \(u_1[n] = u_2[n] = 0\) for all n,

  2. OBSERVE_JOINT_DIST: Estimate the reference \((\mu_0, \sigma_0)\) and record.

  3. TEST_PRODUCER1: Set \(u_1[n] = \mu_0, u_2[n] = 0\) for all n.

  4. OBSERVE_JOINT_DIST: Estimate \((\mu_1, \sigma_1)\) and record.

  5. TEST_PRODUCER2: Set \(u_1[n] = 0, u_2[n] = \mu_0\) for all n.

  6. OBSERVE_JOINT_DIST: Estimate \((\mu_2, \sigma_2)\) and record.

  7. ESTIMATE_DELAYS: The difference between the production time of producers is \(dt = |\mu_1 - \mu_2|\) and the index of slow producer is int(dt > 0) + 1.

  8. DONE: Exit.

Integer arithmetic implementation

In this simulation, all calculations are done on Python’s Fraction instances instead of float s. This simulation implements a new class called Qp to ensure all basic math operations returns a Fraction.

A Fraction can be instantiated by an int, float, str and it converts the input to a tuple of (numerator, denominator) of type tuple[int, uint]. Also, a limit to the denominator can be set. For example:

>>> Fraction(-0.3).limit_denominator(10)
Fraction(-3, 10)
>>> Fraction(0.3).limit_denominator(2)
Fraction(1, 2)
>>> Fraction(0.3).limit_denominator(1)
Fraction(0, 1)

In the example, Fraction(0, 1) and Fraction(-3, 10) are equivalent to \(0/1\) and \(-3/10\). As the denominator limit increases, the precision of calculations increases. Because, the numbers less than one can still be represented in the calculations.

166
167    class Qp:
168        def __init__(self, num=0, max_den=(2**BIT_WIDTH - 1)):
169            self._max_den = max_den
170            if isinstance(num, Qp):
171                num = num.num
172            self.num = self._new_num(num)
173
174        def _new_num(self, num):
175            return Fraction(num).limit_denominator(self._max_den)
176
177        @staticmethod
178        def tuple(iterable):
179            return tuple(map(Qp, iterable))
180
181        @staticmethod
182        def list(iterable):
183            return list(map(Qp, iterable))
184
185        def int_op(op, swap=False):
186            def inner(func):
187                def wrapped(self, *args):
188                    a, b = self, Qp(args[0])
189                    if swap:
190                        a, b = b, a
191                    return Qp(op(a.num, b.num))
192
193                return wrapped
194
195            return inner
196
197        # fmt: off
198        @int_op(operator.mul)
199        def __mul__(self, other): ...
200        @int_op(operator.truediv)
201        def __truediv__(self, other): ...
202        @int_op(operator.add)
203        def __add__(self, other): ...
204        @int_op(operator.sub)
205        def __sub__(self, other): ...
206        @int_op(operator.pow)
207        def __pow__(self, other): ...
208        @int_op(operator.gt)
209        def __gt__(self, other): ...
210        @int_op(operator.lt)
211        def __lt__(self, other): ...
212
213        @int_op(operator.mul, True)
214        def __rmul__(self, other): ...
215        @int_op(operator.truediv, True)
216        def __rtruediv__(self, other): ...
217        @int_op(operator.add, True)
218        def __radd__(self, other): ...
219        @int_op(operator.sub, True)
220        def __rsub__(self, other): ...
221        @int_op(operator.pow, True)
222        def __rpow__(self, other): ...
223        # fmt: on
224
225        def __str__(self):
226            return str(self.num)
227
228        def __float__(self):
229            return float(self.num)
230
231        def __int__(self):
232            return int(self.num)
233
234        def __abs__(self):
235            return Qp(abs(self.num))
236
237        def is_zero(self, eps=1e-2):
238            eps = max(eps, 1 / self._max_den)
239            return abs(self) < Qp(eps)
240

By using this reference implementation, it can be translated to work on an MCU platform. However, it will require limiting the magnitude of numerator together with the denominator. Because, the integer size is unlimited in Python.

Results

The simulation is run for several different \(\mu_i\) and \(\sigma_i\) as defined below in __main__.py

28
29RNG_PARAMS = [
30    #mu_1,   sigma_1,   mu_2,    sigma_2 (minutes)
31    (100,    5,         100,     5),
32    (100,    25,        100,     25),
33    (100,    5,         50,      5),
34    (50,     5,         100,     25),
35    (100,    5,         20,      5),
36    (20,     5,         100,     25),
37]
38

The table below shows the joint probability distribution characteristics.

  • mu0 is \(E[X]\) in the equation above.

  • std0 is \(\sqrt{Var[X]}\) in the equation above.

  • mu is obtained by calculating the mean of the difference of saved timings.

  • std is obtained by calculating the standard deviation of the difference of saved timings.

  • The values are in seconds.

Table 1: Average and actual values.

mu

mu0

std

std0

6232

6169

181

247

6942

6846

1241

1238

5943

6000

229

299

5806

6014

1269

1467

5817

6000

248

300

5977

6000

1582

1498

5987

6169

199

247

6504

6846

950

1238

5933

6000

309

299

6199

6014

1651

1467

6045

6000

378

300

6093

6000

1355

1498

The next table shows estimation results.

  • weak = 1 means that only mean values are used to determine convergence.

  • eps is the relative error tolerance to determine convergence.

  • iter is the final value of the product counter.

  • mu, mu0 and Dmu are estimated, actual mean value and the relative error.

  • std, std0 and Dstd are estimated, actual standard deviation and the relative error.

  • slow is the estimated slow producer indices (‘*’ indicates wrong estimation).

  • Dt, Dt0 and err are estimated, actual production delay differences and their relative error.

    e.g. slow = 1 and Dt = 1000 mean the producer1 is 1000 seconds slower than the producer2.

  • The values are in seconds except weak, iter and slow, former are unitless.

Table 2: Observer estimations (when max rational number denominator is 1. Meaning, the mean and variance are calculated by using integer arithmetic).

weak

eps

iter

mu

mu0

Dmu

std

std0

Dstd

slow

Dt

Dt0

err

0

0.01

47

6231

6169

0.01

1

247

0.992

2*

1

0

0.99

0

0.01

47

6942

6846

0.014

1

1238

0.998

2*

293

0

1.0

0

0.01

47

5943

6000

0.009

1

299

0.993

2

3014

3000

0.0

0

0.01

47

5806

6014

0.035

1

1467

0.999

1

2910

3000

0.03

0

0.01

47

5819

6000

0.03

1

300

0.993

2

4744

4800

0.01

0

0.01

47

5978

6000

0.004

1

1498

0.999

1

4097

4800

0.17

1

0.01

47

6141

6169

0.005

1

247

0.992

2*

107

0

1.0

1

0.01

47

6182

6846

0.097

1

1238

0.998

2*

758

0

1.0

1

0.01

47

5895

6000

0.017

1

299

0.993

2

3122

3000

0.04

1

0.01

47

5440

6014

0.095

1

1467

0.999

1

3325

3000

0.1

1

0.01

47

5947

6000

0.009

1

300

0.993

2

4936

4800

0.03

1

0.01

47

6357

6000

0.059

1

1498

0.999

1

5175

4800

0.07

0

0.1

47

6151

6169

0.0

1

247

0.99

2*

126

0

1.0

0

0.1

47

7193

6846

0.05

1

1238

1.0

1

705

0

1.0

0

0.1

47

5902

6000

0.02

1

299

0.99

2

2968

3000

0.01

0

0.1

47

6136

6014

0.02

1

1467

1.0

1

2621

3000

0.14

0

0.1

47

5859

6000

0.02

1

300

0.99

2

4975

4800

0.04

0

0.1

47

5899

6000

0.02

1

1498

1.0

1

4414

4800

0.09

1

0.1

47

6179

6169

0.0

1

247

0.99

1

70

0

1.0

1

0.1

47

6982

6846

0.02

1

1238

1.0

2*

807

0

1.0

1

0.1

47

5871

6000

0.02

1

299

0.99

2

3115

3000

0.04

1

0.1

47

6381

6014

0.06

1

1467

1.0

1

3467

3000

0.13

1

0.1

47

5972

6000

0.0

1

300

0.99

2

4910

4800

0.02

1

0.1

47

6155

6000

0.03

1

1498

1.0

1

4761

4800

0.01

Table 3: Observer estimations (when max rational number denominator is 255).

weak

eps

iter

mu

mu0

Dmu

std

std0

Dstd

slow

Dt

Dt0

err

0

0.01

47

5987

6169

0.03

205

247

0.17

1

28

0

1.0

0

0.01

47

6504

6846

0.05

855

1238

0.309

2*

853

0

1.0

0

0.01

47

5933

6000

0.011

286

299

0.043

2

2838

3000

0.06

0

0.01

47

6199

6014

0.031

1506

1467

0.027

1

2963

3000

0.01

0

0.01

47

6045

6000

0.007

339

300

0.13

2

4722

4800

0.02

0

0.01

47

6093

6000

0.015

1227

1498

0.181

1

4565

4800

0.05

1

0.01

47

6271

6169

0.017

249

247

0.008

2*

89

0

1.0

1

0.01

47

7733

6846

0.13

1020

1238

0.176

1

319

0

1.0

1

0.01

47

5864

6000

0.023

359

299

0.201

2

3238

3000

0.07

1

0.01

47

6277

6014

0.044

1958

1467

0.335

1

3258

3000

0.08

1

0.01

47

5812

6000

0.031

315

300

0.05

2

4829

4800

0.01

1

0.01

47

5241

6000

0.126

1214

1498

0.19

1

4532

4800

0.06

0

0.1

47

6150

6169

0.0

171

247

0.31

1

114

0

1.0

0

0.1

47

6355

6846

0.07

1022

1238

0.17

1

730

0

1.0

0

0.1

47

5926

6000

0.01

147

299

0.51

2

2957

3000

0.01

0

0.1

47

5909

6014

0.02

1417

1467

0.03

1

3326

3000

0.1

0

0.1

47

5925

6000

0.01

277

300

0.08

2

4764

4800

0.01

0

0.1

47

6497

6000

0.08

1324

1498

0.12

1

5304

4800

0.1

1

0.1

47

6143

6169

0.0

202

247

0.18

2*

60

0

1.0

1

0.1

47

6682

6846

0.02

1019

1238

0.18

2*

1127

0

1.0

1

0.1

47

6146

6000

0.02

256

299

0.14

2

3006

3000

0.0

1

0.1

47

6199

6014

0.03

890

1467

0.39

1

2470

3000

0.21

1

0.1

47

5967

6000

0.01

276

300

0.08

2

4773

4800

0.01

1

0.1

47

5496

6000

0.08

1454

1498

0.03

1

4791

4800

0.0

Comments

  • Table 1 shows that mean and variance is very close to the values obtained from the results of equations.

  • Table 2 shows that estimator works correctly even if it operates on integers. But, it can not estimate the variance by using integer precision.

  • Table 3 shows that estimator works correctly when fractional numbers are used. And, it can more or less estimate the variance.

    It is hard to estimate small differences between \(\mu_1\) and \(\mu_2\).

References

[nadarajah2008]

S. Nadarajah and S. Kotz, “Exact Distribution of the Max/Min of Two Gaussian Random Variables,” 2008

Reproduce

sudo apt install python3-venv graphviz
python3 -m venv venv
source venv/bin/activate

make build
make build=timed_net
make clean=timed_net
make run=timed_net
make results=timed_net
make graph=timed_net
make docs

Usage

Submodules

src.timed_net.main module

src.timed_net.main.USAGE()
Arguments:
-r <rng params>

mu1,std1,mu2,std2 (units are seconds)

Default: 300,60,600,180

-T <time (sec)>

total simulation time in seconds

Default: 2

-o <filename>

output file name to write results. If empty, prints to stdout.

-G

if provided, the script generates PT net graph and exits

-W

if provided, uses a weak comparion metric for convergence which highly increase simulation speed with the cost of degraded accuracy.

-C <strict|weak>

Controller type. ‘weak’ is equivalent to setting -W argument.

Default: strict

-e eps

Relative tolerance to determine convergence.

Default: 1e-2

-b denominator bit width (bw)

The denominator of Fraction used in calculations are limited to 2^(bw)-1

Example

python src/timed_net/main.py -r 100,10,200,25 -T 2

src.timed_net.results module

src.timed_net.results.cdf(x, mean=0, std=1)

Phi

src.timed_net.results.joint_mean(dist1, dist2)
src.timed_net.results.joint_variance(dist1, dist2)
src.timed_net.results.load_results()
src.timed_net.results.main(argv)
src.timed_net.results.pdf(x, mean=0, std=1)

phi

Module contents