본문 바로가기
Programming/Capstone Design

[Capstone Design]4. 시리얼 통신(Serial Communication) - 2

by NoiB 2022. 4. 17.
반응형

오늘은 말씀드렸던 대로 조향 파라미터를 아두이노로 보내서 처리하는 것을 진행해보겠습니다. 사실 오늘 포스팅은 반쪽짜리 포스팅이라고 할 수 밖에 없는게, 벌써 해당 프로젝트를 끝을 내고 이미 다 분해해서 처리를 했기 때문에 제가 가지고 있는 것도 카메라와 아두이노 뿐이고 일체 모터나 전선은 가지고 있지 않기 때문에 에코백 정도 까지만 구현을 하고 넘어가게 될 것 같습니다. 그래도 최대한 자세히 적어보도록 노력하겠습니다.

 

일단 지난 포스팅에서 시리얼 통신에 대해, 파이썬과 아두이노로 시리얼 통신을 하는 법에 대해서 간략하게 알아봤었죠. 하지만 뭐 아무 데이터나 보낸다고 우리가 원하는 동작을 해주지는 않겠죠. 그래서 우리가 이렇게 말하면 이렇게 해주고 저렇게 말하면 저렇게 해줘 라고 하는 약속을 해야합니다. 그리고 이것을 프로토콜(통신 규약)이라고 부릅니다.

 

프로토콜(Protocol)이란 원활한 통신을 위해서 서로간에 합의한 규칙이라고 이해하시면 되겠습니다. 우리가 너무나도 많이 접해본 HTTP, HTTPS 같은 것들도 다 프로토콜의 일종이죠. 그렇다고 해서 지금 하고자 하는 것이 저렇게 거창한 것은 아니구요. 우리는 파이썬과 아두이노 1대1로 통신할 것이기 때문에 아두이노와 파이썬과 나와 내 팀원만 알도록 정해주면 되겠죠. 간단하게 한 번 짜볼까요?

 

지금부터는 하드웨어적인 요소까지는 제가 변경을 할 수 없기 때문에 제가 진행했던 프로젝트의 하드웨어 구성을 따라서만 설명을 드린다는 점 양해 부탁드리겠습니다.

 

저는 차량의 양 바퀴를 스텝모터를 이용해서 조향 및 주행을 제어했습니다. 양 바퀴가 다른 방향으로 움직인다면 전진 또는 후진, 같은 방향이나 하나만 움직인다면 좌회전 또는 우회전이 되겠죠(스텝모터가 서로 다른 방향을 바라보고 설치가 되기 때문). 물론 후진은 사실상 사용할 일이 없었습니다. 따라서 데이터를 보낼 때 '바퀴의 회전 방향을 컨트롤할 데이터 + 얼마나 회전시킬지에 대한 데이터 + 데이터 종료' 의 구조로 구성을 했습니다.

전진 우회전 좌회전
f (forward) r (right) l (left)
숫자
` (grave)

예를 들어서 써보면 r12`,... 이런식이었습니다. 반복문 안에 넣었으니 차량이 움직이고 차선인식을 또 진행하고 또 움직이고 차선인식을 진행하고 하는 방식으로 진행이 되었죠. 다만 시스템적으로 해결할 수 없어 아쉬웠던 건 차량이 진행하는 동안에는 카메라 화면이 바뀌지 않기 때문에 보는 입장에서 영상이 툭툭 끊기는 것처럼 보인다는 점이네요. 아무리 짧게 움직이도록 해도 해당 부분은 어쩔 수 없었습니다. 카메라를 하나 더 사용해서 영상을 틀어놨었다면 보는 입장에선 조금 덜 답답했을지도 모르겠네요.

def communicate(ser,steer_value):
    if steer_value == 0:
        data = deque(['f']+['5']+['`'])
    elif steer_value < 0:
        data =deque(['r']+[str(abs(steer_value))]+['`'])
    else:
        data = deque(['l']+[str(steer_value)]+['`'])
    for i in data:
        interact_ser(i,ser)
    print(ser.readline().decode())
    return 

def interact_ser(_str, _ard):
    _ard.write(_str.encode())
    if _str[-1] == '`':
        tmp = ""
        while tmp == "":
            tmp = _ard.readline()
        print(tmp.decode())
        return tmp

추가한 코드는 이렇습니다. deque를 쓴 이유는 회전수 데이터의 경우는 한글자씩 넘기면 원하는 대로 동작하지 않기 때문에 요소별로 분리하기 위해 사용했구요. 아두이노는 에코백만 확인하기 위해서 어제와 동일한 코드를 사용했습니다.

char data;
void setup(){
    Serial.begin(9600);


}

void loop(){
    if(Serial.available()){
       data =  Serial.read();
       Serial.println(data);
    }
}

