Nginx + Gunicorn, Guaranteed Web App in an Afternoon

Nginx, Gunicorn and Flask
Funky, right? Read to the end to find out how this image was generated!

Have you ever wondered what it would take to develop a simple web app? Maybe you wanted to create one that would serve requests to merely call a command-line version of an app you made. This is what I wanted to do after I created a Word Maze Generator to help my son study for his 3rd-grade spelling tests. With Gunicorn and Nginx, I easily accomplished this task.

Creating web front-ends for simple applications is an excellent way to reach a larger audience and get feedback on your work. In my case, I wanted to make my application available to my son’s teacher and classmates.

Here, I am focusing on the ‘why’ more than the ‘how.’ For a practical guide, feel free to navigate to this Digital Ocean post that I used to implement these tools. If you are like me though, the ‘why’ is very important, so let’s dive in!

Web Request, What?

Below is the ideal model for what happens when you hit Enter on your keyboard with a URL in your web browser.

Web request

There are steps like Domain Name Resolution that enable this ideal model, but for our purposes, let’s stick to this simple construction.

When you enter in www.periaptapis.net and hit Enter, a Hyper-Text Transfer Protocol (HTTP) GET request is assembled by your browser and shipped over the network to the server (host) responsible for that domain. Once the host receives the request, it packages the content and ships it to the client who requested it.

Naturally, this leads to the question: “So all I need is a web host that is connected to the Internet and running my Flask app, right?” The answer to this question is technically “yes.” However, give it time plus a small amount of traffic, and your site is bound to run into some issues.

WSGI Time!

Let’s say there are three people requesting content from your website. If it’s a small application, you might be able to service that traffic in a semi-timely manner. However, some users are going to ‘hang’ until your website can catch up. Some might possibly be left with a page informing them that their request has timed out and to try again (the problem will only get worse if they do!).

Web Server Gateway Interface, or ‘WSGI,’ was created to solve exactly this problem.

Gunicorn added to web request

In our new model, we show how our three requests go to a single Gunicorn application. Gunicorn translates the web request to the WSGI format and creates threads/processes to distribute the requests in queue. The data that your application returns is then translated to an HTTP response by Gunicorn and sent back to the client.

It helps to be able to increase the number of programs working on the requests, but there is a ceiling to the number of threads/processes that can be created on a given web host.

Let’s consider 1000 requests. Gunicorn, while mighty, cannot service this kind of demand. It is meant for WSGI translation and process/thread management to call your application. Gunicorn is better than just a single-threaded Flask application, but it is still overwhelmed by a large set of requests.

Enter Nginx

Nginx is a program that handles the interface between the world-wide Internet and Gunicorn. It can juggle many connections at once, routing them as configured. Our final setup looks like this:


It is a bit excessive to say we could handle a bajillion requests from our gaggle of Internet users, but we are much better off this way. With Nginx installed, our CPU-dependent application can be spun up and managed by Gunicorn while Nginx keeps the users happy, reassuring the client that we are definitely still working on their request and will get back to them.

An added benefit is that with minimal extra work, we get the nice little lock symbol in the browser, which indicates an HTTPS connection with our service. To do this, we run a tool called Certbot after we have Nginx configured. Certbot asks for the server that needs certificates and the owner’s email address. Finally, Certbot requests your certificate from a certificate authority, which is then auto-configured in your Nginx setup for HTTPS traffic. Awesome!

Final Remarks

Check out the current iteration of our spelling maze app front-end here. It employs the exact setup described in this article to create custom spelling mazes requested by users.

The practical implementation I followed to get my server up and running can be found in a Digital Ocean post here.

If you are curious about the monstrous lead photo, it came from an AI model called Stable Diffusion 2, run with the prompt: “Nginx, Gunicorn and Flask.”

Thank you for reading this post. I hope it puts the ‘why’ into the methodology I chose to serve a web application to multiple users. That being said, I am not an authority on web dev! I’m always looking for new information, so if you want to add to or correct anything in this post, please leave a comment!

Kivy, One of the Best and Powerful GUI Creators

Simulation-building is something I seem to return to over and over again. I really enjoy learning something new each time I witness how small behaviors produce emergent behavior. Graphics are an accessible way to conceptualize the complex nature of multi-variable simulations. In this article, I lay the foundation for a project I hope will serve as the first chapter for a much larger vision.

Let’s explore Kivy, a powerful Python app development library.

Spoiler alert, this has, by far, been my best experience with a Graphical User Interface (GUI) library in Python. For me, it provides the best of two worlds for building a small application for a very basic simulation:

  • Layout management and simple widgets for boiler-plate application objects, like buttons and text inputs.
  • Custom drawing to the widgets’ underlying canvas, allowing for custom shapes and graphics.

A Very Basic Simulation

To get a better feel for this GUI framework, I decided to implement a primitive simulation to display on my application. The rules are simple: You have people and you have food! A person (red square) has a stomach and energy. The person’s stomach is filled with food, and energy is created from the food in their stomach. Food (green square) is a stationary source for a person to fill their stomach.

People move around in search for stationary food supplies, which expends energy and in turn uses the food in their stomach. If a person’s energy and stomach level reach 0, they are no longer considered alive (blue square).

Kivy

All right, with the simulation explained, it’s time to dig into Kivy! Kivy gives you so much to play with, yet has a refreshingly minimal barrier to entry. You can add buttons, text boxes, labels, and so much more! The best part is, through the Kv Language, your GUI can function on its own without Python code driving it. There is a helpful rundown on the Kv language on Kivy’s website here.

Kv Design Language

An example of this can be found in my ‘SimulationController’ widget, defined in SimulationGUI.kv below:

<SimulationController>
    BoxLayout:
        width: root.width
        height: root.height
        orientation: "horizontal"
        BoxLayout:
            orientation: "vertical"

            Slider:
                id: grid_height
                min: 25
                max: int(root.height)
                step: 1
                orientation: "horizontal"
            Label:
                text: str(grid_height.value) + " Map Height"
            
            Slider:
                id: grid_width
                min: 25
                max: int(root.width)
                step: 1
                orientation: "horizontal"
            Label:
                text: str(grid_width.value) + " Map Width"

You can see the ‘SimulationController,’ with two nested ‘BoxLayouts.’

<SimulationController>
    BoxLayout:
        width: root.width
        height: root.height
        orientation: "horizontal"
        BoxLayout:
            orientation: "vertical"

On the outermost ‘BoxLayout,’ I define its width and height to fill the space allotted to it by the element into which it is inserted. It took me a while to wrap my head around this as I attempted bigger layouts. Also note the orientation, which describes the direction things will be stacked.

Inside the innermost ‘BoxLayout,’ you can see Slider and Label pairs:

            Slider:
                id: grid_height
                min: 25
                max: int(root.height)
                step: 1
                orientation: "horizontal"
            Label:
                text: str(grid_height.value) + " Map Height"
            
            Slider:
                id: grid_width
                min: 25
                max: int(root.width)
                step: 1
                orientation: "horizontal"
            Label:
                text: str(grid_width.value) + " Map Width"

The Slider and Label are GUI Widgets built into the Kivy library. For the Slider, I set an ID value (basically a name referenced elsewhere in the code), along with a minimum and maximum value. In the Label Widget, you can see the GUI driving itself a bit – the text of the Label is set by the value currently selected by the Slider. This might be a commonplace functionality for a GUI framework that I just haven’t discovered before, but I really like it. Everything looks a lot cleaner when simple functionalities like this are independent from the Python code.

Invoking Python code directly from this language is pretty easy, too.

        Button:
            id: start_but
            on_press: app.run_sim(root.ids.num_people.value, root.ids.num_food.value, root.ids.grid_height.value, root.ids.grid_width.value)
            text: "Start Simulation"

In the above example, you can see a direct reference to ‘app’ and a call to ‘run_sim.’ This is defined on the Python side in my ‘SimulationGUIApp’ object which inherits from the ‘App’ object from the Kivy library.

The reference to ‘run_sim’ uses information from our widget and calls the function with the values collected as parameters. Awesome!

Kivy and Python

For me, it is often difficult to understand the interface between a design language and a program language (I’m looking at you, Javascript and CSS!). Let’s cover some basics.

Make Kv Definitions Python reference-able

First thing: How do you let Kivy know you are using a Kv file? There are a few ways to do this, but the one I chose was to name the .Kv file the same thing as my app’s object name (e.g., Kv filename is ‘SimulationGUI.kv’ and my Python object is ‘SimulationGUIApp’). An important detail here is to note that my Python object name ends with ‘App,’ while the filename doesn’t.

As for the Widgets defined in the .Kv file, all we have to do to use them on the Python side is create an empty class with the same name, as shown below:

.Kv Definition
<SimulationController>
    BoxLayout:
        width: root.width
        height: root.height
        orientation: "horizontal"
Python Definition
class SimulationController(Widget):
    pass

Even though the class is empty, the layout and widgets are defined in the .Kv file and will be shown when the app loads.

Once you do that, you can add Widgets to an App, creating your interface. In the example below, class ‘MainScreen’ inherits from ‘BoxLayout’ and then adds two Widgets to itself. One of the widgets is our familiar ‘SimulationController.’ Next, a ‘MainScreen’ object is instantiated and added to the app class (‘SimulationGUIApp’) in its build method.

class SimulationViewport(Widget):
    pass

class SimulationController(Widget):
    pass
        
class MainScreen(BoxLayout):
    def __init__(self, **kwargs):
        super(MainScreen, self).__init__(**kwargs)
        self.orientation = "vertical"
        self.size_hint = (1.,1.)

        self.sim_view = SimulationViewport()
        self.sim_cont = SimulationController()

        print(self.sim_cont.ids["grid_height"].value)
        
        self.add_widget(self.sim_view)
        self.add_widget(self.sim_cont)

class SimulationGUIApp(App):
    def build(self):
        main_screen = MainScreen()

        self.sim_view = main_screen.sim_view

        return main_screen

I decided to do it this way so I could access the canvas of our ‘SimulationViewport’ for drawing people and food. Also, in the above snippet, I left in a line of code to show how you can reference the values of the Widgets contained in our Kv-defined widgets using the ‘ids’ dictionary.

A Slightly More Advanced Kivy Concept

Until now, we have focused on the basic uses of Kivy. Now, let’s dive into how to draw on Widgets so that we can see our people as they wander around our world looking for food sources.

Assumptions Kill Me

One frustrating mistake I made was when I assumed that my drawing would be relative to the corner of my Widget. In a multi-widget layout, this would mean that the position of each drawing would be local to its own widget. In reality, the drawings are positioned within the global window’s coordinate grid, regardless of widget. With this mistaken assumption, I thought that the Widgets were overlapping one another. I likely developed this intuition from HTML and CSS, where the flow layout can cause elements to exist over one another.

Finally, I realized that drawing occurs from the window’s origin. This is really simple to do, but figuring it out was difficult. Below is the code snippet that saved me, particularly the self.sim_view.pos[0] and [1].

pos_screen_x = x * block_width + self.sim_view.pos[0]
pos_screen_y = y * block_height + self.sim_view.pos[1]

Rectangle(pos=(pos_screen_x, pos_screen_y), size=(block_width, block_height))

A ‘Rectangle’ is drawn at the x, y coordinates of our Person or Food with an offset of ‘self.sim_view.pos[0]’ and ‘[1],’ which gets it to our desired spot. The challenging part, however, is that you can draw outside the widget and it will still be rendered if it is within the window’s region.

So now we are drawing in the right place, but how do we select a color?

It’s like painting

Right above the lines of code where we select the x and y coordinates, there is an if-else block where the color palette is chosen for the upcoming ‘Rectangle’ draw. This color choice selects a value between 0 and 1 for each color (Red/Green/Blue). Here, we check if our object is ‘Food’ or ‘Person.’ Then, we select red or green of varying intensity, based on energy or food levels:

if isinstance(thing, Food):
    color_strength = thing.Amount / thing.MaxAmount
    color_strength = max([color_floor, color_strength])

    Color(0.,color_strength,0.)
elif isinstance(thing, Person):
    color_strength = thing.Energy / thing.EnergyMax
    color_strength = max(color_floor, color_strength)
    if thing.alive:
        Color(color_strength,0.,0.)
    else:
        Color(0.,0.,1.)

Conclusion

This has been a wonderful learning experience for me, and I hope by posting it here it can be useful to you, too! I look forward to possibly building out the simulation and controls with more options and depth.

If you are interested in further reading on my work in the simulation building realm, check out my wandering development post here, where I attempt to build a balanced simulation of a simple model for populations and reproduction.

There is also an excellent website with many models much better than mine on a website called NetLogo. I found this gem while doing research for my simulation (I also bought a book, Complex Adaptive Systems: An Introduction to Computational Models of Social Life, to add to my collection of unread books!).

Thank you so much for sticking with me, I hope the coming week receives you well 🙂

Code

Get the code for my simulation at my GitHub.

Spelling made exciting in a labyrinthine maze!

Last week, my son Isaac was crushed by his 3rd-grade spelling test. For an 8-year-old, it can be difficult to study something that might seem opaque in its usefulness. So, with a bit of Python magic, let us engage the children in solving a spelling maze!

The idea is to accept the input of a word, then generate a maze where the path to success is made easier if you know the next letter of the word.

Just Passing Through (TL;DR)

I figured I would need the ability to 1) generate a maze, 2) find all the junction points of that maze, and 3) draw letters at the junctions which lead the player down the correct route.

I built infrastructure and foundational code to perform functions like determining block positions, drawing the maze, and cleaning up relationships between blocks. Separately, I created a Maze class, which handled the maze-generation algorithm. I treated this Maze class as a parent to the WordMaze class which handles many of the functions related to letter placement. This helped de-clutter the Maze class and extend its functionality.

At the end of my project journey, I developed the code to apply words in the maze. This part became pretty complex, as I needed many parameters to be met so that the letters would group properly and the possible “incorrect” letter junctions along the solution route would be challenging enough. There were bugs to iron out, but eventually the code worked. Even better, soon a kid will be learning his spelling list and having fun at the same time!

Staying for a While (Technical Peeps)

The code for this project can be found at my GitHub.

Infrastructure and Foundational Code

After writing a toy maze generator, it was clear that I would need to build some foundations to help reduce the top-level complexity of the maze generation code. The three main foundational classes were:

  • A Map Class
  • A Draw Class
  • A Path Class

Foundational Code: Map Class

The map class handles the grid of blocks to represent a player’s position at any point in the maze. It performs the following functions:

  • Block relationships, including entry and exit directions
  • Block positions, including x-y coordinate grid

Foundational Code: Draw Class

The draw class (‘Drawable2D’ in the code) is responsible for drawing to a grid of pixels, as defined by the object itself. This is an inheritable class allowing an object to set its pixel width and height and call functions like ‘fill,’ ‘draw_portion,’ or ‘draw_edge.’

The object inheriting from this class must implement a ‘draw’ method instructing how to draw itself.

Foundational Code: Path Class

This class is responsible primarily for generating random paths using the ‘step_path’ function. By moving this logic to its own class, we radically reduce the complexity of generating the maze.

Now that we have a high level view of the classes involved, let’s get into the actual maze generation!

Maze Generation

The code is available at my GitHub, but I will include the chunks of code I am referencing as we go along. For this section we will look at the ‘generate_maze’ function in the Maze class, as shown below:

def generate_maze(self):
    # Setup our start and end blocks
    self.map_start = self.map.get_start_block()
    self.map_end = None

    # We always go from North to South
    self.map_start.entry_direction = GridDirection.North

    # Setup our temporary algorithm variable
    new_start = self.map_start 

    # Start our Algorithm
    while not self.map_end:
        # Generate random paths from our starting block
        self.generate_paths(new_start)
        
        # Get the lowest block and set an exit direction (South)
        y, lowest_block = self.get_lowest_block()
        lowest_block.exit_directions.add(GridDirection.South)

        # If we have hit the bottom of the play area make this our end block
        if y == self.map.grid_height - 1:
            self.map_end = lowest_block
        # If we haven't hit the bottom then make this our next start block for random paths
        else:
            new_start = self.map.get_block_in_direction(lowest_block, GridDirection.South)
            new_start.entry_direction = GridDirection.North

    # Draw the final maze
    self.map.draw()

Firstly, we only make Mazes that start at the top and end at the bottom. With this basic tenet, we can say that we aren’t done building the maze until we have at least one block touching the southern edge of the grid. This is why our main while loop is checking for a ‘map_end’ block. We will be iterating the algorithm steps until we see such a block.

So first we grab the map’s start block, set that as our ‘new_start’ and generate paths from it. Once we have generated the paths for ‘new_start,’ we get the lowest block explored on the grid. If this block is touching the southern edge of the grid, we are done! If it isn’t touching the southern edge, then we set the southern edge as an exit. Next, we get the block south of this block and set it as ‘new_start’ and let the algorithm run again.

Generate Paths

The ‘generate_paths’ function (code below) is a random depth-first search algorithm that will generate random exits as it goes on its current search.

def generate_paths(self, start_block: Block):
    # Setup our Algorithm temporary variables
    possible_path_starts = [start_block]

    # While we have possible path starts continue building
    while possible_path_starts:
        # Randomly choose our next path start and remove it from the list
        current_start = random.choice(possible_path_starts)
        possible_path_starts.remove(current_start)

        # If this block is already explored move on
        if current_start.explored:
            continue
        
        # Otherwise create a new path and step it until it is finished
        path = Path(self.map, current_start)
        while not path.complete:
            possible_path_starts.extend(path.step_path())

Below is a GIF of how this algorithm looks when it is running.

This is what the path generation algorithm looks like while it runs.

It will eventually produce a maze like this:

Randomly generated maze paths

All right, so we have a maze generated! Now how do we get our word in there to guide our kid to the spelling solution?

Putting Spelling Words in the Maze

