개발/ESP32, ESP8266

ESP8266(ESP-01, ESP-01S) SSE, 서버에서 클라이언트에게 데이터 전송

어중E 2022. 10. 30. 21:53

ESP8266(ESP-01, ESP-01S) SSE, 서버에서 클라이언트에게 데이터 전송

ESP8266에서 서버에서 데이터를 받고 싶다면, 고려해 볼 수 있는 SSE(Server Sent Events)에 관한 예제를 살펴보겠습니다. 파일 - 예제 - ESP8266WebServer - SeverSentEvents를 열어주시면 됩니다.

 

SSE에 대해서 잠시 설명드리면, 클라이언트와 서버가 소통하는 방법중 하나인데, 기존에 양방향 소통이 가능하던 웹소켓과 달리 서버 → 클라이언트만 가능한 방법입니다. 그렇다면 웹소켓 방식에서 클라이언트 → 서버로 데이터를 주는 것만 사용 안 하면, 되지 않냐 하시겠지만 이는 배터리 소모면에서는 SSE를 선택하는 것이 좋습니다.

 

기본적으로 ESP 계열은 IOT나 배터리를 사용하는 제품, 프로젝트에서 많이 사용되기 때문에 SSE를 한번 고려해보시는 것이 좋을 듯 합니다.

 


/* Multi-client Server Sent Event (aka EventSource) demo
  Run demo as follows:
  1. set SSID, password and ports, compile and run program
     you should see (random) updates of sensors A and B

  2. on the client(s), register it for the event bus using a REST API call: curl -sS "http://<your ESP IP>:<your port>/rest/events/subscribe"
     on both server and client, you should now see that your client is registered
     the server sends back the location of the event bus (channel) to the client:
      subscription for client IP <your client's IP address>: event bus location: http://<your ESP IP>:<your port>/rest/events/<channel>

     you will also see that the sensors are ready to broadcast state changes, but the client is not yet listening:
      SSEBroadcastState - client <your client IP>> registered but not listening

  3. on the client(s), start listening for events with: curl -sS "http://<your ESP IP>:<your port>/rest/events/<channel>"
     if all is well, the following is being displayed on the ESP console
      SSEHandler - registered client with IP <your client IP address> is listening...
      broadcast status change to client IP <your client IP>> for sensor[A|B] with new state <some number>>
     every minute you will see on the ESP: SSEKeepAlive - client is still connected

     on the client, you should see the SSE messages coming in:
      event: event
      data: { "TYPE":"KEEP-ALIVE" }
      event: event
      data: { "TYPE":"STATE", "sensorB": {"state" : 12408, "prevState": 13502} }
      event: event
      data: { "TYPE":"STATE", "sensorA": {"state" : 17664, "prevState": 49362} }

  4. on the client, stop listening by hitting control-C
    on the ESP, after maximum one minute, the following message is displayed: SSEKeepAlive - client no longer connected, remove subscription
    if you start listening again after the time expired, the "/rest/events" handle becomes stale and "Handle not found" is returned
    you can also try to start listening again before the KeepAliver timer expires or simply register your client again
*/

extern "C" {
#include "c_types.h"
}
#include <ESP8266WiFi.h>
#include <WiFiClient.h>
#include <ESP8266WebServer.h>
#include <ESP8266mDNS.h>
#include <Ticker.h>

#ifndef STASSID
#define STASSID "your-ssid"
#define STAPSK  "your-password"
#endif

const char* ssid = STASSID;
const char* password = STAPSK;
const unsigned int port = 80;

ESP8266WebServer server(port);

#define SSE_MAX_CHANNELS 8  // in this simplified example, only eight SSE clients subscription allowed
struct SSESubscription {
  IPAddress clientIP;
  WiFiClient client;
  Ticker keepAliveTimer;
} subscription[SSE_MAX_CHANNELS];
uint8_t subscriptionCount = 0;

typedef struct {
  const char *name;
  unsigned short value;
  Ticker update;
} sensorType;
sensorType sensor[2];

void handleNotFound() {
  Serial.println(F("Handle not found"));
  String message = "Handle Not Found\n\n";
  message += "URI: ";
  message += server.uri();
  message += "\nMethod: ";
  message += (server.method() == HTTP_GET) ? "GET" : "POST";
  message += "\nArguments: ";
  message += server.args();
  message += "\n";
  for (uint8_t i = 0; i < server.args(); i++) {
    message += " " + server.argName(i) + ": " + server.arg(i) + "\n";
  }
  server.send(404, "text/plain", message);
}