지난 포스팅 때 값이 아마 기억이 안나실테니 말씀을 드리자면 조향값은 -1이 나왔습니다. 프로토콜 대로라면 -1은 r1`로 바뀌겠죠. 에코백을 한 번 볼까요?

반복문안에 들어있기 때문에 같은 데이터가 여러 개가 나왔습니다. 실제로 기계를 돌리면 모터가 움직이면서 카메라에 비춰지는 영상이 달라지기 때문에 계속해서 다른값이 전달됩니다. 약간 순서가 틀어졌는데 다음 포스팅은 웹캠을 이용해서 실시간으로 차선 인식을 진행해볼 예정이니 다음 시간에 한 번 보도록 하자구요.

import cv2
import numpy as np
import serial
from collections import deque

def grayscale(img):
    return cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)

def gaussian_blur(img, kernel_size):
    return cv2.GaussianBlur(img, (kernel_size, kernel_size), 0)

def canny(img, low_threshold, high_threshold):
    return cv2.Canny(img, low_threshold, high_threshold)

def roi(img,h,w):
    mask = np.zeros_like(img)
    vertices = np.array([[(0,h), (0,h*2/3), (w,h*2/3), (w,h)]], dtype=np.int32)
    cv2.fillPoly(mask, vertices, 255)
    roi_img = cv2.bitwise_and(img, mask)
    return roi_img

def restrict_deg(lines,min_slope,max_slope):
    slope_deg = np.rad2deg(np.arctan2(lines[:,1]-lines[:,3],lines[:,0]-lines[:,2]))
    lines = lines[np.abs(slope_deg)<max_slope]#cannot use and & index true catch
    slope_deg = slope_deg[np.abs(slope_deg)<max_slope]
    lines = lines[np.abs(slope_deg)>min_slope]
    slope_deg = slope_deg[np.abs(slope_deg)>min_slope]#where can i use slope
    return lines, slope_deg

def separate_line(lines,slope_deg):
    l_lines, r_lines = lines[(slope_deg>0),:], lines[(slope_deg<0),:]
    l_slopes, r_slopes = slope_deg[(slope_deg>0)], slope_deg[(slope_deg<0)]
    l_line = [sum(l_lines[:,0])/len(l_lines),sum(l_lines[:,1])/len(l_lines),sum(l_lines[:,2])/len(l_lines),sum(l_lines[:,3])/len(l_lines)]
    r_line = [sum(r_lines[:,0])/len(r_lines),sum(r_lines[:,1])/len(r_lines),sum(r_lines[:,2])/len(r_lines),sum(r_lines[:,3])/len(r_lines)]
    l_slope = int(sum(l_slopes)/len(l_slopes))
    r_slope = int(sum(r_slopes)/len(r_slopes))
    return l_line, r_line, l_slope, r_slope

def hough(img,h,w,min_line_len,min_slope,max_slope):
    lines = cv2.HoughLinesP(img, rho=1, theta=np.pi/180, threshold=30, minLineLength=min_line_len, maxLineGap=30)#return = [[x1,y1,x2,y2],[...],...]
    lines = np.squeeze(lines)#one time ok
    lanes, slopes = restrict_deg(lines,min_slope,max_slope)
    l_lane, r_lane, l_slope, r_slope = separate_line(lanes,slopes)
    #lane_img = np.zeros((h, w, 3), dtype=np.uint8)
    #for x1,y1,x2,y2 in l_lanes:
    #cv2.line(lane_img, (int(l_lane[0]), int(l_lane[1])), (int(l_lane[2]), int(l_lane[3])), color=[0,0,255], thickness=2)
    #for x1,y1,x2,y2 in r_lanes:
    #cv2.line(lane_img, (int(r_lane[0]), int(r_lane[1])), (int(r_lane[2]), int(r_lane[3])), color=[255,0,0], thickness=2)
    return l_lane, r_lane, l_slope, r_slope

def communicate(ser,steer_value):
    if steer_value == 0:
        data = deque(['f']+['5']+['`'])
    elif steer_value < 0:
        data =deque(['r']+[str(abs(steer_value))]+['`'])
    else:
        data = deque(['l']+[str(steer_value)]+['`'])
    for i in data:
        interact_ser(i,ser)
    print(ser.readline().decode())
    return 

def interact_ser(_str, _ard):
    _ard.write(_str.encode())
    if _str[-1] == '`':
        tmp = ""
        while tmp == "":
            tmp = _ard.readline()
        print(tmp.decode())
        return tmp

def lane_detection(min_line_len,min_slope,max_slope):
    origin_img = cv2.imread('./left_right.jpg')
    h,w = origin_img.shape[:2]
    gray_img = grayscale(origin_img)
    blur_img = gaussian_blur(gray_img, 5)
    canny_img = canny(blur_img, 50, 200)
    roi_img = roi(canny_img,h,w)
    l_lane,r_lane,l_slope,r_slope = hough(roi_img,h,w,min_line_len,min_slope,max_slope)
    steer_value = l_slope+r_slope#maybe 0 deg is on 12
    cv2.line(origin_img, (int(l_lane[0]), int(l_lane[1])), (int(l_lane[2]), int(l_lane[3])), color=[0,0,255], thickness=5)
    cv2.line(origin_img, (int(r_lane[0]), int(r_lane[1])), (int(r_lane[2]), int(r_lane[3])), color=[255,0,0], thickness=5)
    return origin_img, steer_value

def nothing(pos):
    pass

if __name__ == '__main__':
    ser = serial.Serial('COM4', 9600)
    cv2.namedWindow(winname='Lane Detection')
    cv2.createTrackbar('houghMinLine', 'Lane Detection', 20, 200, nothing)#don't write keyword
    cv2.createTrackbar('slopeMinDeg', 'Lane Detection', 100, 180, nothing)
    cv2.createTrackbar('slopeMaxDeg', 'Lane Detection', 160, 180, nothing)
    while cv2.waitKey(1) != ord('q'):
        min_line_len = cv2.getTrackbarPos(trackbarname='houghMinLine', winname='Lane Detection')
        min_slope = cv2.getTrackbarPos('slopeMinDeg','Lane Detection')
        max_slope = cv2.getTrackbarPos('slopeMaxDeg','Lane Detection')
        result_img, steer_value = lane_detection(min_line_len,min_slope,max_slope)
        communicate(ser,steer_value)
        cv2.imshow('Lane Detection',result_img)
    
    ser.close()
    cv2.imwrite('./hough_img3.jpg',result_img)
    cv2.destroyAllWindows()
    
#It will be great that we can select the instant roi region using click when we run the code.

 

반응형