My beloved blog post still sits on the throne as the most effective format for engineering projects. To me, Inlining code, photographs, CAD models and schematics in an interactive way trumps other mediums. This level of interactivity closes the gap between reader and material, allowing an independent relationship with the subject outside of the story being told by the author.
Working on stream to an online audience has a similar effect, the unedited and interactive format yielding a real understanding of the creators process and technique.
For a while there, I’d settled into a nice habit of broadcasting project development live on twitch. Two moves later, things have settled down enough in my personal life that I feel it’s time to try to get back into this habit.
Before we get started again, I took some time to improve the ergonomics (both hardware and software) of my stream setup. The following documents a few smaller projects, all in service of these upcoming broadcasts.
Ever wanted to have multiple different sound files playing on different output devices attached to a host computer? Say you’re writing a DJing application where you want one mix for headphones and one for the speakers. Or you’re doing some sort of kiosk or art installation where you have many sets of speakers that need to all be playing their own sound file but the whole thing needs to be synchronized. This would even be cool for something like an escape room.
The ladder example is where I needed this bit of code. I’ve been working with interdisciplinary artist Sara Dittrich on a few projects recently and she asked if I could come up with a way to play 8 different mono sound files on 8 different loudspeakers. Here’s a video of the whole setup in action, and an explanation of the project:
I’ve wrapped up all of the code for the art installation project, and that can be found in a github repo here. It includes the startup functionality etc. If you’re interested in recreating the video above, that repo would be a good starting place. The following is a list of the parts used to make that build happen:
It is worth it to give a simple example of how to play multiple files on multiple audio devices using python. I couldn’t find an examples on how to do this online and had to spend some time experimenting to make it all come together. Hopefully this saves you the trouble.
To install sounddevice on my Raspberry Pi, I had to run the following commands:
The code is based on the sounddevice library for python, whose documentation is pretty sparse. This script will find the audio files, and then play them on as many devices as there are attached. For example, if you have 3 sound devices it will play 1.wav, 2.wav and 3.wav on devices 1-3. If you have any questions, feel free to ask:
"""
multi.py, uses the sounddevice library to play multiple audio files to multiple output devices at the same time
Written by Devon Bray (dev@esologic.com)
"""
import sounddevice
import soundfile
import threading
import os
DATA_TYPE = "float32"
def load_sound_file_into_memory(path):
"""
Get the in-memory version of a given path to a wav file
:param path: wav file to be loaded
:return: audio_data, a 2D numpy array
"""
audio_data, _ = soundfile.read(path, dtype=DATA_TYPE)
return audio_data
def get_device_number_if_usb_soundcard(index_info):
"""
Given a device dict, return True if the device is one of our USB sound cards and False if otherwise
:param index_info: a device info dict from PyAudio.
:return: True if usb sound card, False if otherwise
"""
index, info = index_info
if "USB Audio Device" in info["name"]:
return index
return False
def play_wav_on_index(audio_data, stream_object):
"""
Play an audio file given as the result of `load_sound_file_into_memory`
:param audio_data: A two-dimensional NumPy array
:param stream_object: a sounddevice.OutputStream object that will immediately start playing any data written to it.
:return: None, returns when the data has all been consumed
"""
stream_object.write(audio_data)
def create_running_output_stream(index):
"""
Create an sounddevice.OutputStream that writes to the device specified by index that is ready to be written to.
You can immediately call `write` on this object with data and it will play on the device.
:param index: the device index of the audio device to write to
:return: a started sounddevice.OutputStream object ready to be written to
"""
output = sounddevice.OutputStream(
device=index,
dtype=DATA_TYPE
)
output.start()
return output
if __name__ == "__main__":
def good_filepath(path):
"""
Macro for returning false if the file is not a non-hidden wav file
:param path: path to the file
:return: true if a non-hidden wav, false if not a wav or hidden
"""
return str(path).endswith(".wav") and (not str(path).startswith("."))
cwd = os.getcwd()
sound_file_paths = [
os.path.join(cwd, path) for path in sorted(filter(lambda path: good_filepath(path), os.listdir(cwd)))
]
print("Discovered the following .wav files:", sound_file_paths)
files = [load_sound_file_into_memory(path) for path in sound_file_paths]
print("Files loaded into memory, Looking for USB devices.")
usb_sound_card_indices = list(filter(lambda x: x is not False,
map(get_device_number_if_usb_soundcard,
[index_info for index_info in enumerate(sounddevice.query_devices())])))
print("Discovered the following usb sound devices", usb_sound_card_indices)
streams = [create_running_output_stream(index) for index in usb_sound_card_indices]
running = True
if not len(streams) > 0:
running = False
print("No audio devices found, stopping")
if not len(files) > 0:
running = False
print("No sound files found, stopping")
while running:
print("Playing files")
threads = [threading.Thread(target=play_wav_on_index, args=[file_path, stream])
for file_path, stream in zip(files, streams)]
try:
for thread in threads:
thread.start()
for thread, device_index in zip(threads, usb_sound_card_indices):
print("Waiting for device", device_index, "to finish")
thread.join()
except KeyboardInterrupt:
running = False
print("Stopping stream")
for stream in streams:
stream.abort(ignore_errors=True)
stream.close()
print("Streams stopped")
print("Bye.")
After the pi boots, you can run screen -list to see what screens are available to attach to then attach to yours with screen -r yourscreen. Here’s an example:
Press enter, and then see your terminal output. For more info on how to use screen, check out this link:
In working on an project for work, I have figured out the hard way that Electron has to be started from a terminal session on your target device (ie the computer it is to be viewed on). I am developing an embedded system based on the Raspberry Pi that does not take user input but displays information on a screen.
Upon downloading the electron-quick-start example, everything installs correctly without error and can be done remotely via SSH. Upon running with npm start, the following error is thrown.
> electron-quick-start@1.0.0 start /home/pi/electron-quick-start
> electron .
npm ERR! code ELIFECYCLE
npm ERR! errno 1
npm ERR! electron-quick-start@1.0.0 start: `electron .`
npm ERR! Exit status 1
npm ERR!
npm ERR! Failed at the electron-quick-start@1.0.0 start script.
npm ERR! This is probably not a problem with npm. There is likely additional logging output above.
npm ERR! A complete log of this run can be found in:
npm ERR! /home/pi/.npm/_logs/2017-10-04T15_11_16_398Z-debug.log
I spent most of the evening trying to debug npm ERR! code ELIFECYCLE to no avail. On a lark, I connected a keyboard to the device and ran npm start and it ran without error. Sigh.
The remote development alternative for doing this is to use Remote Desktop Connection a client comes bundled in with windows. The software can be installed on the remote system (the Raspberry Pi) using apt-get install xrdp. Upon connecting, opening up a shell in the RDP client, and running npm start, the example application works just fine.
I’m constantly loosing the remote for my RGB LED strip lights, and I have a few days for spring break, time to get hacking. Here’s a demo and explanation video:
I don’t mention it in the video, but the cool part of this project is how the different processes communicate with each other. Rather than interacting with the different processes through pipes, or something like stdin, I’ve decided to use a TCP websocket server:
Processes on the device send RGB values to the Strip Server via a TCP packet. This very very easy to implement, and almost all of the hard work is taken care of via the socketserver module included in python3. This also allows for interactions with that main process (the StripPi Server process) to take place off of the Raspberry Pi as well. I plan on writing an Alexa integration for this project moving forward, and this should make that a lot easier.
The analog to digital conversion is handled by an MCP3008, exactly the same way as I did it here.
This was one of those rare times where I had a hunch, followed it, and had a great result.
So for a project I’m working on for school, we have a robot with multiple composite video cameras onboard. We will be using those cameras seen on DIY drones or in simple security systems. We will be transmitting this video feed via a 5.8GHz video transmitter meant for a drone. We want the operator to be able to switch which feed they’re viewing at a given time, but we don’t want to have to use 3 transmitters and receivers. So to get around this, I thought we might just connect video feeds to a simple analog multiplexer I had laying around from a previous project and see if you could switch the feed that way. Turns out, you totally can. Here’s the eventual block diagram of this part of our project if you’re interested:
The following is the code running on the arduino. Remember, this isn’t doing anything special other than driving the mux:
#define NUMSELECTS 4
int s0 = 2;
int s1 = 3;
int s2 = 4;
int s3 = 5;
int selects[NUMSELECTS] = {s0, s1, s2, s3};
int select_state[NUMSELECTS] = {0, 0, 0, 0};
void setup()
{
Serial.begin(9600);
for (int index = 0; index < NUMSELECTS; index++)
{
pinMode(selects[index], OUTPUT);
digitalWrite(selects[index], select_state[index]);
}
}
void loop()
{
if (Serial.available() > 0)
{
char inchar = Serial.read(); //assigns one byte (as serial.read()'s only input one byte at a time
switch(inchar)
{
case '0':
Serial.println("Switching to video signal 0");
select_state[0] = 0;
select_state[1] = 0;
select_state[2] = 0;
select_state[3] = 0;
write_selects();
break;
case '1':
Serial.println("Switching to video signal 1");
select_state[0] = 1;
select_state[1] = 0;
select_state[2] = 0;
select_state[3] = 0;
write_selects();
break;
default:
Serial.println("Bad input");
break;
}
}
}
void write_selects()
{
for (int index = 0; index < NUMSELECTS; index++)
{
digitalWrite(selects[index], select_state[index]);
}
}
Trying to get the most out of a day has been big theme of my life lately, as I’m sure it is for many people. I’ve found that I always manage my time better when things are urgent; I’m considerably more productive when I have to be.
I want an ascetically pleasing way to be able to represent how much time is left in the day at a granular scale, like an hourglass. Watching individual seconds disappear will look cool and (hopefully) create that sense of urgency that I want to induce.
Technically, this is a really simple thing to accomplish thanks to python and pygame. Here’s a video of a proof of concept running on my laptop:
At the start of each day, the display is filled with squares at random locations, with a random color. As each second elapses, a square will vanish.
To make it easier to see for the video, I’ve made the squares much bigger than they will actually be for the final build. This is what the display looks like with the squares at their actual size:
The code really really simple, like less than 100 lines simple. Here’s how it works:
Here’s the version of the code running on my computer in the video:
import pygame
from random import randint
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
class random_square(object):
def __init__(self, max_x_location, max_y_location):
self.x_loc = randint(0, max_x_location)
self.y_loc = randint(0, max_y_location)
max_color_value = 255
red = randint(0, max_color_value)
green = randint(0, max_color_value)
blue = randint(0, max_color_value)
self.color = [red, green, blue]
class clock(object):
def __init__(self, initial_count, max_count, screen_w, screen_h):
self.max_count = max_count
self.screen_w = screen_w
self.screen_h = screen_h
# create the screen object, force pygame fullscreen mode
self.screen = pygame.display.set_mode([screen_w, screen_h], pygame.FULLSCREEN)
# the screen's width in pixels is stored in the 0th element of the array
self.square_size = screen_w / 200
# create the list of squares, initially as empty
self.squares = []
# fill the squares with the inital seconds until midnight
for second in range(initial_count):
self.squares.append(random_square(screen_w, screen_h))
# starts ticking the clock
def start(self):
scheduler = BlockingScheduler()
scheduler.add_job(self.tick, 'interval', seconds=1)
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
# this occurs once every time a unit of time elapses
def tick(self):
# this will happen once per "day"
if len(self.squares) == 0:
# fill the list of squares to be drawn
for second in range(self.max_count):
self.squares.append(random_square(self.screen_w, self.screen_h))
# draw a blank screen
self.screen.fill([0, 0, 0])
# draw the squares
for square in self.squares:
rect = (square.x_loc, square.y_loc, self.square_size, self.square_size)
pygame.draw.rect(self.screen, square.color, rect, 0)
pygame.display.update()
# remove a single square from the list as one tick has elapsed
self.squares.pop()
# initialize pygame
pygame.init()
# figure out the parameters of the display we're connected to
screen_width = pygame.display.Info().current_w
screen_height = pygame.display.Info().current_h
screen_size = screen_width, screen_height
# determine the number of seconds until midnight
seconds_in_a_day = 86400
now = datetime.datetime.now()
midnight = now.replace(hour=0, minute=0, second=0, microsecond=0)
seconds_until_midnight = seconds_in_a_day - (now - midnight).seconds
# create and start the clock!
cl = clock(seconds_until_midnight, seconds_in_a_day, screen_width, screen_height)
cl.start()
Let’s walk through some of the design decisions of this code. The first thing that’s worth talking about is how the data for the squares is handled:
class random_square(object):
def __init__(self, max_x_location, max_y_location):
self.x_loc = randint(0, max_x_location)
self.y_loc = randint(0, max_y_location)
max_color_value = 255
red = randint(0, max_color_value)
green = randint(0, max_color_value)
blue = randint(0, max_color_value)
self.color = [red, green, blue]
It’s just an object with no methods, and on initialization, all the parameters of the square (location and color) are generated randomly as opposed to just floating the raw numbers in arrays around (even though that’s basically what is happening). This let’s us fill the squares array very easily later on in the file here:
# fill the squares with the inital seconds until midnight
for second in range(initial_count):
self.squares.append(random_square(screen_w, screen_h))
and here:
# this will happen once per "day"
if len(self.squares) == 0:
# fill the list of squares to be drawn
for second in range(self.max_count):
self.squares.append(random_square(self.screen_w, self.screen_h))
When it comes time to draw these squares, it also makes that pretty intuitive:
# draw the squares
for square in self.squares:
rect = (square.x_loc, square.y_loc, self.square_size, self.square_size)
pygame.draw.rect(self.screen, square.color, rect, 0)
Again, very simple stuff, but worth it to talk about.
I’ll be back at my place that has the Raspberry Pi and display I would like to use for this project, so more on this then.
So I’ve been working a lot in the past day in ironing out part of the night side loop (loop 3 in this diagram). Basically, it starts recording based on an input from a sensor and continues to record until these inputs stop occurring.
My test code looks like this
v1 = CameraModuleVideo("/home/pi/CreatureCapture/", "video1")
try:
v1.startRecording()
except ValueError as e:
print(e)
FilmDurationTrigger(5)
try:
v1.stopRecording()
except ValueError as e:
print(e)
The interesting functions at work here are the following:
def FilmDurationTrigger(time):
t = CameraTimer(time)
while True:
continueFlag = False
print "Filming For " + str(time) + " Seconds"
t.run()
while (t.isExpired() != True):
if (GetContinueTrigger() == True):
continueFlag = True
print "Trigger Found, Continuing"
print "Time Has Expired, Continue Flag Is Set To " + str(continueFlag)
if (continueFlag == False):
break
FilmDurationTrigger() Takes the period of time that will be filmed, in this example, it’s 5 seconds just to conserve time, but in application it will be 20 seconds. This code will pause for the input time, and continue to be paused upon inputs from GetContinueTrigger(). This delay allows the code to continue filming until there are no inputs.
In this example, GetContinueTrigger() returns a Boolean if a random event occurs, but in application it will return a Boolean based on the status of a motion detector.
def GetContinueTrigger():
z = randint(0,10000)
k = ((z == 115))
return k
I ran two tests, both of them produced separate results. The first one created a 10 second long video:
pi@raspberrypi ~/CreatureCapture $ python CreatureCaptureTest2.py
Filming For 5 Seconds
Trigger Found, Continuing
Time Has Expired, Continue Flag Is Set To True
Filming For 5 Seconds
Time Has Expired, Continue Flag Is Set To False
Terminated
And the second created a 15 second long video:
pi@raspberrypi ~/CreatureCapture $ python CreatureCaptureTest2.py
Filming For 5 Seconds
Trigger Found, Continuing
Trigger Found, Continuing
Trigger Found, Continuing
Trigger Found, Continuing
Time Has Expired, Continue Flag Is Set To True
Filming For 5 Seconds
Trigger Found, Continuing
Time Has Expired, Continue Flag Is Set To True
Filming For 5 Seconds
Time Has Expired, Continue Flag Is Set To False
Terminated
These two test shows that variable capture length functionality works! As a note, the actual times on the output video varies from the amount of time that it’s designed to record for. This is because the variable frame rate nature of the video coming out of the camera module, it causes the videos to come out a little short, but they still contain all the frames of the amount of time desired to record, just scaled slightly by frame rate error.
One of the biggest problems with the built in commands for using the Raspberry Pi Camera module is that you can’t stop a recording after an unknown time. You can record for a given number of seconds and that’s it. I have attempted to solve this problem by backgrounding the initial record process with a time of 27777.8 hours (99999999 seconds) when it’s time to stop recording, the process is manually killed using pkill.
Here is a test of my code, which I’ve called CameraModulePlus (written in python) which takes two videos, one for five seconds, and one for ten seconds, with a 10 second delay in between.
from CameraModulePlus import CameraModuleVideo
import subprocess
from time import sleep
import time
v1 = CameraModuleVideo("/home/pi/CreatureCapture/", "video1")
v2 = CameraModuleVideo("/home/pi/CreatureCapture/", "video2")
try:
v1.startRecording()
time.sleep(5)
v1.stopRecording()
time.sleep(10)
v2.startRecording()
time.sleep(10)
v2.stopRecording()
except ValueError as e:
print(e)
Here is a result of the 5 second duration test:
Here is a result of the 10 second duration test:
As you can see, it works pretty good for how barbaric it is. The full class for CameraModuleVideo can be found here. In the future, I’d like to encode a lot more data into the CameraModuleVideo class, things about time etc. Also I would like to monitor available space on the device to make sure there is enough space to record.
I’ve decided to embark on a video surveillance project! My family lives in a very rural part of the US, and constantly hear and see evidence of animals going crazy outside of my home at night. The goal of this project is to hopefully provide some kind of insight as to what animals actually live in my backyard.
Ideally, I want to monitor the yard using some kind if infrared motion detector. Upon a motion detection, an IR camera assisted by some IR spotlights would begin filming until it has been determined that there isn’t any more movement going on in yard. These clips would then be filed into a directory, and at the end of the night, they would be compiled and uploaded to YouTube. This video would then be sent to the user via email.
I’ve created the following flowchart to develop against as I begin implementing this idea.
I’ll be using a Raspberry Pi to implement this idea, a few months back I bought the IR camera module and haven’t used it for anything, this would be a good project to test it out.
There are a few hurtles that I’ll have to cross in order to make this project a success, like most groups of problems I deal with, they can be separated into hardware and software components.
Hardware
Minimize false positives by strategically arranging motion detectors
Make sure IR Spotlights are powerful enough to illuminate area
Enclosure must be weatherproof & blend in with environment, Maine winters are brutal.
Software
The Pi doesn’t have any built in software to take undetermined lengths of video.
Must have a lot of error catching and other good OO concepts in order to ensure a long runtime.
I’ve actually come up with a routine for solving the first software problem I’ve listed, hopefully I’ll have an example of my solution in action later tonight.
Ideally, this project will have a working implementation completed by May 21, which is 7 days from now.