void SSEKeepAlive() {
  for (uint8_t i = 0; i < SSE_MAX_CHANNELS; i++) {
    if (!(subscription[i].clientIP)) {
      continue;
    }
    if (subscription[i].client.connected()) {
      Serial.printf_P(PSTR("SSEKeepAlive - client is still listening on channel %d\n"), i);
      subscription[i].client.println(F("event: event\ndata: { \"TYPE\":\"KEEP-ALIVE\" }\n"));   // Extra newline required by SSE standard
    } else {
      Serial.printf_P(PSTR("SSEKeepAlive - client not listening on channel %d, remove subscription\n"), i);
      subscription[i].keepAliveTimer.detach();
      subscription[i].client.flush();
      subscription[i].client.stop();
      subscription[i].clientIP = INADDR_NONE;
      subscriptionCount--;
    }
  }
}

// SSEHandler handles the client connection to the event bus (client event listener)
// every 60 seconds it sends a keep alive event via Ticker
void SSEHandler(uint8_t channel) {
  WiFiClient client = server.client();
  SSESubscription &s = subscription[channel];
  if (s.clientIP != client.remoteIP()) { // IP addresses don't match, reject this client
    Serial.printf_P(PSTR("SSEHandler - unregistered client with IP %s tries to listen\n"), server.client().remoteIP().toString().c_str());
    return handleNotFound();
  }
  client.setNoDelay(true);
  client.setSync(true);
  Serial.printf_P(PSTR("SSEHandler - registered client with IP %s is listening\n"), IPAddress(s.clientIP).toString().c_str());
  s.client = client; // capture SSE server client connection
  server.setContentLength(CONTENT_LENGTH_UNKNOWN); // the payload can go on forever
  server.sendContent_P(PSTR("HTTP/1.1 200 OK\nContent-Type: text/event-stream;\nConnection: keep-alive\nCache-Control: no-cache\nAccess-Control-Allow-Origin: *\n\n"));
  s.keepAliveTimer.attach_scheduled(30.0, SSEKeepAlive);  // Refresh time every 30s for demo
}

void handleAll() {
  const char *uri = server.uri().c_str();
  const char *restEvents = PSTR("/rest/events/");
  if (strncmp_P(uri, restEvents, strlen_P(restEvents))) {
    return handleNotFound();
  }
  uri += strlen_P(restEvents); // Skip the "/rest/events/" and get to the channel number
  unsigned int channel = atoi(uri);
  if (channel < SSE_MAX_CHANNELS) {
    return SSEHandler(channel);
  }
  handleNotFound();
};

void SSEBroadcastState(const char *sensorName, unsigned short prevSensorValue, unsigned short sensorValue) {
  for (uint8_t i = 0; i < SSE_MAX_CHANNELS; i++) {
    if (!(subscription[i].clientIP)) {
      continue;
    }
    String IPaddrstr = IPAddress(subscription[i].clientIP).toString();
    if (subscription[i].client.connected()) {
      Serial.printf_P(PSTR("broadcast status change to client IP %s on channel %d for %s with new state %d\n"),
                      IPaddrstr.c_str(), i, sensorName, sensorValue);
      subscription[i].client.printf_P(PSTR("event: event\ndata: {\"TYPE\":\"STATE\", \"%s\":{\"state\":%d, \"prevState\":%d}}\n\n"),
                                      sensorName, sensorValue, prevSensorValue);
    } else {
      Serial.printf_P(PSTR("SSEBroadcastState - client %s registered on channel %d but not listening\n"), IPaddrstr.c_str(), i);
    }
  }
}

// Simulate sensors
void updateSensor(sensorType &sensor) {
  unsigned short newVal = (unsigned short)RANDOM_REG32; // (not so good) random value for the sensor
  Serial.printf_P(PSTR("update sensor %s - previous state: %d, new state: %d\n"), sensor.name, sensor.value, newVal);
  if (sensor.value != newVal) {
    SSEBroadcastState(sensor.name, sensor.value, newVal);  // only broadcast if state is different
  }
  sensor.value = newVal;
  sensor.update.once(rand() % 20 + 10, std::bind(updateSensor, sensor));  // randomly update sensor
}