With perfect 20/20 hindsight, I probably would have included exit count in path creation when I was working on the maze generator. I hadn’t considered how to control the number of exits on the solution path until I tried applying a word to it. The problem is our solution path must have exactly the same number of junctions as there are letters in our target word. This means if we have too many junctions on our solution path, we must do something to remove them.

I had a few ideas on how to do this with simple algorithms that I thought for sure would do exactly what I wanted them to do… None of them worked for various reasons, largely due to bugs in the foundational code.

I created multiple band-aids to mitigate these problems. If the band-aids stacked too high, I would eventually sift the code down to where it belonged. However, I will warn the reader, none of the code that I submit here is pretty. This was one of those “quickly written/get the thing working!” kind of projects. Should it be deemed for long term use, I think a rewrite would be warranted and easily done. Anyway, on to the method that I chose!

Apply Word

The ‘apply_word’ method orchestrates all the steps in adding letters to our maze and is included in a child class of the Maze class called… WordMaze.

def apply_word(self):
    exit_count = len(self.word)
    solution_junctions = self.get_solution_path_junctions()
    
    if len(solution_junctions) < exit_count - 1:
        raise IndexError("Not enough exit points to write word!")
    
    # Randomly select the junctions to be used for our word path
    selected_junctions = self.select_solution_path_junctions(solution_junctions)

    # Close off and unexplore all other junctions
    self.close_all_solution_path_junctions(solution_junctions, selected_junctions)

    # Reopen paths for unexplored areas on valid routes
    self.fill_out_unexplored_areas()
    
    # Apply letters to junctions
    self.apply_letters_to_junctions()

Put simply, after we have generated a maze using our parent class, we find all junctions in the map and select ones that are located along the solution path. From this smaller set, we randomly choose the junctions we want to use for the letters of our word (this shortens the list further).

With our solution letter junctions selected, we need to get rid of all other junctions along the solution path. So, we close off their entry and “un-explore” them, meaning we reset them to the original state they held before we applied any path algorithm to them.

At this point we have a pretty boring maze, and it likely is pretty easy to distinguish which path is the right one because we “un-explored” all but the ones chosen for our letters. To increase complexity, we open the dead-ends of our selected paths, exploring them until the dead space is accessible again from the selected junction.

If this is still confusing, stick with me, we have some visual elements below that will hopefully fill any gaps of understanding. First, let’s break down the important methods included in the code block above that follow the steps I just described.

get solution path junctions

This function gets all of the junctions in the map and randomly selects ones that exist on the solution path.

select solution path junctions

Of the set of junctions that exist on the solution path, select only as many as there are letters in our target word. Do this selection at random.

close all solution path junctions

This closes all of our non-selected solution path junctions and marks the blocks in the path closed off from the solution path as not explored.

Fill out unexplored areas

Now that we have a bunch of unexplored territory, let’s re-explore it from the dead ends of the selected paths. This should make the puzzle a bit more…puzzling!

Apply Letters to junctions

Finally, let’s get some letters at the junction exits. This method will iterate all junctions, giving them each a random letter. Then, it will iterate all the solution path blocks in order, applying the correct letter of the word to each. To make things a bit more tricky in the orthogonal direction of the solution path, we put the letter that comes after the correct letter.

Below is a GIF showing the re-exploration of unexplored map area:

Re-explore after culling unused solution path junctions

With re-exploration completed, we can perform letter application. Below is the result of running the following command:

python3 ./main.py --word Discover
A maze with "discover" as the solution spelling word
Can you solve for “discover”? Start at the entrance and find out!

Conclusion

Spelling is difficult for many youngsters my son’s age. I hope that doing puzzles will push it just far enough over the edge of engagement to keep him moving forward. I want to thank you for sticking with me through this post. I hope the ideas and implementation I shared were thought-provoking. If you have any thoughts inspired by this post, please feel free to leave a comment below.

Also, if there is interest, I can push a web version of this application so that you can generate spelling mazes from a webpage instead of having to run it on your local machine.

Thanks for reading!

Spelling Maze Code

The code for this project can be found at my GitHub.

My Related Work

If you find algorithms interesting and would like to read more about another project of mine, check out another interesting algorithm problem from my PaperBoy VR Game.

Exploring and Modifying the Oxford 3 Rower

Recently I got the itch to take something apart and modify it in an externally noticeable way. First I started looking at my sons toys as a source of equipment to modify, one specifically caught my eye. It was a leapfrog globe that was intended to teach kids about different continents, countries and oceans on planet Earth. I am still looking at modifying this toys firmware but it is going to require some equipment and skills that I have yet to acquire, so I put it back together and switched gears, aiming my sight at my wife’s Oxford 3 Rower. It is expensive, and not mine so consent is always paramount in these situations, and after having obtained it I began by unscrewing the four screws on the back of the control panel.

Screw Locations

This exposed the rear of the front face where two connectors needed to be removed in order to relocate to my desk for further investigation.

Process

I started simply by searching for information regarding the chips that I could see on the board, mainly looking for datasheets to help me better understand what to target for firmware modification.

Chip Information

Having noticed the ARM chip I really wanted to know where the firmware for it was stored. With MCU’s like this it seemed to me it could be on a SPI part or stored on device, and after looking around a bit for anything that looked like a memory without luck so I figured the likely option was it would be stored on the MCU itself. Lucky for me it appeared that a debug header (likely used for flashing the firmware in production) was left on the device. If you look closely at the traces for that header you can see that two of the pins go to the upper right of the chip, and in the datasheet sure enough those are debug ports!

PG98 of Datasheet
PG 100 of Datasheet

A bit of multi-meter probing later and I had a basic pinout for this debug header!

After some searching I learned I would need a debugger for this, which is available here. Looking at that page now I see someone has had an issue with the device, but it worked seamlessly for me on this project, so buy at your own risk.

Now, it is important to mention, if you are following along, that the flash on this chip is read only when you receive it from the factory and unlocking the flash region of the chip will erase it. So it is of utmost importance that you first read the flash region to a file before attempting a write back or unlocking it. I nearly erased this part without having a binary file — luckily I had read the memory to a file to disassemble it before and had saved it to my file system…

Some other things that you will need:

  • pyOCD
    • Python based tool and API for debugging, programming, and exploring Arm Cortex microcontrollers.
  • EFM32 DFP
    • Simply install this with pyOCD on the command line
pyocd pack install EFM32G232F128
  • A Hex Editor Software
    • For this I used Visual Studio Code with the Hex Editor extension

Once you have that all setup and your debugger wired to the target board and plugged into your PC we get to start exploring things. Lets start by building our safety and reading the flash memory to a file. To enter our live shell run:

pyocd cmd -t EFM32G232F128

Once in the shell run:

show map

This will tell you the layout of your chips memory (because mine is unlocked it looks a bit different than the one you likely see)

What you want to note is the ‘Flash’ type, this is where the main program usually resides on any given MCU, from this you will need the start and size values. So lets go ahead and backup this information by simply running the following from our shell with the values we have:

savemem <Start> <Size> oxford3rowerfirmware.bin

Inspecting the Binary

Binary inspection is something I have done in the past when I was a kid using other peoples tools, after having attended university for CS I have a better understanding of how these binaries are produced — but I still use other peoples tools :). I started by using Ghidra to look at the disassembled assembly code, but this is unnecessary for the small modification we will be making.

Speaking of, in this article we will be focused on simply changing strings in the firmware. One thing I wish we had the ability to change is the default usernames — in the firmware they are simply ‘User 1’ through ‘User 4’, this makes it difficult to remember who you are when you are sharing the rower with your family. So lets just do that, change the users strings to something more meaningful.

If you are using Visual Studio Code with the hex editor extension simply open the binary you have stored in Visual Studio Code and search for the string ‘User 1’ this should take you to the location of the string we want to replace:

It is a good idea to make a copy of this file as backup before making modifications. When ready double click the ‘U’ and type out a name that fits in the same number of characters as ‘User 1’ with a space after. Save it and now we have something we can flash onto the chip.

This is where we get to the irreversible space, after the next required step the MCU will have non-bootable firmware. In fact the next command will completely erase what is on there in order to make it writable. From the shell run:

THIS IS IRREVERSIBLE, BE SURE YOU FOLLOWED THE STEPS TO BACKUP YOUR FIRMWARE AND PROCEED AT YOUR OWN RISK

unlock

At this point your chip should be erased, you can confirm this by reading a 32 bits of memory from the base of memory:

rd 0x0

This should result in a read of all F’s for this MCU that signifies the flash memory is erased.

As the final step we need to run the command that will flash our new desired firmware onto the device:

loadmem 0x0 <Filename>.bin

If all went as it did for me you now have modified firmware on your device! Simply unplug the power and plug the power back in hit the power button and cycle through the users to see your changes!

Thank you for reading, please feel free to leave a comment below with any other modifications you dare try!

Making a Balanced Simulation #2

OK so if you haven’t read through the first balanced simulation post, I highly recommend you do, this one is going to be aimed at how we manage to expand the population size and still have a ‘run-able’ solution. That is, something that can possibly finish in our lifetimes :D.

To start with I finally did a Google search for large scale simulation software designs and algorithms that are bound to exist and found this white paper. Now, I won’t make you go through it, but I will break it down a little here before we dive into how I think we should move forward with our simulations.

The Breakdown

Alright so this paper doesn’t actually appear to be a ‘white paper’ in the proper sense. It is displayed on a white background though so… All kidding aside, I find its breakdown on different ways of handling the problem we face very useful. It all starts with agents and ‘macros’ or objects that represent the behavior of many agents. This isn’t exactly the problem we are having but at the very least defining a means by which to represent a population greater than say 1 million people is definitely useful to our end goal.

The article outlines 4 ways of handling simulations like these, each of which are capable of being viewed at both the micro(agent level) and macro(population level). They are as follows:

Zoom

In this architecture a macro object is composed of each of the micro agents upon which the micro agents are destroyed. When the micro level is chosen to be viewed we have a macro to micro function that creates a bunch of micro agents that describe the macro state after which the macro object is destroyed.

The good thing about this method is you can save memory by moving to a macro object as the population gets too large. Bad part is you lose all fidelity in the agents states because they are destroyed.

Puppeteer

This architecture differs from the Zoom method in a few ways

  1. The agents are never destroyed
    • Only their update method is called (for example age might increase) no decisions are made by the agent (for example whether they ‘choose’ to get pregnant)
  2. The macro object is created and granted control over the agents decisions
    • Hence the name Puppeteer

Memory could easily become an issue here as both the macro representation, and all of the agents state representation are present at any given time.

View

This architecture is directed at identifying emergent properties of a set of agents that might be behaving similarly, so in other words grouping by behavior. The grouping macro object(the ‘view’) never effects the decisions of the agents and only represents a group of them by their similar behaviors.

No memory savings here either — but definitely worth ear marking for future use if we expand this to a 2D or 3D simulation.

Cohabitation

Here is the most complex of all four. This one has macro and micro objects that bidirectionally act on each other — this one is used to represent complex simulations such as those in biological systems or social networks. Its hard to explain how two things could work to define each other when they are in themselves a part of the other, but if you think about it, that’s very representative of our experience of the world.

Where to next?

OK so we have some models of how we might handle many small agents, and macro-tize them to larger scales allowing us to handle large scale simulations. But, which do we choose, and where do we begin? It’s a good question, and I think it starts with finally exploring database usage. Kind of out of left field right? Yeah I agree, but I think we need to know if we get better performance by utilizing queries in a DB. Lets find out!

I agree, we shouldn’t rebuild everything to include database usage, lets just build a toy problem with a single object that can be represented by an object relational mapping(ORM), like SQLAlchemy, and see which one goes slower — object lists, or databases.

…the results are laughable…

OK, first off, as I started implementing this I knew the winner was going to be clear, but even then I just had to know.

To explain, each cycle(x-axis) adds 1000 people to our population and measures the time to update the entire population in seconds(y-axis). If you are curious here is their respective code blerbs.

# -*- coding: utf-8 -*-
"""
Created on Tue Jan 18 17:13:13 2022

This script is intended to time lists for a simple simulation use case.

@author: Travis Adsitt
"""
import matplotlib.pyplot as plt
import time

# Person to handle the most simple update method
class Person:
    def __init__(self, age, alive):
        self.age = age
        self.alive = alive
    
    def update(self):
        if self.alive:
            self.age += 1
        

if __name__ == "__main__":
    # Variables to keep track of things
    total_time = 0
    populations = []
    cycle_times = []
    total_cycles = [cycle for cycle in range(0,100)]
    curr_population = []
    
    for cycle in total_cycles:
        # Start timeing the cycle
        start_time = time.time()
        
        # Add 1000 people to our population
        curr_population.extend([Person(0, True) for i in range(0,1000)])
        
        # Select all the alive ones
        for i in curr_population:
            i.update()
        
        # Calculate our measurements
        cycle_time = time.time() - start_time
        total_time += cycle_time
        
        # Not the way you would want to do it if people ever died, but 
        # works for our testing here.
        total_pop = len(curr_population)
        populations.append(total_pop)
        cycle_times.append(cycle_time)
        # Print something so we know it's not dead
        print(f"Cycle Time = {cycle_time}, Population = {total_pop}")
    
    print(f"Total Time = {total_time}")
    
    # plt.plot(total_cycles, populations, label="Population")
    plt.plot(total_cycles, cycle_times, label="Cycle Time")
    plt.legend()
# -*- coding: utf-8 -*-
"""
Created on Tue Jan 18 17:13:13 2022

This script is intended to time sql ORM for a simple simulation use case.

@author: Travis Adsitt
"""
from sqlalchemy import create_engine, Boolean, Column, Integer, String, ForeignKey, select, func
from sqlalchemy.orm import declarative_base, sessionmaker
import matplotlib.pyplot as plt
import time
import logging



# declarative base class
Base = declarative_base()

# Person to handle the most simple update method
class Person(Base):
    __tablename__ = 'person'

    id = Column(Integer, primary_key=True)
    age = Column(Integer)
    alive = Column(Boolean)
    
    def update(self):
        if self.alive:
            self.age += 1
        

if __name__ == "__main__":
    # Create the db engine
    engine = create_engine("sqlite+pysqlite:///:memory:", echo=False, future=True)
    Base.metadata.create_all(engine)
    # Create a session
    session = sessionmaker(engine)()
    # Disable all logging on sqlalchemy
    logging.basicConfig()
    logging.getLogger('sqlalchemy').setLevel(logging.ERROR)
    
    # Variables to keep track of things
    total_time = 0
    populations = []
    cycle_times = []
    total_cycles = [cycle for cycle in range(0,100)]
    
    for cycle in total_cycles:
        # Start timeing the cycle
        start_time = time.time()
        
        # Add 1000 people to our population
        session.bulk_save_objects(
            [
                Person(id=None, age=0, alive=True) for i in range(0,1000) 
            ]
        )
        
        # Select all the alive ones
        for i in session.execute(select(Person).filter_by(alive=True)).scalars():
            i.update()
        
        # Commit our changes
        session.commit()
        
        # Calculate our measurements
        cycle_time = time.time() - start_time
        total_time += cycle_time
        
        # Not the way you would want to do it if people ever died, but 
        # works for our testing here.
        total_pop = session.execute(session.query(func.count(Person.alive))).scalar_one()
        populations.append(total_pop)
        cycle_times.append(cycle_time)
        # Print something so we know it's not dead
        print(f"Cycle Time = {cycle_time}, Population = {total_pop}")
    
    print(f"Total Time = {total_time}")
    
    # plt.plot(total_cycles, populations, label="Population")
    plt.plot(total_cycles, cycle_times, label="Cycle Time")
    plt.legend()
    

I guess the bright side is, now I remember how to implement basic ORM with SQLAlchemy, which will be useful for data storage :). I think there is a way that you might be able to speed this up — but it is inescapable that SQL look ups have just too much overhead to even come close to competing. Maybe at a certain scale the query selects could be a bit more efficient, and paired with multi-thread/process you could get through the list quicker.

Anyways, that’s it for tonight — hope to make more progress in the region of macro/micro(agent) creation tomorrow, was fun exploring this tonight.

END DAY 1

OK so it has been a few days since I worked on this, I am moving to a new company in my day job so it’s been a bit difficult to context switch to personal projects such as this. Buuut I think I have a little time to work this up further, let me just quickly remind myself where we left off. Ah that’s right, Database management doesn’t make sense for this — and we need to refocus our attention to the multi-agent system model we want to implement.

I think that it would be best to use the ‘View’ model for a couple of reasons:

  1. Basically everything we are doing here can be broken into probabilities
  2. There are ‘easy’ methods we could write to translate between Macro and Micro forms

Alright then, lets start looking at each of the life-time of our agents and converting them to probabilities that can be managed by simple population integers.

Approaching it from top down changes the design greatly, first of all we can look at each persons lifetime and split it into three time spans basically:

  1. Youth
    • This is when the person is unable to have children
  2. Adulthood
    • This is the span of time (192-420) where a person can have children
  3. Elderly
    • This is when the person is too old to have children but is still alive(420-876)

Splitting it into these sections is useful because as far as I can tell we should be able to use lists of different events to calculate when and how many people are pregnant or having children. Same goes for alive and dead, it will be easier to explain possibly after I have written the code (also, it should prove if this is plausible). Toooo the code!

END DAY 2

OK, OK, I know, I didn’t post the code yesterday — things came up. Lets get it together, today I post the code for a ‘macro’ view of our population 🙂

import random
from threading import Thread
from enum import Enum
import matplotlib.pyplot as plt

# Simulation Variables
START_POPULATION = 1000
RUN_TIME = 10000

MALE_LIFE_SPAN = 876
FEMALE_LIFE_SPAN = 876

MALE_ADULT_AGE = 192
MALE_END_REPRODUCTION_AGE = MALE_LIFE_SPAN

FEMALE_ADULT_AGE = 192
FEMALE_END_REPRODUCTION_AGE = 420

GESTATION_LENGTH = 9

NUM_SAMPLES = 1000
RENDERED_SAMPLES = 100

# Host variables
MAX_THREADS = 15

