Jive-ATK-Monitor lets the commnication to a GreenMode.Asyncio DS crash

Dear all,

I have a simple pyTango DS in asyncio mode that only provides two scalar double attributes. A LabVIEW client is supposed to write them. I was wondering why the communication crashes after some writes. I suspected that LV-Bindings has problems with the async mode.
It turned out that a pyTango client shows the same behavior.
But, and this is my question: It only happens (LV or py) when I watch the DS in the Jive ATK monitor. Why actually?


class LVProxyAsync(Device):
green_mode = GreenMode.Asyncio

async def init_device(self):
super().init_device()
self._sinus_signal=float('nan')
self._saw_signal = float('nan')
self.set_change_event('sinus_signal', True, False)
self.set_change_event('saw_signal', True, False)
self.set_state(DevState.RUNNING)

@attribute(dtype=float)
async def sinus_signal(self):
return self._sinus_signal

@sinus_signal.write
async def set_sinus_signal(self, nValue):
self.debug_stream(f"Write Sinus: {nValue}")
self._sinus_signal = nValue
self.push_change_event("sinus_signal", nValue, time.time() , q.ATTR_VALID)

@attribute(dtype=float)
async def saw_signal(self):
return self._saw_signal

@saw_signal.write
async def set_saw_signal(self, nValue):
self._saw_signal=nValue
self.push_change_event("saw_signal", nValue, time.time() , q.ATTR_VALID)


Here is the test client (runs in VS Code jupyther nb)

from tango.asyncio import DeviceProxy as aDS
import numpy, asyncio, tango

lvproxy_async = await aDS("lvproxyasync/lvproxyasync/1")
for i in range(10000):
try:
await lvproxy_async.write_attribute("sinus_signal", numpy.sin(i))
await lvproxy_async.write_attribute("sinus_signal", numpy.sin(i))
await asyncio.sleep(0.1)
except:
print(f"Error after {i} try")
break


— Update —–
The error message:

CommunicationFailed: DevFailed[
DevError[
desc = TRANSIENT CORBA system exception: TRANSIENT_CallTimedout
origin = DeviceProxy:write_attribute()

desc = Timeout (3000 mS) exceeded on device lvproxyasync/lvproxyasync/1
origin = DeviceProxy:write_attribute()
reason = API_DeviceTimedOut
severity = ERR]
]
Edited 3 months ago
After some discussions in Slack, it turned out that the solution is not easy because it forces a redesign of the cppTango threading model.

A workaround is to hide the “Asyncio” part behind the synchronous interface in a separate thread. Here is a cooking recipe:

  1. Create an event loop in you sync DS
  2. Create and start a thread with the event loop
  3. To call a async coroutine from sync code use: asyncio.run_coroutine_threadsafe
  4. To call a sync method from async code use: loop.call_soon_threadsafe
  5. optional: activate debug mode for asyncio


import asyncio
import tango
from tango.server import Device, attribute, command
from threading import Thread

def side_thread(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

class SyncDS(Device):
def init_device(self):
super().init_device()
self.loop = asyncio.new_event_loop()
# — ACTIVATE DEBUG MODE —
self.loop.set_debug(True)
#————————-
self.thread = Thread(target=side_thread, args=(self.loop,), daemon=True, name="AsyncThread")
self.thread.start()
self.myAsyncLookForTheAnswer = AsyncLookForTheAnswer("async_part", self) # pass a reference of this object for feedback
self.answer = 0

def sync_method_for_answer(self, value):
print(f"A sync method was called from async code with answer = {value}")
self.answer = value

@command
def calculate_the_answer(self):
print("Start time-consuming task asynchronously")
# pass the event loop :
asyncio.run_coroutine_threadsafe(self.myAsyncLookForTheAnswer.calculate_the_answer(), self.loop) # Don't call directly!
print("continue with my synchronous jobs")

@attribute(dtype=int)
def the_answer(self):
return self.answer

# —— THE ASYNC PART ————-
class AsyncLookForTheAnswer:
def __init__(self, name: str, device) -> None:
self.name = name
self.loop = device.loop
self.device = device

async def calculate_the_answer(self):
await asyncio.sleep(5) #it is a very difficult answer…
# Call the sync method in a thread-safe manner from the event loop
self.loop.call_soon_threadsafe(self.device.sync_method_for_answer, 42) # Don't call directly!

if __name__ == "__main__":
dev_info = tango.DbDevInfo()
dev_info.server = "SyncDS/test"
dev_info._class = "SyncDS"
dev_info.name = "test/syncDS/1"
db = tango.Database()
db.add_device(dev_info)

SyncDS.run_server()


To test, start the DS and send command. Observe the messages in console.
Edited 3 months ago
 
Register or login to create to post a reply.