Skip to content

Commit a6d936f

Browse files
author
Jean-François Nguyen
committed
event: add EventSource and InterruptSource.
1 parent 967a65f commit a6d936f

File tree

2 files changed

+265
-0
lines changed

2 files changed

+265
-0
lines changed

nmigen_soc/event.py

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
from nmigen import *
2+
from nmigen import tracer
3+
4+
from nmigen_soc import csr
5+
6+
7+
__all__ = ["EventSource", "InterruptSource"]
8+
9+
10+
class EventSource:
11+
"""Event source.
12+
13+
Parameters
14+
----------
15+
mode : ``"level"``, ``"rise"``, ``"fall"``
16+
Trigger mode. If ``"level"``, a notification is raised when the ``stb`` signal is high.
17+
If ``"rise"`` (or ``"fall"``) a notification is raised on a rising (or falling) edge
18+
of ``stb``.
19+
name : str
20+
Name of the event. If ``None`` (default) the name is inferred from the variable
21+
name this event source is assigned to.
22+
23+
Attributes
24+
----------
25+
name : str
26+
Name of the event source.
27+
mode : ``"level"``, ``"rise"``, ``"fall"``
28+
Trigger mode.
29+
stb : Signal, in
30+
Event strobe.
31+
"""
32+
def __init__(self, *, mode="level", name=None, src_loc_at=0):
33+
if name is not None and not isinstance(name, str):
34+
raise TypeError("Name must be a string, not {!r}".format(name))
35+
self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
36+
37+
choices = ("level", "rise", "fall")
38+
if mode not in choices:
39+
raise ValueError("Invalid trigger mode {!r}; must be one of {}"
40+
.format(mode, ", ".join(choices)))
41+
self.mode = mode
42+
43+
self.stb = Signal(name="{}_stb".format(self.name))
44+
45+
46+
class InterruptSource(Elaboratable):
47+
"""Interrupt source.
48+
49+
A mean of gathering multiple event sources into a single interrupt request line.
50+
51+
Parameters
52+
----------
53+
events : iter(:class:`EventSource`)
54+
Event sources.
55+
name : str
56+
Name of the interrupt source. If ``None`` (default) the name is inferred from the
57+
variable name this interrupt source is assigned to.
58+
59+
Attributes
60+
----------
61+
name : str
62+
Name of the interrupt source.
63+
status : :class:`csr.Element`, read-only
64+
Event status register. Each bit displays the level of the strobe of an event source.
65+
Events are ordered by position in the `events` parameter.
66+
pending : :class:`csr.Element`, read/write
67+
Event pending register. If a bit is 1, the associated event source has a pending
68+
notification. Writing 1 to a bit clears it.
69+
Events are ordered by position in the `events` parameter.
70+
enable : :class:`csr.Element`, read/write
71+
Event enable register. Writing 1 to a bit enables its associated event source.
72+
Writing 0 disables it.
73+
Events are ordered by position in the `events` parameter.
74+
irq : Signal, out
75+
Interrupt request. It is raised if any event source is enabled and has a pending
76+
notification.
77+
"""
78+
def __init__(self, events, *, name=None, src_loc_at=0):
79+
if name is not None and not isinstance(name, str):
80+
raise TypeError("Name must be a string, not {!r}".format(name))
81+
self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
82+
83+
for event in events:
84+
if not isinstance(event, EventSource):
85+
raise TypeError("Event source must be an instance of EventSource, not {!r}"
86+
.format(event))
87+
self._events = list(events)
88+
89+
width = len(events)
90+
self.status = csr.Element(width, "r", name="{}_status".format(self.name))
91+
self.pending = csr.Element(width, "rw", name="{}_pending".format(self.name))
92+
self.enable = csr.Element(width, "rw", name="{}_enable".format(self.name))
93+
94+
self.irq = Signal(name="{}_irq".format(self.name))
95+
96+
def elaborate(self, platform):
97+
m = Module()
98+
99+
with m.If(self.pending.w_stb):
100+
m.d.sync += self.pending.r_data.eq(self.pending.r_data & ~self.pending.w_data)
101+
102+
with m.If(self.enable.w_stb):
103+
m.d.sync += self.enable.r_data.eq(self.enable.w_data)
104+
105+
for i, event in enumerate(self._events):
106+
m.d.sync += self.status.r_data[i].eq(event.stb)
107+
108+
if event.mode in ("rise", "fall"):
109+
event_stb_r = Signal.like(event.stb, name_suffix="_r")
110+
m.d.sync += event_stb_r.eq(event.stb)
111+
112+
event_trigger = Signal(name="{}_trigger".format(event.name))
113+
if event.mode == "level":
114+
m.d.comb += event_trigger.eq(event.stb)
115+
elif event.mode == "rise":
116+
m.d.comb += event_trigger.eq(~event_stb_r & event.stb)
117+
elif event.mode == "fall":
118+
m.d.comb += event_trigger.eq(event_stb_r & ~event.stb)
119+
else:
120+
assert False # :nocov:
121+
122+
with m.If(event_trigger):
123+
m.d.sync += self.pending.r_data[i].eq(1)
124+
125+
m.d.comb += self.irq.eq((self.pending.r_data & self.enable.r_data).any())
126+
127+
return m