class Gender(Enum):
    Male=0
    Female=1
    

def chunked_list_generator(l, chunks=1):
    """
    Helper function to chunk a list into 'chunks' pieces, using the Knuth
    algorithm already present in the random.shuffle method of python

    Parameters
    ----------
    l : list
        The list to chunk up.
    chunks : int, optional
        The number of chunks to split the list into. The default is 1.

    Yields
    ------
    y_list : list
        Each chunk as they are produced.

    """
    # Get our list length and the number of items to place in each
    len_list = len(l)
    number_per_chunk = int(len_list / chunks)
    # First iterator to go chunk by chunk
    for curr_start in range(0, len_list, number_per_chunk):
        # y_list = []
        chunk_end = curr_start + number_per_chunk
        yield (curr_start, chunk_end)

def get_new_population(num):
    """
    Easy way to yield a random set of people

    Parameters
    ----------
    num : int
        The number of people to create.

    Yields
    -------
    lists : tuple(int,int)
        The male and female population
    """
    new_males = 0
    new_females = 0
    
    for p in range(0, num):
        if bool(random.getrandbits(1)):
            new_males += 1
        else:
            new_females += 1
    
    return (new_males, new_females)

def get_viable_repo_population(pop_list, start, end):
    if(len(pop_list) <= start):
        return 0
    
    gross_viable_pop = pop_list[-start]
    
    if(len(pop_list) <= end):
        return gross_viable_pop
    else:
        return gross_viable_pop - pop_list[-end]

def get_num_newly_pregnant(male_pop_list, female_pop_list, pregnant_pop_list):
    """
    Get the number of newly pregnant females

    Parameters
    ----------
    male_pop : int
        The number of 'old enough to have children' males.
    female_pop : int
        The number of 'old enough to have children' females.
    pregnant_pop : int
        The currently pregnant population.

    Returns
    -------
    int
        The number to 'impregnate'.

    """
    # Take the current population of females, and subtract it from the total
    # number of pregnant females
    viable_females = get_viable_repo_population(
        female_pop_list, 
        FEMALE_ADULT_AGE, 
        FEMALE_END_REPRODUCTION_AGE
        ) - sum(pregnant_pop_list[-GESTATION_LENGTH:])
    
    viable_males = get_viable_repo_population(
        male_pop_list, 
        MALE_ADULT_AGE, 
        MALE_END_REPRODUCTION_AGE
        )
    
    total_pop = viable_females + viable_males
    new_preg_pop = 0
    
    if viable_females < 1 or viable_males < 1:
        return 0
    
    people = []
    
    while(viable_females > 0 and total_pop > 0):
        is_female_prob = (viable_females / total_pop) * 100
        rand_int = random.randint(0, 100)
        
        # Get the gender of our new random person
        if(rand_int < is_female_prob):
            # Female
            people.append(Gender.Female.value)
            viable_females -= 1
        else:
            # Male
            people.append(Gender.Male.value)
            viable_males -= 1
        
        # If we have two people check if there is a new pregnancy
        if(len(people) == 2):
            # 0 + 1 == pregnancy
            if sum(people) == 1:
                new_preg_pop += 1
            
            people = []
            
        total_pop = viable_females + viable_males
    
    return new_preg_pop
        
def get_num_dead(male_pop_list, female_pop_list):
    dead_males = 0
    dead_females = 0
    
    if(len(male_pop_list) > MALE_LIFE_SPAN and male_pop_list[-1] > 0):
        dead_males = male_pop_list[-MALE_LIFE_SPAN]
    
    if(len(female_pop_list) > FEMALE_LIFE_SPAN and female_pop_list[-1] > 0):
        dead_females = female_pop_list[-FEMALE_LIFE_SPAN]
    
    return (dead_males, dead_females)

def get_num_born(pregnant_pop_list):
    if(len(pregnant_pop_list) < GESTATION_LENGTH):
        return (0, 0)
    
    birthing = pregnant_pop_list[-GESTATION_LENGTH]
    
    males_birthed = 0
    females_birthed = 0
    
    for p in range(0, birthing):
        male_female_rand = random.getrandbits(1)
        
        if bool(male_female_rand):
            males_birthed += 1
        else:
            females_birthed += 1
    
    return (males_birthed, females_birthed)

def cull_list(l, max_len):
    return l[-max_len:]    
   
if __name__ == "__main__":
    # These lists should never be longer than 876
    male_population = []
    female_population = []
    
    time_list = [0]
    
    # This list could be as short as 9
    pregnant_population = [0]
    # This list can be any length
    dead_population = [0]
    
    male, female = get_new_population(START_POPULATION)
    
    male_population.append(male)
    female_population.append(female)
    
    for month in range(1, RUN_TIME):
        new_males, new_females = get_num_born(pregnant_population)
        dead_males, dead_females = get_num_dead(male_population, female_population)
        
        newly_pregnant = get_num_newly_pregnant(male_population, female_population, pregnant_population)
        curr_male_pop = (male_population[-1] + new_males) - dead_males
        curr_female_pop = (female_population[-1] + new_females) - dead_females
        
        male_population.append(curr_male_pop)
        female_population.append(curr_female_pop)
        pregnant_population.append(newly_pregnant)
        time_list.append(month)
        
        
        male_population = cull_list(male_population, NUM_SAMPLES)
        female_population = cull_list(female_population, NUM_SAMPLES)
        pregnant_population = cull_list(pregnant_population, NUM_SAMPLES)
        time_list = cull_list(time_list, NUM_SAMPLES)
        
        plt.plot(time_list[-RENDERED_SAMPLES:], male_population[-RENDERED_SAMPLES:], label="Male Population")
        plt.plot(time_list[-RENDERED_SAMPLES:], female_population[-RENDERED_SAMPLES:], label="Female Population")
        plt.plot(time_list[-RENDERED_SAMPLES:], pregnant_population[-RENDERED_SAMPLES:], label="Pregnant Population")
        
        plt.legend()
        plt.draw()
        plt.pause(0.05)

OK, so this is just a snapshot of the code before I complete the multi-threaded version… This thing flies and can handle far larger populations (it was ticking slowly with more than 25 million people WITHOUT multi-threading).

So, multi-threading still hasn’t saved us, I still cant get a long enough run for us to see how the whole thing balances out eventually — more thinking to be done. Either way, it was a very fun thought exercise to run through. Below is the final code for this week, along with a moving graph (you’ll get to see my fancy new ‘render only 100 months’ graphing technique). Thank you for reading this week!

import random
from threading import Thread
from enum import Enum
import matplotlib.pyplot as plt
from queue import Queue

# Simulation Variables
START_POPULATION = 1000
RUN_TIME = 10000

MALE_LIFE_SPAN = 876
FEMALE_LIFE_SPAN = 876

MALE_ADULT_AGE = 192
MALE_END_REPRODUCTION_AGE = MALE_LIFE_SPAN

FEMALE_ADULT_AGE = 192
FEMALE_END_REPRODUCTION_AGE = 420

GESTATION_LENGTH = 9

NUM_SAMPLES = 1000
RENDERED_SAMPLES = 100

# Host variables
MAX_THREADS = 15
POPULATION_PER_THREAD = 10000

class Gender(Enum):
    Male=0
    Female=1
    

def chunked_list_generator(l, chunks=1):
    """
    Helper function to chunk a list into 'chunks' pieces, using the Knuth
    algorithm already present in the random.shuffle method of python

    Parameters
    ----------
    l : list
        The list to chunk up.
    chunks : int, optional
        The number of chunks to split the list into. The default is 1.

    Yields
    ------
    y_list : list
        Each chunk as they are produced.

    """
    # Get our list length and the number of items to place in each
    len_list = len(l)
    number_per_chunk = int(len_list / chunks)
    # First iterator to go chunk by chunk
    for curr_start in range(0, len_list, number_per_chunk):
        # y_list = []
        chunk_end = curr_start + number_per_chunk
        yield (curr_start, chunk_end)

def get_new_population(num):
    """
    Easy way to yield a random set of people

    Parameters
    ----------
    num : int
        The number of people to create.

    Yields
    -------
    lists : tuple(int,int)
        The male and female population
    """
    new_males = 0
    new_females = 0
    
    for p in range(0, num):
        if bool(random.getrandbits(1)):
            new_males += 1
        else:
            new_females += 1
    
    return (new_males, new_females)

def get_viable_repo_population(pop_list, start, end):
    """
    A method to get the number of repoductively viable population given a start 
    age and an end age. This is mostly used for the male and female populations for 
    reproductions.

    Parameters
    ----------
    pop_list : list(int)
        A population list of integers.
    start : int
        Start age.
    end : int
        End age.

    Returns
    -------
    int
        Viable population.

    """
    if(len(pop_list) <= start):
        return 0
    
    gross_viable_pop = pop_list[-start]
    
    if(len(pop_list) <= end):
        return gross_viable_pop
    else:
        return gross_viable_pop - pop_list[-end]

def threaded_pregnancy(male_pop, female_pop, pregnant_pop, queue=None):
    """
    A method to handle one threads worth of work given the male population
    female population and pregnant population

    Parameters
    ----------
    male_pop : int
        Number of males in the population.
    female_pop : int
        Number of females in the population.
    pregnant_pop : int
        Number of pregnancies currently.
    queue : Queue, optional
        A queue to place our results, if none will just return results assuming
        single threaded. The default is None.

    Returns
    -------
    new_preg_pop : int
        Number new pregnancies.

    """
    viable_females = female_pop
    viable_males = male_pop
    total_pop = male_pop + viable_females
    new_preg_pop = 0
    
    people = []
    while(viable_females > 0 and total_pop > 0):
        is_female_prob = (viable_females / total_pop) * 100
        rand_int = random.randint(0, 100)
        
        # Get the gender of our new random person
        if(rand_int < is_female_prob):
            # Female
            people.append(Gender.Female.value)
            viable_females -= 1
        else:
            # Male
            people.append(Gender.Male.value)
            viable_males -= 1
        
        # If we have two people check if there is a new pregnancy
        if(len(people) == 2):
            # 0 + 1 == pregnancy
            if sum(people) == 1:
                new_preg_pop += 1
            
            people = []
            
        total_pop = viable_females + viable_males
    if not queue:
        return new_preg_pop
    else:
        queue.put(new_preg_pop)
        
def get_viable_population_thread_count(population_count):
    """
    A helper function to get the thread count given a population size

    Parameters
    ----------
    population_count : int
        The population size.

    Returns
    -------
    int
        Total number of threads that should be used.

    """
    calc_threads = int(population_count / POPULATION_PER_THREAD) + 1
    return calc_threads if calc_threads < MAX_THREADS else MAX_THREADS

def get_num_newly_pregnant(male_pop_list, female_pop_list, pregnant_pop_list):
    """
    Get the number of newly pregnant females

    Parameters
    ----------
    male_pop : int
        The number of 'old enough to have children' males.
    female_pop : int
        The number of 'old enough to have children' females.
    pregnant_pop : int
        The currently pregnant population.

    Returns
    -------
    int
        The number to 'impregnate'.

    """
    # Take the current population of females, and subtract it from the total
    # number of pregnant females
    prev_preg_count = sum(pregnant_pop_list[-GESTATION_LENGTH:])
    viable_females = get_viable_repo_population(
        female_pop_list, 
        FEMALE_ADULT_AGE, 
        FEMALE_END_REPRODUCTION_AGE
        ) - prev_preg_count
    
    viable_males = get_viable_repo_population(
        male_pop_list, 
        MALE_ADULT_AGE, 
        MALE_END_REPRODUCTION_AGE
        )
    
    total_pop = viable_females + viable_males
    new_preg_pop = 0
    
    if viable_females < 1 or viable_males < 1:
        return 0
    
    threads = get_viable_population_thread_count(total_pop)
    # print(threads)
    if threads == 1:
        new_preg_pop = threaded_pregnancy(
            viable_males, 
            viable_females, 
            pregnant_pop_list
            )
    else:
        single_threaded_females = int(viable_females % threads)
        single_threaded_males = int(viable_males % threads)
        single_threaded_preg = int(prev_preg_count % threads)
        
        single_threaded_preg = threaded_pregnancy(
            single_threaded_males, 
            single_threaded_females, 
            single_threaded_preg
            )
        
        males_per_thread = int(viable_males / threads)
        females_per_thread = int((viable_females - single_threaded_preg) / threads)
        preg_per_thread = int((single_threaded_preg + prev_preg_count) / threads)
        
        thread_objs = []
        thread_queue = Queue()
        for t in range(0, threads):
            thread_objs.append(Thread(
                target=threaded_pregnancy,
                args=(males_per_thread, females_per_thread, preg_per_thread, thread_queue, )
                ))
            thread_objs[-1].start()
        
        for t in thread_objs:
            t.join()
        
        new_preg = sum([thread_queue.get_nowait() for i in range(0, thread_queue.qsize())])
        new_preg_pop = single_threaded_preg + new_preg
        
    return new_preg_pop
        
def get_num_dead(male_pop_list, female_pop_list):
    """
    A method to get the total dead this month

    Parameters
    ----------
    male_pop_list : list(int)
        Population list for men.
    female_pop_list : list(int)
        Population list for females.

    Returns
    -------
    dead_males : int
        Number of dead males.
    dead_females : int
        Number of dead females.

    """
    dead_males = 0
    dead_females = 0
    
    if(len(male_pop_list) > MALE_LIFE_SPAN and male_pop_list[-1] > 0):
        dead_males = male_pop_list[-MALE_LIFE_SPAN]
    
    if(len(female_pop_list) > FEMALE_LIFE_SPAN and female_pop_list[-1] > 0):
        dead_females = female_pop_list[-FEMALE_LIFE_SPAN]
    
    return (dead_males, dead_females)

def threaded_birth(num_to_birth, male_queue=None, female_queue=None):
    """
    A method for one threads work for birth.

    Parameters
    ----------
    num_to_birth : int
        Number of new people.
    male_queue : Queue, optional
        A queue to dump our number of new boys. The default is None.
    female_queue : Queue, optional
        A queue to dump our number of new girls. The default is None.

    Returns
    -------
    males_birthed : int
        Number of males birthed.
    females_birthed : int
        Number of females birthed.

    """
    males_birthed = 0
    females_birthed = 0
    
    for p in range(0, num_to_birth):
        male_female_rand = random.getrandbits(1)
        
        if bool(male_female_rand):
            males_birthed += 1
        else:
            females_birthed += 1
    
    if not male_queue:
        return (males_birthed, females_birthed)
    else:
        male_queue.put(males_birthed)
        female_queue.put(females_birthed)

def get_num_born(pregnant_pop_list):
    """
    A method attempt to auto-multi-thread the birthing process. Multi-thread 
    currently commented out due to strange behavior at high numbers.

    Parameters
    ----------
    pregnant_pop_list : list(int)
        A population list of pregnancies.

    Returns
    -------
    int
        Number of males born.
    int
        Number of females born.

    """
    if(len(pregnant_pop_list) < GESTATION_LENGTH):
        return (0, 0)
    
    birthing = pregnant_pop_list[-GESTATION_LENGTH]
    
    # if birthing < POPULATION_PER_THREAD:
    return threaded_birth(birthing)
    # else:
    #    threads = get_viable_population_thread_count(birthing)
    #    thread_objs = []
    #    
    #    male_queue = Queue()
    #    female_queue = Queue()
    #    
    #    single_thread_num = int(birthing % threads)
    #    males, females = threaded_birth(single_thread_num)
    #    
    #    thread_num = int(birthing / threads)
    #    for t in range(0, threads - 1):
    #        thread_objs.append(Thread(
    #            target=threaded_birth, args=(
    #                thread_num, 
    #                male_queue, 
    #                female_queue
    #                )
    #            ))
    #        thread_objs[-1].start()
    #    
    #    for t in thread_objs:
    #        t.join()
    #    
    #    new_males = sum([male_queue.get_nowait() for i in range(0, male_queue.qsize())])
    #    new_females = sum([female_queue.get_nowait() for i in range(0, female_queue.qsize())])
        
        
    
    #    return (males + new_males, females + new_females)

def cull_list(l, max_len):
    """
    A method to shorten a list from the front

    Parameters
    ----------
    l : TYPE
        DESCRIPTION.
    max_len : TYPE
        DESCRIPTION.

    Returns
    -------
    TYPE
        DESCRIPTION.

    """
    return l[-max_len:]    
   
if __name__ == "__main__":
    # These lists should never be longer than 876
    male_population = []
    female_population = []
    
    time_list = [0]
    
    # This list could be as short as 9
    pregnant_population = [0]
    # This list can be any length
    dead_population = [0]
    
    male, female = get_new_population(START_POPULATION)
    
    male_population.append(male)
    female_population.append(female)
    
    for month in range(1, RUN_TIME):
        # Get number born and dead
        new_males, new_females = get_num_born(pregnant_population)
        dead_males, dead_females = get_num_dead(male_population, female_population)
        
        # Get newly pregnant and current male and female populations
        newly_pregnant = get_num_newly_pregnant(male_population, female_population, pregnant_population)
        curr_male_pop = (male_population[-1] + new_males) - dead_males
        curr_female_pop = (female_population[-1] + new_females) - dead_females
        
        # Append new data to appropriate lists
        male_population.append(curr_male_pop)
        female_population.append(curr_female_pop)
        pregnant_population.append(newly_pregnant)
        dead_population.append(dead_males + dead_females)
        time_list.append(month)
        
        # Cull our lists
        male_population = cull_list(male_population, NUM_SAMPLES)
        female_population = cull_list(female_population, NUM_SAMPLES)
        pregnant_population = cull_list(pregnant_population, NUM_SAMPLES)
        dead_population = cull_list(dead_population, NUM_SAMPLES)
        time_list = cull_list(time_list, NUM_SAMPLES)
        
        # Plot things
        plt.plot(time_list[-RENDERED_SAMPLES:], male_population[-RENDERED_SAMPLES:], label="Male Population")
        plt.plot(time_list[-RENDERED_SAMPLES:], female_population[-RENDERED_SAMPLES:], label="Female Population")
        plt.plot(time_list[-RENDERED_SAMPLES:], pregnant_population[-RENDERED_SAMPLES:], label="Pregnant Population")
        # plt.plot(time_list[-RENDERED_SAMPLES:], dead_population[-RENDERED_SAMPLES:], label="Dead Population")
        
        plt.legend()
        plt.draw()
        plt.pause(0.05)
        
        

