Please explain Python asyncio

Thread Starter

strantor

Joined Oct 3, 2010
6,782
My goal is to have some IPC (two or more programs sharing information) between a Python script and another Python script (or a web page).

I want to have a "server" script which runs in a continuous loop and acting as a "database" - Not sure whether I should call that a server or database.

In the manner in which I am using the "client" script, it is executed several times per second. During every single execution of the client script, it should read from and write to the server/database several times (different parameters, different values each time).

I first tried ordinary Python socket module. This was problematic because:
#1. The socket module seems to want a fixed data length, and I cannot comply with that. The values read & written, and the number of values read & written are different each time.
#2. It does not want to run in a loop; one & done. Attempts to put it into a _while_ loop cause other things to lock up.

So I tried Python websockets, and it is the most promising so far because both of the above issues are null. It runs in a forever loop, listening and replying with any given data length. But it introduces a new problem: any script that makes a call to the "server/database" script never finishes. Because of asyncio, it makes any script that calls it into another forever loop. I cannot have this. My cyclic "client" script must run from top to bottom and end, just like a regular Python script.

Each attempt I make to read and understand how the asyncio module works makes me feel more and more like an imbecile. Can someone please explain it on the simplest level possible, and hopefully provide some tips on how to call a script running asyncio without making the calling script also asynchronous?
 

tjohnson

Joined Dec 23, 2014
611
I first tried ordinary Python socket module. This was problematic because:
#1. The socket module seems to want a fixed data length, and I cannot comply with that. The values read & written, and the number of values read & written are different each time.
#2. It does not want to run in a loop; one & done. Attempts to put it into a _while_ loop cause other things to lock up.
I'm not very knowledgeable about the other modules you mentioned, but I have used the Python socket module in the past and I'm fairly positive that there are ways to work around both of those problems. If you'd be interested, I can take a look at the code I wrote that used it to see exactly how they can be avoided.
 
Last edited:

tjohnson

Joined Dec 23, 2014
611
I think I remember what I did to avoid the problems you mentioned:
#1. The socket module seems to want a fixed data length, and I cannot comply with that. The values read & written, and the number of values read & written are different each time.
If you have control over generating the values that are read and written, you can end them with a unique character sequence (or anything distinctive like a NUL byte). Then, you can just use a while loop to read a fixed number of bytes from the socket at a time (for example, 1024), and stop once you reach the termination character(s).
#2. It does not want to run in a loop; one & done. Attempts to put it into a _while_ loop cause other things to lock up.
You can put the socket in a thread class, and insert a time.sleep call for a fraction of a second in your while loop.

There are a few other caveats that have to be handled, but I can't remember exactly how to deal with them off the top of my head. As I said in my previous post, I can gladly dig up the code I wrote a few years ago and post it here if you'd like. But if you'd rather use asyncio, that's fine, although I don't understand it either.

Note: I learned a lot about how to use the Python socket module by studying how other programs used it, particularly a text editor written in Python (no longer developed) called Editra.
 
Last edited:

GetDeviceInfo

Joined Jun 7, 2009
2,192
I want to have a "server" script which runs in a continuous loop and acting as a "database" - Not sure whether I should call that a server or database.
Call it a service. Maybe consider using your system resources such as DCOM to pass your values. As mentioned, OPC servers/clients are common with PLCs, dropping in as simple objects into your code.
 
Last edited:

Thread Starter

strantor

Joined Oct 3, 2010
6,782
I think I remember what I did to avoid the problems you mentioned:

If you have control over generating the values that are read and written, you can end them with a unique character sequence (or anything distinctive like a NUL byte). Then, you can just use a while loop to read a fixed number of bytes from the socket at a time (for example, 1024), and stop once you reach the termination character(s).

You can put the socket in a thread class, and insert a time.sleep call for a fraction of a second in your while loop.
Sounds solid, I'll give that a shot.

There are a few other caveats that have to be handled, but I can't remember exactly how to deal with them off the top of my head. As I said in my previous post, I can gladly dig up the code I wrote a few years ago and post it here if you'd like. But if you'd rather use asyncio, that's fine, although I don't understand it either.
Thanks, I would appreciate it.
Thank you for the tips!
 
Last edited:

