// RH_TCP.cpp // // Copyright (C) 2014 Mike McCauley // $Id: RH_TCP.cpp,v 1.5 2015/08/13 02:45:47 mikem Exp $ #include // This can only build on Linux and compatible systems #if (RH_PLATFORM == RH_PLATFORM_UNIX) #include #include #include #include #include #include #include #include #include #include RH_TCP::RH_TCP(const char* server) : _server(server), _rxBufLen(0), _rxBufValid(false), _socket(-1) { } bool RH_TCP::init() { if (!connectToServer()) return false; return sendThisAddress(_thisAddress); } bool RH_TCP::connectToServer() { struct addrinfo hints; struct addrinfo *result, *rp; int sfd, s; struct sockaddr_storage peer_addr; socklen_t peer_addr_len; memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_UNSPEC; // Allow IPv4 or IPv6 hints.ai_socktype = SOCK_STREAM; // Stream socket hints.ai_flags = AI_PASSIVE; // For wildcard IP address hints.ai_protocol = 0; // Any protocol hints.ai_canonname = NULL; hints.ai_addr = NULL; hints.ai_next = NULL; std::string server(_server); std::string port("4000"); size_t indexOfSeparator = server.find_first_of(':'); if (indexOfSeparator != std::string::npos) { port = server.substr(indexOfSeparator+1); server.erase(indexOfSeparator); } s = getaddrinfo(server.c_str(), port.c_str(), &hints, &result); if (s != 0) { fprintf(stderr, "RH_TCP::connect getaddrinfo failed: %s\n", gai_strerror(s)); return false; } // getaddrinfo() returns a list of address structures. // Try each address until we successfully connect(2). // If socket(2) (or connect(2)) fails, we (close the socket // and) try the next address. */ for (rp = result; rp != NULL; rp = rp->ai_next) { _socket = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (_socket == -1) continue; if (connect(_socket, rp->ai_addr, rp->ai_addrlen) == 0) break; /* Success */ close(_socket); } if (rp == NULL) { /* No address succeeded */ fprintf(stderr, "RH_TCP::connect could not connect to %s\n", _server); return false; } freeaddrinfo(result); /* No longer needed */ // Now make the socket non-blocking int on = 1; int rc = ioctl(_socket, FIONBIO, (char *)&on); if (rc < 0) { fprintf(stderr,"RH_TCP::init failed to set socket non-blocking: %s\n", strerror(errno)); close(_socket); _socket = -1; return false; } return true; } void RH_TCP::clearRxBuf() { _rxBufValid = false; _rxBufLen = 0; } void RH_TCP::checkForEvents() { #define RH_TCP_SOCKETBUF_LEN 500 static uint8_t socketBuf[RH_TCP_SOCKETBUF_LEN]; // Room for several messages static uint16_t socketBufLen = 0; // Read at most the amount of space we have left in the buffer ssize_t count = read(_socket, socketBuf + socketBufLen, sizeof(socketBuf) - socketBufLen); if (count < 0) { if (errno != EAGAIN) { fprintf(stderr,"RH_TCP::checkForEvents read error: %s\n", strerror(errno)); exit(1); } } else if (count == 0) { // End of file fprintf(stderr,"RH_TCP::checkForEvents unexpected end of file on read\n"); exit(1); } else { socketBufLen += count; while (socketBufLen >= 5) { RHTcpTypeMessage* message = ((RHTcpTypeMessage*)socketBuf); uint32_t len = ntohl(message->length); uint32_t messageLen = len + sizeof(message->length); if (len > sizeof(socketBuf) - sizeof(message->length)) { // Bogus length fprintf(stderr, "RH_TCP::checkForEvents read ridiculous length: %d. Corrupt message stream? Aborting\n", len); exit(1); } if (socketBufLen >= len + sizeof(message->length)) { // Got at least all of this message if (message->type == RH_TCP_MESSAGE_TYPE_PACKET && len >= 5) { // REVISIT: need to check if we are actually receiving? // Its a new packet, extract the headers and payload RHTcpPacket* packet = ((RHTcpPacket*)socketBuf); _rxHeaderTo = packet->to; _rxHeaderFrom = packet->from; _rxHeaderId = packet->id; _rxHeaderFlags = packet->flags; uint32_t payloadLen = len - 5; if (payloadLen <= sizeof(_rxBuf)) { // Enough room in our receiver buffer memcpy(_rxBuf, packet->payload, payloadLen); _rxBufLen = payloadLen; _rxBufFull = true; } } // check for other message types here // Now remove the used message by copying the trailing bytes (maybe start of a new message?) // to the top of the buffer memcpy(socketBuf, socketBuf + messageLen, sizeof(socketBuf) - messageLen); socketBufLen -= messageLen; } } } } void RH_TCP::validateRxBuf() { // The headers have already been extracted if (_promiscuous || _rxHeaderTo == _thisAddress || _rxHeaderTo == RH_BROADCAST_ADDRESS) { _rxGood++; _rxBufValid = true; } } bool RH_TCP::available() { if (_socket < 0) return false; checkForEvents(); if (_rxBufFull) { validateRxBuf(); _rxBufFull= false; } return _rxBufValid; } // Block until something is available void RH_TCP::waitAvailable() { waitAvailableTimeout(0); // 0 = Wait forever } // Block until something is available or timeout expires bool RH_TCP::waitAvailableTimeout(uint16_t timeout) { int max_fd; fd_set input; int result; FD_ZERO(&input); FD_SET(_socket, &input); max_fd = _socket + 1; if (timeout) { struct timeval timer; // Timeout is in milliseconds timer.tv_sec = timeout / 1000; timer.tv_usec = (timeout % 1000) * 1000; result = select(max_fd, &input, NULL, NULL, &timer); } else { result = select(max_fd, &input, NULL, NULL, NULL); } if (result < 0) fprintf(stderr, "RH_TCP::waitAvailableTimeout: select failed %s\n", strerror(errno)); return result > 0; } bool RH_TCP::recv(uint8_t* buf, uint8_t* len) { if (!available()) return false; if (buf && len) { if (*len > _rxBufLen) *len = _rxBufLen; memcpy(buf, _rxBuf, *len); } clearRxBuf(); return true; } bool RH_TCP::send(const uint8_t* data, uint8_t len) { bool ret = sendPacket(data, len); delay(10); // Wait for transmit to succeed. REVISIT: depends on length and speed return ret; } uint8_t RH_TCP::maxMessageLength() { return RH_TCP_MAX_MESSAGE_LEN; } void RH_TCP::setThisAddress(uint8_t address) { RHGenericDriver::setThisAddress(address); sendThisAddress(_thisAddress); } bool RH_TCP::sendThisAddress(uint8_t thisAddress) { if (_socket < 0) return false; RHTcpThisAddress m; m.length = htonl(2); m.type = RH_TCP_MESSAGE_TYPE_THISADDRESS; m.thisAddress = thisAddress; ssize_t sent = write(_socket, &m, sizeof(m)); return sent > 0; } bool RH_TCP::sendPacket(const uint8_t* data, uint8_t len) { if (_socket < 0) return false; RHTcpPacket m; m.length = htonl(len + 4); m.type = RH_TCP_MESSAGE_TYPE_PACKET; m.to = _txHeaderTo; m.from = _txHeaderFrom; m.id = _txHeaderId; m.flags = _txHeaderFlags; memcpy(m.payload, data, len); ssize_t sent = write(_socket, &m, len + 8); return sent > 0; } #endif