Scheduling Examples
Import the necessary python libraries and modules.
from agribot import Agribot, StubHandler
import threading
import schedule
import time
import sys
Define a custom handler, CustomHandler, with methods:
on_connect: Prints a connection confirmation.
on_response: Stays idle on bot response.
on_change: Prints bot’s current position and disconnects the bot.
on_log: Prints the bot’s log status.
on_error: Prints the bot’s error status.
perform_task: Watering or LED control by moving position according to time.
class CustomHandler(StubHandler):
def __init__(self,bot):
super().__init__()
# 이동할 지점들의 좌표 리스트
self.target_positions = [
(300, 150, 0),
(300, 300, 0),
(300, 450, 0),
(550, 450, 0),
(550, 300, 0),
(550, 150, 0),
(800, 150, 0),
(800, 300, 0),
(800, 450, 0),
(1050, 450, 0),
(1050, 300, 0),
(1050, 150, 0),
]
self.remaining_targets = len(self.target_positions)
self.task_event = threading.Event()
self.bot=bot
def on_connect(self, bot, client):
print("Connected to Agribot")
self.start_task()
def on_change(self, bot, state):
pass
def on_log(self, bot, log):
self.log = log
print(self.log, "\n")
def on_error(self, bot, response):
print("Received error response")
print(response.errors)
def on_response(self, bot, response):
pass
def perform_task(self):
distance_threshold = 5 # 오차 범위(거리 기준) 설정 (예: 5mm)
# 작업 시작 멘트
print("Starting task...")
for position in self.target_positions:
print(f"Moving to location {position}... ({self.remaining_targets} targets remaining)")
self.move_to_position(position, distance_threshold)
print("Watering...")
self.water_plants()
print(f"Watering completed. ({self.remaining_targets} targets remaining)")
self.remaining_targets -= 1
# 홈으로 이동
self.bot.find_home()
time.sleep(2)
# 홈 위치로 이동 후 제대로 도착했는지 확인
while True:
current_position = self.bot.position()
x_diff = abs(current_position[0])
y_diff = abs(current_position[1])
z_diff = abs(current_position[2])
if x_diff < distance_threshold and y_diff < distance_threshold and z_diff < distance_threshold:
break
time.sleep(1) # 1초마다 위치 확인
print("Moved to home position")
# 작업 완료 멘트
print("Task completed.")
# 연결 종료
self.bot.disconnect()
def move_to_position(self, position, distance_threshold):
self.bot.move_absolute(position[0], position[1], position[2])
# 위치 도달까지 대기
while True:
current_position = self.bot.position()
x_diff = abs(current_position[0] - position[0])
y_diff = abs(current_position[1] - position[1])
z_diff = abs(current_position[2] - position[2])
if x_diff < distance_threshold and y_diff < distance_threshold and z_diff < distance_threshold:
break
time.sleep(1) # 1초마다 위치 확인
print(f"Arrived at location {position}")
# Z 방향으로 200씩 내려가기
self.bot.move_relative(x=0, y=0, z=-200)
# 내려간 후 제대로 이동했는지 확인
while True:
current_position = self.bot.position()
z_diff = abs(current_position[2] - (position[2] - 200))
if z_diff < distance_threshold:
break
time.sleep(1) # 1초마다 위치 확인
print(f"Moved down successfully. ({self.remaining_targets} targets remaining)")
def water_plants(self):
self.bot.write_pin(8, 1) # water_valve 장치 활성화
self.wait_for_log_message("water_valve", "ON")
self.bot.write_pin(10, 1) # water_pump 장치 활성화
self.wait_for_log_message("water_pump", "ON")
self.task_event.wait(timeout=2) # 펌프 동작을 위해 2초 대기
self.bot.write_pin(10, 0) # water_pump 장치 비활성화
self.wait_for_log_message("water_pump", "OFF")
self.bot.write_pin(8, 0) # water_valve 장치 비활성화
self.wait_for_log_message("water_pump", "OFF")
print(f"Watering completed. ({self.remaining_targets} targets remaining)")
self.task_event.wait(timeout=2) # 밸브 비활성화 후 2초 대기
def wait_for_log_message(self, keyword1, keyword2):
while True:
message = self.log.get("message", "")
if keyword1 in message and keyword2 in message:
break
self.task_event.wait(timeout=1)
def turn_led(self, on):
self.bot.write_pin(7, int(on)) # LED를 켜는 작업 또는 끄는 작업
# LED 상태에 따른 멘트
if on:
print("LED turned on.")
else:
print("LED turned off.")
self.bot.disconnect()
def start_task(self):
if self.watering:
thread = threading.Thread(target=self.perform_task)
thread.start()
elif self.led_on is not None:
self.turn_led(self.led_on)
Connect the Agribot instance with the CustomHandler instance, resulting in the execution of the defined methods on respective triggers.
def select_task(watering=False, led=False):
bot = Agribot.login("zaxrok@gmail.com", "zeta@1234!")
handler = CustomHandler(bot)
handler.watering=watering
handler.led_on=led
bot.connect(handler)
def schedule_task():
print("schedule start")
# 오전 5시에 작업 시작
schedule.every().day.at("05:00").do(select_task, watering=True, led=False)
# 오전 8시에 작업 시작
schedule.every().day.at("08:00").do(select_task, watering=True, led=False)
# 오후 5시에 작업 시작
schedule.every().day.at("17:00").do(select_task, watering=True, led=False)
# 오후 8시에 LED 제어
schedule.every().day.at("20:00").do(select_task, watering=False, led=True)
# 오후 9시에 LED 제어
schedule.every().day.at("21:00").do(select_task, watering=False, led=False)
# 오후 11시에 LED 제어
schedule.every().day.at("23:00").do(select_task, watering=False, led=True)
# 오전 12시에 LED 제어
schedule.every().day.at("00:00").do(select_task, watering=False, led=False)
# 오전 2시에 LED 제어
schedule.every().day.at("02:00").do(select_task, watering=False, led=True)
# 오전 3시에 LED 제어
schedule.every().day.at("03:00").do(select_task, watering=False, led=False)
# 스케줄링 실행
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
sys.exit(0)
schedule_task()