-
Notifications
You must be signed in to change notification settings - Fork 368
/
Copy pathserver-events.py
51 lines (42 loc) · 1.88 KB
/
server-events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import asyncio
import logging
from asyncua import ua
from asyncua.server import Server
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger("asyncua")
async def main():
server = Server()
await server.init()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
# setup our own namespace, not really necessary but should as spec
uri = "http://examples.freeopcua.github.io"
idx = await server.register_namespace(uri)
# populating our address space
myobj = await server.nodes.objects.add_object(idx, "MyObject")
# Creating a custom event: Approach 1
# The custom event object automatically will have members from its parent (BaseEventType)
etype = await server.create_custom_event_type(
idx,
"MyFirstEvent",
ua.ObjectIds.BaseEventType,
[("MyNumericProperty", ua.VariantType.Float), ("MyStringProperty", ua.VariantType.String)],
)
myevgen = await server.get_event_generator(etype, myobj)
# Creating a custom event: Approach 2
custom_etype = await server.nodes.base_event_type.add_object_type(2, "MySecondEvent")
await custom_etype.add_property(2, "MyIntProperty", ua.Variant(0, ua.VariantType.Int32))
await custom_etype.add_property(2, "MyBoolProperty", ua.Variant(True, ua.VariantType.Boolean))
mysecondevgen = await server.get_event_generator(custom_etype, myobj)
async with server:
count = 0
while True:
await asyncio.sleep(1)
myevgen.event.Message = ua.LocalizedText("MyFirstEvent %d" % count)
myevgen.event.Severity = count
myevgen.event.MyNumericProperty = count
myevgen.event.MyStringProperty = "Property %d" % count
await myevgen.trigger()
await mysecondevgen.trigger(message="MySecondEvent %d" % count)
count += 1
if __name__ == "__main__":
asyncio.run(main())