Replies: 7 comments 1 reply
-
You should be able to instead start the web server with |
Beta Was this translation helpful? Give feedback.
-
I would use LVGL's timer to schedule calls to your webserver stuff. remove the while loops and the asynchio stuff. |
Beta Was this translation helpful? Give feedback.
-
Hey Kevin, Regards Sebastian |
Beta Was this translation helpful? Give feedback.
-
def timer_cb(t):
print('timer called')
# make sure the timer reference doesn't go out of scope.
# so make sure to create it at the module level and not
# inside of a function.
timer = lv.timer_create(
timer_cb, # callback function
20, # period in milliseconds
None # user data
)
timer.set_repeat_count(-1) # -1 means the timer auto restarts after the callback is called.
When starting Here is a full blown example including a driver initialization... In this example the timer will run once every 5 seconds and when it does it will reset the slider value to 0 import lcd_bus
from micropython import const
# display settings
_WIDTH = const(320)
_HEIGHT = const(480)
_BL = const(45)
_RST = const(4)
_DC = const(0)
_WR = const(47)
_FREQ = const(20000000)
_DATA0 = const(9)
_DATA1 = const(46)
_DATA2 = const(3)
_DATA3 = const(8)
_DATA4 = const(18)
_DATA5 = const(17)
_DATA6 = const(16)
_DATA7 = const(15)
_SCL = const(5)
_SDA = const(6)
_TP_FREQ = const(100000)
display_bus = lcd_bus.I80Bus(
dc=_DC,
wr=_WR,
freq=_FREQ,
data0=_DATA0,
data1=_DATA1,
data2=_DATA2,
data3=_DATA3,
data4=_DATA4,
data5=_DATA5,
data6=_DATA6,
data7=_DATA7
)
import st7796 # NOQA
import lvgl as lv # NOQA
lv.init()
display = st7796.ST7796(
data_bus=display_bus,
display_width=_WIDTH,
display_height=_HEIGHT,
backlight_pin=_BL,
color_space=lv.COLOR_FORMAT.RGB565,
color_byte_order=st7796.BYTE_ORDER_BGR,
rgb565_byte_swap=True,
)
import i2c # NOQA
import task_handler # NOQA
import ft6x36 # NOQA
display.init()
i2c_bus = i2c.I2C.Bus(host=0, scl=_SCL, sda=_SDA, freq=_TP_FREQ, use_locks=False)
touch_dev = i2c.I2C.Device(bus=i2c_bus, dev_id=ft6x36.I2C_ADDR, reg_bits=ft6x36.BITS)
indev = ft6x36.FT6x36(touch_dev)
display.invert_colors()
if not indev.is_calibrated:
display.set_backlight(100)
indev.calibrate()
display.set_rotation(lv.DISPLAY_ROTATION._90)
display.set_backlight(100)
th = task_handler.TaskHandler(20)
scrn = lv.screen_active()
scrn.set_style_bg_color(lv.color_hex(0x000000), 0)
slider = lv.slider(scrn)
slider.set_size(300, 50)
slider.center()
def slider_cb(e):
print('slider_value:', slider.get_value())
def timer_cb(t):
print('timer called')
slider.set_value(0, lv.ANIM_ON)
timer = lv.timer_create(timer_cb, 5000, None)
timer.set_repeat_count(-1) # -1 means the timer auto restarts after the callback is called. |
Beta Was this translation helpful? Give feedback.
-
Hey, Kevin thanks a lot for this detailed example! |
Beta Was this translation helpful? Give feedback.
-
Hy, example works after updating it for LVGL 9, THX But wont't help the webserver blocks the timer, i think because he is wait for client connection? any Ideas? boot.py
lv.init()
th = task_handler.TaskHandler()
sys.path.append('/sd/web_srv')
from mini_web_srv import dienste
def timer_cb(t):
print('timer called')
dienste.webserver()
dienste.ntp_sync()
timer = lv.timer_create(timer_cb, 20, None)
timer.set_repeat_count(-1) # -1 means the timer auto restarts after the callback is called.
#mini_web_srv
import ujson
import usocket as socket
import network
import urandom
import ntptime
import utime
import ure
import ubinascii
sta = network.WLAN(network.STA_IF)
ip = sta.ifconfig()[0] if sta.isconnected() else '0.0.0.0'
PASSWORT = "biene123"
sessions = set()
class dienste:
@staticmethod
def lade_daten():
try:
with open('/sd/data/produkte.json', "r") as f:
return ujson.load(f)
except Exception as e:
print("Fehler beim Laden:", e)
return []
@staticmethod
def speichere_daten(liste):
try:
with open('/sd/data/produkte.json', "w") as f:
ujson.dump(liste, f)
except Exception as e:
print("Fehler beim Speichern:", e)
@staticmethod
def lade_html(datei):
try:
with open(f'/sd/web_srv/html/{datei}', 'r') as f:
return f.read()
except Exception as e:
return f"Fehler: {e}"
@staticmethod
def urldecode(s):
s = s.replace('+', ' ')
parts = s.split('%')
raw = bytearray(parts[0], 'utf-8')
for part in parts[1:]:
try:
hexcode = part[:2]
rest = part[2:]
raw.append(int(hexcode, 16))
raw.extend(bytes(rest, 'utf-8'))
except:
raw.extend(bytes('%' + part, 'utf-8'))
try:
return raw.decode('utf-8')
except:
return raw.decode()
@staticmethod
def generiere_session_id():
return ''.join('{:02x}'.format(urandom.getrandbits(8)) for _ in range(16))
@staticmethod
def ist_angemeldet(req):
return any(f"session={sid}" in req for sid in sessions)
@staticmethod
def webserver():
global ip
s = socket.socket()
s.bind((ip, 80))
s.listen(1)
print("Server läuft auf:", ip)
while True:
cl, addr = s.accept()
req = b""
while True:
chunk = cl.recv(1024)
if not chunk:
break
req += chunk
if b"\r\n\r\n" in req:
break
try:
req_str = req.decode('utf-8')
header, _, body = req_str.partition("\r\n\r\n")
if "GET / " in header or "GET /index.html" in header:
html = dienste.lade_html("index.html")
cl.send("HTTP/1.0 200 OK\r\nContent-Type: text/html; charset=utf-8\r\nCache-Control: no-store\r\nX-Content-Type-Options: nosniff\r\n\r\n".encode())
cl.send(html.encode('utf-8'))
elif "POST /login" in header:
if f"pw={PASSWORT}" in body:
session_id = dienste.generiere_session_id()
sessions.add(session_id)
html = dienste.lade_html("status.html")
cl.send(f"HTTP/1.0 200 OK\r\nSet-Cookie: session={session_id}\r\nContent-Type: text/html; charset=utf-8\r\n\r\n".encode())
cl.send(html.encode('utf-8'))
else:
cl.send("HTTP/1.0 401 Unauthorized\r\n\r\nFalsches Passwort".encode('utf-8'))
elif "GET /produkte" in header:
if not dienste.ist_angemeldet(header):
cl.send("HTTP/1.0 302 Found\r\nLocation: /index.html\r\n\r\n".encode())
else:
produkte = dienste.lade_daten()
html = dienste.lade_html("produkte.html")
if not produkte:
feldnamen = ["id", "groesse", "sorte", "mhd", "konsistenz", "artikelnummer",
"spalte6", "spalte7", "spalte8", "spalte9", "spalte10"]
tabellenzeilen = "<tr><td colspan='{}'>Keine Daten vorhanden</td></tr>".format(len(feldnamen))
else:
feldnamen = list(produkte[0].keys())
tabellenzeilen = ""
headerzeile = ''.join(f"<th>{feld}</th>" for feld in feldnamen)
html = html.replace("{{table_header}}", headerzeile)
for i, p in enumerate(produkte):
tabellenzeilen += "<tr>"
for feld in feldnamen:
wert = p.get(feld, "")
tabellenzeilen += (
f"<td><input name='{i}-{feld}' "
f"value='{wert}' "
f"title='{feld}' "
f"placeholder='{feld}'></td>"
)
tabellenzeilen += "</tr>"
html = html.replace("{{tabellenzeilen}}", tabellenzeilen)
cl.send("HTTP/1.0 200 OK\r\nContent-Type: text/html; charset=utf-8\r\nCache-Control: no-store\r\nX-Content-Type-Options: nosniff\r\n\r\n".encode())
cl.send(html.encode('utf-8'))
elif "POST /produkte" in header:
if not dienste.ist_angemeldet(header):
cl.send("HTTP/1.0 302 Found\r\nLocation: /index.html\r\n\r\n".encode())
else:
try:
content_length = 0
for line in header.split("\r\n"):
if line.lower().startswith("content-length"):
content_length = int(line.split(":")[1].strip())
break
while len(body.encode()) < content_length:
body += cl.recv(1024).decode('utf-8')
felder = body.split("&")
decoded = [tuple(map(dienste.urldecode, f.split("=", 1))) for f in felder if "=" in f]
zeilen = {}
for key, value in decoded:
if "-" in key:
index, feldname = key.split("-", 1)
if index not in zeilen:
zeilen[index] = {}
zeilen[index][feldname] = value
neue_daten = []
for z in sorted(zeilen.keys(), key=int):
eintrag = zeilen[z]
if any(v.strip() for v in eintrag.values()):
neue_daten.append(eintrag)
print("Neue Daten:", neue_daten)
dienste.speichere_daten(neue_daten)
cl.send("HTTP/1.0 303 See Other\r\nLocation: /produkte\r\n\r\n".encode())
except Exception as e:
cl.send("HTTP/1.0 500 Internal Server Error\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n")
cl.send(f"Fehler beim Speichern:\n{e}".encode('utf-8'))
finally:
cl.close()
@staticmethod
def ntp_sync():
for versuch in range(3):
try:
ntptime.settime()
print("Zeit erfolgreich gesetzt:", utime.localtime())
return
except Exception as e:
print(f"Versuch {versuch + 1} fehlgeschlagen:", e)
raise RuntimeError("NTP-Zeit konnte nicht abgerufen werden!")
|
Beta Was this translation helpful? Give feedback.
-
i will give it a try, thx a lot :-) |
Beta Was this translation helpful? Give feedback.
-
Hy,
how do i run an webserver in background of lvgl frontend?
Regards
Sebastian
Beta Was this translation helpful? Give feedback.
All reactions