Thread Starter

strantor

Joined Oct 3, 2010
6,782
Call it a service. Maybe consider using your system resources such as DCOM to pass your values.
I didn't see DCOM on the official Python list of IPC methods, but apparently there is a module out there for it. I will check it out. Thanks!
As mentioned, OPC servers/clients are common with PLCs, dropping in as simple objects into your code.
Every option I've seen for out-of-the-box OPC embedded solutions is designed for Visual Studio. Seems if you want Python, it's all DIY, which is what I've done. I've got the PLC comms already working fine, using plain 'ol sockets - fixed data length always, called cyclically, Socket likes it. The problem I'm facing now, is what to do with the data once it gets to my Python program. For example, my simulator is running 24FPS and I need to average a value over 2 seconds. That's 48 values. My script runs 48 times, and I have no problem fetching that instantaneous value 48 times, but I need to store it in a FIFO for averaging. So right now my cyclic (24 executions/sec) program looks like this in python (ex):

1.Fetch memory areas D200-D500 from PLC over TCP
2.Convert D321 (funtion #1 hydraulic valve command) to Float, calculate function #1 hydraulic flow based on other system parameters
3.Convert D323 (funtion #2 hydraulic valve command) to Float, calculate function #2 hydraulic flow based on other system parameters
4. Add flow_1 + flow_2, display on screen "total flow" (wild fluctuations)
5. Send total flow to PLC

I would like to add an averaging function like this:
1.Fetch memory areas D200-D500 from PLC over TCP
2.Convert D321 (funtion #1 hydraulic valve command) to Float, calculate function #1 hydraulic flow based on other system parameters
2.A Write to FIFO in local database
2.B sum current + 47 previous values and divide for 2-sec average
2.C Result is Flow_1​
3.Convert D323 (funtion #2 hydraulic valve command) to Float, calculate function #2 hydraulic flow based on other system parameters
3.A Write to FIFO in local database
3.B sum current + 47 previous values and divide for 2-sec average
3.C Result is Flow_2​
4. Add flow_1 + flow_2, display on screen "total flow" (smoothed value)
5. Send total flow to PLC
 

tjohnson

Joined Dec 23, 2014
611
Sounds solid, I'll give that a shot.


Thanks, I would appreciate it.
Thank you for the tips!
I wrote this code for an IPC server that transmitted command line arguments between program instances, but I would think it can be fairly easily adapted to your project. (I removed the data processing parts from it, since they were specific to what I was doing.)
Code:
import socket
import time

try:
    import threading
except ImportError:
    import dummy_threading as threading

class IPCServer(threading.Thread):
    port = 50000##49152
    def __init__(self):
        threading.Thread.__init__(self)

        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            while True:
                try:
                    self.socket.bind(("localhost", IPCServer.port))
                except:
                    IPCServer.port += 1
                else:
                    break
            self.socket.listen(5)
        except socket.error:
            self.socket = None

        self.running = True

        self.setDaemon(True)  # Force thread to quit if program is aborted

    def run(self):
        while self.running:
            client, address = self.socket.accept()
            if not self.running:
                break
            args = [client.recv(4096)]
            start = time.time()
            while len(args[-1]) == 4096 and time.time() < start + 2:
                args.append(client.recv(4096))
            # Process data received from client here
        try:
            self.socket.shutdown(socket.SHUT_RDWR)
        except socket.error:
            pass
        self.socket.close()

    def Quit(self):
        self.running = False
        transmit([])

def transmit(args):
    try:
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect(("localhost", IPCServer.port))
        client.send("\0".join(args))  # Send data to server here
        client.shutdown(socket.SHUT_RDWR)
        client.close()
    except socket.error:
        return False
    return True
The caveats that I mentioned are:
  • The "if not self.running: break" is needed immediately after the call to socket.accept in the while loop within run(), so that when the program is quitting the socket thread can stop as quickly as possible.
  • For the same reason, it's necessary to send a blank message to the server when the program is quitting so that the socket.accept function in the while loop will return promptly.
One other thing to note: I don't remember for sure anymore whether the call to thread.setDaemon is really necessary or was just something that I added as an experiment. If I remember correctly, I think I may have inserted it to prevent the thread from staying alive if the main program thread aborted due to a fatal error. I don't see how it could hurt anything, so I would recommend trying it.

Disclaimer: I haven't worked with this code for over a year, so my memory is probably a little rusty. You may find that some things work a little differently for you, especially since you're using sockets for a different application than I was.
 
Last edited:

GetDeviceInfo

Joined Jun 7, 2009
2,192
I've been trying this out and it has been performing well so far; http://openopc.sourceforge.net/ . I can't recommend anything, but a quick google came up with a few packages that will average a running buffer. Would you not implement a PID or some other 'closed loop' logic. Might this not be a better function for the PLC to handle.
 

Thread Starter

strantor

Joined Oct 3, 2010
6,782
I've been trying this out and it has been performing well so far; http://openopc.sourceforge.net/ . I can't recommend anything, but a quick google came up with a few packages that will average a running buffer. Would you not implement a PID or some other 'closed loop' logic. Might this not be a better function for the PLC to handle.
Sorry if I'm repeating myself; I can't keep track of what I've said on what forums. I'm making a 3D video game simulator/trainer of a hydraulically operated subsea tool. I'm using a 3D animation suite called Blender, along with a real-life PLC & HMI. My goal here is to leave the PLC program untouched; The PLC should not know the difference between being interfaced with the real world or being interfaced with my simulator. Reason being, this tool is a prototype and the PLC programming is subject to change. Bringing simulator programming into the PLC world will complicate any future changes. Also over time I hope to develop the simulator into something more than just an operator trainer; something physically accurate enough to test proposed changes to machine design before investing and taking it offshore. Though I'm not sure Blender will get me there.

I'm using TCP sockets to read/write directly into the PLC's I/O areas as if it were physical I/O - that part works fine as mentioned. But as I try to refine the physics effects of hydraulic flow through the various proportional valves and motors and such, it becomes beneficial to have some averaging among other ΔT functions - functions I want to keep in the PC if at all possible.

I'm not very up-to-speed on OPC. In my vague understanding, it's designed to play nice in large networks & DCS. I understand its usefulness to be limited to periodic polling for data acquisition and sending test messages to plant managers when a line has been down for over an hour. I don't see it being suited for what I need, which is rapid fire comms between two machines, 24 times/second. But if I'm off base here, let me know; I suspect you could probably teach a class on OPC.
 

Thread Starter

strantor

Joined Oct 3, 2010
6,782
I wrote this code for an IPC server that transmitted command line arguments between program instances, but I would think it can be fairly easily adapted to your project. (I removed the data processing parts from it, since they were specific to what I was doing.)
Code:
import socket
import time

try:
    import threading
except ImportError:
    import dummy_threading as threading

class IPCServer(threading.Thread):
    port = 50000##49152
    def __init__(self):
        threading.Thread.__init__(self)

        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            while True:
                try:
                    self.socket.bind(("localhost", IPCServer.port))
                except:
                    IPCServer.port += 1
                else:
                    break
            self.socket.listen(5)
        except socket.error:
            self.socket = None

        self.running = True

        self.setDaemon(True)  # Force thread to quit if program is aborted

    def run(self):
        while self.running:
            client, address = self.socket.accept()
            if not self.running:
                break
            args = [client.recv(4096)]
            start = time.time()
            while len(args[-1]) == 4096 and time.time() < start + 2:
                args.append(client.recv(4096))
            # Process data received from client here
        try:
            self.socket.shutdown(socket.SHUT_RDWR)
        except socket.error:
            pass
        self.socket.close()

    def Quit(self):
        self.running = False
        transmit([])

def transmit(args):
    try:
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect(("localhost", IPCServer.port))
        client.send("\0".join(args))  # Send data to server here
        client.shutdown(socket.SHUT_RDWR)
        client.close()
    except socket.error:
        return False
    return True
The caveats that I mentioned are:
  • The "if not self.running: break" is needed immediately after the call to socket.accept in the while loop within run(), so that when the program is quitting the socket thread can stop as quickly as possible.
  • For the same reason, it's necessary to send a blank message to the server when the program is quitting so that the socket.accept function in the while loop will return promptly.
One other thing to note: I don't remember for sure anymore whether the call to thread.setDaemon is really necessary or was just something that I added as an experiment. If I remember correctly, I think I may have inserted it to prevent the thread from staying alive if the main program thread aborted due to a fatal error. I don't see how it could hurt anything, so I would recommend trying it.

Disclaimer: I haven't worked with this code for over a year, so my memory is probably a little rusty. You may find that some things work a little differently for you, especially since you're using sockets for a different application than I was.

Thank you for sharing that with me. I will give it a shot but I do not think it will work, because of threading. As I mentioned in my reply above to GetDeviceInfo, I am running Python inside of Blender, and Blender runs Python in its own hacked interpreter which wasn't designed to support threading. Only recently (2 weeks ago) did anybody solve this problem, and they did it by using the asyncio module, which is why I was barking up that tree. See this page:
Up to now, network programming in blender has been a nightmare and virtually absent. Python threading did not work reliably inside blender, and so any script running for extended periods of time would block the user interface. Any code running outside the main process would not have full or reliable access to the API.

The larger Python community solved this problem using asyncio. In fact, Guido van Rossum himself designed a standard event loop library for concurrency and asynchronous input/output. Codenamed Tulip, this library is already included in the standard library and current distributions of Blender
So you're probably going to ask - "so why don't you just use that Tulip thingy?" well, as far as I can tell it only works in the regular rendering mode, not the game engine mode. I could be wrong, but I couldn't figure out how to do it, and being so new, there aren't a lot (any) tutorials out there for it.
 

Thread Starter

strantor

Joined Oct 3, 2010
6,782
I think I got this problem solved, without using asyncio. Check out below:

My cyclic script (called by blender each frame (every 42 mS):
Code:
__author__ = 'chuck'
import IPC_Database
from random import randint
import time

#Simulate cyclic calling (every 42mS (24FPS)) of the script by blender. (50 animation frames):
for i in range (0,50):
    t0 = int(round(time.time() * 1000)) #start timer to evaluate comms time

    #initialize database client (database server should already be running, started manually outside of Blender):
    myIPC = IPC_Database.IPC_DB_Access()
    myIPC.open_DB()
    pv = myIPC.putval
    gv = myIPC.getval
    db = myIPC.iodict
    avg = myIPC.average

    #Simulate sensor input with random rumber:
    sensor1_input = randint(32000,32768)
    #utilize the averaging function in the IPC client:
    sens1AVG = avg("F1_flo",sensor1_input,100) #average flow, hydraulic function #1
    print("current sensor 1 input: " + str(sensor1_input) + ", Current average: " + str(round(sens1AVG)))

    sensor2_input = randint(32000,32768)
    sens2AVG = avg("F2_flo",sensor2_input,100) #average flow, hydraulic function #2
    print("current sensor 2 input: " + str(sensor2_input) + ", Current average: " + str(round(sens2AVG)))

    myIPC.write_DB()
    # write_DB automatically closes the connection when finished. Un-comment myIPC.close_DB() if reading only
    #myIPC.close_DB()

    t1 = int(round(time.time() * 1000))
    tel = str(t1-t0)
    print("interval #: " + str(i) + ", ... time elapsed: " + tel + "mS")

    time.sleep(.03)#30mS sleep to simulate framerate
returns this:
Code:
D:\Python34\python.exe C:/Users/chuck/PycharmProjects/socketplay/socketplay2/IPC_subclient.py

Connected to IPC database server
current sensor 1 input: 32376, Current average: 32376
current sensor 2 input: 32493, Current average: 32493
disconnected
interval #: 0, ... time elapsed: 9mS

Connected to IPC database server
current sensor 1 input: 32095, Current average: 32373
current sensor 2 input: 32097, Current average: 32489
disconnected
interval #: 1, ... time elapsed: 2mS

Connected to IPC database server
current sensor 1 input: 32534, Current average: 32375
current sensor 2 input: 32333, Current average: 32487
disconnected
interval #: 2, ... time elapsed: 2mS

[...]

Connected to IPC database server
current sensor 1 input: 32412, Current average: 32373
current sensor 2 input: 32285, Current average: 32439
disconnected
interval #: 48, ... time elapsed: 3mS

Connected to IPC database server
current sensor 1 input: 32187, Current average: 32371
current sensor 2 input: 32505, Current average: 32439
disconnected
interval #: 49, ... time elapsed: 3mS

Process finished with exit code 0
The cyclic script calls a "run once" client script that connects to a "run forever" server script.
client script:
Code:
__author__ = 'chuck'
import socket

class IPC_DB_Access:

    def __init__(self, HOST="localhost", PORT=48000):
        self.host = HOST
        self.port = PORT
        self.iodict = {}

    def connect(self):      
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# Create a socket (SOCK_STREAM means a TCP socket)
        try:          
            self.sock.connect((self.host, self.port))# Connect to server to send/receive data
        finally:
            print("\nConnected to IPC database server")

    def open_DB(self):
        try:
            self.connect()          
            received = str(self.sock.recv(16384), "utf-8")# Receive data from the server
            rec_list = received.split()
            for item in rec_list:
                name, value = item.split(":")
                self.iodict[name] = value
        finally:
            return (received)

    def getval(self,key):
        if key in self.iodict.keys():
            op = self.iodict[key]
        else:
            print("Value requested (" + key + ") was not returned by server")
            op = 0
        return (op)


    def close_DB(self):
        try:
            self.sock.close()
        finally:
            print("disconnected")

    def write_DB(self):
        try:
            op=""
            for key in self.iodict.keys():
                val = self.iodict[key]
                op += str(key) + ":" + str(val)
                op += "\n"
            sendstr = bytes(op, "utf-8")
            self.sock.sendall(sendstr)
        finally:
            self.close_DB()

    def putval(self,key,value):
        self.iodict[key]=value

    def average(self, name,current_val, avgpoints):
        firstentry = "$AVG$"+name+"0000"
        avgsum = 0
        #first cycle of averaging, create data slots for FIFO:
        if firstentry not in self.iodict.keys():
            for i in range (0,avgpoints+1):
                slotnumber = "0000" + str(i)
                slotnumber = slotnumber[-4:]
                slotnumber = "$AVG$" + name + slotnumber
                self.putval(slotnumber,current_val)
                avg = current_val
        else:
            for i in range (avgpoints-1,-1,-1):
                this_slotnumber, next_slotnumber = "0000" + str(i), "0000" + str(i+1)
                this_slotnumber, next_slotnumber = this_slotnumber[-4:], next_slotnumber[-4:]
                this_slotnumber, next_slotnumber = "$AVG$" + name + this_slotnumber, "$AVG$" + name + next_slotnumber
                #print( "moving " + this_slotnumber + " to " + next_slotnumber)
                this_value = self.getval(this_slotnumber)
                self.putval(next_slotnumber,this_value)
                avgsum +=int(this_value)
            self.putval(firstentry,current_val)
            avgsum += current_val
            avg = avgsum/(avgpoints+1)
        return (avg)
and here's the server script:
Code:
__author__ = 'chuck'
import socketserver
recvd_items = []
serv_dict = {}
print("init")

class MyTCPHandler(socketserver.BaseRequestHandler):

    ####Server routine, runs in continuous loop####
    def handle(self):
        #New connection established:
        print("connection recieved on port {}... ".format(self.client_address[1]) +
              " from IP: {}".format(self.client_address[0]))

        #upon connection from client, first send stored tags & values to client, then listen
        if serv_dict == {}:
            self.request.sendall(b"init")
        else:
            op = ""
            for key in serv_dict.keys():
                val = serv_dict[key]
                op += str(key)+ ":" + str(val)
                op += '\n'
            sendstr = bytes(op, "utf-8")
            self.request.sendall(sendstr)
            #print(b"sending: ... " + sendstr)

        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(16384).strip()
        recvd_whole = self.data.decode()
        recvd_lines = recvd_whole.split()
        for line in recvd_lines:
            if ":" in line:
                key,val = line.split(":")
                serv_dict[key]=val
            else:
                print("fucked formatting: ( " + str(line) + " ) ... should contain a colon")
        for item in sorted(serv_dict.items()):
            print(item)

if __name__ == "__main__":
HOST, PORT = "localhost", 48000
# Create the server, binding to localhost on port 48000
server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)

# Activate the server; this will keep running until you # interrupt the program with Ctrl-C
server.serve_forever()
 
Top