Create a Python 3 Streaming Socket Server with SQLAlchemy and MySQL

A company that I contract for operates a product that provides a complete back-office business solution for Propane Dealers across the United States and Mexico. This solutions includes wireless usage monitoring over both Fixed RF and Cellular, as well as monthly invoice billing, customer, service address and tank management.

This solution is multi-faceted, as there are many working parts, including the over 5000 Radios deployed in the field and the fixed receivers atop Radio Towers from Florida to Nantucket.

One of the primary network components is the Radio Receiver. Currently, this implementation is a Node.js/Express application. It is quite complex as the callback routines pass the raw Radio data through a series of decoding/encoding functions and finally, the database functions to write the data to the local Receiver database and then forward it to the Gateway.

While this works very well, we have found that, over the past 2 years of upgrading and building out new project features, the technical debt contained within this project repository has been a serious limitation.

The Radio Gateway is also a Node app, but the back-office is a full-featured Django app that includes the Dealer Portal as well as the Invoicing and automation system.

Discussing these issues with the Principals in regards to the future development roadmap for this project, I have recommended replacing the front-end Node apps with a pure Python Socket Server solution.

While I was researching the most efficient Python Socket Server implementations, I created the following practice app as a test-bed for developing new Radio Receiver software in Python.

The first place I always start is the Data Model. How are our radio objects going to be represented and the radio data stored? How is the incoming data stream encoded? How do we decode the stream? How does it scale to tens of thousands of connections per second? Concurrency is not automatic in Python due to thread locking, so each new socket will need to start a new Thread.

Ok, let's get started. As usual, I am only including the relevant parts of the source code for the sake of the post. For the full source code, please see this projects GitHub repository.

The Data Model:

Here is the basic data model for this sample Socket Server project. The RadioType, Radio and the Radio TX data tables.

Now that I have the Data Model looking good, I need to create the interface to connect the Data Model to the database. Wire up and modify your MySQL username, password and database name.

The DB Interface

Next, we need to install the project dependencies so we can create our Data Model automatically from our shell environment.

$ virtualenv .env --python=python3
$ source .env/bin/activate
(.env)$ pip3 install -r requirements.txt
... dependencies install successfully ...
(.env)$ python

We are now in our Python shell, import and initialize the database.