Making a Balanced Simulation

I don’t know what I am doing, I have never made a balanced simulation in my life. This is purely an exploration of the idea.

Travis

All right, lets get down to it, I am genuinely curious how many variables I can add to a simulation… Coded myself… Using Python, Sqlite, Matplotlib, and have it continue for a lengthy period of time… How much? you might ask. The answer is, of course, AS MUCH AS WE CAN GET! ONWARD!

To start with, I think I will use the tutorial here to get my feet wet on animated graphs with Matplotlib, because who wants to watch the statistics of your simulation in snapshots? (Not this guy)

We also need to think about data storage, I think (getting the objects that will be my actors/variables to behave in a semi ‘normal’ way). In the short term, we can stick with just local variables like lists and dictionaries. However, reaching any further than even just a couple of variables we, I think, should implement a Sqlite back end to store the information. This will make retrieval and multi-threaded things easier I think.

Let’s also define when the simulation is ‘broken’ and should be ‘stopped’… Let’s say:

  1. When any one variable climbs way beyond any other.
  2. When all variables are ‘dead.’
  3. When a ‘steady state’ is achieved.
    • Meaning we lose interest in the animated graph because it no longer seems interesting… Though a steady state is kind of fun to think about.
      • Like, how? You know? How is everything 1 to 1 like that? Idk…

Time to code!

# -*- coding: utf-8 -*-
"""
Created on Tue Jan 11 18:13:50 2022

@author: Travis Adsitt
"""

from dataclasses import dataclass
from enum import Enum
import matplotlib.pyplot as plt
import random


GESTATION_TIME = 9
LIFE_TIME = 876

class Gender(Enum):
    Male = "Male"
    Female = "Female"

@dataclass(init=True)
class WorldTraits:
    population: list
    time: int
    
    
class World:
    def __init__(self, population):
        assert isinstance(population, int), "population must be of type 'int'"
        
        self.traits = WorldTraits(
            population=[],
            time=0
            )
        
        self.traits.population = [Person(self, 0.0) for p in range(0, population)]
    
    def birth(self):
        """
        Helper function for the Person object to inform the World that they have
        had a child.

        Returns
        -------
        None.

        """
        # Create a new person
        new_peep = Person(self, 0.0)
        
        # Add them to the population
        self.traits.population.append(new_peep)
    
    def attempt_mate(self, person_one, person_two):
        """
        Helper function to handle the resolution as to whether to individuals
        can mate, and attempt to impregnate them if they can

        Parameters
        ----------
        person_one : Person
            Any individual person
        person_two : Person
            Any other individual person

        Returns
        -------
        None.

        """
        # Check if they are female
        one_kids = person_one.traits.gender == Gender.Female
        two_kids = person_two.traits.gender == Gender.Female
        
        # Check if one female and one male
        if one_kids and not two_kids or not one_kids and two_kids:
            # Attempt impregnation on the correct individual
            if one_kids:
                person_one.attempt_to_impregnate(self.traits.time)
            else:
                person_two.attempt_to_impregnate(self.traits.time)
            
    
    def reproduction_cycle(self):
        """
        Helper function to handle the reproduction behavior of the population.

        Returns
        -------
        None.

        """
        # Shuffle everyone so we can pick peeps at 'random'
        random.shuffle(self.traits.population)
        
        # Make a copy of the population to ensure we don't change under the 
        # iterator
        pop_copy = self.traits.population.copy()
        people_iterator = iter(pop_copy)
        
        # attempt mating in twos
        for person in people_iterator:
            try:
                self.attempt_mate(person, next(people_iterator))
            except StopIteration:
                break
        
            
    
    def tick_time(self):
        """
        Helper function to push the world forward one month at a time.

        Returns
        -------
        None.

        """
        # Tick time on all people
        for person in self.traits.population:
            person.tick_time(self.traits.time)
        
        # Try and mate
        self.reproduction_cycle()
        # Tick our time forward
        self.traits.time += 1
    

@dataclass(init=True)
class PersonTraits:
    alive: bool
    ispregnant: bool
    pregnancystart: int
    age: int
    gender: Gender
    money: float

class Person:
    def __init__(self, world, money):
        assert isinstance(world, World), "world, must be of type 'World'"
        assert isinstance(money, float), "money must be of type 'float'"
        
        # Set our world object
        self.world = world
        # Set our initial 'last_time'
        self.last_time = self.world.traits.time
        # Set our initial traits
        self.traits = PersonTraits(
            alive=True,
            ispregnant=False, 
            pregnancystart=None, 
            age=0, 
            gender=Gender[random.choice([g.name for g in Gender])],
            money=money
        )
    
    def spend_money(self, amount):
        """
        Used to check if this person can spend money, and if so, does spend
        the amount passed in.

        Parameters
        ----------
        amount : float
            How much money should I spend?

        Returns
        -------
        bool
            If I spent the money

        """
        spent = False
        
        if(self.money >= amount):
            self.money -= amount
            spent = True
        
        return spent
    
    def advance_automatic_tickers(self, time):
        """
        Helper function to advance the automatic tickers for a person, such as
        age or checking for a pregnancy... Those sorts of things.

        Parameters
        ----------
        time : int
            The current time in the world.

        Returns
        -------
        None.

        """
        # We get older
        self.traits.age += time - self.last_time
        
        # We sometimes die
        if(self.traits.age > 876):
            self.traits.alive = False
        
        # Sometimes people give birth
        if self.traits.ispregnant and ((time - self.traits.pregnancystart) > GESTATION_TIME):
            self.world.birth()
            self.traits.ispregnant = False
            self.traits.pregnancystart = None
    
    def wants_childeren(self):
        """
        For the world to call when determining if a pregnancy should happen.

        Returns
        -------
        bool
            If I want childeren

        """
        # ~16 to ~35 years old
        wants_childeren = self.traits.age > 192 and self.traits.age < 420
        # Can't have childeren if we already have them
        wants_childeren = not self.traits.ispregnant and wants_childeren
        
        return wants_childeren
    
    def attempt_to_impregnate(self, time):
        """
        Impregnate this person(female)

        Returns
        -------
        None.

        """
        if self.traits.gender == Gender.Female and self.wants_childeren() and self.traits.alive:
            self.traits.ispregnant = True
            self.traits.pregnancystart = time
    
    def tick_time(self, time):
        """
        A 'behavior' function to encapsulate a decision point for this person.

        Parameters
        ----------
        time : int
            The current time tick of the world.

        Returns
        -------
        None.

        """
        self.advance_automatic_tickers(time)
        self.last_time = time
        
def get_plot_vars(world):
    men = 0
    women = 0
    alive = 0
    dead = 0
    for person in world.traits.population:
        if person.traits.alive:
            alive += 1
            
            if person.traits.gender == Gender.Female:
                women += 1
            else:
                men += 1
                
        else:
            dead += 1
    
    return (women, men, alive, dead)


if __name__ == "__main__":
    new_world = World(1000)
    
    women_list = []
    men_list = []
    alive_list = []
    dead_list = []
    
    time_list = []
    
    for time in range(0,100000):
        new_world.tick_time()
        women, men, alive, dead = get_plot_vars(new_world)
        
        women_list.append(women)
        men_list.append(men)
        alive_list.append(alive)
        dead_list.append(dead)
        
        time_list.append(time)
        
        plt.plot(time_list, women_list, label="Women")
        plt.plot(time_list, men_list, label="Men")
        plt.plot(time_list, alive_list, label="Alive")
        plt.plot(time_list, dead_list, label="Dead")
        plt.draw()
        plt.pause(0.05)
    
    plt.show()
    
    

Ok, so this code represents the base system for a world and people to ‘exist.’ In this world people can reproduce and die. After a few minutes of thinking, maybe even less, you’ll likely ask me — but Travis, won’t this just be an exponential increase in population? Didn’t we agree that that was a ‘broken’ simulation?

Yes. This is just the base, and unfortunately in order to balance this out, we will need to introduce something that kills people… That we will have to find out tomorrow, because I need sleep now. To keep you company while I am away, here is an animated image of our exponential growth!

Label your graph, Travis! the teacher screams. Yeah, yeah, I will next time. The x-axis represents time in months, the blue/orange lines are one of the two genders, the green line is the total alive, and the red line is dead (which might be broken).

END DAY 1

Thoughts

Ok, I have been gone a day and had a chance to think about this a bit, so to start I am simply going to clean up any magic numbers I have lingering and place them at the top as constants.

Along with this I am adding a ‘DIVIDER’ variable to cut down things to a reasonable scale for short-term experiments. The new variables can be seen below:

DIVIDER = 50

REPRODUCTIVE_AGE_START = int(192 / DIVIDER)
REPRODUCTIVE_AGE_END = int(420 / DIVIDER)
GESTATION_TIME_MONTHS = int(9 / DIVIDER)
LIFE_TIME_MONTHS = int(876 / DIVIDER)

With these variables installed throughout the code, it becomes much easier to adjust things and see how they change the graph. For instance, using the settings above we get the following graph:

Which is quite exciting, considering we are trying to “balance” our simulation. I am thinking it would be a good idea to change the way we count deaths, that is, instead of counting total deaths, we count deaths in the last cycle. To do this, we simply need to keep a running value of total deaths and subtract it each time from the previous month.

So it seems that starting conditions are a huge factor in this. The first one we ran (above) ended in 3000 months. Below is an example of one that ran until I had to shut it down because I didn’t put a stop condition in and I needed to go to bed.

In the next couple days I want to port this to SQLite so we can have some memory savings and maybe data access speedup (doubt it though). I find it overwhelmingly fascinating that even at these seemingly small variable counts, we can have such large differences between runs. I look forward to the data this will generate 🙂 Goodnight!

END DAY 2

Another day later, I realize I didn’t talk much about one of the things we noticed about yesterday’s graph: There is almost always a spike in population when the blue line (I think this is the women population) crosses to be above the orange line (the men population). This is interesting for a number of reasons, and our speculation is that when there are more women in the world, clearly there is a higher probability to have children. Also, when those women get too old to have children, if they didn’t have girls when they were having children, then we are likely to see a downturn in population as there aren’t enough women to bear children.

These findings are hardly revolutionary, but it is still cool to uncover, and feel like we are discovering something. Probably shouldn’t make it a habit though 🙂

Anyway, today I want to improve the efficiency of my simulation, and maybe even get started with the move to a Model View Controller(MVC) architecture.

To start, let’s take some timings. For this, I am thinking of 4 places:

  • Render
  • Stats Collection
  • World Resolver
  • Person Resolver

In order to do it, I will install time-collection points at the beginning and end of each of these, subtracting the last from the first. I’ll let it run for a while, storing all these measurements and averaging them at the end.

On the left you can see different timing measurements (in seconds) over the current world time. On the right is the graph that represents the world run. From the timing measurements, we can see our World Resolver is going to quickly become unmanageable — I would imagine the memory usage is horrendous on this as well.

For the World Resolver, I think we can make it a shallower linear increase by simply removing the dead Persons and placing them in their own list each time we resolve. This will reduce the number of Persons we need to resolve way down the line (dead people don’t change much). We could also have a win by multi-threading this to parallelize the Person resolutions. This is almost trivial to do, but we have the pesky births that might cause race conditions — so let’s mutex the population list somehow and we should be clear to multi-thread.

Let’s see how those two changes affect our timings:

Those changes were right on point, you can clearly see we have cut down any latency that we might have had to almost nothing! Now we can run some truly long simulations without as much worry over memory and execution time. Speculating on this, I think that even the transfer of dead to their own list has memory implications, namely, we no longer need to keep them in cache or nearby because of their less frequent use. Let’s take off the gloves and run 500 months without any scaling, that is, no division of 50…

Oooo, look at that! We are starting to see the signs of stress as our population comes to 20,000. Wonder how far we can take this, let’s run for 1.5x a lifetime 🙂

All right, that caps the day 🙂 See you tomorrow on POST DAY!

# -*- coding: utf-8 -*-
"""
Created on Tue Jan 11 18:13:50 2022

@author: Travis Adsitt
"""

from dataclasses import dataclass
from enum import Enum
from threading import Thread, Lock
import matplotlib.pyplot as plt
import random
import time as real_time

# Timing variables for benchmarking different portions of code
timing_vars = {
    "Render": [],
    "StatCollection": [],
    "PersonResolver": [],
    "WorldResolver": [],
    "Population": []
    }

DIVIDER = 1
THREADS = 16

START_POP = 1000
RUN_IN_MONTHS = 1000


REPRODUCTIVE_AGE_START = int(192 / DIVIDER)
REPRODUCTIVE_AGE_END = int(420 / DIVIDER)
GESTATION_TIME_MONTHS = int(9 / DIVIDER)
LIFE_TIME_MONTHS = int(876 / DIVIDER)

class Gender(Enum):
    Male = "Male"
    Female = "Female"

@dataclass(init=True)
class WorldTraits:
    population: list
    pop_mutex: Lock
    time: int
    dead: list
    dead_mutex: Lock
    
def tick_people(people, time):
    for person in people:
        person.tick_time(time)
    
class World:
    def __init__(self, population):
        assert isinstance(population, int), "population must be of type 'int'"
        
        self.traits = WorldTraits(
            population=[],
            pop_mutex=Lock(),
            time=0,
            dead=[],
            dead_mutex=Lock()
            )
        
        self.traits.population = [Person(self, 0.0) for p in range(0, population)]
    
    def birth(self):
        """
        Helper function for the Person object to inform the World that they have
        had a child.

        Returns
        -------
        None.

        """
        # Create a new person
        new_peep = Person(self, 0.0)
        
        #Get our mutex
        self.traits.pop_mutex.acquire()
        
        # Add them to the population
        self.traits.population.append(new_peep)
        
        self.traits.pop_mutex.release()
    
    def death(self, new_dead):
        if new_dead not in self.traits.population:
            return
        
        self.traits.dead_mutex.acquire()
        self.traits.dead.append(new_dead)
        self.traits.dead_mutex.release()
        
        self.traits.pop_mutex.acquire()
        self.traits.population.remove(new_dead)
        self.traits.pop_mutex.release()
        
    
    def attempt_mate(self, person_one, person_two):
        """
        Helper function to handle the resolution as to whether to individuals
        can mate, and attempt to impregnate them if they can

        Parameters
        ----------
        person_one : Person
            Any individual person
        person_two : Person
            Any other individual person

        Returns
        -------
        None.

        """
        # Check if they are female
        one_kids = person_one.traits.gender == Gender.Female
        two_kids = person_two.traits.gender == Gender.Female
        
        # Check if one female and one male
        if one_kids and not two_kids or not one_kids and two_kids:
            # Attempt impregnation on the correct individual
            if one_kids:
                person_one.attempt_to_impregnate(self.traits.time)
            else:
                person_two.attempt_to_impregnate(self.traits.time)
            
    
    def reproduction_cycle(self):
        """
        Helper function to handle the reproduction behavior of the population.
    
        Returns
        -------
        None.
    
        """
        # Shuffle everyone so we can pick peeps at 'random'
        random.shuffle(self.traits.population)
        
        # Make a copy of the population to ensure we don't change under the 
        # iterator
        pop_copy = self.traits.population.copy()
        people_iterator = iter(pop_copy)
        
        # attempt mating in twos
        for person in people_iterator:
            try:
                self.attempt_mate(person, next(people_iterator))
            except StopIteration:
                break
    
    def tick_time(self):
        """
        Helper function to push the world forward one month at a time.

        Returns
        -------
        None.

        """
        world_resolution_time = 0
        person_resolution_time = 0
        
        threads = []
        person_time = real_time.time()
        people = self.traits.population
        num_per_thread = int(len(people) / THREADS) + 1
        thread_data = [people[i:i + num_per_thread] for i in range(0, len(people), num_per_thread)]
            
        
        for t in range(0,THREADS):
            threads.append(Thread(target=tick_people, args=(thread_data[t],self.traits.time, )))
            threads[-1].start()
            
        for t in threads:
            t.join()
        
        person_time = (real_time.time() - person_time) / len(people)
        person_resolution_time = person_time if person_resolution_time == 0 else (person_time + person_resolution_time) / 2
        world_resolution_time += person_resolution_time
                
        world_time = real_time.time()
        # Try and mate
        self.reproduction_cycle()
        # Tick our time forward
        self.traits.time += 1
        world_time = real_time.time() - world_time
        
        timing_vars["WorldResolver"].append(
            world_resolution_time + world_time
            )
        timing_vars["PersonResolver"].append(
            person_resolution_time
            )
    

@dataclass(init=True)
class PersonTraits:
    alive: bool
    ispregnant: bool
    pregnancystart: int
    age: int
    gender: Gender
    money: float