nmigen_soc/test/test_event.py

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# nmigen: UnusedElaboratable=no
2+
3+
import unittest
4+
from nmigen import *
5+
from nmigen.back.pysim import *
6+
7+
from ..event import *
8+
9+
10+
def simulation_test(dut, process):
11+
with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
12+
sim.add_clock(1e-6)
13+
sim.add_sync_process(process)
14+
sim.run()
15+
16+
17+
class EventSourceTestCase(unittest.TestCase):
18+
def test_simple(self):
19+
ev = EventSource()
20+
self.assertEqual(ev.name, "ev")
21+
self.assertEqual(ev.mode, "level")
22+
23+
def test_name_wrong(self):
24+
with self.assertRaisesRegex(TypeError,
25+
r"Name must be a string, not 2"):
26+
EventSource(name=2)
27+
28+
def test_mode_wrong(self):
29+
with self.assertRaisesRegex(ValueError,
30+
r"Invalid trigger mode 'foo'; must be one of level, rise, fall"):
31+
ev = EventSource(mode="foo")
32+
33+
34+
class InterruptSourceTestCase(unittest.TestCase):
35+
def test_simple(self):
36+
ev_0 = EventSource()
37+
ev_1 = EventSource()
38+
dut = InterruptSource((ev_0, ev_1))
39+
self.assertEqual(dut.name, "dut")
40+
self.assertEqual(dut.status.width, 2)
41+
self.assertEqual(dut.pending.width, 2)
42+
self.assertEqual(dut.enable.width, 2)
43+
44+
def test_name_wrong(self):
45+
with self.assertRaisesRegex(TypeError,
46+
r"Name must be a string, not 2"):
47+
InterruptSource((), name=2)
48+
49+
def test_event_wrong(self):
50+
with self.assertRaisesRegex(TypeError,
51+
r"Event source must be an instance of EventSource, not 'foo'"):
52+
dut = InterruptSource(("foo",))
53+
54+
def test_events(self):
55+
ev_0 = EventSource(mode="level")
56+
ev_1 = EventSource(mode="rise")
57+
ev_2 = EventSource(mode="fall")
58+
dut = InterruptSource((ev_0, ev_1, ev_2))
59+
60+
def process():
61+
yield ev_0.stb.eq(1)
62+
yield ev_1.stb.eq(0)
63+
yield ev_2.stb.eq(1)
64+
yield
65+
self.assertEqual((yield dut.irq), 0)
66+
67+
yield dut.status.r_stb.eq(1)
68+
yield
69+
yield dut.status.r_stb.eq(0)
70+
yield
71+
self.assertEqual((yield dut.status.r_data), 0b101)
72+
yield
73+
74+
yield dut.enable.w_stb.eq(1)
75+
yield dut.enable.w_data.eq(0b111)
76+
yield
77+
yield dut.enable.w_stb.eq(0)
78+
yield
79+
yield
80+
self.assertEqual((yield dut.irq), 1)
81+
82+
yield dut.pending.w_stb.eq(1)
83+
yield dut.pending.w_data.eq(0b001)
84+
yield
85+
yield dut.pending.w_stb.eq(0)
86+
yield
87+
88+
yield dut.pending.r_stb.eq(1)
89+
yield
90+
yield dut.pending.r_stb.eq(0)
91+
yield
92+
self.assertEqual((yield dut.pending.r_data), 0b001)
93+
self.assertEqual((yield dut.irq), 1)
94+
yield
95+
96+
yield ev_0.stb.eq(0)
97+
yield dut.pending.w_stb.eq(1)
98+
yield dut.pending.w_data.eq(0b001)
99+
yield
100+
yield dut.pending.w_stb.eq(0)
101+
yield
102+
yield
103+
self.assertEqual((yield dut.irq), 0)
104+
105+
yield ev_1.stb.eq(1)
106+
yield dut.pending.r_stb.eq(1)
107+
yield
108+
yield dut.pending.r_stb.eq(0)
109+
yield
110+
self.assertEqual((yield dut.pending.r_data), 0b010)
111+
self.assertEqual((yield dut.irq), 1)
112+
113+
yield dut.pending.w_stb.eq(1)
114+
yield dut.pending.w_data.eq(0b010)
115+
yield
116+
yield dut.pending.w_stb.eq(0)
117+
yield
118+
yield
119+
self.assertEqual((yield dut.irq), 0)
120+
121+
yield ev_2.stb.eq(0)
122+
yield
123+
yield dut.pending.r_stb.eq(1)
124+
yield
125+
yield dut.pending.r_stb.eq(0)
126+
yield
127+
self.assertEqual((yield dut.pending.r_data), 0b100)
128+
self.assertEqual((yield dut.irq), 1)
129+
130+
yield dut.pending.w_stb.eq(1)
131+
yield dut.pending.w_data.eq(0b100)
132+
yield
133+
yield dut.pending.w_stb.eq(0)
134+
yield
135+
yield
136+
self.assertEqual((yield dut.irq), 0)
137+
138+
simulation_test(dut, process)

0 commit comments

Comments
 (0)