void handleSubscribe() {
  if (subscriptionCount == SSE_MAX_CHANNELS - 1) {
    return handleNotFound();  // We ran out of channels
  }

  uint8_t channel;
  IPAddress clientIP = server.client().remoteIP();   // get IP address of client
  String SSEurl = F("http://");
  SSEurl += WiFi.localIP().toString();
  SSEurl += F(":");
  SSEurl += port;
  size_t offset = SSEurl.length();
  SSEurl += F("/rest/events/");

  ++subscriptionCount;
  for (channel = 0; channel < SSE_MAX_CHANNELS; channel++) // Find first free slot
    if (!subscription[channel].clientIP) {
      break;
    }
  subscription[channel] = {clientIP, server.client(), Ticker()};
  SSEurl += channel;
  Serial.printf_P(PSTR("Allocated channel %d, on uri %s\n"), channel, SSEurl.substring(offset).c_str());
  //server.on(SSEurl.substring(offset), std::bind(SSEHandler, &(subscription[channel])));
  Serial.printf_P(PSTR("subscription for client IP %s: event bus location: %s\n"), clientIP.toString().c_str(), SSEurl.c_str());
  server.send_P(200, "text/plain", SSEurl.c_str());
}

void startServers() {
  server.on(F("/rest/events/subscribe"), handleSubscribe);
  server.onNotFound(handleAll);
  server.begin();
  Serial.println("HTTP server and  SSE EventSource started");
}

void setup(void) {
  Serial.begin(115200);
  WiFi.mode(WIFI_STA);
  WiFi.begin(ssid, password);
  Serial.println("");
  while (WiFi.status() != WL_CONNECTED) {   // Wait for connection
    delay(500);
    Serial.print(".");
  }
  Serial.printf_P(PSTR("\nConnected to %s with IP address: %s\n"), ssid, WiFi.localIP().toString().c_str());
  if (MDNS.begin("esp8266")) {
    Serial.println("MDNS responder started");
  }

  startServers();   // start web and SSE servers
  sensor[0].name = "sensorA";
  sensor[1].name = "sensorB";
  updateSensor(sensor[0]);
  updateSensor(sensor[1]);
}

void loop(void) {
  server.handleClient();
  MDNS.update();
  yield();
}

※이전 글에 이어서 작성되는 글임으로 동일한 내용과 설명에 대해서는 생략이 될 수 있습니다. 생략된 내용이 궁금하시면 이전 글을 참고해주세요.

 

extern "C" {
#include "c_types.h"
}
#include <ESP8266WiFi.h>
#include <WiFiClient.h>
#include <ESP8266WebServer.h>
#include <ESP8266mDNS.h>
#include <Ticker.h>

extern “C”를 사용하여 c_types.h를 include 했습니다. 이는 c에서 사용되는 자료형 타입들을 선언해둔 파일입니다. 크게 의미는 없습니다.

 

그리고 Ticker.h 헤더 파일도 포함이 되었는데. 이는 타이머 인터럽트입니다. 이는 예제 구현에 있어서 사용됩니다.

 

#define SSE_MAX_CHANNELS 8  // in this simplified example, only eight SSE clients subscription allowed
struct SSESubscription {
  IPAddress clientIP;
  WiFiClient client;
  Ticker keepAliveTimer;
} subscription[SSE_MAX_CHANNELS];
uint8_t subscriptionCount = 0;

 

SSE를 통해서 데이터를 받는 클라이언트들의 최대 채널을 설정하고, 클라이언트의 데이터인 IP주소를 포함하는 구조체를 만들어 줍니다. 그리고 타이머 인터럽트(Ticker)도 포함되어 있습니다.

 

typedef struct {
  const char *name;
  unsigned short value;
  Ticker update;
} sensorType;
sensorType sensor[2];

이 센서 구조체는 임의의 테스트를 위한 것입니다. ESP8266을 사용해서, 특정 센서의 값을 주기적으로 클라이언트가 받는 것이 이번 예제입니다. 따라서 센서를 직접 만들 순 없으니 임의로 만드는 구조체입니다.

 

여기에서도 타이머 인터럽트(Ticker)가 있습니다.

 