class Person:
    def __init__(self, world, money):
        assert isinstance(world, World), "world, must be of type 'World'"
        assert isinstance(money, float), "money must be of type 'float'"
        
        # Set our world object
        self.world = world
        # Set our initial 'last_time'
        self.last_time = self.world.traits.time
        # Set our initial traits
        self.traits = PersonTraits(
            alive=True,
            ispregnant=False, 
            pregnancystart=None, 
            age=0, 
            gender=Gender[random.choice([g.name for g in Gender])],
            money=money
        )
    
    def spend_money(self, amount):
        """
        Used to check if this person can spend money, and if so, does spend
        the amount passed in.

        Parameters
        ----------
        amount : float
            How much money should I spend?

        Returns
        -------
        bool
            If I spent the money

        """
        spent = False
        
        if(self.money >= amount):
            self.money -= amount
            spent = True
        
        return spent
    
    def advance_automatic_tickers(self, time):
        """
        Helper function to advance the automatic tickers for a person, such as
        age or checking for a pregnancy... Those sorts of things.

        Parameters
        ----------
        time : int
            The current time in the world.

        Returns
        -------
        None.

        """
        # We get older
        self.traits.age += time - self.last_time
        
        # We sometimes die
        if(self.traits.age > LIFE_TIME_MONTHS):
            self.traits.alive = False
            self.world.death(self)
        
        # Sometimes people give birth
        if self.traits.ispregnant and ((time - self.traits.pregnancystart) > GESTATION_TIME_MONTHS):
            self.world.birth()
            self.traits.ispregnant = False
            self.traits.pregnancystart = None
    
    def wants_childeren(self):
        """
        For the world to call when determining if a pregnancy should happen.

        Returns
        -------
        bool
            If I want childeren

        """
        # ~16 to ~35 years old
        wants_childeren = self.traits.age > REPRODUCTIVE_AGE_START
        
        if(self.traits.gender == Gender.Female):
            wants_childeren = wants_childeren and self.traits.age < REPRODUCTIVE_AGE_END
        
        # Can't have childeren if we already have them
        wants_childeren = not self.traits.ispregnant and wants_childeren
        
        return wants_childeren
    
    def attempt_to_impregnate(self, time):
        """
        Impregnate this person(female)

        Returns
        -------
        None.

        """
        if self.traits.gender == Gender.Female and self.wants_childeren() and self.traits.alive:
            self.traits.ispregnant = True
            self.traits.pregnancystart = time
    
    def tick_time(self, time):
        """
        A 'behavior' function to encapsulate a decision point for this person.

        Parameters
        ----------
        time : int
            The current time tick of the world.

        Returns
        -------
        None.

        """
        self.advance_automatic_tickers(time)
        self.last_time = time
        
def get_plot_vars(world):
    men = 0
    women = 0
    alive = 0
    dead = len(world.traits.dead)
    for person in world.traits.population:
        if person.traits.alive:
            alive += 1
            
            if person.traits.gender == Gender.Female:
                women += 1
            else:
                men += 1
    
    return (women, men, alive, dead)


if __name__ == "__main__":
    new_world = World(START_POP)
    
    women_list = []
    men_list = []
    alive_list = []
    dead_list = []
    
    time_list = []
    prev_dead = 0
    for time in range(0,RUN_IN_MONTHS):
        new_world.tick_time()
        stat_time = real_time.time()
        women, men, alive, dead = get_plot_vars(new_world)
        dead_this_month = dead - prev_dead
        prev_dead = dead
        
        women_list.append(women)
        men_list.append(men)
        alive_list.append(alive)
        dead_list.append(dead_this_month)
        
        time_list.append(time)
        stat_time = real_time.time() - stat_time
        
        plot_time = real_time.time()
        plt.plot(time_list, women_list, label="Women")
        plt.plot(time_list, men_list, label="Men")
        plt.plot(time_list, alive_list, label="Alive")
        plt.plot(time_list, dead_list, label="Dead")
        plt.legend()
        plt.draw()
        plt.pause(0.05)
        plot_time = real_time.time() - plot_time
        
        timing_vars["Render"].append(plot_time)
        timing_vars["StatCollection"].append(stat_time)
    
    total_stats = [i for i in range(0,len(timing_vars["Render"]))]
    
    plt.plot(total_stats, timing_vars["Render"], label="Render")
    plt.plot(total_stats, timing_vars["PersonResolver"], label="PersonResolver")
    plt.plot(total_stats, timing_vars["WorldResolver"], label="WorldResolver")
    plt.plot(total_stats, timing_vars["StatCollection"], label="StatCollection")
    plt.legend()
    
    
    

END DAY 3

Hello again! The final day for this week’s development!

First, I need to specify yesterday’s final couple of graphs. Brielle and I went to dinner and came back to my laptop still trying to crunch the numbers with four threads. So I pushed the code to my local git, pulled it on my desktop (which has a bit more compute) and spun it up on 16 threads, which as you can see towards the end took nearly 25 seconds per person to compute. Also notable is the number of people we managed to get to: ~14 million!

I am thinking the last thing to add this week is just more efficiencies in the World Resolver, in which I think we can reduce a significant amount of time by multi-threading the shuffle and mating cycles. The reason this is what I am targeting is we can infer from the stat collection time that any single-threaded operation will take ~1/4 to ~1/3 of the total time of the world resolution. So, splitting all single-threaded operations will reduce our time(duh).

Currently my reproduction cycle code looks like this:

def reproduction_cycle(self):
    """
    Helper function to handle the reproduction behavior of the population.

    Returns
    -------
    None.

    """
    # Shuffle everyone so we can pick peeps at 'random'
    random.shuffle(self.traits.population)
    
    # Make a copy of the population to ensure we don't change under the 
    # iterator
    pop_copy = self.traits.population.copy()
    people_iterator = iter(pop_copy)
    
    # attempt mating in twos
    for person in people_iterator:
        try:
            self.attempt_mate(person, next(people_iterator))
        except StopIteration:
            break

First thing that pops up in my mind is that shuffle operation. I am willing to bet that is very expensive with larger sizes. Our iteration, though single-threaded, is iterating 2 at a time. This will only save us so long and I believe it would be a constant in a Big-O notation break down.

Let’s start by getting a higher resolution timing on each of the parts of the reproduction cycle function. With timers this code looks like:

def reproduction_cycle(self):
    """
    Helper function to handle the reproduction behavior of the population.

    Returns
    -------
    None.

    """
    shuffle_time = real_time.time()
    # Shuffle everyone so we can pick peeps at 'random'
    random.shuffle(self.traits.population)
    timing_vars["PeopleShuffle"].append(real_time.time() - shuffle_time)
    
    pop_copy_time = real_time.time()
    # Make a copy of the population to ensure we don't change under the 
    # iterator
    pop_copy = self.traits.population.copy()
    people_iterator = iter(pop_copy)
    timing_vars["PeopleCopy"].append(real_time.time() - pop_copy_time)
    
    repro_time = real_time.time()
    # attempt mating in twos
    for person in people_iterator:
        try:
            self.attempt_mate(person, next(people_iterator))
        except StopIteration:
            break
    timing_vars["PeopleMate"].append(real_time.time() - repro_time)

Results over 1000 months on 16 threads:

Ok, shuffling people is expensive, but not as expensive as our mating algorithm, so let’s mix the two. We can “shuffle” by simply selecting two random people in the list and chunking at the same time. Below is my rendering of the “chunked” list generator:

def chunked_list_generator(l, chunks=1):
    """
    Helper function to chunk a list into 'chunks' pieces, using the Knuth
    algorithm already present in the random.shuffle method of python

    Parameters
    ----------
    l : list
        The list to chunk up.
    chunks : int, optional
        The number of chunks to split the list into. The default is 1.

    Yields
    ------
    y_list : list
        Each chunk as they are produced.

    """
    # Get our list length and the number of items to place in each
    len_list = len(l)
    number_per_chunk = int(len_list / chunks)
    # First iterator to go chunk by chunk
    for curr_start in range(0, len_list, number_per_chunk):
        y_list = []
        chunk_end = curr_start + number_per_chunk
        # Second iterator to go individual by individual
        for c in range(curr_start,chunk_end):
            # If we have reached the list length break
            if c >= len_list: break
            # Get a random list index above the current point
            j = random.randint(c, len_list - 1)
            # Swap the current with the random item
            l[c], l[j] = l[j], l[c]
            # Append our Yield list
            y_list.append(l[c])
        # Yield the list
        yield y_list

This method should yield each of the split lists from the main population list that we can then start a thread from and use in the global function:

def mate_list(l, time):
    """
    Helper function to handle the resolution as to whether to individuals
    can mate, and attempt to impregnate them if they can

    Parameters
    ----------
    person_one : Person
        Any individual person
    person_two : Person
        Any other individual person

    Returns
    -------
    None.

    """
    for i in range(0, len(l), 2):
        if (i + 1) >= len(l): break
        one = l[i]
        two = l[i + 1]
    
        # Check if they are female
        one_kids = one.traits.gender == Gender.Female
        two_kids = two.traits.gender == Gender.Female
        
        # Check if one female and one male
        if one_kids and not two_kids or not one_kids and two_kids:
            # Attempt impregnation on the correct individual
            if one_kids:
                one.attempt_to_impregnate(time)
            else:
                two.attempt_to_impregnate(time)

In case it isn’t totally clear, this is intended to run inside a thread so the list should be handed to it along with the current time step of the world. Let’s see what that does:

Surprisingly, this did not yield the expected speedup I wanted. I am thinking this has to do with list copying more than shuffling, specifically when I yield back the list, so let’s attempt yielding a couple list indexes that we can use to slice and see what happens…

Ok, so interestingly enough, we see speedup when separating the shuffle from the chunking method, quite a bit of speedup actually. I should mention that I implemented a scaling multi-thread here, where as the population grows we add threads. This reduces the overhead for smaller populations. Splitting 1000 people into 16 groups and starting threads for those groups just doesn’t really make all that much sense. So, every 100,000 people we add a thread to compute them until we get to the max thread count, at which point we just accept the time impact.

Conclusion

Ok, so at the start of this post I set out to create a “balanced” simulation — and in the middle of the post we had a pretty small-scale version that was quite balanced, except when there were too few women in the world to birth children. Towards the end here, we tried to get the full-scale problem working. Though we managed to get considerable speedup, we couldn’t quite get a manageable full-scale simulation going.

Brielle and I have some ideas on how to get speed improvements for a large model. I look forward to exploring those and balancing the simulation further 🙂

Final code for this week 🙂

# -*- coding: utf-8 -*-
"""
Created on Tue Jan 11 18:13:50 2022

@author: Travis Adsitt
"""

from dataclasses import dataclass
from enum import Enum
from threading import Thread, Lock
import matplotlib.pyplot as plt
import random
import time as real_time

# Timing variables for benchmarking different portions of code
timing_vars = {
    "Render": [],
    "StatCollection": [],
    "PersonResolver": [],
    "WorldResolver": [],
    "Population": [],
    "PeopleShuffle":[],
    "PeopleCopy":[],
    "PeopleMate":[]
    }

DIVIDER = 1
THREADS = 15
POPULATION_PER_THREAD = 100000

START_POP = 1000
RUN_IN_MONTHS = 1000


REPRODUCTIVE_AGE_START = int(192 / DIVIDER)
REPRODUCTIVE_AGE_END = int(420 / DIVIDER)
GESTATION_TIME_MONTHS = int(9 / DIVIDER)
LIFE_TIME_MONTHS = int(876 / DIVIDER)

class Gender(Enum):
    Male = "Male"
    Female = "Female"

@dataclass(init=True)
class WorldTraits:
    population: list
    pop_mutex: Lock
    time: int
    dead: list
    dead_mutex: Lock
    
def tick_people(people, time):
    for person in people:
        person.tick_time(time)

def chunked_list_generator(l, chunks=1):
    """
    Helper function to chunk a list into 'chunks' pieces, using the Knuth
    algorithm already present in the random.shuffle method of python

    Parameters
    ----------
    l : list
        The list to chunk up.
    chunks : int, optional
        The number of chunks to split the list into. The default is 1.

    Yields
    ------
    y_list : list
        Each chunk as they are produced.

    """
    # Get our list length and the number of items to place in each
    len_list = len(l)
    number_per_chunk = int(len_list / chunks)
    # First iterator to go chunk by chunk
    for curr_start in range(0, len_list, number_per_chunk):
        # y_list = []
        chunk_end = curr_start + number_per_chunk
        yield (curr_start, chunk_end)
        """
        # Second iterator to go individual by individual
        for c in range(curr_start,chunk_end):
            # If we have reached the list length break
            if c >= len_list: break
            # Get a random list index above the current point
            j = random.randint(c, len_list - 1)
            # Swap the current with the random item
            l[c], l[j] = l[j], l[c]
            # Append our Yield list
            y_list.append(l[c])
        # Yield the list
        yield y_list
        """


def mate_list(l, time):
    """
    Helper function to handle the resolution as to whether to individuals
    can mate, and attempt to impregnate them if they can

    Parameters
    ----------
    person_one : Person
        Any individual person
    person_two : Person
        Any other individual person

    Returns
    -------
    None.

    """
    for i in range(0, len(l), 2):
        if (i + 1) >= len(l): break
        one = l[i]
        two = l[i + 1]
    
        # Check if they are female
        one_kids = one.traits.gender == Gender.Female
        two_kids = two.traits.gender == Gender.Female
        
        # Check if one female and one male
        if one_kids and not two_kids or not one_kids and two_kids:
            # Attempt impregnation on the correct individual
            if one_kids:
                one.attempt_to_impregnate(time)
            else:
                two.attempt_to_impregnate(time)
    
class World:
    def __init__(self, population):
        assert isinstance(population, int), "population must be of type 'int'"
        
        self.traits = WorldTraits(
            population=[],
            pop_mutex=Lock(),
            time=0,
            dead=[],
            dead_mutex=Lock()
            )
        self.threads = 1
        self.traits.population = [Person(self, 0.0) for p in range(0, population)]
    
    def birth(self):
        """
        Helper function for the Person object to inform the World that they have
        had a child.

        Returns
        -------
        None.

        """
        # Create a new person
        new_peep = Person(self, 0.0)
        
        #Get our mutex
        self.traits.pop_mutex.acquire()
        
        # Add them to the population
        self.traits.population.append(new_peep)
        
        self.traits.pop_mutex.release()
    
    def death(self, new_dead):
        if new_dead not in self.traits.population:
            return
        
        self.traits.dead_mutex.acquire()
        self.traits.dead.append(new_dead)
        self.traits.dead_mutex.release()
        
        self.traits.pop_mutex.acquire()
        self.traits.population.remove(new_dead)
        self.traits.pop_mutex.release()
        
    
    def attempt_mate(self, person_one, person_two):
        """
        Helper function to handle the resolution as to whether to individuals
        can mate, and attempt to impregnate them if they can

        Parameters
        ----------
        person_one : Person
            Any individual person
        person_two : Person
            Any other individual person

        Returns
        -------
        None.

        """
        # Check if they are female
        one_kids = person_one.traits.gender == Gender.Female
        two_kids = person_two.traits.gender == Gender.Female
        
        # Check if one female and one male
        if one_kids and not two_kids or not one_kids and two_kids:
            # Attempt impregnation on the correct individual
            if one_kids:
                person_one.attempt_to_impregnate(self.traits.time)
            else:
                person_two.attempt_to_impregnate(self.traits.time)
    
    
    
    def reproduction_cycle(self):
        """
        Helper function to handle the reproduction behavior of the population.
    
        Returns
        -------
        None.
    
        """
        shuffle_time = real_time.time()
        # Shuffle everyone so we can pick peeps at 'random'
        random.shuffle(self.traits.population)
        timing_vars["PeopleShuffle"].append(real_time.time() - shuffle_time)
        
        pop_copy_time = real_time.time()
        # Make a copy of the population to ensure we don't change under the 
        # iterator
        pop_copy = self.traits.population.copy()
        timing_vars["PeopleCopy"].append(real_time.time() - pop_copy_time)
        
        repro_time = real_time.time()
        num_threads = self.get_num_threads()
        threads = []
        # attempt mating in twos
        for s, e in chunked_list_generator(pop_copy, num_threads):
            threads.append(Thread(target=mate_list, args=(pop_copy[s:e],self.traits.time, )))
            threads[-1].start()
        
        for t in threads:
            t.join()

        timing_vars["PeopleMate"].append(real_time.time() - repro_time)
    
    def tick_time(self):
        """
        Helper function to push the world forward one month at a time.

        Returns
        -------
        None.

        """
        world_resolution_time = 0
        person_resolution_time = 0
        
        threads = []
        person_time = real_time.time()
        people = self.traits.population
        num_threads = self.get_num_threads()
        num_per_thread = int(len(people) / num_threads) + 1
        thread_data = [people[i:i + num_per_thread] for i in range(0, len(people), num_per_thread)]
            
        
        for t in thread_data:
            threads.append(Thread(target=tick_people, args=(t,self.traits.time, )))
            threads[-1].start()
            
        for t in threads:
            t.join()
        
        person_time = (real_time.time() - person_time) / len(people)
        person_resolution_time = person_time if person_resolution_time == 0 else (person_time + person_resolution_time) / 2
        world_resolution_time += person_resolution_time
                
        world_time = real_time.time()
        # Try and mate
        self.reproduction_cycle()
        # Tick our time forward
        self.traits.time += 1
        world_time = real_time.time() - world_time
        
        timing_vars["WorldResolver"].append(
            world_resolution_time + world_time
            )
        timing_vars["PersonResolver"].append(
            person_resolution_time
            )
    
    def get_num_threads(self):
        num_threads = int(len(self.traits.population) / POPULATION_PER_THREAD)
        
        num_threads = num_threads if num_threads < THREADS else THREADS
        
        num_threads = num_threads or 1
        
        if(num_threads != self.threads):
            print(f"Threads changing from {self.threads} to {num_threads}", flush=True)
            self.threads = num_threads
        return num_threads

@dataclass(init=True)
class PersonTraits:
    alive: bool
    ispregnant: bool
    pregnancystart: int
    age: int
    gender: Gender
    money: float