Python 3.6.7 (default, Oct 22 2018, 11:32:17)  
[GCC 8.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.  
>>> from db import init_db
>>> init_db()

Next, open MySQL in your terminal and check to ensure the database model changes were written successfully to your database.

mysql> use RadioData;  
Reading table information for completion of table and column names  
You can turn off this feature to get a quicker startup with -A

Database changed  
mysql> show tables;  
| Tables_in_RadioData |
| radio_tx_headers    |
| radio_tx_readings   |
| radio_txs           |
| radio_types         |
| radios              |
5 rows in set (0.00 sec)  

Great. Let's review what we have completed so far.

  1. We have created the Data Model.
  2. Created the DB interface and initialized the database.
  3. Set up our environment and installed dependencies.

The next step is to create the Socket Server.

Socket Server (

This is a very straight-forward implementation and is quite simple compared to the Node app that is essentially doing to exact same thing, just in a completely different way.

The Socket Server begins by importing our libraries and creating a main() function. This is where the Socket Server app accepts incoming connections. The first loop, while True keeps the listener running indefinitely unless there is a socket error, in which case an Exception is called and sys.exit(1) dumps the error out to the console.

The server is listening on port 8060, so any data sent to this server IP and port will be chunked into 1024 bytes. If the data object returns True, we set the tx variable to the data encoded in hexadecimal.

tx = data.hex()  

Then slice the header from tx and convert to binary.

# set the header values to determine the message type
header = inspect_header(tx[:34], 2)  

I now have a binary representation of my hexadecimal encoded data.

Now comes the fun part... Decoding our Radio Data object.

The data is encoded using bit registers and each bit must be offset and read individually, then reassembled to create our OrderedDict of decoded Radio data.

Radio Data Decoder

There are quite a few things happening in this module, First, we have to pad the data reading and return the prefix in the case the binary is of insufficient length.

Next, we have a function to calculate hexadecimal to decimal conversions as well as binary to decimal. Also within the module, we implement functions to inspect the header and decode the header to ensure we have a valid Radio data object and Radio Message Type. Finally, we complete this module with a function to decode the individual radio data readings.

Back to our server now, we call inspect_header() and then decode_header(). If the Radio data object is properly decoded, then we can call our first SQLAlchemy function to create a new tx_id and return the ID of the newly created TX data object.

Next, I create a list of decoded radio data...

items = list(decoded.items())  

For development and debugging while building this app, I want to see the list members, so I output them to the console in the script.

Since I have a full set of decoded list members; I can insert the data into the radio_tx_headers table and return the TX_HEADER_ID.

Moving along, I have a TX and a TX HEADER, next step is to start decoding the individual readings from the remaining slice of the encoded data from the data stream.

reading_data = data[52:]  

Loop over the payload length and set

readings = inspect_header(readings_data, 8)  

which will convert the data into an 8 bit signed integer. With a valid readings iterable, I can now loop over readings and call the decode_readings function on our reading object.

for reading in reading:  
    decode_reading = decode_readings(reading)

Next, insert the rest of the Radio data to the radio_tx_readings table and close the socket.

OK, nice. We have done quite a bit of work. The final step is to call the main() function and start up our Socket Server.

if __name__ == '__main__':  
    Start the Python Socket Server and Listen for Connections.
    Network Receiver Listens on Port 9060
    # create the socket
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # bind the socket to the port
    server_address = ''
    # should be socket.gethostname()
    port = 8060

    # bind the server address
    s.bind((server_address, port))

    # flash a server message to console
    print('Socket server starting up... listening on {}, Port: {}'.format(server_address, port))

    # listen for incoming connections

    # call main program

Awesome. Back in your terminal, call the script and start up the server.

(.env)~/workspaces/python-socket-server$ python
Socket server starting up... listening on, Port: 8060  
Hi.  Please send me some data.  

The server is running... Ok, now what?

The Client App

The console output for debugging.


(.env)~/workspaces/python-socket-server$ python 
Socket Created: <socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('', 0)>  
Socket Connected to on IP:  
Message: 05018408001a7a0861075027891761087b00018c00000084023609702825097028240970282409702824097028240970282409702824097028240970282408702824000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e5af was sent successfully...  

I am using a sample hex encoded data stream to pass to the server. This was provided by the radio manufacturer's test radios and docs.


(.env)~/workspaces/python-socket-server$ python
Socket server starting up... listening on, Port: 8060  
Hi.  Please send me some data.  
<socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('', 8060), raddr=('', 40070)> from client ('', 40070)  
Product Type: 5  
Hardware Rev: 1  
Firmware Rev: 4.4  
Contact Reason: 0b1000  
Alarm Status: 0b0  
RSSI: 26  
Battery Status: 122  
IMEI: 0861075027891761  
Message Type: 8  
Payload Length: 123  
Decoded Header: OrderedDict([('product_type', 5), ('hardware_rev', 1), ('firmware_rev', '4.4'), ('contact_reason', '0b1000'), ('alarm_status', '0b0'), ('imei', '0861075027891761'), ('gsm_rssi', 26), ('battery_status', 122), ('message_type', 8), ('payload_len', 123)])  
Items from List: [('product_type', 5), ('hardware_rev', 1), ('firmware_rev', '4.4'), ('contact_reason', '0b1000'), ('alarm_status', '0b0'), ('imei', '0861075027891761'), ('gsm_rssi', 26), ('battery_status', 122), ('message_type', 8), ('payload_len', 123)]  
Product Type: 5  
Message Type: 8  
IMEI: 0861075027891761  
Payload Length: 123  
Wrapping Up

This is a great starting point for creating a purely Python Network Radio Receiver and Socket Server. We are already decoding Radio data transmissions from new test radios on our test network.

In the next part of this implementation, we are going to introduce Threading into our Socket Server in order to provide the necessary concurrency that is required when developing applications in a high-performance and complex networking environment.

Stay tuned for Part II...

Craig Derington

Secular Humanist, Libertarian, Open Source Software Advocate building Cloud Apps on RHEL and Ubuntu Server. My toolset includes Python, Celery, Flask, Django, Graph, EdgeDB, MySQL, MongoDB and Git.

comments powered by Disqus