Please explain Python asyncio

Discussion in 'Programmer's Corner' started by strantor, Mar 30, 2015.

  1. strantor

    Thread Starter AAC Fanatic!

    Oct 3, 2010
    4,302
    1,988
    My goal is to have some IPC (two or more programs sharing information) between a Python script and another Python script (or a web page).

    I want to have a "server" script which runs in a continuous loop and acting as a "database" - Not sure whether I should call that a server or database.

    In the manner in which I am using the "client" script, it is executed several times per second. During every single execution of the client script, it should read from and write to the server/database several times (different parameters, different values each time).

    I first tried ordinary Python socket module. This was problematic because:
    #1. The socket module seems to want a fixed data length, and I cannot comply with that. The values read & written, and the number of values read & written are different each time.
    #2. It does not want to run in a loop; one & done. Attempts to put it into a _while_ loop cause other things to lock up.

    So I tried Python websockets, and it is the most promising so far because both of the above issues are null. It runs in a forever loop, listening and replying with any given data length. But it introduces a new problem: any script that makes a call to the "server/database" script never finishes. Because of asyncio, it makes any script that calls it into another forever loop. I cannot have this. My cyclic "client" script must run from top to bottom and end, just like a regular Python script.

    Each attempt I make to read and understand how the asyncio module works makes me feel more and more like an imbecile. Can someone please explain it on the simplest level possible, and hopefully provide some tips on how to call a script running asyncio without making the calling script also asynchronous?
     
  2. tjohnson

    Active Member

    Dec 23, 2014
    614
    121
    I'm not very knowledgeable about the other modules you mentioned, but I have used the Python socket module in the past and I'm fairly positive that there are ways to work around both of those problems. If you'd be interested, I can take a look at the code I wrote that used it to see exactly how they can be avoided.
     
    Last edited: Mar 30, 2015
  3. tjohnson

    Active Member

    Dec 23, 2014
    614
    121
    I think I remember what I did to avoid the problems you mentioned:
    If you have control over generating the values that are read and written, you can end them with a unique character sequence (or anything distinctive like a NUL byte). Then, you can just use a while loop to read a fixed number of bytes from the socket at a time (for example, 1024), and stop once you reach the termination character(s).
    You can put the socket in a thread class, and insert a time.sleep call for a fraction of a second in your while loop.

    There are a few other caveats that have to be handled, but I can't remember exactly how to deal with them off the top of my head. As I said in my previous post, I can gladly dig up the code I wrote a few years ago and post it here if you'd like. But if you'd rather use asyncio, that's fine, although I don't understand it either.

    Note: I learned a lot about how to use the Python socket module by studying how other programs used it, particularly a text editor written in Python (no longer developed) called Editra.
     
    Last edited: Mar 30, 2015
  4. GetDeviceInfo

    Senior Member

    Jun 7, 2009
    1,571
    230
    Call it a service. Maybe consider using your system resources such as DCOM to pass your values. As mentioned, OPC servers/clients are common with PLCs, dropping in as simple objects into your code.
     
    Last edited: Mar 30, 2015
  5. strantor

    Thread Starter AAC Fanatic!

    Oct 3, 2010
    4,302
    1,988
    Sounds solid, I'll give that a shot.

    Thanks, I would appreciate it.
    Thank you for the tips!
     
    Last edited: Mar 31, 2015
  6. strantor

    Thread Starter AAC Fanatic!

    Oct 3, 2010
    4,302
    1,988
    I didn't see DCOM on the official Python list of IPC methods, but apparently there is a module out there for it. I will check it out. Thanks!
    Every option I've seen for out-of-the-box OPC embedded solutions is designed for Visual Studio. Seems if you want Python, it's all DIY, which is what I've done. I've got the PLC comms already working fine, using plain 'ol sockets - fixed data length always, called cyclically, Socket likes it. The problem I'm facing now, is what to do with the data once it gets to my Python program. For example, my simulator is running 24FPS and I need to average a value over 2 seconds. That's 48 values. My script runs 48 times, and I have no problem fetching that instantaneous value 48 times, but I need to store it in a FIFO for averaging. So right now my cyclic (24 executions/sec) program looks like this in python (ex):

    1.Fetch memory areas D200-D500 from PLC over TCP
    2.Convert D321 (funtion #1 hydraulic valve command) to Float, calculate function #1 hydraulic flow based on other system parameters
    3.Convert D323 (funtion #2 hydraulic valve command) to Float, calculate function #2 hydraulic flow based on other system parameters
    4. Add flow_1 + flow_2, display on screen "total flow" (wild fluctuations)
    5. Send total flow to PLC

    I would like to add an averaging function like this:
    1.Fetch memory areas D200-D500 from PLC over TCP
    2.Convert D321 (funtion #1 hydraulic valve command) to Float, calculate function #1 hydraulic flow based on other system parameters
    2.A Write to FIFO in local database
    2.B sum current + 47 previous values and divide for 2-sec average
    2.C Result is Flow_1​
    3.Convert D323 (funtion #2 hydraulic valve command) to Float, calculate function #2 hydraulic flow based on other system parameters
    3.A Write to FIFO in local database
    3.B sum current + 47 previous values and divide for 2-sec average
    3.C Result is Flow_2​
    4. Add flow_1 + flow_2, display on screen "total flow" (smoothed value)
    5. Send total flow to PLC
     
  7. tjohnson

    Active Member

    Dec 23, 2014
    614
    121
    I wrote this code for an IPC server that transmitted command line arguments between program instances, but I would think it can be fairly easily adapted to your project. (I removed the data processing parts from it, since they were specific to what I was doing.)
    Code (Text):
    1. import socket
    2. import time
    3.  
    4. try:
    5.     import threading
    6. except ImportError:
    7.     import dummy_threading as threading
    8.  
    9. class IPCServer(threading.Thread):
    10.     port = 50000##49152
    11.     def __init__(self):
    12.         threading.Thread.__init__(self)
    13.  
    14.         try:
    15.             self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    16.             while True:
    17.                 try:
    18.                     self.socket.bind(("localhost", IPCServer.port))
    19.                 except:
    20.                     IPCServer.port += 1
    21.                 else:
    22.                     break
    23.             self.socket.listen(5)
    24.         except socket.error:
    25.             self.socket = None
    26.  
    27.         self.running = True
    28.  
    29.         self.setDaemon(True)  # Force thread to quit if program is aborted
    30.  
    31.     def run(self):
    32.         while self.running:
    33.             client, address = self.socket.accept()
    34.             if not self.running:
    35.                 break
    36.             args = [client.recv(4096)]
    37.             start = time.time()
    38.             while len(args[-1]) == 4096 and time.time() < start + 2:
    39.                 args.append(client.recv(4096))
    40.             # Process data received from client here
    41.         try:
    42.             self.socket.shutdown(socket.SHUT_RDWR)
    43.         except socket.error:
    44.             pass
    45.         self.socket.close()
    46.  
    47.     def Quit(self):
    48.         self.running = False
    49.         transmit([])
    50.  
    51. def transmit(args):
    52.     try:
    53.         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    54.         client.connect(("localhost", IPCServer.port))
    55.         client.send("\0".join(args))  # Send data to server here
    56.         client.shutdown(socket.SHUT_RDWR)
    57.         client.close()
    58.     except socket.error:
    59.         return False
    60.     return True
    The caveats that I mentioned are:
    • The "if not self.running: break" is needed immediately after the call to socket.accept in the while loop within run(), so that when the program is quitting the socket thread can stop as quickly as possible.
    • For the same reason, it's necessary to send a blank message to the server when the program is quitting so that the socket.accept function in the while loop will return promptly.
    One other thing to note: I don't remember for sure anymore whether the call to thread.setDaemon is really necessary or was just something that I added as an experiment. If I remember correctly, I think I may have inserted it to prevent the thread from staying alive if the main program thread aborted due to a fatal error. I don't see how it could hurt anything, so I would recommend trying it.

    Disclaimer: I haven't worked with this code for over a year, so my memory is probably a little rusty. You may find that some things work a little differently for you, especially since you're using sockets for a different application than I was.
     
    Last edited: Mar 31, 2015
  8. GetDeviceInfo

    Senior Member

    Jun 7, 2009
    1,571
    230
    I've been trying this out and it has been performing well so far; http://openopc.sourceforge.net/ . I can't recommend anything, but a quick google came up with a few packages that will average a running buffer. Would you not implement a PID or some other 'closed loop' logic. Might this not be a better function for the PLC to handle.
     
  9. strantor

    Thread Starter AAC Fanatic!

    Oct 3, 2010
    4,302
    1,988
    Sorry if I'm repeating myself; I can't keep track of what I've said on what forums. I'm making a 3D video game simulator/trainer of a hydraulically operated subsea tool. I'm using a 3D animation suite called Blender, along with a real-life PLC & HMI. My goal here is to leave the PLC program untouched; The PLC should not know the difference between being interfaced with the real world or being interfaced with my simulator. Reason being, this tool is a prototype and the PLC programming is subject to change. Bringing simulator programming into the PLC world will complicate any future changes. Also over time I hope to develop the simulator into something more than just an operator trainer; something physically accurate enough to test proposed changes to machine design before investing and taking it offshore. Though I'm not sure Blender will get me there.

    I'm using TCP sockets to read/write directly into the PLC's I/O areas as if it were physical I/O - that part works fine as mentioned. But as I try to refine the physics effects of hydraulic flow through the various proportional valves and motors and such, it becomes beneficial to have some averaging among other ΔT functions - functions I want to keep in the PC if at all possible.

    I'm not very up-to-speed on OPC. In my vague understanding, it's designed to play nice in large networks & DCS. I understand its usefulness to be limited to periodic polling for data acquisition and sending test messages to plant managers when a line has been down for over an hour. I don't see it being suited for what I need, which is rapid fire comms between two machines, 24 times/second. But if I'm off base here, let me know; I suspect you could probably teach a class on OPC.
     
  10. strantor

    Thread Starter AAC Fanatic!

    Oct 3, 2010
    4,302
    1,988

    Thank you for sharing that with me. I will give it a shot but I do not think it will work, because of threading. As I mentioned in my reply above to GetDeviceInfo, I am running Python inside of Blender, and Blender runs Python in its own hacked interpreter which wasn't designed to support threading. Only recently (2 weeks ago) did anybody solve this problem, and they did it by using the asyncio module, which is why I was barking up that tree. See this page:
    So you're probably going to ask - "so why don't you just use that Tulip thingy?" well, as far as I can tell it only works in the regular rendering mode, not the game engine mode. I could be wrong, but I couldn't figure out how to do it, and being so new, there aren't a lot (any) tutorials out there for it.
     
  11. strantor

    Thread Starter AAC Fanatic!

    Oct 3, 2010
    4,302
    1,988
    I think I got this problem solved, without using asyncio. Check out below:

    My cyclic script (called by blender each frame (every 42 mS):
    Code (Text):
    1. __author__ = 'chuck'
    2. import IPC_Database
    3. from random import randint
    4. import time
    5.  
    6. #Simulate cyclic calling (every 42mS (24FPS)) of the script by blender. (50 animation frames):
    7. for i in range (0,50):
    8.     t0 = int(round(time.time() * 1000)) #start timer to evaluate comms time
    9.  
    10.     #initialize database client (database server should already be running, started manually outside of Blender):
    11.     myIPC = IPC_Database.IPC_DB_Access()
    12.     myIPC.open_DB()
    13.     pv = myIPC.putval
    14.     gv = myIPC.getval
    15.     db = myIPC.iodict
    16.     avg = myIPC.average
    17.  
    18.     #Simulate sensor input with random rumber:
    19.     sensor1_input = randint(32000,32768)
    20.     #utilize the averaging function in the IPC client:
    21.     sens1AVG = avg("F1_flo",sensor1_input,100) #average flow, hydraulic function #1
    22.     print("current sensor 1 input: " + str(sensor1_input) + ", Current average: " + str(round(sens1AVG)))
    23.  
    24.     sensor2_input = randint(32000,32768)
    25.     sens2AVG = avg("F2_flo",sensor2_input,100) #average flow, hydraulic function #2
    26.     print("current sensor 2 input: " + str(sensor2_input) + ", Current average: " + str(round(sens2AVG)))
    27.  
    28.     myIPC.write_DB()
    29.     # write_DB automatically closes the connection when finished. Un-comment myIPC.close_DB() if reading only
    30.     #myIPC.close_DB()
    31.  
    32.     t1 = int(round(time.time() * 1000))
    33.     tel = str(t1-t0)
    34.     print("interval #: " + str(i) + ", ... time elapsed: " + tel + "mS")
    35.  
    36.     time.sleep(.03)#30mS sleep to simulate framerate
    returns this:
    Code (Text):
    1. D:\Python34\python.exe C:/Users/chuck/PycharmProjects/socketplay/socketplay2/IPC_subclient.py
    2.  
    3. Connected to IPC database server
    4. current sensor 1 input: 32376, Current average: 32376
    5. current sensor 2 input: 32493, Current average: 32493
    6. disconnected
    7. interval #: 0, ... time elapsed: 9mS
    8.  
    9. Connected to IPC database server
    10. current sensor 1 input: 32095, Current average: 32373
    11. current sensor 2 input: 32097, Current average: 32489
    12. disconnected
    13. interval #: 1, ... time elapsed: 2mS
    14.  
    15. Connected to IPC database server
    16. current sensor 1 input: 32534, Current average: 32375
    17. current sensor 2 input: 32333, Current average: 32487
    18. disconnected
    19. interval #: 2, ... time elapsed: 2mS
    20.  
    21. [...]
    22.  
    23. Connected to IPC database server
    24. current sensor 1 input: 32412, Current average: 32373
    25. current sensor 2 input: 32285, Current average: 32439
    26. disconnected
    27. interval #: 48, ... time elapsed: 3mS
    28.  
    29. Connected to IPC database server
    30. current sensor 1 input: 32187, Current average: 32371
    31. current sensor 2 input: 32505, Current average: 32439
    32. disconnected
    33. interval #: 49, ... time elapsed: 3mS
    34.  
    35. Process finished with exit code 0
    36.  
    The cyclic script calls a "run once" client script that connects to a "run forever" server script.
    client script:
    Code (Text):
    1. __author__ = 'chuck'
    2. import socket
    3.  
    4. class IPC_DB_Access:
    5.  
    6.     def __init__(self, HOST="localhost", PORT=48000):
    7.         self.host = HOST
    8.         self.port = PORT
    9.         self.iodict = {}
    10.  
    11.     def connect(self):      
    12.         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# Create a socket (SOCK_STREAM means a TCP socket)
    13.         try:          
    14.             self.sock.connect((self.host, self.port))# Connect to server to send/receive data
    15.         finally:
    16.             print("\nConnected to IPC database server")
    17.  
    18.     def open_DB(self):
    19.         try:
    20.             self.connect()          
    21.             received = str(self.sock.recv(16384), "utf-8")# Receive data from the server
    22.             rec_list = received.split()
    23.             for item in rec_list:
    24.                 name, value = item.split(":")
    25.                 self.iodict[name] = value
    26.         finally:
    27.             return (received)
    28.  
    29.     def getval(self,key):
    30.         if key in self.iodict.keys():
    31.             op = self.iodict[key]
    32.         else:
    33.             print("Value requested (" + key + ") was not returned by server")
    34.             op = 0
    35.         return (op)
    36.  
    37.  
    38.     def close_DB(self):
    39.         try:
    40.             self.sock.close()
    41.         finally:
    42.             print("disconnected")
    43.  
    44.     def write_DB(self):
    45.         try:
    46.             op=""
    47.             for key in self.iodict.keys():
    48.                 val = self.iodict[key]
    49.                 op += str(key) + ":" + str(val)
    50.                 op += "\n"
    51.             sendstr = bytes(op, "utf-8")
    52.             self.sock.sendall(sendstr)
    53.         finally:
    54.             self.close_DB()
    55.  
    56.     def putval(self,key,value):
    57.         self.iodict[key]=value
    58.  
    59.     def average(self, name,current_val, avgpoints):
    60.         firstentry = "$AVG$"+name+"0000"
    61.         avgsum = 0
    62.         #first cycle of averaging, create data slots for FIFO:
    63.         if firstentry not in self.iodict.keys():
    64.             for i in range (0,avgpoints+1):
    65.                 slotnumber = "0000" + str(i)
    66.                 slotnumber = slotnumber[-4:]
    67.                 slotnumber = "$AVG$" + name + slotnumber
    68.                 self.putval(slotnumber,current_val)
    69.                 avg = current_val
    70.         else:
    71.             for i in range (avgpoints-1,-1,-1):
    72.                 this_slotnumber, next_slotnumber = "0000" + str(i), "0000" + str(i+1)
    73.                 this_slotnumber, next_slotnumber = this_slotnumber[-4:], next_slotnumber[-4:]
    74.                 this_slotnumber, next_slotnumber = "$AVG$" + name + this_slotnumber, "$AVG$" + name + next_slotnumber
    75.                 #print( "moving " + this_slotnumber + " to " + next_slotnumber)
    76.                 this_value = self.getval(this_slotnumber)
    77.                 self.putval(next_slotnumber,this_value)
    78.                 avgsum +=int(this_value)
    79.             self.putval(firstentry,current_val)
    80.             avgsum += current_val
    81.             avg = avgsum/(avgpoints+1)
    82.         return (avg)
    and here's the server script:
    Code (Text):
    1. __author__ = 'chuck'
    2. import socketserver
    3. recvd_items = []
    4. serv_dict = {}
    5. print("init")
    6.  
    7. class MyTCPHandler(socketserver.BaseRequestHandler):
    8.  
    9.     ####Server routine, runs in continuous loop####
    10.     def handle(self):
    11.         #New connection established:
    12.         print("connection recieved on port {}... ".format(self.client_address[1]) +
    13.               " from IP: {}".format(self.client_address[0]))
    14.  
    15.         #upon connection from client, first send stored tags & values to client, then listen
    16.         if serv_dict == {}:
    17.             self.request.sendall(b"init")
    18.         else:
    19.             op = ""
    20.             for key in serv_dict.keys():
    21.                 val = serv_dict[key]
    22.                 op += str(key)+ ":" + str(val)
    23.                 op += '\n'
    24.             sendstr = bytes(op, "utf-8")
    25.             self.request.sendall(sendstr)
    26.             #print(b"sending: ... " + sendstr)
    27.  
    28.         # self.request is the TCP socket connected to the client
    29.         self.data = self.request.recv(16384).strip()
    30.         recvd_whole = self.data.decode()
    31.         recvd_lines = recvd_whole.split()
    32.         for line in recvd_lines:
    33.             if ":" in line:
    34.                 key,val = line.split(":")
    35.                 serv_dict[key]=val
    36.             else:
    37.                 print("fucked formatting: ( " + str(line) + " ) ... should contain a colon")
    38.         for item in sorted(serv_dict.items()):
    39.             print(item)
    40.  
    41. if __name__ == "__main__":
    42. HOST, PORT = "localhost", 48000
    43. # Create the server, binding to localhost on port 48000
    44. server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)
    45.  
    46. # Activate the server; this will keep running until you # interrupt the program with Ctrl-C
    47. server.serve_forever()
    48.  
     
Loading...