Communicating with hubs that run official LEGO firmware

Communicating with hubs that run official LEGO firmware

This project shows how you can control the Duplo Hub (or any hub running the official firmware), from a hub that runs Pybricks.


LEGO Wireless Protocol 3.0 (LWP3)

Out of the box, most LEGO hubs support the LEGO Wireless Protocol 3.0 (LWP3). Pybricks has a builtin LWP3Device class that lets a hub running Pybricks connect to another hub that runs the standard firmware.

This is particularly useful if you want to control a hub that is not directly supported by Pybricks, or if you don’t want to install Pybricks on all of your hubs.

In this example, we’ll show you how it works for the Duplo Hub. You can adapt the example to send and receive other commands or control other hubs, by following the LWP3 documentation.

The Duplo train class

The code for this example is split into two parts. First, create a new file called duplo.py with the following contents.

It provides a class for connecting to the Duplo Hub and sending commands to it.

from pybricks.iodevices import LWP3Device
from pybricks.parameters import Color
from pybricks.tools import wait

# Device identifier for the Duplo Hub.
DUPLO_TRAIN_ID = 0x20

# Mapping that converts colors to LEGO color identifiers.
COLORS = {
    Color.NONE: 0,
    Color.MAGENTA: 2,
    Color.BLUE: 3,
    Color.GREEN: 6,
    Color.YELLOW: 7,
    Color.ORANGE: 8,
    Color.RED: 9,
}

class DuploTrain():
    """Class to connect to the Duplo train and send commands to it."""

    def __init__(self):
        """Scans for a train, connect, and prepare it to receive commands."""
        print("Searching for the train. Make sure it is on.")
        self.device = LWP3Device(DUPLO_TRAIN_ID, name=None, timeout=10000)
        self.device.write(bytes([0x0a, 0x00, 0x41, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, 0x01]))
        print("Connected!")
        wait(500)

    def choo_choo(self):
        """Plays the choo choo sound."""
        self.device.write(bytes([0x08, 0x00, 0x81, 0x01, 0x11, 0x51, 0x01, 0x09]))

    def light(self, color):
        """Turns on the train light at the requested color."""
        if color not in COLORS:
            return
        self.device.write(bytes([0x08, 0x00, 0x81, 0x11, 0x11, 0x51, 0x00, COLORS[color]]))

    def drive(self, power):
        """Drives at a given "power" level between -100 and 100."""
        power = max(-100, min(power, 100))
        if power < 0:
            power += 256
        self.device.write(bytes([0x08, 0x00, 0x81, 0x00, 0x01, 0x51, 0x00, power]))

The main program

Next, create another program called train_driver.py with the following contents.

This program imports the DuploTrain class we just made to interface with the Duplo Hub in a simple way. Instantiating the train object will set up the connection.

You can use the choo_choo method to make a sound, the light method to set the train front light color, and the drive method to drive at a selected speed (-100 to 100).

To try this project, run this program and turn on the train. The train should make a connection sound and you’re ready to go.

from pybricks.hubs import EssentialHub
from pybricks.pupdevices import Motor, ColorSensor
from pybricks.parameters import Color, Port
from pybricks.tools import wait

# This imports the DuploTrain from the duplo.py file.
from duplo import DuploTrain

# Initialize the hub and devices. You can use any other hub too.
hub = EssentialHub()
dial = Motor(Port.A)
sensor = ColorSensor(Port.B)

# Connect to the train.
train = DuploTrain()

# These variables are used to monitor the angle and color state.
last_angle = 0
last_color = Color.BLACK

while True:
    # If the measured color changed, play choo choo
    # and set the hub and train light to match.
    color = sensor.color()
    if last_color != color:
        last_color = color
        if color != Color.NONE:
            train.choo_choo()
        hub.light.on(color)
        train.light(color)
        
    # Read the angle and discard low values.
    angle = dial.angle()
    if abs(angle) < 25:
        angle = 0

    # Skip updating on small changes to reduce traffic.
    if abs(last_angle - angle) < 10:
        wait(10)
        continue

    # Send new speed.
    last_angle = angle
    train.drive(angle)


This project was submitted by The Pybricks Authors.