void SSEKeepAlive() {
  for (uint8_t i = 0; i < SSE_MAX_CHANNELS; i++) {
    if (!(subscription[i].clientIP)) {
      continue;
    }
    if (subscription[i].client.connected()) {
      Serial.printf_P(PSTR("SSEKeepAlive - client is still listening on channel %d\n"), i);
      subscription[i].client.println(F("event: event\ndata: { \"TYPE\":\"KEEP-ALIVE\" }\n"));   // Extra newline required by SSE standard
    } else {
      Serial.printf_P(PSTR("SSEKeepAlive - client not listening on channel %d, remove subscription\n"), i);
      subscription[i].keepAliveTimer.detach();
      subscription[i].client.flush();
      subscription[i].client.stop();
      subscription[i].clientIP = INADDR_NONE;
      subscriptionCount--;
    }
  }
}

이 함수는 ESP8266이 생성한 서버에 접속한 클라이언트가 있는지 IP로 확인하고, 연결되어 있다면, 현재 상태를 나타내 주고, IP가 확인되지만 연결이 안 된다면, 해당 client 제외시킵니다.

 

※ PSTR()는 해당 문자열 데이터를 SRAM에 저장하는 것이 아닌 FLASH 부분에 저장하도록 하는 것입니다. 주소 값으로 반환되기 때문에, print대신, printf_P와 함께 사용됩니다. 입문자 분들은 단순하게 해당 문자열이 출력된다고 알고 계시면 됩니다. (참조할만한 글)

 

void SSEHandler(uint8_t channel) {
  WiFiClient client = server.client();
  SSESubscription &s = subscription[channel];
  if (s.clientIP != client.remoteIP()) { // IP addresses don't match, reject this client
    Serial.printf_P(PSTR("SSEHandler - unregistered client with IP %s tries to listen\n"), server.client().remoteIP().toString().c_str());
    return handleNotFound();
  }
  client.setNoDelay(true);
  client.setSync(true);
  Serial.printf_P(PSTR("SSEHandler - registered client with IP %s is listening\n"), IPAddress(s.clientIP).toString().c_str());
  s.client = client; // capture SSE server client connection
  server.setContentLength(CONTENT_LENGTH_UNKNOWN); // the payload can go on forever
  server.sendContent_P(PSTR("HTTP/1.1 200 OK\nContent-Type: text/event-stream;\nConnection: keep-alive\nCache-Control: no-cache\nAccess-Control-Allow-Origin: *\n\n"));
  s.keepAliveTimer.attach_scheduled(30.0, SSEKeepAlive);  // Refresh time every 30s for demo
}

현 클라이언트가 특정 채널의 subscription 구조체에 저장된 것과 동일한지 확인합니다. 동일하지 않다면, handleNotFound() 함수로 리턴합니다.

 

동일하다면, 타이머 인터럽트를 통해서 주기적으로 SSEKeepAlive 함수가 반복되도록 합니다.

 

void handleAll() {
  const char *uri = server.uri().c_str();
  const char *restEvents = PSTR("/rest/events/");
  if (strncmp_P(uri, restEvents, strlen_P(restEvents))) {
    return handleNotFound();
  }
  uri += strlen_P(restEvents); // Skip the "/rest/events/" and get to the channel number
  unsigned int channel = atoi(uri);
  if (channel < SSE_MAX_CHANNELS) {
    return SSEHandler(channel);
  }
  handleNotFound();
};

서버의 uri를 char 타입 문자열로 저장하고, restEvents와 비교해서 다르다면 handleNotFound()를 반환합니다.

 

채널의 번호 값을 정수로 저장하고, 이를 매개변수로 바로 이전에 살펴본 SSEHandler를 반환합니다.

 

void SSEBroadcastState(const char *sensorName, unsigned short prevSensorValue, unsigned short sensorValue) {
  for (uint8_t i = 0; i < SSE_MAX_CHANNELS; i++) {
    if (!(subscription[i].clientIP)) {
      continue;
    }
    String IPaddrstr = IPAddress(subscription[i].clientIP).toString();
    if (subscription[i].client.connected()) {
      Serial.printf_P(PSTR("broadcast status change to client IP %s on channel %d for %s with new state %d\n"),
                      IPaddrstr.c_str(), i, sensorName, sensorValue);
      subscription[i].client.printf_P(PSTR("event: event\ndata: {\"TYPE\":\"STATE\", \"%s\":{\"state\":%d, \"prevState\":%d}}\n\n"),
                                      sensorName, sensorValue, prevSensorValue);
    } else {
      Serial.printf_P(PSTR("SSEBroadcastState - client %s registered on channel %d but not listening\n"), IPaddrstr.c_str(), i);
    }
  }
}

