Module¶
Mjukvarumoduler skapas som Python-klasser. Modulen laddas därefter genom att lägga till den i konfigurationsfilen.
Definiera en modul¶
Filen om filen <code>/lrpy/lrpy/modules/test_mod.py
from lrpy import DecoratedModule
class TestModule(DecoratedModule):
pass
Ladda modulen i konfigurationen
commod:
type: "lrpy.com_module:StdComModule" # Ladda kom-modulen (bör alltid vara samma)
config:
modules: # Lista av moduler som kom-modulen ska ladda
- type: "lrpy.modules.test_mod:TestModule" # Vår modul!
Type strängen är på formatet fil:klassnamn där fil skrivs som om man skulle importera den
(på samma sätt som from fil import klassnamn).
Ta emot meddelanden¶
Om DecoratedClass används så används @on_message för att markera att en funktion lyssnar efter ett meddelande.
Funktionen ska ta argumenten self (som vanligt) och message som skriver typen av meddelande som lyssnar på som typ
(dvs message: TestMessage lyssnar på TestMessage)
Nedan visas ett exempel på hur en model kan lyssna på ett meddelande.
from lrpy import DecoratedModule, on_message
from lrpy.message import TestMessage
class TestModule(DecoratedModule):
@on_message
async def on_test(self, message: TestMessage):
print(message.value)
Flera meddelanden kan lyssnas på på en funktion enligt nedan
@on_message
async def on_test(self, message: TestMessage | TestMessage2):
print(message)
Skicka ett meddelande¶
import asyncio
from lrpy import DecoratedModule
from lrpy.message import TestMessage
class TestModule(DecoratedModule):
async def run(self, stop: asyncio.Event):
while not stop.is_set():
self.emit_message(
TestMessage(value=1) # Man måste använda kwargs (d.v.s. value=5 istället för 5) när man skapar meddelanden
)
Alla meddelandetyper är definierade i filen <code>/lrpy/lrpy/message.py
Run & huvudloop för modulen¶
En modul kan köra kod kontinuerligt genom att definiera run-metoden. Metoden tar emot ett stop-event som signalerar
när programmet ska avslutas (t.ex. när användaren trycker Ctrl+C).
Modulen måste respektera stop-eventet och avsluta inom ett par sekunder efter att det satts. Det innebär:
Använd
while not stop.is_set()som loop-villkor.Använd
await asyncio.sleep()med ett rimligt intervall (högst ett par sekunder) så att loopen hinner kontrollerastopregelbundet.Om modulen gör blockerande arbete (t.ex. väntar på I/O) måste den ändå avbryta inom rimlig tid.
import asyncio
from lrpy import DecoratedModule
from lrpy.message import TestMessage
class TestModule(DecoratedModule):
async def run(self, stop: asyncio.Event):
while not stop.is_set():
self.emit_message(
TestMessage(value=1)
)
await asyncio.sleep(1) # Vänta 1 sekund — loopen avslutas inom ~1s efter stop
await asyncio.sleep() fyller två syften:
Den ger andra moduler och event-loopen tid att köra.
Den begränsar hur lång tid det tar innan loopen märker att
stophar satts.
Om en modul inte avslutas inom 10 sekunder efter att stop satts kommer dess async-task att termineras
(via asyncio.Task.cancel) och ett felmeddelande skrivs ut. Detta är en säkerhetsmekanism — moduler ska inte
förlita sig på den utan istället avsluta ordentligt på egen hand. En sleep på t.ex. 60 sekunder skulle alltså
leda till att modulen tvingas att avslutas varje gång programmet stoppas.
Om modulen inte behöver en loop (t.ex. om den bara lyssnar på meddelanden) behöver run inte definieras.
Telemetri¶
Moduler kan exponera telemetridata som visas i frontenden. Telemetri definieras som en lista av TelemetryField i
klassattributet TELEMETRY.
import asyncio
from lrpy import DecoratedModule
from lrpy.telemetry import TelemetryField, TelemetryState
class TestModule(DecoratedModule):
TELEMETRY = [
TelemetryField("temperature", "Temperatur", "Aktuell temperatur", unit="°C"),
TelemetryField("status", "Status", "Modulens status"),
]
async def run(self, stop: asyncio.Event):
while not stop.is_set():
self.telemetry.temperature.set(42, state=TelemetryState.OK)
self.telemetry.status.set("Kör", state=TelemetryState.OK, text="Allt fungerar")
await asyncio.sleep(1)
TelemetryField tar följande argument:
id— unikt id som används för att komma åt värdet viaself.telemetry.<id>name— visningsnamndescription(valfri) — beskrivningunit(valfri) — enhet, t.ex."ms","°C"
Värden uppdateras med self.telemetry.<id>.set(value, state, text) där state anger färg/status:
TelemetryState.NEUTRAL— standardTelemetryState.OK— grönt/okTelemetryState.WARN— varningTelemetryState.ERROR— fel
text är ett valfritt meddelande som ger mer kontext om värdet.
Konfiguration¶
En modul kan ta emot konfiguration från YAML-filen genom att definiera en inre Config-klass som ärver från
pydantic.BaseModel.
from pydantic import BaseModel, Field
from lrpy import DecoratedModule
class MyModule(DecoratedModule):
class Config(BaseModel):
interval: float = 1.0
device_path: str = Field(alias="device-path")
def __init__(self, config: Config):
self.config: MyModule.Config = config
super().__init__(config)
Konfigurationen skickas in via YAML-filen:
modules:
- type: "lrpy.modules.my_module:MyModule"
config:
interval: 0.5
device-path: "/dev/i2c-1"
Config-klassen valideras automatiskt av Pydantic vid laddning. Om ett fält i YAML-filen använder bindestreck
(t.ex. device-path) kan Field(alias="device-path") användas för att mappa det till ett Python-attribut
(t.ex. device_path).
Om modulen inte definierar en Config-klass behöver inget config-block anges i YAML-filen.