بازوی رباتیک: نرم افزار کنترل کننده پیام

دستکاری بازوی رباتی از اتصالات و موتورهای سروو تشکیل شده است. به یک پایه ثابت متصل است و با دریافت فرمان، اقدامات حرکتی را انجام می دهد. حرکت یک فرآیند دو مرحله ای است: اول، بازو باید یک فرمان را دریافت کند، و دوم باید این دستور را به اقدامات هماهنگی تبدیل کند که مفاصل را بر این اساس حرکت دهد. این مقاله بر روی سوال اول تمرکز دارد: چگونه دستورات را به طور موثر به بازو ارسال کنیم؟
در آخرین مقاله، من یک کیت بازوی رباتیک 4 درجه آزادی را برای Raspberry Pi مونتاژ کردم. این کیت کلاه Raspberry Pi Motor را ارائه میکند که UART، I2C، ورودیهای آنالوگ و پینهای کافی برای کنترل 3 استپر و 15 موتور سروو را در معرض دید قرار میدهد. کیت یک مجموعه کنترل کامل را در مخزن رسمی ارائه می دهد، اما من می خواستم نرم افزار خود را برای کنترل بازو ایجاد کنم.
این مقاله پیاده سازی گام به گام پروتکل اتصال سرویس گیرنده-سرور بر اساس سوکت های پایتون و TCP را ارائه می دهد. به طور خاص، چندین مشتری میتوانند به سرور متصل شوند، پیامهایی را بهعنوان یک هدر و بسته داده ارسال کنند و تا زمانی که لازم است اتصال را باز بگذارند. سرور مسئول دریافت اتصالات و پیام های کلاینت، تجزیه بایت ها به اشیاء پایتون و پردازش آنها به عنوان عمل حرکتی خواهد بود.
این مقاله در ابتدا در وبلاگ من admantium.com ظاهر شد.
بررسی کیت بازو
مخزن رسمی شامل راه اندازی، کد نمونه و برنامه هایی برای کنترل از راه دور بازو است. این دو رابط را برای کاربر نهایی فراهم می کند: یک برنامه رابط کاربری گرافیکی نوشته شده به زبان TK که روی Raspberry PI خود اجرا می شود، یا روی هر رایانه ای که می تواند یک سوکت اتصال به Pi برقرار کند. یا وب سروری که روی Pi شروع می شود و هر کلاینت با آدرس IP و پورت مناسب می تواند به آن متصل شود. با نگاهی به فایل های کد منبع، می توانیم این را یاد بگیریم:
- مکانیسم انتقال پیام های حرکتی یک سوکت TCP است
- سوکت TCP روی رزبری پای با یک پورت ثابت ایجاد می شود
- کلاینت TK GUI و WebServer تعاملات کاربر را می پذیرند و دستورات را به سرور سوکت TCP ارسال می کنند.
- سرور پیام را می پذیرد، محتوای پیام مشخص را تعیین می کند و به بازو دستور حرکت می دهد
- پیام ها یا دستورات ساده ای هستند
X_minus
،X_add
، یا موقعیت های مرکب تمام محورها که منجر به حرکت پیوسته می شود
ملاحظات طراحی برای کنترل از راه دور
با این دانش و تجربه قبلی هنگام طراحی نرم افزار کنترلی برای ربات متحرک RADU من، می توانیم الزامات زیر را تدوین کنیم:
- بازو باید از طریق یک رابط شبکه قابل دسترسی باشد
- این رابط را می توان در پایتون نوشت و به طور مستقیم بازو را دستکاری کرد
- دستورات حرکت باید دستورات متنی ساده باشند
- دستورات باید فورا اجرا شوند
- هنگام ادغام ROS، یک کلاس دروازه پیام را می توان اضافه کرد تا پیام های ROS را به فرمت سفارشی تبدیل کند
بنابراین، پیام ها باید به Pi ارسال شوند. در واقع، این دادههای سریال در شبکه است که جداسازی کد را فراهم میکند و ابزاری را برای پیادهسازی مکانیسمهای انتقال مختلف فراهم میکند. من از سادگی سوکت های TCP استفاده شده در مثال رسمی خوشم آمد. و بنابراین من انتخاب می کنم که همان مکانیسم را تکرار کنم.
پیاده سازی سرور
برای باز کردن سوکت TCP، پایتون داخلی socket
کتابخانه استفاده می شود. این کتابخانه همه کاره است و اجازه می دهد تا چندین نوع سوکت ایجاد شود. نوع خاص سوکت انتخابی ما است AF_INET
، که به معنی TCP با IPv4 و نوع فرعی است SOCK_STREAM
، یک جریان داده دو طرفه و پیوسته. ما همچنین پیش فرض هایی مانند IP و آدرس پورت را تعریف می کنیم.
اولین تکرار کد سرور یک شی سوکت را باز می کند، خود را به IP و پورت مشخص شده متصل می کند و سپس به ترافیک ورودی گوش می دهد. در یک حلقه پیوسته، سرور منتظر هر اتصال دریافتی است. هنگامی که یک کلاینت متصل می شود، پیامی را به ترمینال چاپ می کند، سپس پیامی را که مشتری ارسال می کند رمزگشایی و چاپ می کند.
from socket import socket, AF_INET, SOCK_STREAM
SOCKET_HOST = 'localhost'
SOCKET_PORT = 11314
BUFFER_SIZE = 2048
with socket(AF_INET, SOCK_STREAM) as server:
server.bind((SOCKET_HOST, SOCKET_PORT))
server.listen()
print(f'{repr(server)}\n >> Server started, waiting for connections')
while True:
client, addr = server.accept()
with client:
print(f'Connected from {addr}')
msg= client.recv(BUFFER_SIZE).decode()
print(f'Message: << {msg} >>')
این کد به عنوان اثبات مفهوم عمل می کند. اما مدیریت اتصال چندان مؤثر نیست: دقیقاً برای یک پیام مشتری منتظر میماند و سپس دوباره بلافاصله اتصال را قطع میکند. همچنین، پیام ها با اندازه بافر ثابت خوانده می شوند. هنگامی که پیامها بسیار کوچکتر هستند، این حافظه را هدر میدهد، و زمانی که پیامهای بیش از این حد وارد میشوند، معیوب خواهد بود. بیایید این را بهبود دهیم.
تکرار سرور 2: ارسال پیام انعطاف پذیر
خواندن داده ها از مشتری مجموعه ای از تصمیمات بسیار متفاوت را در اختیار ما قرار می دهد. اول اینکه چقدر داده می خواهیم بخوانیم؟ دوم، چند داده در یک زمان؟ با فرض اینکه پیامها فقط رشتهها هستند، میتوانیم یک کاراکتر را در یک زمان بخوانیم، همه کاراکترها را تا یک سیگنال توقف خاص (مثلاً یک نماد خط جدید) یا تمام دادهها را با اندازه بافر ثابت بخوانیم، همانطور که قبلا استفاده میکردیم.
یک گزینه بهتر این است که به مشتری اجازه دهید تصمیم بگیرد و به سرور اطلاع دهد که چه چیزی قرار است بیاید. پروتکل به این صورت است: ابتدا یک سرصفحه پیام ارسال کنید که حاوی اطلاعاتی درباره طول پیام است. و سپس، پیام را ارسال کنید. من این ایده را از این آموزش سوکت عالی دریافت کردم.
در نسخه اصلاح شده زیر ابتدا طول ثابت پیام هدر را تعریف می کنیم. سپس، پس از اتصال موفقیت آمیز، اولین پیام رمزگشایی و ذخیره می شود و سپس به مقدار صحیح تبدیل می شود. expected_length
. و به دنبال این، پیام بعدی یک کاراکتر در یک زمان خوانده می شود تا زمانی که این مقدار طول مورد انتظار برآورده شود.
HEADER_MSG_PADDING = 20
while True:
client, addr = server.accept()
with client:
print(f'Connected from {addr}')
header, msg = '', ''
expected_length, current_length = 0,0
header = client.recv(HEADER_MSG_PADDING).decode()
expected_length = int(header)
print(f'Header: <{header}>\nExpected length: {expected_length}')
while not expected_length == current_length:
char = client.recv(1).decode()
msg += char
current_length += 1
print(f'Full Messages Received\n<< {msg} >>')
print('Terminating connection..\n')
تکرار سرور 3: مدیریت اتصال مشتری چند رشته ای
تکرار نهایی دو الزام دیگر را اضافه می کند. ابتدا، سرور اتصال چند مشتری را می پذیرد و ردیابی می کند. دوم، مشتریان تصمیم می گیرند که چه زمانی می خواهند اتصال را با ارسال یک پیام اختصاصی ببندند. و سوم، سرور پیامهای دریافتی مکرر (مشت هردر، سپس بارگذاری) مشتریان متصل را مدیریت میکند.
بیایید با قسمت اول شروع کنیم. لیست مشتریان متصل یک فرهنگ لغت است. هنگامی که یک کلاینت جدید متصل می شود، اما همچنان متصل در نظر گرفته می شود، یک استثنا مطرح می شود. در غیر این صورت یک تاپیک جدید ایجاد می شود. سپس اتصال جدید و رشته در لیست اتصال ذخیره می شوند.
while True:
try:
client, addr = server.accept()
if client_list.get(addr):
raise ValueError
connection_thread = Thread(target=handle_socket_connection, args=(client,))
client_list[addr] = {'client': client, 'handler': connection_thread}
print(f'Connected from {addr}')
print('LIST', client_list)
connection_thread.start()
except Exception as e:
print(e)
متد handler یک حلقه پیوسته را شروع می کند که پیام دریافتی را بررسی می کند. اولین پیام را به عنوان هدر تجزیه می کند و طول مورد انتظار را استخراج می کند. سپس تمام داده های دریافتی را تا طول پیام مورد انتظار می خواند. اگر پیام فقط شامل رشته باشد TERMINATE
، حلقه بسته می شود که نخ را نیز متوقف می کند. در غیر این صورت، محتوای پیام مدیریت می شود، برای مثال همانطور که در اینجا برای دستور حرکت نشان داده شده است. در انتهای این حلقه، نخ در زمان خواب کوتاهی قرار می گیرد. این مهم است، در غیر این صورت تمام آن رشته ها به طور مداوم تمام CPU های موجود را مصرف می کنند.
def handle_socket_connection(client, addr, client_list):
header, msg = '', ''
expected_length = 0
is_alive = True
while is_alive:
header = client.recv(HEADER_MSG_PADDING).decode()
if header:
expected_length = int(header)
print(f'Header: <{header}>\nExpected length: {expected_length}')
msg = deserialize(client.recv(expected_length))
if match('move_', msg):
handle_move_command(msg)
elif msg == 'TERMINATE':
is_alive = False
else:
print(f'Message: \n<< {msg} >>')
sleep(DELAY_MS)
client_list.pop(addr)
کد منبع سرور کامل
سورس کد کامل سرور این است:
#!/usr/bin/python3
#
# ---------------------------------------
# Copyright (c) Sebastian Günther 2021 |
# |
# devcon@admantium.com |
# |
# Created: 2021-08-09 20:41:12 |
# ---------------------------------------
#
from time import sleep
from pickle import loads as deserialize
from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM
from re import match
SOCKET_HOST = 'localhost'
SOCKET_PORT = 11314
HEADER_MSG_PADDING = 10
DELAY_MS = 0.100
def handle_socket_connection(client, addr, client_list):
header, msg = '', ''
expected_length = 0
is_alive = True
while is_alive:
header = client.recv(HEADER_MSG_PADDING).decode()
if header:
expected_length = int(header)
print(f'Header: <{header}>\nExpected length: {expected_length}')
msg = deserialize(client.recv(expected_length))
if match('move_', msg):
handle_move_command(msg)
elif msg == 'TERMINATE':
is_alive = False
else:
print(f'Message: \n<< {msg} >>')
sleep(DELAY_MS)
client_list.pop(addr)
def handle_move_command(msg):
move_cmd = msg[5:len(msg)]
print('MOVE', move_cmd)
with socket(AF_INET, SOCK_STREAM) as server:
server.bind((SOCKET_HOST, SOCKET_PORT))
server.listen()
client_list = {}
print(f'{repr(server)}\n >> Server started, waiting for connections')
while True:
try:
client, addr = server.accept()
if client_list.get(addr):
raise ValueError
connection_thread = Thread(target=handle_socket_connection, args=(client,addr, client_list))
client_list[addr] = {'client': client, 'handler': connection_thread}
print(f'Connected from {addr}')
print('LIST', client_list)
connection_thread.start()
except Exception as e:
print(e)
پیاده سازی مشتری
کد سرور تمام شد، حالا بیایید کد مشتری را تعریف کنیم. مانند قبل، کد مشتری نیز ساده شروع می شود: اتصال به سرور، ارسال پیام و بسته شدن خود.
نسخه اول کد کلاینت بسیار شبیه به سرور است: از همان کتابخانه استفاده می کند، ثابت های یکسانی را تنظیم می کند و یک اتصال را باز می کند.
from socket import socket, AF_INET, SOCK_STREAM
SOCKET_HOST = 'localhost'
SOCKET_PORT = 11314
HEADER_MSG_PADDING = 10
def send_socket_msg(client, msg):
msg = serialize(msg)
msg_length = len(msg)
client.send(f'{msg_length:<{HEADER_MSG_PADDING}}'.encode())
client.send(msg)
with socket(AF_INET, SOCK_STREAM) as client:
client.connect((SOCKET_HOST, SOCKET_PORT))
for i in range(10):
if (i % 2 == 0):
send_socket_msg(client, 'move_x+')
else:
send_socket_msg(client, 'move_x-')
sleep(1.342)
کد مشتری: سریال کردن اشیا با Pickle
تکرار دوم نوع پیام را بهینه کرد. این هم بهینه سازی سرعت و هم بهینه سازی انعطاف پذیری پیام است. در پایتون، هر شی را می توان با کتابخانه Pickle سریال کرد. مرحله سریالسازی، شی را به بایت تبدیل میکند، که به هر حال، نوع داده اصلی برای دادههای سوکت است. بنابراین، به جای رمزگذاری صریح یک رشته، هر شی را سریال می کنیم و آن را ارسال می کنیم.
from pickle import dumps as serialize
def send_socket_msg(client, msg):
msg = serialize(msg)
msg_length = len(msg)
client.send(f'{msg_length:<{HEADER_MSG_PADDING}}'.encode())
client.send(msg)
with socket(AF_INET, SOCK_STREAM) as client:
client.connect((SOCKET_HOST, SOCKET_PORT))
send_socket_msg(client, {'MOVE_X': 10, 'MOVE_Y': 30})
یک پسوند ممکن – و برای استفاده تولیدی مورد نیاز – ارسال اطلاعات در مورد نوع پیام همراه با سربرگ است. این کار مدیریت پیام در سمت سرور را بسیار آسان تر می کند.
کد مشتری: ارسال پیام تعاملی
تکرار نهایی کد مشتری فقط اتصال را باز نمی کند، پیامی ارسال می کند و آن را می بندد. در عوض، یک خط فرمان را در ترمینال باز می کند و به کاربر اجازه می دهد هر پیامی را که به سرور ارسال می شود تایپ کند. این را می توان با input
دستور در ترمینال،
with socket(AF_INET, SOCK_STREAM) as client:
client.connect((SOCKET_HOST, SOCKET_PORT))
while True:
cmd = input(">> ")
if cmd.lower() == "stop":
break;
else:
send_socket_msg(client, cmd)
sleep(0.01)
آزمایش کردن
در Raspberry Pi، server.py
آغاز شده است.
> python3 server.py
<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 11314)>
>> Server started, waiting for connections
سپس از طریق آن به آن متصل می شویم client.py
از هر کامپیوتری در همان شبکه
> python3 client_interactive_input.py
cmd: center
پس از اتصال، کافی است هر دستوری را به زبان مشخص شده تایپ کنید، بازو مطابق آن حرکت می کند.
LIST {('127.0.0.1', 36148): {'client': <socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 11314), raddr=('127.0.0.1', 36148)>, 'handler': <Thread(Thread-1, initial)>}}
Header: <16 >
Expected length: 16
Message:
<< center >>
نتیجه
با کیت Raspberry Pi Robot ARM می توانید یک دستکاری رباتیک 4 درجه آزادی بسازید. در این مقاله آموزش گام به گام پیاده سازی نرم افزار کنترل کننده پیام ارائه شده است. در اصل، یک سرور socker در Raspberry Pi راه اندازی می شود و سپس یک کلاینت متصل می شود. برای آزمایش رویکردهای مختلف، مقاله چندین تکرار از کد سرور را نشان داد. ابتدا، پیام ها را می توان به عنوان رشته ها تجزیه کرد یا به عنوان ساختارهای داده پایتون ارزیابی کرد. دوم، سرور فقط می تواند یک اتصال را باز کند، یا چندین اتصال کلاینت را به طور همزمان مدیریت کند که در رشته ها ذخیره شده اند. سوم، پروتکل پیام را می توان با جدا کردن سرصفحه پیام و متن پیام که در آن هدر میزان بایت هایی را که بار ارسال می کند تعیین می کند، بهبود بخشید. اجرای حاصل همه کاره و قوی است. مقاله بعدی این مجموعه را ادامه می دهد و نحوه اجرای حرکات ساده و پیچیده بازو را نشان می دهد.