매개변수로 센서의 이름과 값을 받고, 클라이언트가 있는지 확인하고, 연결되어 있는지 확인되면, 해당 클라이언트로 값을 보냅니다.

 

void updateSensor(sensorType &sensor) {
  unsigned short newVal = (unsigned short)RANDOM_REG32; // (not so good) random value for the sensor
  Serial.printf_P(PSTR("update sensor %s - previous state: %d, new state: %d\n"), sensor.name, sensor.value, newVal);
  if (sensor.value != newVal) {
    SSEBroadcastState(sensor.name, sensor.value, newVal);  // only broadcast if state is different
  }
  sensor.value = newVal;
  sensor.update.once(rand() % 20 + 10, std::bind(updateSensor, sensor));  // randomly update sensor
}

임의의 센서 값을 sensor 구조체의 value 변수에 저장합니다. 그리고 sensor 구조체의 ticker 인 타이머 인터럽트를 이용해서 특정 시간마다 updateSensor가 반복되도록 합니다.

 

void handleSubscribe() {
  if (subscriptionCount == SSE_MAX_CHANNELS - 1) {
    return handleNotFound();  // We ran out of channels
  }

  uint8_t channel;
  IPAddress clientIP = server.client().remoteIP();   // get IP address of client
  String SSEurl = F("http://");
  SSEurl += WiFi.localIP().toString();
  SSEurl += F(":");
  SSEurl += port;
  size_t offset = SSEurl.length();
  SSEurl += F("/rest/events/");

  ++subscriptionCount;
  for (channel = 0; channel < SSE_MAX_CHANNELS; channel++) // Find first free slot
    if (!subscription[channel].clientIP) {
      break;
    }
  subscription[channel] = {clientIP, server.client(), Ticker()};
  SSEurl += channel;
  Serial.printf_P(PSTR("Allocated channel %d, on uri %s\n"), channel, SSEurl.substring(offset).c_str());
  //server.on(SSEurl.substring(offset), std::bind(SSEHandler, &(subscription[channel])));
  Serial.printf_P(PSTR("subscription for client IP %s: event bus location: %s\n"), clientIP.toString().c_str(), SSEurl.c_str());
  server.send_P(200, "text/plain", SSEurl.c_str());
}

클라이언트가 서버에 접속하였을 때, 현재 등록된 개수가 최대를 초과하면 handleNotFound() 함수를 반환합니다.

 

그리고 갯수가 여유로우면, 각종 정보를 String 변수에 저장합니다. 그리고 빈 공간을 찾습니다. 각 정보들을 subscription 구조체에 저장하고 String 변수에 저장된 것을 서버로 text/plain 형태로 보내줍니다.

 

그렇게 되면, 브라우저에서는 String의 값을 볼 수 있습니다.

 

void setup(void) {
	...

	startServers();   // start web and SSE servers
  sensor[0].name = "sensorA";
  sensor[1].name = "sensorB";
  updateSensor(sensor[0]);
  updateSensor(sensor[1]);
}

setup 함수에서는 서버를 실행시킵니다. startServers()는 서버를 실행하는데 필요한 코드를 모아둔 함수입니다. (설명이 필요하시면 이전 글을 확인해주세요)

 

그리고 임의의 센서를 설정하고 실행시킵니다. updateSensor 함수를 사용하여 Sensor의 값이 지속적으로 변경되도록 합니다.

 

이제 업로드를 하면 다음과 같이 나타납니다.

 

여기에서 표시해주는 IP로 접속을 하면, Handle Not Found로 나타날 겁니다. 당연하게도 StartServers함수를 보면, /rest/events/subscribe에서 server.on 하였기 때문에 <<ip>>/rest/events/subscribe로 접속해야 합니다.

 

접속해보면 특정 주소가 표시됩니다. 여러분의 IP에 80번 포트까지 포함해서 주소가 설정되어 있는데, 이 주소로 이동을 합니다.

 

※마지막 숫자 2는 현재 채널의 번호입니다.

 

주소에 접속하면, 이와 같이 서버에서 클라이언트에게 보내는 메시지들을 확인할 수 있습니다.