-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsnitch.py
792 lines (705 loc) · 30.4 KB
/
snitch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
"""
snitch.py - Functions for handling philosophers at a dining table and
detecting anomalies.
This file contains various functions related to handling philosophers at a
dining table, including functions for error handling and debugging information.
The functions implemented here are used to process philosopher records, perform
tests, and update the state of philosophers at the dining table.
Date: 2023-04-19
MIT License
Copyright (c) 2023 Pablo López Bergillos
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import sys
ENDC = "\033[0m"
INPUT = "\033[0m\033[1m"
ERROR = "\033[0m\033[91m\033[1m"
DEBUG = "\033[0m\033[93m\033[1m"
class PhilosopherError(Exception):
"""
Custom exception class for handling errors related to philosophers.
Attributes:
msgs (list): List of error messages.
"""
def __init__(self, source, msgs, records):
"""
Initialize the PhilosopherError instance.
Args:
msgs (list): List of error messages.
references (list): List of references related to the error.
"""
super().__init__()
self.msgs = []
self.msgs.extend(msgs)
records = [ref for ref in records if ref]
records.sort(key=lambda x: x[0])
self.msgs.extend([ref[3] + f" ({source[0] - ref[0]} ms ago)" for ref in records])
self.args = self.msgs
class PhilosopherDebugInfo(PhilosopherError):
"""
Custom class for handling debug information related to philosophers.
Inherits from PhilosopherError class.
"""
def read_command_line_arguments(args):
"""
This function reads command line arguments and parses them to generate a
configuration dictionary. The configuration may include settings such as
'debug' mode, 'bonus' mode, number of philosophers, time to die, time to
eat, time to sleep, and number of times each philosopher must eat.
The function returns a dictionary with the collected configuration.
Args:
args (list): A list of command line arguments.
Returns:
dict: A dictionary containing the configuration parsed from the command
line arguments.
Raises:
ValueError: If the arguments are not valid integers or the number of
arguments is not 4 or 5.
"""
config = {}
while len(args) > 0 and args[0].lower() in ["debug", "bonus"]:
if len(args) > 0 and args[0].lower() == "debug":
config["debug"] = True
args = args[1:]
if len(args) > 0 and args[0].lower() == "bonus":
config["bonus"] = True
args = args[1:]
if "bonus" in config:
raise NotImplementedError("bonus mode not implemented.")
try:
config["number of philosophers"] = int(args[0])
config["time to die"] = int(args[1])
config["time to eat"] = int(args[2])
config["time to sleep"] = int(args[3])
if len(args) == 5:
config["number of times each philosopher must eat"] = int(args[4])
except ValueError as exc:
raise ValueError("Error: Arguments must be integers.") from exc
if len(args) not in [4, 5]:
raise ValueError("Error: 4 or 5 integers must be specified.")
return config
def read_record(string):
"""
This function reads a string and extracts information to create a tuple.
The input should contain at least two integers separated by whitespace.
The function extracts the first two integers and the remaining part of the
string, and creates a tuple with four elements: the first integer, second
integer, remaining part, and the original input string. If the first two
elements of the input string are not integers, a ValueError is raised.
Args:
string (str): The input string to be parsed.
Returns:
record: A tuple containing extracted information from the input string.
Raises:
ValueError: If the first two elements of the input string are not
integers.
"""
parts = string.split()
try:
num1 = int(parts[0])
num2 = int(parts[1])
except Exception as exc:
raise ValueError(
"The first two elements of the string must be integers."
) from exc
remaining_part = " ".join(parts[2:])
record = (num1, num2, remaining_part, " ".join(parts))
return record
def update_state(philosophers_dict, config_dict, action_record):
"""
This function updates the state of a philosopher and the overall
configuration based on a record of their action. The philosopher's state is
stored in the 'philosophers_dict' dictionary, and the configuration is
stored in the 'config_dict' dictionary. The 'action_record' parameter should
contain information about the philosopher's action, such as whether they are
thinking, eating, sleeping, taking a fork, or if they have died. The
function updates the dictionaries with the latest information from the
action_record, including the last action, last update time, number of forks
taken, number of times eaten, and last death record.
Args:
philosophers_dict (dict): A dictionary storing the state of each
philosopher.
config_dict (dict): A dictionary storing the overall configuration.
action_record (list): A list containing the record of a philosopher's
action.
Returns:
None
"""
philosopher = philosophers_dict.get(action_record[1], {})
action = action_record[2]
if action in [
"is thinking",
"is eating",
"is sleeping",
"has taken a fork",
"died",
]:
philosopher["last " + action] = action_record
philosopher["last action"] = action_record
config_dict["last record"] = action_record
config_dict["last update"] = max(
config_dict.get("last update", 0), action_record[0]
)
if action == "has taken a fork":
philosopher["forks"] = philosopher.get("forks", 0) + 1
if action == "is sleeping":
philosopher["forks"] = 0
philosopher["times eat"] = philosopher.get("times eat", 0) + 1
if action == "is eating":
philosopher["forks"] = 0
if action == "died":
config_dict["last died"] = action_record
philosopher["forks"] = 0
philosophers_dict[action_record[1]] = philosopher
def process_philosopher_record(
philosophers_dict, config_dict, line, tests, log_file=None
):
"""
Process a philosopher record by performing tests, logging errors and debug
information, and updating the state.
Args:
philosophers_dict (dict): A dictionary of philosophers.
config_dict (dict): A configuration dictionary for the table.
line (str): A line representing a philosopher record.
log_file (Optional[file]): An optional log file to write the output.
Defaults to None.
Returns:
None
Notes:
- This function processes a philosopher record by performing tests on
the record using the `philosophers_dict`, `config_dict`, and `record`
information.
- If any errors or debug information is encountered during the tests,
they are logged to the `log_file` if provided, otherwise printed to
the console.
- The `philosophers_dict` dictionary is updated with the new record
information using the `update_state` function.
- Any unexpected input in the `line` will result in an error message
appended to the `errors` list.
- If any other exception occurs during the tests, the error message is
appended to the `errors` list.
"""
errors = []
debug = []
try:
record = read_record(line)
except ValueError:
record = None
errors.append("Unexpected input")
if record:
for test in tests:
try:
test(
philosophers_dict=philosophers_dict,
config_dict=config_dict,
action_record=record,
)
except PhilosopherDebugInfo as exc:
if "debug" in config_dict:
debug.extend(exc.args)
except PhilosopherError as exc:
errors.extend(exc.args)
print_line_info(line, errors, debug, log_file)
update_state(philosophers_dict, config_dict, record)
def print_line_info(line, errors, debug, log_file=None):
"""
This function prints line information with optional error and debug
messages. The input parameters are the line to be printed, a list of errors,
a list of debug messages, and an optional log file. If there are errors, the
line is printed with an error color and the error messages. If there are
debug messages, the line is printed with a debug color and the debug
messages. If there are no messages, an empty line is printed. If a log file
is provided, the error messages are also written to the log file. The
function uses the 'Colors' class for color formatting.
Args:
line (str): The line to be printed.
errors (list): A list of error messages.
debug (list): A list of debug messages.
log_file (file, optional): An optional log file to write the error
messages to.
"""
input_color = INPUT
msgs = []
if errors:
msgs.extend([(ERROR, ". " + d) for d in errors])
input_color = ERROR
if debug:
msgs.extend([(DEBUG, ". " + d) for d in debug])
if not msgs:
msgs = [("", "")]
start = " ".join(line.split())
for msg in msgs:
print(f"\n{input_color}{start : <36}{msg[0]}{msg[1]}", end="")
if input_color == ERROR and log_file:
log_file.write(f"\n{start : <36}{msg[1]}")
start = ""
def check_time_travel(**kwargs):
"""
This function checks if a philosopher has traveled through time based on the
record of an action. The input parameters are a dictionary of philosophers,
a configuration dictionary, and a record containing information about an
action performed by a philosopher. The function calculates the time travel
by comparing the timestamp of the record with the last update timestamp in
the configuration. If the calculated time travel is greater than 0, a
PhilosopherError is raised with an error message indicating the
philosopher's name and the amount of time traveled.
Args:
philosophers_dict (dict): A dictionary of philosophers.
config_dict (dict): A configuration dictionary.
action_record (tuple): A record containing information about an action
performed by a philosopher.
Raises:
PhilosopherError: If a philosopher has traveled through time.
"""
config_dict = kwargs["config_dict"]
action_record = kwargs["action_record"]
last_update = config_dict.get("last update", 0)
update = config_dict.get("last record", None)
travel = last_update - action_record[0]
philosopher_id = action_record[1]
if travel > 0:
raise PhilosopherError(
action_record,
[
"ERROR: TIME TRAVEL",
f"{philosopher_id} traveled at least {travel} miliseconds backwards in time.",
],
[update],
)
def check_philosopher_death(**kwargs):
"""
This function checks if a philosopher has died based on the record of an
action. The input parameters are a dictionary of philosophers, a
configuration dictionary, and a record containing information about an
action performed by a philosopher. The function checks if there is a record
of a philosopher who has died in the configuration. If so, a
PhilosopherError is raised with an error message indicating the
philosopher's name and a request for a minute of silence.
Args:
philosophers_dict (dict): A dictionary of philosophers.
config_dict (dict): A configuration dictionary.
action_record (tuple): A record containing information about an action
performed by a philosopher.
Raises:
PhilosopherError: If a philosopher has died.
"""
config_dict = kwargs["config_dict"]
action_record = kwargs["action_record"]
dead_record = config_dict.get("last died", (0, 0, 0, ""))
dead_philosopher_id = dead_record[1]
if dead_philosopher_id:
raise PhilosopherError(
action_record,
[
"ERROR: DEAD PHILOSOPHER.",
f"Let's take a minute of silence for {dead_philosopher_id}.",
],
[dead_record],
)
def check_strange_smell(**kwargs):
"""
This function checks for any strange smell among the philosophers based on
their recent actions and time to die. The input parameters are a dictionary
of philosophers, a configuration dictionary, and a record containing
information about an action performed by a philosopher. The function
calculates the time elapsed since the last philosopher's meal and compares
it with the time to die specified in the configuration. If the elapsed time
exceeds 10 milliseconds, a PhilosopherError is raised with an error message
indicating a strange smell and the expected time of death.
Args:
philosophers_dict (dict): A dictionary of philosophers.
config_dict (dict): A configuration dictionary.
action_record (tuple): A record containing information about an action
performed by a philosopher.
Raises:
PhilosopherError: If a strange smell is detected among the philosophers.
"""
philosophers_dict = kwargs["philosophers_dict"]
config_dict = kwargs["config_dict"]
action_record = kwargs["action_record"]
time_to_die = config_dict["time to die"]
init_smell = action_record[0] - time_to_die
for philosopher_id in range(1, config_dict["number of philosophers"]):
if philosopher_id not in philosophers_dict and init_smell > 10:
raise PhilosopherError(
action_record,
[
"Has a strange smell",
f"{philosopher_id} should have died {init_smell} ms ago.",
],
[],
)
if philosopher_id in philosophers_dict:
last_is_eating = philosophers_dict[philosopher_id].get(
"last is eating", (0,)
)
smell = action_record[0] - last_is_eating[0] - time_to_die
if smell > 10:
raise PhilosopherError(
action_record,
[
"ERROR: STRANGE SMELL.",
f"{philosopher_id} should have died {smell} ms ago.",
f"Time to die: {time_to_die}",
],
[
philosophers_dict[philosopher_id].get(
"last is eating", None
)
],
)
philosopher_id = action_record[1]
philosopher = philosophers_dict.get(philosopher_id, {})
last_is_eating = philosopher.get("last is eating", (0,))
time_remaining = time_to_die - (action_record[0] - last_is_eating[0])
raise PhilosopherDebugInfo(
action_record,
[f"time to die: {time_remaining} ms"], []
)
def check_eating_habits(**kwargs):
"""
This function checks if a philosopher has eaten too little based on their
recent actions and the configured time to eat. The input parameters are a
dictionary of philosophers, a configuration dictionary, and a record
containing information about an action performed by a philosopher. The
function first checks if the philosopher is sleeping, and if so, it
calculates the time elapsed since their last eating action. If the elapsed
time is less than the configured time to eat, a PhilosopherError is raised
with an error message indicating that the philosopher has eaten too little.
Args:
philosophers_dict (dict): A dictionary of philosophers.
config_dict (dict): A configuration dictionary.
action_record (tuple): A record containing information about an action
performed by a philosopher.
Raises:
PhilosopherError: If the philosopher has eaten too little.
"""
philosophers_dict = kwargs["philosophers_dict"]
config_dict = kwargs["config_dict"]
action_record = kwargs["action_record"]
if action_record[2] == "is sleeping":
time_to_eat = config_dict["time to eat"]
philosopher_id = action_record[1]
philosopher = philosophers_dict.get(philosopher_id, {})
last_time_eat = philosopher.get("last is eating", (0,))
time_eating = action_record[0] - last_time_eat[0]
if time_eating < time_to_eat:
raise PhilosopherError(
action_record,
[
"ERROR: ATE TOO LITTLE.",
f"{action_record[1]} has been eating only {time_eating} ms.",
f"Time to eat: {time_to_eat}",
],
[philosopher.get("last is eating", None)],
)
times_eat = philosopher.get("times eat", 0)
raise PhilosopherDebugInfo(
action_record,
[f"Has been eating {time_eating} ms.",
f"{philosopher_id} ate {times_eat} times"],
[philosopher.get("last is eating", None)],
)
def check_wakeup_time(**kwargs):
"""
This function checks if a philosopher woke up early based on their most
recent thinking action and the configured sleep time. The input parameters
are a dictionary of philosophers, a configuration dictionary, and a record
that contains information about an action performed by a philosopher. The
function first checks if the philosopher is thinking, and if so, calculates
the elapsed time since their last sleep action. If the elapsed time is less
than the configured sleep time, a PhilosopherError is raised with an error
message indicating that the philosopher woke up early.
Args:
philosophers_dict (dict): A dictionary of philosophers.
config_dict (dict): A configuration dictionary.
action_record (tuple): A record containing information about an action
performed by a philosopher.
Raises:
PhilosopherError: If the philosopher woke up early.
"""
philosophers_dict = kwargs["philosophers_dict"]
config_dict = kwargs["config_dict"]
action_record = kwargs["action_record"]
if action_record[2] == "is thinking":
time_to_sleep = config_dict["time to sleep"]
philosopher = philosophers_dict.get(action_record[1], {})
last_time_sleep = philosopher.get("last is sleeping", (0,))
time_sleeping = action_record[0] - last_time_sleep[0]
if time_sleeping < time_to_sleep:
raise PhilosopherError(
action_record,
[
"ERROR: WOKE UP EARLY.",
f"{action_record[1]} has been sleeping only {time_sleeping} ms.",
f"Time to sleep: {time_to_sleep}",
],
[philosopher.get("last is sleeping", None)],
)
raise PhilosopherDebugInfo(
action_record,
[f"Has been sleeping {time_sleeping} ms."],
[philosopher.get("last is sleeping", None)],
)
def check_fork_duplication(**kwargs):
"""
This function checks if a philosopher has duplicated a fork , based on the
recent actions recorded in the `philosophers` dictionary and the
configuration settings. The input parameters are a dictionary of
philosophers, a configuration dictionary, and a record that contains
information about an action performed by a philosopher. The function
iterates over the neighboring philosophers in both directions (left and
right) and checks their last action. If the neighboring philosopher is
sleeping or thinking, it's considered that a fork is available for taking.
If the neighboring philosopher is eating or already has two forks, it's
considered that the fork is not available. The function then raises a
PhilosopherError with an error message indicating that the philosopher has
a magic fork, if the philosopher has taken a fork while already having one
or more forks.
Args:
philosophers_dict (dict): A dictionary of philosophers.
config_dict (dict): A configuration dictionary.
action_record (tuple): A record containing information about an action
performed by a philosopher.
Raises:
PhilosopherError: If the philosopher has taken a fork while already
having a fork.
"""
philosophers_dict = kwargs["philosophers_dict"]
config_dict = kwargs["config_dict"]
action_record = kwargs["action_record"]
if action_record[2] == "has taken a fork":
references = []
phil = philosophers_dict.get(action_record[1], {})
free_forks = 0
for direction in [1, -1]:
pos = action_record[1]
available = -1
while available == -1:
pos = (pos - 1 + direction) % config_dict[
"number of philosophers"
] + 1
phil_pos = philosophers_dict.get(pos, {})
phil_last_action = phil_pos.get(
"last action", (0, 0, "is sleeping", "")
)
references.append(phil_pos.get("last action", None))
if pos == action_record[1]:
available = 0
free_forks = 1
elif phil_last_action[2] in ["is sleeping", "is thinking"]:
available = 1
elif (
phil_last_action[2] == "is eating"
or phil_pos.get("forks", 0) == 2
):
available = 0
free_forks += available
forks = phil.get("forks", 0)
free_forks -= forks
if free_forks <= 0:
raise PhilosopherError(
action_record,
[
"ERROR: MAGIC FORK.",
(
f"{action_record[1]} picked up a fork "
f"while already having {forks} forks"
),
],
list(set(references)),
)
raise PhilosopherDebugInfo(
action_record,
[f"{free_forks} forks available"], list(set(references))
)
def check_valid_transition(**kwargs):
"""
This function checks if a transition from the last action of a philosopher
to the current action recorded in the `record` is valid, based on the
predefined valid transitions in the `valid_transitions` dictionary. The
input parameters are a dictionary of philosophers, a configuration
dictionary, and a record that contains information about an action performed
by a philosopher. The function retrieves the last action of the philosopher
from the `philosophers_dict` dictionary, and then checks if the current action in
the `action_record` is a valid transition from the last action, based on the valid
transitions defined in the `valid_transitions` dictionary. If the transition
is not valid, the function raises a PhilosopherError with an error message
indicating that the philosopher is playing another game, and provides the
expected valid transitions. Otherwise, the function raises a
PhilosopherDebugInfo with a message indicating the expected valid
transitions.
Args:
philosophers_dict (dict): A dictionary of philosophers.
config_dict (dict): A configuration dictionary.
action_record (tuple): A record containing information about an action
performed by a philosopher.
Raises:
PhilosopherError: If the transition from the last action is not valid.
"""
philosophers_dict = kwargs["philosophers_dict"]
action_record = kwargs["action_record"]
philosopher = philosophers_dict.get(action_record[1], {})
last_action = philosophers_dict.get(action_record[1], {}).get(
"last action", (0, 0, "is thinking", "")
)
valid_transitions = {
"is sleeping": ["is thinking", "died"],
"is thinking": ["has taken a fork", "died"],
"has taken a fork": [
"has taken a fork"
if philosopher.get("forks", 0) < 2
else "is eating",
"died",
],
"is eating": ["is sleeping", "died"],
}
transitions = valid_transitions.get(last_action[2], [])
t_msg = " or ".join(['"' + t + '"' for t in transitions])
if action_record[2] not in transitions:
raise PhilosopherError(
action_record,
["ERROR: INVALID TRANSITION", f"Expected {t_msg}."],
[philosopher.get("last action", None)],
)
raise PhilosopherDebugInfo(action_record,[f"Expected {t_msg}."], [None])
def check_finish_eating(**kwargs):
"""
Check if the condition for philosophers to finish eating is met.
Args:
philosophers_dict (dict): A dictionary containing information about the
philosophers.
config_dict (dict): A dictionary containing the configuration of the
problem.
action_record (list): A list containing the details of the action.
Raises:
PhilosopherError: If all philosophers have eaten the required number of
times.
PhilosopherDebugInfo: If there are philosophers left to finish eating.
"""
philosophers_dict = kwargs["philosophers_dict"]
config_dict = kwargs["config_dict"]
action_record = kwargs["action_record"]
number_of_philosophers = config_dict.get("number of philosophers", 0)
must_eat = number_of_philosophers
number_of_times_must_eat = config_dict.get(
"number of times each philosopher must eat", -1
)
for philosopher in philosophers_dict.values():
times_eat = philosopher.get("times eat", 0)
if (
number_of_times_must_eat > 0
and not times_eat < number_of_times_must_eat
):
must_eat = must_eat - 1
if must_eat == 0:
raise PhilosopherError(
action_record,
["ERROR: ALL PHILOSOPHERS ATE."],
[],
)
must_eat = max(must_eat, 0)
raise PhilosopherDebugInfo(
action_record,
[f"There are {must_eat} philosophers left to finish."],
[],
)
def check_invitation(**kwargs):
"""
Checks if a philosopher's number in the action record matches the invited
range.
Args:
philosophers_dict (dict): A dictionary containing information about each
philosopher.
config_dict (dict): A dictionary containing configuration settings for
the dinner.
action_record (list): A list representing the action record of a
philosopher.
Raises:
PhilosopherError: If the philosopher's number is not within the invited
range.
Returns:
None
"""
config_dict = kwargs["config_dict"]
action_record = kwargs["action_record"]
number_of_philosophers = config_dict.get("number of philosophers", 0)
philosopher_number = action_record[1]
if philosopher_number > number_of_philosophers or philosopher_number < 1:
raise PhilosopherError(
action_record,
[
"ERROR: NOT INVITED",
f"philosopher number must be between 1 and {number_of_philosophers}",
],
[],
)
def check_premature_death(**kwargs):
"""
Check if a philosopher has died prematurely based on the action records.
Args:
philosophers_data (dict): A dictionary containing information about the
philosophers.
config_data (dict): A dictionary containing the program configuration.
action_record (tuple): A tuple representing a recorded action.
Raises:
PhilosopherError: If the philosopher has died prematurely, a
PhilosopherError exception is raised with an error message that
includes the name of the philosopher and the remaining time he had
ahead of him.
"""
philosophers_dict = kwargs["philosophers_dict"]
config_dict = kwargs["config_dict"]
action_record = kwargs["action_record"]
if action_record[2] == "died":
philosopher = philosophers_dict.get(action_record[1], {})
time_to_die = config_dict["time to die"]
last_is_eating = philosopher.get("last is eating", (0,))[0]
time_remaining = time_to_die - (action_record[0] - last_is_eating)
if time_remaining > 0:
raise PhilosopherError(
action_record,
[
"ERROR: DIED PREMATURELY.",
(
f"{action_record[1]} had his whole "
f"{time_remaining} ms ahead of him."
),
],
[philosopher.get("last is eating", None)],
)
if __name__ == "__main__":
test_suite = [
check_time_travel,
check_philosopher_death,
check_strange_smell,
check_eating_habits,
check_wakeup_time,
check_fork_duplication,
check_valid_transition,
check_finish_eating,
check_premature_death,
check_invitation,
]
philosophers = {}
table_config = read_command_line_arguments(sys.argv[1:])
with open("./log.snitch", "w", encoding="utf8") as log:
for newline in sys.stdin:
process_philosopher_record(
philosophers, table_config, newline, test_suite, log
)
print(ENDC)