How to track WAN events as they pass through a GatewaySender queue
search cancel

How to track WAN events as they pass through a GatewaySender queue

book

Article ID: 294366

calendar_today

Updated On:

Products

VMware Tanzu Gemfire

Issue/Introduction

How can we track WAN events as they pass through a GatewaySender queue?

Environment

Product Version: 9.9

Resolution

VMware GemFire does not track WAN events by default. However, with the help of a filter (TimingGatewayEventFilter), we can track the following WAN Events:
 
  • The number of events received by gateway sender - receivedEvents
  • The number of events transmitted by gateway sender - transmittedEvents
  • The number of events acknowledged by gateway sender - acknowledgedEvents
  • The minimum time an event spent in the gateway sender queue - minimumQueueTime
  • The maximum time an event spent in the gateway sender queue - maximumQueueTime
  • The total time events spent in the gateway sender queue - totalQueueTime
  • The minimum time an event spent in transmission, including processing time on the remote site - minimumTransmitTime
  • The maximum time an event spent in transmission, including processing time on the remote site - maximumTransmitTime
Below is the listing of the implementation of a filter (TimingGatewayEventFilter) that tracks such WAN events through a GatewaySenderQueueStatistics class (GemFire Statistics):
 
  • GatewaySenderQueueStatistics:
import java.util.concurrent.atomic.AtomicLong;
​
public class GatewaySenderQueueStatistics {
	
  private AtomicLong receivedEvents;
​
  private AtomicLong transmittedEvents;
​
  private AtomicLong acknowledgedEvents;
​
  private AtomicLong totalQueueTime;
​
  private AtomicLong totalTransmitTime;
​
  private AtomicLong minimumQueueTime;
​
  private AtomicLong minimumTransmitTime;
​
  private AtomicLong maximumQueueTime;
​
  private AtomicLong maximumTransmitTime;
	
  public GatewaySenderQueueStatistics() {
    this.receivedEvents = new AtomicLong();
    this.transmittedEvents = new AtomicLong();
    this.acknowledgedEvents = new AtomicLong();
    this.totalQueueTime = new AtomicLong();
    this.totalTransmitTime = new AtomicLong();
    this.minimumQueueTime = new AtomicLong(Long.MAX_VALUE);
    this.minimumTransmitTime = new AtomicLong(Long.MAX_VALUE);
    this.maximumQueueTime = new AtomicLong();
    this.maximumTransmitTime = new AtomicLong();
  }
	
  protected void incrementReceivedEvents() {
    this.receivedEvents.incrementAndGet();
  }
​
  protected void incrementTransmittedEvents() {
    this.transmittedEvents.incrementAndGet();
  }
​
  protected void incrementAcknowledgedEvents() {
    this.acknowledgedEvents.incrementAndGet();
  }
​
  protected void addQueueTime(long time) {
    this.totalQueueTime.addAndGet(time);
    setMinimumQueueTime(time);
    setMaximumQueueTime(time);
  }
​
  protected void addTransmitTime(long time) {
    this.totalTransmitTime.addAndGet(time);
    setMinimumTransmitTime(time);
    setMaximumTransmitTime(time);
  }
​
  public long getReceivedEvents() {
    return this.receivedEvents.get();
  }
​
  public long getTransmittedEvents() {
    return this.transmittedEvents.get();
  }
​
  public long getAcknowledgedEvents() {
    return this.acknowledgedEvents.get();
  }
​
  public long getTotalQueueTime() {
    return this.totalQueueTime.get();
  }
​
  public long getTotalTransmitTime() {
    return this.totalTransmitTime.get();
  }
​
  public long getQueueTimePerEvent() {
    return getTransmittedEvents() == 0 ? 0 : getTotalQueueTime() / getTransmittedEvents();
  }
​
  public long getTransmitTimePerEvent() {
    return getAcknowledgedEvents() == 0 ? 0 : getTotalTransmitTime() / getAcknowledgedEvents();
  }
    
  public void setMinimumQueueTime(long time) {
    while (true) {
      long currentMinimumQueueTime = this.minimumQueueTime.get();
      if (time < currentMinimumQueueTime) {
        if (this.minimumQueueTime.compareAndSet(currentMinimumQueueTime, time)) {
          return;
        }
      } else {
        return;
      }
    }
  }
​
  public long getMinimumQueueTime() {
    return this.minimumQueueTime.get();
  }
​
  public void setMaximumQueueTime(long time) {
    while (true) {
      long currentMaximumQueueTime = this.maximumQueueTime.get();
      if (time > currentMaximumQueueTime) {
        if (this.maximumQueueTime.compareAndSet(currentMaximumQueueTime, time)) {
          return;
        }
      } else {
        return;
      }
    }
  }
​
  public long getMaximumQueueTime() {
    return this.maximumQueueTime.get();
  }
  
  public void setMinimumTransmitTime(long time) {
    while (true) {
      long currentMinimumTransmitTime = this.minimumTransmitTime.get();
      if (time < currentMinimumTransmitTime) {
        if (this.minimumTransmitTime.compareAndSet(currentMinimumTransmitTime, time)) {
          return;
        }
      } else {
        return;
      }
    }
  }
​
  public long getMinimumTransmitTime() {
    return this.minimumTransmitTime.get();
  }
  
