wissel.net

Usability - Productivity - Business - The web - Singapore & Twins

Watching the EventBus


I'm quite fond of Event-driven architecture, so to no surprise, I like vert.x's EventBus and its ability to enable polyglot programming. So it is time to have a closer look

Dem Volk aufs Maul geschaut

(That's a word play on Martin Luther loosly translated as "Watch them how they talk")

I wanted to know, what exactly is happening "on the wire", without disrupting the regular flow. Turns out, there is an easy way to do this. The vert.x EventBus provides the methods addOutboundInterceptor and addInboundInterceptor that provide you with access to a Handler with a DeliveryContext.

From there you can get to the Message or directly the message's body. So I took it for a spin in conjunction with a Websocket. This allows me to watch as the messages flow through:

final HttpServer server = this.vertx.createHttpServer();
server.websocketHandler(this::handlerWebsockets);

The handler looks like this:

private void handlerWebsockets(final ServerWebSocket ws) {
    if (this.passSecurityCheck(ws)) {
      ws.reject();
      return;
    }

    final EventBusTracker ebt = new EventBusTracker(ws);
    final EventBus eb = this.getVertx().eventBus();

    ws.closeHandler(ch -> {
      eb.removeOutboundInterceptor(ebt::interceptorOutbound);
    });

    eb.addOutboundInterceptor(ebt::interceptorOutbound);

  }

The final server side piece is the EventBusTracker:

public class EventBusTracker {
  private final ServerWebSocket ws;

  public EventBusTracker(final ServerWebSocket ws) {
    this.ws = ws;
  }

  public void interceptorOutbound(final DeliveryContext<JsonObject> dcx) {
    
    JsonObject headers = new JsonObject();    
    headers.put("address", dcx.message().address());
    
    dcx.message().headers().forEach(he -> headers.put(he.getKey(), he.getValue()));
    // CleanupJson returns a JsonObject with sensitive information masked
    JsonObject message = this.cleanupJson(dcx.message().body());
    JsonObject result = new JsonObject()
        .put("Headers", headers)
        .put("Message", message);
    try {
      this.ws.writeTextMessage(result.encode());
    } catch (Exception e) {
      // We ignore this
    }
    dcx.next();
  }

}

The final piece is a simple website talking to that socket (security code stripped out)

<!DOCTYPE html>
<html>
<head>
<title>EventBus monitor</title>
<style type="text/css">

table {width : 100%}
td.label {width: 30%; background-color: #DDDDFF}
td.label2 {width: 30%; background-color: #FFDDDD}
td {border-bottom : 1px solid gray; padding 2px;}
.data {border-bottom : 2px solid black;}
</style>
</head>
<body>
 <h1>Evenbus monitor</h1>
 <table>
    <tr><td class="label">EvenBus Header</td><td>That's in the header</td></tr>
    <tr><td class="label2">EvenBus Message</td><td>The message might contain headers from the API call</td></tr>
 </table>
 <div id="content"></div>
 <hr />
 <button onClick="closeSocket();" value="End this">End this</button>
 [EOF]
 <script type="text/javascript">
  // Connect to Websocket
  var socket;
  var addr = (location.protocol == 'https' ? 'wss' : 'ws') +'://'+location.hostname+(location.port ? ':'+location.port: '')+"/eventbus";
  if (window.WebSocket) {
   socket = new WebSocket(addr);
   socket.onmessage = function(event) {
    var content = document.getElementById("content");
                content.appendChild(renderIncoming(JSON.parse(event.data)));
                window.scrollTo(0,document.body.scrollHeight);
   }
   socket.onopen = function(event) {
    var content = document.getElementById("content");
    var div = document.createElement('div');
                div.innerHTML = "Web Socket opened!";
                content.appendChild(div);
   };
   
   socket.onclose = function(event) {
    var content = document.getElementById("content");
                var div = document.createElement('div');
                div.innerHTML = "<hr />Web Socket closed.";
                content.appendChild(div);
   };
  } else {
   alert("Your browser does not support Websockets!");
   
  }
  
  function renderIncoming(jsonSource) {
            var headers = jsonSource.Headers;
            var html = "<table class=\"header\">";
            for (hName in headers) {
              html += "<tr><td class=\"label\">"+hName+"</td><td>"+headers[hName]+"</td></tr>"; 
            };
            html += "</table><table class=\"data\">";
            var bodyData = jsonSource.Message;
            for (bName in bodyData) {
             html += "<tr><td class=\"label2\">"+bName+"</td><td>"+JSON.stringify(bodyData[bName],null,2)+"</td></tr>";
            }
            html += "</table>";
            var div = document.createElement('div');
            div.innerHTML = html;
            return div;
  }
  
  function closeSocket() {
   socket.close();
  }
 </script>
</body>
</html>

Important: make sure not to enable something like this in a production system without proper security!

As usual YMMV!


Posted by on 28 April 2020 | Comments (0) | categories: Java vert.x

Comments

  1. No comments yet, be the first to comment