class Person:
    def __init__(self, world, money):
        assert isinstance(world, World), "world, must be of type 'World'"
        assert isinstance(money, float), "money must be of type 'float'"
        
        # Set our world object
        self.world = world
        # Set our initial 'last_time'
        self.last_time = self.world.traits.time
        # Set our initial traits
        self.traits = PersonTraits(
            alive=True,
            ispregnant=False, 
            pregnancystart=None, 
            age=0, 
            gender=Gender[random.choice([g.name for g in Gender])],
            money=money
        )
    
    def spend_money(self, amount):
        """
        Used to check if this person can spend money, and if so, does spend
        the amount passed in.

        Parameters
        ----------
        amount : float
            How much money should I spend?

        Returns
        -------
        bool
            If I spent the money

        """
        spent = False
        
        if(self.money >= amount):
            self.money -= amount
            spent = True
        
        return spent
    
    def advance_automatic_tickers(self, time):
        """
        Helper function to advance the automatic tickers for a person, such as
        age or checking for a pregnancy... Those sorts of things.

        Parameters
        ----------
        time : int
            The current time in the world.

        Returns
        -------
        None.

        """
        # We get older
        self.traits.age += time - self.last_time
        
        # We sometimes die
        if(self.traits.age > LIFE_TIME_MONTHS):
            self.traits.alive = False
            self.world.death(self)
        
        # Sometimes people give birth
        if self.traits.ispregnant and ((time - self.traits.pregnancystart) > GESTATION_TIME_MONTHS):
            self.world.birth()
            self.traits.ispregnant = False
            self.traits.pregnancystart = None
    
    def wants_childeren(self):
        """
        For the world to call when determining if a pregnancy should happen.

        Returns
        -------
        bool
            If I want childeren

        """
        # ~16 to ~35 years old
        wants_childeren = self.traits.age > REPRODUCTIVE_AGE_START
        
        if(self.traits.gender == Gender.Female):
            wants_childeren = wants_childeren and self.traits.age < REPRODUCTIVE_AGE_END
        
        # Can't have childeren if we already have them
        wants_childeren = not self.traits.ispregnant and wants_childeren
        
        return wants_childeren
    
    def attempt_to_impregnate(self, time):
        """
        Impregnate this person(female)

        Returns
        -------
        None.

        """
        if self.traits.gender == Gender.Female and self.wants_childeren() and self.traits.alive:
            self.traits.ispregnant = True
            self.traits.pregnancystart = time
    
    def tick_time(self, time):
        """
        A 'behavior' function to encapsulate a decision point for this person.

        Parameters
        ----------
        time : int
            The current time tick of the world.

        Returns
        -------
        None.

        """
        self.advance_automatic_tickers(time)
        self.last_time = time
        
def get_plot_vars(world):
    men = 0
    women = 0
    alive = 0
    dead = len(world.traits.dead)
    for person in world.traits.population:
        if person.traits.alive:
            alive += 1
            
            if person.traits.gender == Gender.Female:
                women += 1
            else:
                men += 1
    
    return (women, men, alive, dead)


if __name__ == "__main__":
    new_world = World(START_POP)
    
    women_list = []
    men_list = []
    alive_list = []
    dead_list = []
    
    time_list = []
    prev_dead = 0
    for time in range(0,RUN_IN_MONTHS):
        new_world.tick_time()
        stat_time = real_time.time()
        women, men, alive, dead = get_plot_vars(new_world)
        dead_this_month = dead - prev_dead
        prev_dead = dead
        
        women_list.append(women)
        men_list.append(men)
        alive_list.append(alive)
        dead_list.append(dead_this_month)
        
        time_list.append(time)
        stat_time = real_time.time() - stat_time
        
        plot_time = real_time.time()
        plt.plot(time_list, women_list, label="Women")
        plt.plot(time_list, men_list, label="Men")
        plt.plot(time_list, alive_list, label="Alive")
        plt.plot(time_list, dead_list, label="Dead")
        plt.legend()
        plt.draw()
        plt.pause(0.05)
        plot_time = real_time.time() - plot_time
        
        timing_vars["Render"].append(plot_time)
        timing_vars["StatCollection"].append(stat_time)
    
    total_stats = [i for i in range(0,len(timing_vars["Render"]))]
    
    ignore_timings = [
    # "Render",
    # "StatCollection",
    "PersonResolver",
    # "WorldResolver",
    "Population",
    "PeopleShuffle",
    "PeopleCopy",
    "PeopleMate"
    ]
    for key in timing_vars:
        if key in ignore_timings:
            continue
        
        plt.plot(total_stats, timing_vars[key], label=key)
        plt.legend()

Paper Boy Progress Report Final (For Now)

This week I managed to get a black screen to show up between different scenes so you can no longer see each scene being setup. Nothing else has changed…

This post is meant to cap the progress on this game for now, I think it is at a point where the core mechanic is clear and polished enough to be fun on occasion. There are bigger visions that I have for this project, but to do weekly progress reports at this point just seems not useful as the progress would be much slower. With a full time job, limited knowledge on the Unity Game Engine and the XR utilities of the engine being fairly young still I don’t think progress will be quick enough to be interesting to any audience.

However with all of that said, I am pleased to inform you that below is a download link to the most recent build for you to download and side load to your headset. A couple things to note:

  • You do not need a bike to ‘ride’ simply use the right joystick to move forward
  • If you do have a bike capable of Bluetooth connectivity it must be a Echelon Connect to be compatible with this game.
    • If there is a bike that you would like me to make compatible let me know, I can try and implement a solution so you can use your bike 🙂
  • This is a VERY rough copy, it will be enjoyable, but don’t expect a polished game.

Thank you all for joining me in the progress reports for this early game, I have learned a TON and honestly feel pretty proud of how far I came starting with zero knowledge about game development. Of course the University Degree in Computer Science helped but…. you know. Thank you 🙂

Paper Boy Progress Report #6

Change Log

  • Added procedural pathway generation
    • This allowed the addition of a bunch of new houses
  • Added automatic target placement on front door pathway
  • Added Coroutines to prevent lag spikes in level loading

This week I implemented the back end for procedural generation of pathways for houses in the game. The ability to simply place ’empty’ game objects to mark the front door, garage door and back two corners of the house and have it generate the paths and the grass is so useful. Now regardless of the meshes we can generate the base environment for the front of the house.

Below you can see an animation of the paths and grass that are placed in the generation.

To explain a bit, the grass is filled in on the front door side of the driveway with each path block as it is placed. Basically we place a block, then do a little logic to tell which sides of the brick need grass, and shrink it to the width of the block give it a length and place that object on the midpoint of that side of the brick. On the opposite side of the driveway we simply place grass the same length as the driveway.

This method works really well, or at least well enough, I am pleased with the results.

Demo

I implemented Coroutines, which means there is no longer a lag spike when loading into the level, however it also means you see the level being setup which is sort of funny, but shouldn’t be included — one of the next things I would like to implement is a loading screen so all the level switches are a bit more kind to the user.

With that qualifier enjoy the demo and thank you for stopping in and reading! Please feel free to leave a comment 🙂 I would like to hear your thoughts.

Arduino RP2040 Nano Connect SPI Bus

This post is to document the setup for a simple SPI Bus test using a RP2040 Nano Connect as a controller and the Arduino Uno as a peripheral device. The board connections look as follows:

I couldn’t find a good example of how to use the RPi4 as a peripheral device, and didn’t want to spend the time of coding it myself. So the Arduino Uno’s ATMega328p chip seemed like a reasonable fit for this — plenty of examples and simple enough for me to wrap my head around it.

With all that said I will be using the Arduino IDE to program both devices and the SPI library to handle byte transfers.

Useful Links

Peripheral Device Code

I like to start with the full picture and drill down into it, so below is the full code for the peripheral device.