  public void setMaximumTransmitTime(long time) {
    while (true) {
      long currentMaximumTransmitTime = this.maximumTransmitTime.get();
      if (time > currentMaximumTransmitTime) {
        if (this.maximumTransmitTime.compareAndSet(currentMaximumTransmitTime, time)) {
          return;
        }
      } else {
        return;
      }
    }
  }
​
  public long getMaximumTransmitTime() {
    return this.maximumTransmitTime.get();
  }
​
  public String toString() {
    return new StringBuilder()
      .append(getClass().getSimpleName())
      .append("[")
      .append("receivedEvents=")
      .append(getReceivedEvents())
      .append("; transmittedEvents=")
      .append(getTransmittedEvents())
      .append("; acknowledgedEvents=")
      .append(getAcknowledgedEvents())
      .append("; totalQueueTime=")
      .append(getTotalQueueTime())
      .append("; minimumQueueTime=")
      .append(getMinimumQueueTime())
      .append("; maximumQueueTime=")
      .append(getMaximumQueueTime())
      .append("; queueTimePerEvent=")
      .append(getQueueTimePerEvent())
      .append("; totalTransmitTime=")
      .append(getTotalTransmitTime())
      .append("; minimumTransmitTime=")
      .append(getMinimumTransmitTime())
      .append("; maximumTransmitTime=")
      .append(getMaximumTransmitTime())
      .append("; transmitTimePerEvent=")
      .append(getTransmitTimePerEvent())
      .append("]")
      .toString();
  }
}
  •  TimingGatewayEventFilter
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Declarable;
​
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
​
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
​
import java.util.Map;
import java.util.Properties;
​
import java.util.concurrent.ConcurrentHashMap;
​
public class TimingGatewayEventFilter implements GatewayEventFilter, Declarable {
​
  private GatewaySenderQueueStatistics queueStatistics;
  
  private Map<Long,Long> transmitStartTimes;
  
  public TimingGatewayEventFilter() {
    this.queueStatistics = new GatewaySenderQueueStatistics();
    this.transmitStartTimes = new ConcurrentHashMap<>();
    launchDumpQueueStatisticsThread();
  }
​
  public boolean beforeEnqueue(GatewayQueueEvent event) {
    // Increment received events
    this.queueStatistics.incrementReceivedEvents();
​
    return true;
  }
​
  public boolean beforeTransmit(GatewayQueueEvent event) {
    // This method can be called multiple times for the same batch if the remote site is
    // not connected.
    GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) event;
    if (this.transmitStartTimes.containsKey(gsei.getShadowKey())) {
      // This case means the batch is being re-attempted.
      // @TODO Decrement the previous time from the stats and add the new time.
      //System.out.println(Thread.currentThread().getName() + ": Reattempting transmission event=" + event.getKey());
    } else {
      // Increment transmitted events
      this.queueStatistics.incrementTransmittedEvents();
​
      // Calculate and save queue time for this event
      long currentTime = System.currentTimeMillis();
      long queueTime = currentTime - gsei.getCreationTime();
      this.queueStatistics.addQueueTime(queueTime);
​
      // Set the transmit start time for this event
      this.transmitStartTimes.put(gsei.getShadowKey(), currentTime);
​
      // Log the current event
      //logTime(gsei, "queueTime", currentTime, gsei.getCreationTime(), queueTime);
    }
    return true;
  }
​
  public void afterAcknowledgement(GatewayQueueEvent event) {
    // Get transmit start time for this event
    GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) event;
    Long transmitStartTime = this.transmitStartTimes.remove(gsei.getShadowKey());
    
    // If the event was not transmitted by this member, ignore it.
    // Only handle primary events.
    if (transmitStartTime != null) {
      // Increment acknowledged events
      this.queueStatistics.incrementAcknowledgedEvents();
      
      // Calculate and save transmit time for this event
      long currentTime = System.currentTimeMillis();
      long transmitTime = currentTime - transmitStartTime;
      this.queueStatistics.addTransmitTime(transmitTime);
​
      // Log the current event
      //logTime(gsei, "transmitTime", currentTime, transmitStartTime, transmitTime);
    }
  }
​
  private GatewaySenderQueueStatistics getQueueStatistics() {
    return this.queueStatistics;
  }
  
  private void logTime(GatewaySenderEventImpl event, String activity, long currentTime, long startTime, long time) {
    StringBuilder builder = new StringBuilder();
    builder
      .append(getClass().getSimpleName())
      .append(" ")
      .append(activity)
      .append(" for event ")
      .append("shadowKey=")
      .append(event.getShadowKey())
      .append("; key=")
      .append(event.getKey())
      .append("; currentTime=")
      .append(currentTime)
      .append("; startTime=")
      .append(startTime)
      .append("; time=")
      .append(time);
    System.out.println(builder.toString());
  }
  
  private void launchDumpQueueStatisticsThread() {
    final Cache cache = CacheFactory.getAnyInstance();
    Thread thread = new Thread(
      new Runnable() {
        public void run() {
          long previousReceivedEvents=0, previousTransmittedEvents=0, previousAcknowledgedEvents=0;
          GatewaySenderQueueStatistics queueStatistics = getQueueStatistics();
          while (true) {
            long currentReceivedEvents = queueStatistics.getReceivedEvents();
            long currentTransmittedEvents = queueStatistics.getTransmittedEvents();
            long currentAcknowledgedEvents = queueStatistics.getAcknowledgedEvents();
            if (currentReceivedEvents != previousReceivedEvents
              || currentTransmittedEvents != previousTransmittedEvents
            	|| currentAcknowledgedEvents != previousAcknowledgedEvents) {
            	cache.getLogger().info(queueStatistics.toString());
              //System.out.println(queueStatistics);
              previousReceivedEvents = currentReceivedEvents;
              previousTransmittedEvents = currentTransmittedEvents;
              previousAcknowledgedEvents = currentAcknowledgedEvents;
            }
            try {Thread.sleep(2000);} catch (InterruptedException e) {}
          }
        }
      });
    thread.setDaemon(true);
    thread.start();
  }
  
  public void close() {
  }
  
  public void init(Properties properties) {
  }
}