Thread Examples

  • Import the necessary python libraries and modules.

from agribot import Agribot, AgribotToken
import threading, sys

  • Define a custom handler, CustomHandler, with methods:

    • add_job: Add the next process.

    • try_next_job: Run the following process.

class MyHandler:
    def __init__(self, bot):
        # Store "W", "A", "S", "D", "L", "T" in a queue
        self.queue = []
        # Maintain a flag that lets us know if the bot is
        # ready for more commands.
        self.busy = True
        self.bot = bot
        self.led_status = False  # Default status of LED is off

    def add_job(self, direction):
        d = direction.capitalize()
        if d in ["W", "A", "S", "D", "L","T"]:
            self.queue.append(d)
            self.bot.read_status()

    def try_next_job(self):
        if (len(self.queue) > 0) and (not self.busy):
            command = self.queue.pop(0)
            print("sending " + command)
            self.busy = True
            if command == "W":
                return self.bot.move_relative(10, 0, 0)
            if command == "A":
                return self.bot.move_relative(0, -10, 0)
            if command == "S":
                return self.bot.move_relative(-10, 0, 0)
            if command == "D":
                return self.bot.move_relative(0, 10, 0)
            if command == "L":
                self.toggle_led()
            if command == "T":
                self.bot.take_and_download_photo()


    # Function to toggle LED
    def toggle_led(self):
        if self.led_status:
            self.bot.write_pin(7, 0)  # If LED is on, switch it off
            self.led_status = False
        else:
            self.bot.write_pin(7, 1)  # If LED is off, switch it on
            self.led_status = True

    def on_connect(self, bot, mqtt_client):
        self.bot.read_status()
        pass

    def on_change(self, bot, state):
        is_busy = state['informational_settings']['busy']
        if is_busy != self.busy:
            if is_busy:
                print("Device is busy")
            else:
                print("Device is idle")

        self.busy = is_busy
        self.try_next_job()

    def on_log(self, _bot, log):
        print("LOG: " + log['message'])

    def on_response(self, _bot, _response):
        pass

    def on_error(self, _bot, response):
        print("ERROR: " + response.id)
        print("Reason(s) for failure: " + str(response.errors))

  • Connect the Agribot instance with the CustomHandler instance, resulting in the execution of the defined methods on respective triggers.