/*
    SPI Bus Test
    Copyright (C) 2021  Travis Adsitt

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include<SPI.h>

#define SPI_MODE 3
#define MAX_BUFFER_SIZE 50

volatile boolean received;
volatile byte peripheral_received,peripheral_send;

char print_buffer [50]; // Used for print formatting

byte *command_buffer;
byte *command_buffer_end;
byte *command_buffer_start;
int command_buffer_count;

byte *send_buffer;
byte *send_buffer_end;
byte *send_buffer_start;
int send_buffer_count;

int counter; // Counter to control
byte counter_low;
byte counter_high;

void setup()

{
  Serial.begin(115200);
  
  pinMode(MISO,OUTPUT);                   

  SPCR |= _BV(SPE);   
  SPCR |= _BV(SPIE);
  // SPCR |= _BV(DORD);
  SPCR |= (SPI_MODE << 2);
  
  command_buffer_count = 0;
  
  command_buffer = calloc(0, MAX_BUFFER_SIZE);
  command_buffer_start = command_buffer;
  command_buffer_end = command_buffer;

  
  send_buffer_count = 0;
  
  send_buffer = calloc(0, MAX_BUFFER_SIZE);
  send_buffer_start = send_buffer;
  send_buffer_end = send_buffer;

  counter = 0;
  
  SPI.attachInterrupt();            
}

void add_command_to_buffer(byte command){
  if(command_buffer_count == MAX_BUFFER_SIZE || command == 0x00) return;
  
  *command_buffer_end = command;

  if(command_buffer_end == (command_buffer + MAX_BUFFER_SIZE)){
    command_buffer_end = command_buffer;
  }else{
    command_buffer_end++;
  }
  
  command_buffer_count++;
}

byte get_command(){
  if(!command_buffer_count) return 0x00;
  
  byte command = *command_buffer_start;
  if(command_buffer_start == (command_buffer + MAX_BUFFER_SIZE)){
    command_buffer_start = command_buffer;
  }else{
    command_buffer_start++;
  }
  
  command_buffer_count--;

  return command;
}

byte get_send_buffer_byte(){
  if(!send_buffer_count) return 0x00;

  byte ret_send = *send_buffer_start;
  
  if(send_buffer_start == (send_buffer + MAX_BUFFER_SIZE)){
    send_buffer_start = send_buffer; 
  }else{
    send_buffer_start++;
  }

  send_buffer_count--;

  return ret_send;
}

void add_byte_to_send_buffer(byte data_to_send){
  if(send_buffer_count == MAX_BUFFER_SIZE){
    return;
  }

  *send_buffer_end = data_to_send; 
  
  if(send_buffer_end == (send_buffer + MAX_BUFFER_SIZE)){
    send_buffer_end = send_buffer; 
  }else{
    send_buffer_end++;
  }

  send_buffer_count++;
}

ISR (SPI_STC_vect)                         
{
  byte command = SPDR;
  
  SPDR = get_send_buffer_byte();
  
  add_command_to_buffer(command);
}


void loop()
{ 
  if (command_buffer_count > 0) //Check for more commands
  {
    // Grab the next command to process
    peripheral_received = get_command();
  
    // Print command recieved
    sprintf(print_buffer,"Processing Command: %x Commands Left: %d",peripheral_received, command_buffer_count);
    Serial.println(print_buffer);
    switch(peripheral_received){
      case 0x01: // Add to the counter
        Serial.println("Adding to counter");
        counter++;
        break;
      case 0x02: // Get the counter
        add_byte_to_send_buffer(counter & 0xff);
        add_byte_to_send_buffer((counter >> 8) & 0xff);
        break;
      case 0x03: // Reset the counter
        Serial.println("Resetting the Counter");
        counter = 0;
        break;
      case 0x04: // Are you still there?
        Serial.println("Sending heartbeat");
        add_byte_to_send_buffer(0xff); // Yes :)
        break;
    }
    received = false;
  }
}

Ok to start lets look at the main loop…

void loop()
{ 
  if (command_buffer_count > 0) //Check for more commands
  {
    // Grab the next command to process
    peripheral_received = get_command();
  
    // Print command recieved
    sprintf(print_buffer,"Processing Command: %x Commands Left: %d",peripheral_received, command_buffer_count);
    Serial.println(print_buffer);
    switch(peripheral_received){
      case 0x01: // Add to the counter
        Serial.println("Adding to counter");
        counter++;
        break;
      case 0x02: // Get the counter
        Serial.println("Sending the Counter");
        add_byte_to_send_buffer(counter & 0xff);
        add_byte_to_send_buffer((counter >> 8) & 0xff);
        break;
      case 0x03: // Reset the counter
        Serial.println("Resetting the Counter");
        counter = 0;
        break;
      case 0x04: // Are you still there?
        Serial.println("Sending heartbeat");
        add_byte_to_send_buffer(0xff); // Yes :)
        break;
    }
    received = false;
  }
}

Our SPI bus is configured manually using the SPI Control Register(SPCR), I will leave it up to you to review the ATMega328P datasheet for more information on the bit values I set in there.

In the loop first we check to see if any new commands have come across the bus by checking the global variable ‘command_buffer_count’. If there are commands waiting then grab the next one in the queue with a call to ‘get_command()’. With the command stored in our ‘peripheral_received’ variable we can enter the switch case statement that identifies the commands we care about, namely:

  • 0x01
    • Add to our internal counter
  • 0x02
    • Send the current state of the counter
  • 0x03
    • Reset our counter
  • 0x04
    • Send heartbeat, this is used to verify there is indeed a device online

You might notice in each of the case blocks there are calls similar to our get_command() call to helper routines that manage a queue for us. Lets dig into this concept a bit.

A Queue for Command Management

Before we look at how the queue is setup and managed lets look at the Interrupt Service Routine(ISR) that is in our program:

ISR (SPI_STC_vect)                         
{
  byte command = SPDR;
  
  SPDR = get_send_buffer_byte();
  
  add_command_to_buffer(command);
}

This little block of code is run anytime there is a byte of data fully received and available in the SPI Data Register(SPDR), it is an interrupt because anything executing on the Microcontroller(MCU) at the time is immediately interrupted to handle the data in the buffer. You want your interrupt routines to be extremely simple as to not miss any data that might be being shifted in while you are executing the handler code.

So the first thing we do is grab the current byte from the register, and replace it with the next byte we want to send (if there isn’t a byte to send we replace it with 0x00). Then we add it to our command buffer queue to be processed in our main loop, we don’t process it here because that would be too compute expensive for the rate at which commands might be coming across the bus.

Finally we come to the concept of this queue… The command and send_data buffers are setup as such:

  command_buffer_count = 0;
  
  command_buffer = calloc(0, MAX_BUFFER_SIZE);
  command_buffer_start = command_buffer;
  command_buffer_end = command_buffer;

  
  send_buffer_count = 0;
  
  send_buffer = calloc(0, MAX_BUFFER_SIZE);
  send_buffer_start = send_buffer;
  send_buffer_end = send_buffer;

Basically we have an arbitrary ‘MAX_BUFFER_SIZE’ that we use with calloc(clear allocation, meaning we set all the memory to 0 in this case after we allocate it — this is more expensive but at least you know the start state of your memory) to get a set amount of memory for our buffer.

Then two pointers and a count variable to manage the queue are established, having all three of these can be seen as a bit of overkill considering you should be able to infer everything from a counter and list start pointer, or even start and end pointers to infer count, but I want to keep it simple and just track a bit more.

Using those buffers we have the command add and get routines below:

void add_command_to_buffer(byte command){
  if(command_buffer_count == MAX_BUFFER_SIZE || command == 0x00) return;
  
  *command_buffer_end = command;

  if(command_buffer_end == (command_buffer + MAX_BUFFER_SIZE)){
    command_buffer_end = command_buffer;
  }else{
    command_buffer_end++;
  }
  
  command_buffer_count++;
}

byte get_command(){
  if(!command_buffer_count) return 0x00;
  
  byte command = *command_buffer_start;
  if(command_buffer_start == (command_buffer + MAX_BUFFER_SIZE)){
    command_buffer_start = command_buffer;
  }else{
    command_buffer_start++;
  }
  
  command_buffer_count--;

  return command;
}

When adding a command we check if the buffer is full or the command is 0x00 and just return if either is the case. If not though, we set the end pointers data equal to the command handed in, check if we are at the end of the buffer and if so then wrap it to the buffers location if not we just add one to the buffer end pointer. Finally add one to the buffers count.

On the get command side we do much the same except we pull from the start of the buffer and set the start to +1 from its current location and subtract from the buffer_count.

For the send_data buffer you will see exactly the same layout:

byte get_send_buffer_byte(){
  if(!send_buffer_count) return 0x00;

  byte ret_send = *send_buffer_start;
  
  if(send_buffer_start == (send_buffer + MAX_BUFFER_SIZE)){
    send_buffer_start = send_buffer; 
  }else{
    send_buffer_start++;
  }

  send_buffer_count--;

  return ret_send;
}

void add_byte_to_send_buffer(byte data_to_send){
  if(send_buffer_count == MAX_BUFFER_SIZE){
    return;
  }

  *send_buffer_end = data_to_send; 
  
  if(send_buffer_end == (send_buffer + MAX_BUFFER_SIZE)){
    send_buffer_end = send_buffer; 
  }else{
    send_buffer_end++;
  }

  send_buffer_count++;
}

In fact, it is so similar it might make sense to generalize the functions and let the caller specify the buffer using a struct or something… For now that is all for the peripheral device.

Controller Device Code

If you are still with me the controller code is much simpler and should be a breeze to review. Again we will start with the whole picture:

/*
    SPI Bus Test
    Copyright (C) 2021  Travis Adsitt

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include<SPI.h>

// Define our Chip Select pin
#define CS 10

// SPI settings
SPISettings settings(9600, LSBFIRST, SPI_MODE3);

char print_buffer [50]; // Used for print formatting

void setup()
{
  // Begin a serial session for debug printing
  Serial.begin(9600);
  // Begin our SPI bus library functionality
  SPI.begin();

  // Setup all pin podes
  pinMode(CS, OUTPUT);
  pinMode(MOSI, OUTPUT);
  pinMode(MISO, INPUT);
  pinMode(SCK, OUTPUT);
  pinMode(9, OUTPUT);

  // Set our Chip Select high, this turns it 'off'
  digitalWrite(CS, HIGH);
}

void loop(void)
{
  // A place for us to store the counter when we request it
  int counter = 0;

  // Drop Chip Select and begin a transaction
  digitalWrite(CS, LOW);
  SPI.beginTransaction(settings);

  // We are simply going to count up and send as a command
  int i = 1;
  while(i < 5){
    // If we are sending a two expect the return to 
    // be the current counters value.
    if(i == 2){
      counter = SPI.transfer(i);
    }else{
      SPI.transfer(i);
    }
    i++;
  }
  // Send 0x00 to clear SPDR preventing bad data on next transaction
  SPI.transfer(0x00);
  // Bring chip select back up and end the transaction
  digitalWrite(CS, HIGH);
  SPI.endTransaction();

  // Print our counter and delay for easier reading on serial
  // terminal.
  sprintf(print_buffer,"Counter Lower Value: %x",counter);
  Serial.println(print_buffer);
  delay(500);
}

The setup section is just specifying the pin modes for the SPI library and ourselves. We control the Chip Select(CS) line but the rest is handled by the SPI library. Despite the library handling things I highly recommend reading up on what that library is doing, it is really impressive and very cool.

Anyway, in the main loop you will see we are simply counting from 0-4 and sending the current index as a command to the peripheral. We capture the return value when sending 2 as that should return the value of the counter. And because we are in Least Significant Bit First(LSBFIRST) mode we expect the first bit to be a one as we have only added 1 to the peripherals counter. If we cared to look, when we send the 4 we would expect also to see the 0xFF of the heartbeat, that can be seen in the O-Scope capture below:

O-Scope capture of CIPO(MISO) line over Clock line

In this picture you can actually see the decoded values below the wave form, this includes the heartbeat at the end.

Output

Peripheral Serial Output

Processing Command: 1 Commands Left: 0
Adding to counter
Processing Command: 2 Commands Left: 0
Sending the Counter
Processing Command: 3 Commands Left: 1
Resetting the Counter
Processing Command: 4 Commands Left: 0
Sending heartbeat

Here we see the command that is being processed along with the queue remaining. Because it is such a short burst we see it only start to stack commands when we are processing command ‘0x03’.

Controller Serial Output

Counter Lower Value: 1

As expected we receive a value of one from the peripheral.

Conclusion

This dive into SPI communication was very informative for me, I hope this article is helpful in guiding someone else by showing a working example. Please comment if you find any inaccuracies that should be corrected or would like to discuss the content. Thank you for reading!

Paper Boy Progress Report #5

Preface

This week features no direct in-game demo, instead this is the theory behind procedural generation of pathways for each houses driveway and front door paths. I totally understand if the theory is something you skip — but there are some pretty pictures along the way that you can scroll through and check out 🙂

Either way thank you for stopping in — and with this theory will come (I hope) a wicked awesome post next week. Thank you!

Procedural Generation of House Paths

I want to start implementing procedural generation of the environment. This will allow me to expand the objects used in a scalable way as well as increase the variability of the environment with random generation. So to start I built a python program to generate paths from two points, the garage door and the front door, to the road.

For a little added flair the script creates a GIF to show step by step how the paths are constructed, this also helps with debug.

# -*- coding: utf-8 -*-
"""
Created on Mon Dec 20 06:34:29 2021

@author: Travis Adsitt
"""

from PIL import Image, ImageDraw, ImageColor
import random, math

GLOBAL_EDGE_COUNT = 260
CUBE_EDGE_SIZE = 20
PATH_BUFFER = 5
ROAD_Y = 200

CUBES_PERLINE = GLOBAL_EDGE_COUNT / CUBE_EDGE_SIZE

def GetNewBufferedLocation(not_x=None, not_y=None):
    """
    
    
    Parameters
    ----------
    not_x : int, optional
        Avoid this x value. The default is None.
    not_y : int, optional
        Avoid this y value. The default is None.

    Returns
    -------
    x : int
        A new random x-point within the buffered area.
    y : int
        A new random y-point within the buffered area.

    """
    buffered_house_area = GLOBAL_EDGE_COUNT - CUBE_EDGE_SIZE
    x = random.randint(CUBE_EDGE_SIZE, buffered_house_area)
    if not_x:
        while x == not_x:
            x = random.randint(CUBE_EDGE_SIZE, buffered_house_area)
            
    y = random.randint(CUBE_EDGE_SIZE, ROAD_Y)
    if not_y:
        while y == not_y:
            y = random.randint(CUBE_EDGE_SIZE, buffered_house_area)
            
    return (x,y)


def GetRoadBoundaryLine():
    """
    Helper function to get the two points describing the roads boundary    
    
    Returns
    -------
    List[Tuple(int x, int y), Tuple(int x, int y)]
        Left and right most point representing the road boundary line.

    """
    P1 = (0,ROAD_Y)
    P2 = (GLOBAL_EDGE_COUNT, ROAD_Y)
    
    return [P1,P2]

def GetGaragePathPoints(prev_point):
    """
    Helper function to yield points until the driveway meets the road

    Parameters
    ----------
    prev_point : Tuple(int x, int y)
        The most recent garage path point.

    Yields
    ------
    Tuple(int x, int y)
        The next point for the driveway.

    """
    curr_point = prev_point
    
    while curr_point[1] <= ROAD_Y:
        curr_point = (curr_point[0], curr_point[1] + 1)
        yield curr_point
        
def GetDistance(P1, P2):
    """
    Helper function to get the distance between two points in non-diagonal way

    Parameters
    ----------
    P1 : Tuple(int x, int y)
        Point one.
    P2 : Tuple(int x, int y)
        Point two.

    Returns
    -------
    int
        Distance between the two input points on a non-dagonal basis.

    """
    x1 = P1[0]
    y1 = P1[1]
    
    x2 = P2[0]
    y2 = P2[1]
    
    xd = x1 - x2 if x1 > x2 else x2 - x1
    yd = y1 - y2 if y1 > y2 else y2 - y1
    
    return xd + yd# math.sqrt(xd ** 2 + yd ** 2)

def GetFrontDoorPathTarget(fd_point, garage_points):
    """
    Helper function to get the target point for the front door path

    Parameters
    ----------
    fd_point : Tuple(int x, int y)
        The point of the front doors location.
    garage_points : List[Tuple(int x, int y)]
        All of the points describing the driveway.

    Returns
    -------
    Tuple(int x, int y)
        The point to target for front pathway.

    """
    # Get the closest road point and its distance
    road_point = (fd_point[0], ROAD_Y)
    road_point_distance = GetDistance(fd_point, road_point)
    # Get the closest drive way point and set a start distance
    garage_point = garage_points[0]
    garage_point_distance = GetDistance(fd_point, garage_point)
    
    # Iterate all driveway points to ensure there isn't one closer
    for point in garage_points:
        curr_point_dist = GetDistance(fd_point, point)
        
        if curr_point_dist < garage_point_distance:
            garage_point = point
            garage_point_distance = curr_point_dist
            
        if curr_point_dist > garage_point_distance:
            break
    
    # Return whichever point (driveway or road) is closer to be used as a target
    return garage_point if garage_point_distance < road_point_distance else road_point

def GetFrontDoorPathPoints(prev_point, garage_points):
    """
    Helper function to properly draw a path from the front door to the 
    drive way or the road whichever is closer

    Parameters
    ----------
    prev_point : Tuple(int x, int y)
        Front door location.
    garage_points : List[Tuple(int x, int y)]
        List of points describing the driveways path.

    Yields
    ------
    curr_point : Tuple(int x, int y)
        The next point in the front path.

    """
    # Get our target, either the nearest driveway point or the road
    target = GetFrontDoorPathTarget(prev_point, garage_points)
    # Local variable for current point tracking
    curr_point = prev_point
    
    # Iterate the y direction before the x, no diagonals are allowed
    while curr_point[1] != target[1]:
        yield curr_point
        curr_point = (curr_point[0], curr_point[1] + 1)
    
    # Iterate the x direction to complete the path
    while curr_point[0] != target[0]:
        yield curr_point
        delta = 1 if curr_point[0] < target[0] else -1
        curr_point = (curr_point[0] + delta, curr_point[1])
    
    

def GetPaths(garage_door_point, front_door_point):
    """
    Helper function to get the garage door and front door paths to the road.

    Parameters
    ----------
    garage_door_point : TYPE
        DESCRIPTION.
    front_door_point : TYPE
        DESCRIPTION.

    Yields
    ------
    Tuple(int x, int y) || None
        Next Driveway point.
    Tuple(int x, int y) || None
        Next Front Pathway point.

    """
    curr_g_door_point = garage_door_point
    curr_f_door_point = front_door_point
    
    garage_points = []
    front_points = []
    
    # Initial path buffer of PATH_BUFFER length to ensure distance from doors
    for i in range(PATH_BUFFER):
        garage_points.append(curr_g_door_point)
        front_points.append(curr_f_door_point)
        
        curr_g_door_point = (curr_g_door_point[0], curr_g_door_point[1] + 1)
        curr_f_door_point = (curr_f_door_point[0], curr_f_door_point[1] + 1)
        
        yield (curr_g_door_point, curr_f_door_point)
    
    # Finish writing points for the garage door to go to the road
    for g in GetGaragePathPoints(curr_g_door_point):
        garage_points.append(g)
        yield (g, None)
    
    # Finish writing points for the front door to go to the road or garage path
    for f in GetFrontDoorPathPoints(curr_f_door_point, garage_points[5:]):
        yield (None, f)
    

if __name__ == "__main__":
    # Get random front door and garage locations
    front_door = GetNewBufferedLocation()
    garage_door = GetNewBufferedLocation(not_x=front_door[0])
    
    # A place to store all our images
    images = []
    
    # A place to keep track of all points to be drawn on each frame
    r_points = []
    g_points = []
    
    for point in GetPaths(garage_door,front_door):
        # Create a new image for current frame
        out = Image.new("RGBA", (GLOBAL_EDGE_COUNT,GLOBAL_EDGE_COUNT))
        d = ImageDraw.Draw(out)
        
        # Draw Road Boundary
        d.line(GetRoadBoundaryLine(), fill="blue")
        
        # Append garage and front path points to respective lists
        if point[0]:
            r_points.append(point[0])
        if point[1]:
            g_points.append(point[1])
            
        d.point(r_points,fill="red")
        d.point(g_points,fill="green")
        
        images.append(out)
    
    # Save our GIF
    images[0].save(
        'house_paths.gif', 
        save_all=True, 
        append_images=images[1:], 
        optimize=True, 
        duration=100,
        loop=0
    )
    
    

Some examples of the output (Green == Front Door Path :: Red == Driveway):

So it is clear this is generating unrealistic front door and garage locations — but luckily at this stage I don’t really care about realism, I just want to know the algorithm is working. This demonstrates that it is, now we need to expand this to represent points a bit differently. In a 2D space we can have objects that cover multiple pixels, and so the first solution I propose is to represent each game object by its upper left point and its lower right point. From these two points we should be able to infer a whole lot of information about the object itself, and it should give us everything we need to properly position it in the world.

2-Point Object Representation

Assumptions

To start lets get some assumptions out there.

  1. All objects we care to place are rectangular
  2. All 3D rectangular objects can be measured by two corners
    • one at the lowest z position in any corner
    • the other at the highest z position in the opposite diagonal corner from the first

High Level

At a high level these two points can tell us everything we need to know about the path or driveway block we are trying to place. Specifically its rotation in the world, how it aligns or doesn’t align with other objects in the world, also should the block be to large in any one direction we should be able to work out the amount it needs to be scaled by to fit the spot. For the Unity translation this will mean attaching two empties to the parent object we can search for when generating the world. If anyone has a better way of doing this please please comment, I have been looking for an alternative including bounding boxes of the Renderer but those weren’t as reliable as the empty placement is.

So with that lets get to coding a 2D representation of what we want using Python, and hopefully after this we can move the logic to Unity and begin adding more houses without having to manually place paths!

Code

# -*- coding: utf-8 -*-
"""
Created on Tue Dec 21 09:14:20 2021

@author: Travis Adsitt
"""

from enum import Enum
from PIL import Image, ImageDraw, ImageColor
import random


WORLD_EDGE_SIZE = 250
CUBE_EDGE_SIZE = 10
ROAD_Y = 200

class CompassDirection(Enum):
    North="North"
    East="East"
    South="South"
    West="West"

class Point:
    def __init__(self, x, y):
        """
        Object initializer for a 2D Point

        Parameters
        ----------
        x : int
            The x value of the point.
        y : int
            The y value of the point.

        Returns
        -------
        None.

        """
        self.x = x
        self.y = y
    
    @classmethod
    def is_point(cls, P1):
        """
        Used to check if an object is of type point

        Parameters
        ----------
        P1 : Object
            Object to test if it is of type Point.

        Returns
        -------
        Boolean

        """
        return isinstance(P1, Point)
    
    @classmethod
    def distance(cls, P1, P2, keep_orientation=False):
        """
        Get the distance between two points

        Parameters
        ----------
        P1 : Point
            First point.
        P2 : Point
            Second point.

        Returns
        -------
        xd : int
            Distance between the points in the x direction.
        yd : int
            Distance between the points in the y direction.

        """
        assert Point.is_point(P1), "P1 must be a point for distance"
        assert Point.is_point(P2), "P2 must be a point for distance"
        
        x1 = P1.x
        y1 = P1.y
        
        x2 = P2.x
        y2 = P2.y
        
        xd = x1 - x2 if x1 > x2 and not keep_orientation else x2 - x1
        yd = y1 - y2 if y1 > y2 and not keep_orientation else y2 - y1
        
        return (xd,yd)
    
    @classmethod
    def midpoint(cls, P1, P2):
        """
        Get the midpoint between two points

        Parameters
        ----------
        P1 : Point
            First point.
        P2 : Point
            Second point.

        Returns
        -------
        Point
            The midpoint located at the center of the two points.

        """
        dist_x, dist_y = cls.distance(P1, P2)
        
        x_offset = P1.x if P1.x < P2.x else P2.x
        y_offset = P1.y if P1.y < P2.y else P2.y
        
        return Point((dist_x / 2) + x_offset, (dist_y / 2) + y_offset)
    
    def as_list(self):
        """
        Helper function to put this point in a list

        Returns
        -------
        list
            [x, y] values.

        """
        return [self.x, self.y]
    
    def __add__(self, other):
        """
        Dunder method to add two points together

        Parameters
        ----------
        other : Point
            The point to add to this one.

        Returns
        -------
        ret_point : Point
            The point that results from the addition.

        """
        assert Point.is_point(other), "Cannot add a Point to a non-Point"
        
        ret_point = Point(self.x + other.x, self.y + other.y)
        
        return ret_point
    
    def __str__(self):
        return f"({self.x},{self.y})"

ORIGIN = Point(0,0)

class Object2D:
    def __init__(self, P1, P2, fill_color="gray", outline_color="red"):
        """
        Object initializer for a 2D game object

        Parameters
        ----------
        P1 : Point
            Corner one of the 2D object.
        P2 : Point
            Corner two of the 2D object.

        Returns
        -------
        None.

        """
        assert Point.is_point(P1), "P1 must be a point for 2D Object"
        assert Point.is_point(P2), "P2 must be a point for 2D Object"
        
        self.points = {"p1": P1, "p2": P2}
        self.fill = fill_color
        self.outline = outline_color
        
    def move_xy(self, move_to, reference_point=None):
        """
        Function to move this object using a reference point

        Parameters
        ----------
        move_to : Point
            Where to move this object.
        reference_point : Point, optional
            The point to move in reference to. The default is None.

        Returns
        -------
        None.

        """
        ref_point = reference_point or self.points["p1"]
        
        assert Point.is_point(move_to), "move_to must be of type Point"
        assert Point.is_point(ref_point), "reference_point must be of type Point"
        
        # Get our delta for the reference point to the move point
        ref_move_delta_x, ref_move_delta_y = Point.distance(ref_point, move_to, keep_orientation=True)
        
        self.points["p1"] += Point(ref_move_delta_x, ref_move_delta_y)
        self.points["p2"] += Point(ref_move_delta_x, ref_move_delta_y)
    
    def move_y(self, move_to, self_point="p1"):
        """
        Move to a point only on the x direction

        Parameters
        ----------
        move_to : Point
            Where to move to.
        self_point : str, optional
            The internal point to use as reference. The default is "p1".

        Returns
        -------
        None.

        """
        assert Point.is_point(move_to), "move_to must be of type Point"
        assert self_point in self.points, "self_point must be either 'p1' or 'p2'"
        
        ref_point = Point(self.points["p1"].x, move_to.y)
        
        self.move_xy(move_to, ref_point)
    
    def scale_x(self, percentage_scale):
        """
        Helper function to scale this object by a percentage

        Parameters
        ----------
        percentage_scale : float
            The percentage to scale the object by.

        Returns
        -------
        None.

        """
        # This is the amount each point should be moved
        point_offset = (self.get_width() * percentage_scale) / 2
        
        P1 = self.points["p1"]
        P2 = self.points["p2"]
        
        P1.x = P1.x + point_offset if P1.x < P2.x else P1.x - point_offset
        P2.x = P2.x + point_offset if P2.x < P1.x else P2.x - point_offset
        
        self.points["p1"] = P1
        self.points["p2"] = P2
        
    def scale_y(self, percentage_scale):
        """
        Helper function to scale this object by a percentage

        Parameters
        ----------
        percentage_scale : float
            The percentage to scale the object by.

        Returns
        -------
        None.

        """
        # This is the amount each point should be moved
        point_offset = (self.get_height() * percentage_scale) / 2
        
        P1 = self.points["p1"]
        P2 = self.points["p2"]
        
        P1.y = P1.y + point_offset if P1.y < P2.y else P1.y - point_offset
        P2.y = P2.y + point_offset if P2.y < P1.y else P2.y - point_offset
        
        self.points["p1"].y = P1.y
        self.points["p2"].y = P2.y
    
    def move_x(self, move_to, self_point="p1"):
        """
        Move to a point only on the y direction

        Parameters
        ----------
        move_to : Point
            Where to move to.
        self_point : str, optional
            The internal point to use as reference. The default is "p1".

        Returns
        -------
        None.

        """
        assert Point.is_point(move_to), "move_to must be of type Point"
        assert self_point in self.points, "self_point must be either 'p1' or 'p2'"
        
        ref_point = Point(move_to.x, self.points["p1"].y)
        
        self.move_xy(move_to, ref_point)
    
    def get_width(self):
        """
        Helper function to get this objects width

        Returns
        -------
        float
            The width of this object.

        """
        p1X = self.points["p1"].x
        p2X = self.points["p2"].x
        
        p1X = p1X * -1 if p1X < 0 else p1X
        p2X = p2X * -1 if p2X < 0 else p2X
        
        return p1X - p2X if p1X > p2X else p2X - p1X
    
    def get_height(self):
        """
        Helper function to get this objects height

        Returns
        -------
        float
            The height of this object.

        """
        p1Y = self.points["p1"].y
        p2Y = self.points["p2"].y
        
        p1Y = p1Y * -1 if p1Y < 0 else p1Y
        p2Y = p2Y * -1 if p2Y < 0 else p2Y
        
        return p1Y - p2Y if p1Y > p2Y else p2Y - p1Y
    
    def get_midpoint(self):
        """
        Helper function to get the midpoint of this Object2D

        Returns
        -------
        Point
            The midpoint of the object.

        """
        p1 = self.points["p1"]
        p2 = self.points["p2"]
        
        return Point.midpoint(p1, p2)
    
    def get_compass_direction_edge_midpoint(self, direction=CompassDirection.South):
        """
        Helper function to get the world coordinate for one of our edges
        in the direction of the input compass direction.

        Parameters
        ----------
        direction : CompassDirection, optional
            Which edges midpoint to get world location for. 
            The default is CompassDirection.South.

        Returns
        -------
        Point
            The world location for the midpoint of an edge.

        """
        p1X = self.points["p1"].x
        p1Y = self.points["p1"].y
        
        p2X = self.points["p2"].x
        p2Y = self.points["p2"].y
        
        midpoint = Point.midpoint(self.points["p1"], self.points["p2"])
        
        westX = p1X if p1X < p2X else p2X
        eastX = p1X if p1X > p2X else p2X
        northY = p1Y if p1Y < p2Y else p2Y
        southY = p1Y if p1Y > p2Y else p2Y
        
        if(direction == CompassDirection.North):
            return Point(midpoint.x, northY)
        elif(direction == CompassDirection.East):
            return Point(eastX, midpoint.y)
        elif(direction == CompassDirection.South):
            return Point(midpoint.x, southY)
        elif(direction == CompassDirection.West):
            return Point(westX, midpoint.y)
        
    def draw_self(self, draw_object):
        
        rect_points = []
        rect_points.extend(self.points["p1"].as_list())
        rect_points.extend(self.points["p2"].as_list())
        
        try:
            draw_object.rectangle(rect_points, fill=self.fill, outline=self.outline)
        except:
            print("Couldn't draw myself :(")
        
class SidewalkSquare(Object2D):
    def __init__(self, spawn_point=ORIGIN, side_length=CUBE_EDGE_SIZE):
        P1 = spawn_point
        P2 = Point(P1.x + side_length, P1.y + side_length)
        
        super().__init__(P1, P2)

class DrivewayRectangle(Object2D):
    def __init__(
            self, 
            spawn_point=ORIGIN, 
            long_edge_len=3*CUBE_EDGE_SIZE, 
            short_edge_len=2*CUBE_EDGE_SIZE
            ):
        
        P1 = spawn_point
        P2 = Point(P1.x + short_edge_len, P1.y + long_edge_len)
        
        super().__init__(P1, P2)

def GetRoadBoundaryLine():
    """
    Helper function to get the two points describing the roads boundary    
    
    Returns
    -------
    List[Tuple(int x, int y), Tuple(int x, int y)]
        Left and right most point representing the road boundary line.

    """
    P1 = (0,ROAD_Y)
    P2 = (WORLD_EDGE_SIZE, ROAD_Y)
    
    return [P1,P2]

def GetNewPaddedLocation(not_x=None, not_y=None, boundary=0):
    """
    Parameters
    ----------
    not_x : int, optional
        Avoid this x value. The default is None.
    not_y : int, optional
        Avoid this y value. The default is None.
    boundary : int, optional
        Avoid +- this on either side of not_x or not_y

    Returns
    -------
    x : int
        A new random x-point within the buffered area.
    y : int
        A new random y-point within the buffered area.

    """
    buffered_house_area = WORLD_EDGE_SIZE - CUBE_EDGE_SIZE
    x = random.randint(CUBE_EDGE_SIZE, buffered_house_area)
    
    if not_x:
        while x < not_x + boundary and x > not_x - boundary:
            x = random.randint(CUBE_EDGE_SIZE, buffered_house_area)
            
    y = random.randint(CUBE_EDGE_SIZE, ROAD_Y)
    if not_y:
        while y < not_y + boundary and y > not_y - boundary:
            y = random.randint(CUBE_EDGE_SIZE, buffered_house_area)
            
    return Point(x,y)

def GetDoorandGarageLocations():
    """
    Helper function to get two points for the Garage and Front Door locations

    Returns
    -------
    dict
        "Garage" -- the garage point :: "Door" -- the frontdoor point.

    """
    # Start with the garage point
    GaragePoint = GetNewPaddedLocation()
    # Get the front door point, with the boundary of half a GarageRect and 
    DoorPoint = GetNewPaddedLocation(
        not_x=GaragePoint.x, 
        boundary=DrivewayRectangle().get_width() / 2
    )
    
    return {"Garage":GaragePoint,"Door":DoorPoint}

def place_north_to_south(base_rect, move_rect):
    """
    Helper function to place one objects north to the other objects south

    Parameters
    ----------
    base_rect : Object2D
        This object stays in its original position, its southern midpoint will
        be where we place the other objects northern midpoint.
    move_rect : Object2D
        This object is moved, we move it in reference to its northern midpoint
        and place it at the base_rect southern midpoint.

    Returns
    -------
    None.

    """
    # Get the two points we care about
    root = base_rect.get_compass_direction_edge_midpoint(CompassDirection.South)
    place = move_rect.get_compass_direction_edge_midpoint(CompassDirection.North)
    # Move the object as expected
    move_rect.move_xy(root, place)

def GetDrivewayPoints(garage_point, road_boundary_line):
    """
    Helper function to yield each driveway block

    Parameters
    ----------
    garage_point : Point
        Where the driveway is to start
    road_boundary_line : List[Point]
        The points that represent the road boundary

    Yields
    ------
    Object2D
        The blocks to be drawn for the driveway

    """
    road_y = road_boundary_line[0][1]
    driveway_blocks = []
    
    def scale_and_move(garage_point, road_y):
        """
        Helper function to scale a driveway block to fit the space between
        the last block and the road if it is over reaching.

        Parameters
        ----------
        garage_point : Point
            The start of the driveway.
        road_y : int
            The y position of the road.

        Returns
        -------
        None.

        """
        
        move_block = driveway_blocks[-1]
        
        # We need to set the root block and place it at the last blocks
        # southern point before proceeding
        if len(driveway_blocks) > 1:
            print("Moving North to South 1")
            root_block = driveway_blocks[-2]
            place_north_to_south(root_block, move_block)
        
        # Check if we are over the road
        over_boundary = move_block.get_compass_direction_edge_midpoint(CompassDirection.South).y > road_y
        
        # Shrink to fit if we are over the road
        if over_boundary:
            print("Over boundary")
            drive_y = move_block.get_compass_direction_edge_midpoint(CompassDirection.South).y
            shrink_perc = (drive_y - road_y) / move_block.get_height()
            move_block.scale_y(shrink_perc)
            
        # If we are over the boundary and have more than one block move north to south
        if len(driveway_blocks) > 1 and over_boundary:
            print("Moving North to South 2")
            place_north_to_south(root_block, move_block)
        # If we are over boundary on the first block move to the garage point
        elif over_boundary:
            driveway_blocks[-1].move_xy(
                garage_point,
                driveway_blocks[-1].get_compass_direction_edge_midpoint(
                    CompassDirection.North
                    )
                )
        
        return over_boundary
        
    # Create and move our first block to the start position
    driveway_blocks.append(DrivewayRectangle())
    
    driveway_blocks[-1].move_xy(
        garage_point,
        driveway_blocks[-1].get_compass_direction_edge_midpoint(
            CompassDirection.North
            )
        )
    
    # Now continue creating and placing blocks
    while not scale_and_move(garage_point, road_y):
        yield(driveway_blocks[-1])
        driveway_blocks.append(DrivewayRectangle())
        
    # Finally get the last block in the list that wouldn't yield in the
    # While loop
    yield(driveway_blocks[-1]) 

def GetFrontDoorPathwayBlocks(front_door_point, target_point):
    """
    Helper function to generate new sidewalk squares for a pathway that leads
    either to the driveway or the road depending on distance.

    Parameters
    ----------
    front_door_point : Point
        Where the center of the front door is.
    target_point : Point
        The location we want the pathway to go to.

    Yields
    ------
    Object2D
        The next sidewalk square to be rendered.

    """
    sidewalk_blocks = []
    
    def place_on_south_edge():
        """
        Helper function to spawn and place a new block on the southern edge of
        the previous block.

        Returns
        -------
        None.

        """
        sidewalk_blocks.append(SidewalkSquare())
        
        root_block = sidewalk_blocks[-2]
        move_block = sidewalk_blocks[-1]
        
        root_midpoint = root_block.get_midpoint()
        target_midpoint = Point(root_midpoint.x, root_midpoint.y + SidewalkSquare().get_height())
        move_block.move_xy(target_midpoint, move_block.get_midpoint())
        
        
    
    def place_on_lateral_edge():
        """
        Helper function to spawn and place a new sidewalk square on either the
        east or west edges of the last block based on target location.

        Returns
        -------
        None.

        """
        sidewalk_blocks.append(SidewalkSquare())
        
        root_block = sidewalk_blocks[-2]
        move_block = sidewalk_blocks[-1]
        
        root_midpoint = root_block.get_midpoint()
        
        to_right = root_block.get_midpoint().x < target_point.x
        
        offset = SidewalkSquare().get_width() if to_right else -SidewalkSquare().get_width()
        
        target_midpoint = Point(root_midpoint.x + offset, root_midpoint.y)
        move_block.move_xy(target_midpoint, move_block.get_midpoint())
        
    
    def in_edge(block, target):
        """
        Helper function to check if our target is lined up with either the 
        vertical or horizontal edges of our current block.

        Parameters
        ----------
        block : Object2D
            The object we are checking is lined up with the target.
        target : Point
            What we want the object to be lined up with.

        Returns
        -------
        vertical_in_edge : bool
            If we are lined up in the vertical direction.
        horizontal_in_edge : bool
            If we are lined up in the horizontal direction.

        """
        pm_width = block.get_width() / 2
        pm_height = block.get_height() / 2
        
        midpoint = block.get_midpoint()
        
        top = midpoint.y + pm_height
        bottom = midpoint.y - pm_height
        left = midpoint.x - pm_width
        right = midpoint.x + pm_width
        
        to_right = block.get_midpoint().x < target_point.x
        
        # Check for the in edge
        vertical_in_edge = target.y > bottom and target.y < top or target.y < top
        horizontal_in_edge = target.x < right and target.x > left
        
        # Check if we accidentally passed the edge in the horizontal direction
        if to_right:
            horizontal_in_edge = horizontal_in_edge or target.x < left
        else:
            horizontal_in_edge = horizontal_in_edge or target.x > right
        
        return (vertical_in_edge, horizontal_in_edge)
    
    def scale_final_block(horizontal):
        """
        Helper function to scale the final block in our front door pathway

        Parameters
        ----------
        horizontal : bool
            Tells the algorithm if this block is intended for a horizontal 
            scaling or a vertical one.

        Returns
        -------
        None.

        """
        root_block = sidewalk_blocks[-2] if len(sidewalk_blocks) > 1 else -1
        move_block = sidewalk_blocks[-1]
        
        # Shrink vertical aka 'not horisontal'
        if not horizontal:
            block_y = move_block.get_compass_direction_edge_midpoint(CompassDirection.South).y
            shrink_perc = (block_y - target_point.y) / move_block.get_height()
            move_block.scale_y(shrink_perc)
            if len(sidewalk_blocks) > 1:
                move_block.move_xy(
                    root_block.get_compass_direction_edge_midpoint(CompassDirection.South),
                    move_block.get_compass_direction_edge_midpoint(CompassDirection.North)
                    )
        # Scale the horizontal
        else:
            # get the direction we will need to measure and move in
            if root_block.get_midpoint().x < move_block.get_midpoint().x:
                direction = CompassDirection.East
            else:
                direction = CompassDirection.West
            
            # Get the x values to measure from
            root_x = root_block.get_compass_direction_edge_midpoint(direction).x
            target_x = target_point.x
            # Measure the gap we need to fill
            gap_width = root_x - target_x if root_x > target_x else target_x - root_x
            # Get the srink percentage
            shrink_perc = (move_block.get_width() - gap_width) / move_block.get_width()
            # Scale the block
            move_block.scale_x(shrink_perc)
            
            # Move the block into place dependent on direction
            if direction == CompassDirection.East:
                move_block.move_xy(
                    root_block.get_compass_direction_edge_midpoint(direction), 
                    move_block.get_compass_direction_edge_midpoint(CompassDirection.West)
                    )
            else:
                move_block.move_xy(
                    root_block.get_compass_direction_edge_midpoint(direction), 
                    move_block.get_compass_direction_edge_midpoint(CompassDirection.East)
                    )
            
    
    # Add the first block
    sidewalk_blocks.append(SidewalkSquare())
    
    sidewalk_blocks[-1].move_xy(
        front_door_point, 
        sidewalk_blocks[-1].get_compass_direction_edge_midpoint(
            CompassDirection.North
        )
    )
    
    horizontal = False
    # Add in the vertical direction first
    while not in_edge(sidewalk_blocks[-1], target_point)[0]:
        yield sidewalk_blocks[-1]
        place_on_south_edge()
    
    # Add in the horizontal direction second
    while not in_edge(sidewalk_blocks[-1], target_point)[1]:
        horizontal = True
        yield sidewalk_blocks[-1]
        place_on_lateral_edge()
        
    scale_final_block(horizontal)
    yield sidewalk_blocks[-1]
        
    
        
if __name__ == "__main__":
    # Generate the road boundary
    roadBoundaryLine = GetRoadBoundaryLine()
    
    # Generate some random points for the garage and door locations
    GandDPoints = GetDoorandGarageLocations()
    
    # specific variables for easier use
    garage_point = GandDPoints["Garage"]
    frontdoor_point = GandDPoints["Door"]
    
    # Which direction is the driveway from the front door
    frontdoor_left_of_driveway = frontdoor_point.x < garage_point.x
    
    dw_center_offset = DrivewayRectangle().get_width() / 2
    
    # Get a x value for where to target the driveway
    if frontdoor_left_of_driveway:
        driveway_point_x = garage_point.x - dw_center_offset
    else:
        driveway_point_x = garage_point.x + dw_center_offset
    
    # This is to ensure the front walk comes out a certain amount before bending
    # to lead to the driveway
    block_lead = frontdoor_point.y + (3 * SidewalkSquare().get_height())
    
    # Get a y value for where to target the driveway
    if garage_point.y  > frontdoor_point.y:
        driveway_point_y = garage_point.y + SidewalkSquare().get_height() / 2
    else:
        driveway_point_y = block_lead
    
    # Create the points
    fpt_driveway_point = Point(driveway_point_x, driveway_point_y)
    fpt_road_point = Point(frontdoor_point.x, roadBoundaryLine[0][1])
    
    # Measure the distance to both
    dist_fp_dw = sum(Point.distance(frontdoor_point, fpt_driveway_point))
    dist_fp_road = sum(Point.distance(frontdoor_point, fpt_road_point))
    
    # Choose which to target
    if dist_fp_dw < dist_fp_road:
        print("Targetting Driveway")
        fpt = fpt_driveway_point
    else:
        print("Targetting Road")
        fpt = fpt_road_point
    
    images = []
    dws = []
    sss = []
    
    def draw_items(d):
        """
        Helper function to draw items in each frame

        Parameters
        ----------
        d : draw object
            What to use to draw

        Returns
        -------
        None.

        """
        d.line(roadBoundaryLine)
        
        d.point(garage_point.as_list())
        d.point(frontdoor_point.as_list())
        
        for dw in dws:
            dw.draw_self(d)
        
        for ss in sss:
            ss.draw_self(d)
    
    
    
    # Get all the driveway blocks and draw them
    for dw in GetDrivewayPoints(garage_point, roadBoundaryLine):
        out = Image.new("RGBA", (WORLD_EDGE_SIZE,WORLD_EDGE_SIZE))
        d = ImageDraw.Draw(out)
        dws.append(dw)
        draw_items(d)
        images.append(out)
    
    # Get all the front pathway blocks and draw them
    for fp in GetFrontDoorPathwayBlocks(frontdoor_point, fpt):
        out = Image.new("RGBA", (WORLD_EDGE_SIZE,WORLD_EDGE_SIZE))
        d = ImageDraw.Draw(out)
        sss.append(fp)
        draw_items(d)
        images.append(out)
    
    # Save our GIF
    images[0].save(
        'house_paths.gif', 
        save_all=True, 
        append_images=images[1:], 
        optimize=True, 
        duration=100,
        loop=0
        )
    
    

Some examples of the output (small squares are the front pathway)

The theoretical code got away from me in time, I didn’t intend for this to take as long as it did, which is why there isn’t any in-game demo. However it did give me interesting problems that I might not have figured out so quickly in Unity.

First we need to be aware of which direction to scale the blocks at the end, for the driveway this is easy, it is only going to be in the vertical direction and determined by the roads y position. For the front door pathway it is a bit more tricky because you have to know which direction the path is headed in order to scale and adjust its location properly.

Second, determination of what to target for the front door path is difficult in its own ways, such as measuring the distance to an arbitrary point on the drive way and which side of the path the driveway is on.

Conclusion

With the logic worked out(mostly there are still some hidden bugs I will have to work out), I am ready to begin porting this to Unity this next week which will bring with it many more houses. I look forward to the next post and the NEW YEAR happy New Year everyone!