VMware GemFire does not track WAN events by default. However, with the help of a filter (
TimingGatewayEventFilter), we can track the following WAN Events:
- The number of events received by gateway sender - receivedEvents
- The number of events transmitted by gateway sender - transmittedEvents
- The number of events acknowledged by gateway sender - acknowledgedEvents
- The minimum time an event spent in the gateway sender queue - minimumQueueTime
- The maximum time an event spent in the gateway sender queue - maximumQueueTime
- The total time events spent in the gateway sender queue - totalQueueTime
- The minimum time an event spent in transmission, including processing time on the remote site - minimumTransmitTime
- The maximum time an event spent in transmission, including processing time on the remote site - maximumTransmitTime
Below is the listing of the implementation of a filter (
TimingGatewayEventFilter) that tracks such WAN events through a
GatewaySenderQueueStatistics class (GemFire Statistics):
- GatewaySenderQueueStatistics:
import java.util.concurrent.atomic.AtomicLong;
public class GatewaySenderQueueStatistics {
private AtomicLong receivedEvents;
private AtomicLong transmittedEvents;
private AtomicLong acknowledgedEvents;
private AtomicLong totalQueueTime;
private AtomicLong totalTransmitTime;
private AtomicLong minimumQueueTime;
private AtomicLong minimumTransmitTime;
private AtomicLong maximumQueueTime;
private AtomicLong maximumTransmitTime;
public GatewaySenderQueueStatistics() {
this.receivedEvents = new AtomicLong();
this.transmittedEvents = new AtomicLong();
this.acknowledgedEvents = new AtomicLong();
this.totalQueueTime = new AtomicLong();
this.totalTransmitTime = new AtomicLong();
this.minimumQueueTime = new AtomicLong(Long.MAX_VALUE);
this.minimumTransmitTime = new AtomicLong(Long.MAX_VALUE);
this.maximumQueueTime = new AtomicLong();
this.maximumTransmitTime = new AtomicLong();
}
protected void incrementReceivedEvents() {
this.receivedEvents.incrementAndGet();
}
protected void incrementTransmittedEvents() {
this.transmittedEvents.incrementAndGet();
}
protected void incrementAcknowledgedEvents() {
this.acknowledgedEvents.incrementAndGet();
}
protected void addQueueTime(long time) {
this.totalQueueTime.addAndGet(time);
setMinimumQueueTime(time);
setMaximumQueueTime(time);
}
protected void addTransmitTime(long time) {
this.totalTransmitTime.addAndGet(time);
setMinimumTransmitTime(time);
setMaximumTransmitTime(time);
}
public long getReceivedEvents() {
return this.receivedEvents.get();
}
public long getTransmittedEvents() {
return this.transmittedEvents.get();
}
public long getAcknowledgedEvents() {
return this.acknowledgedEvents.get();
}
public long getTotalQueueTime() {
return this.totalQueueTime.get();
}
public long getTotalTransmitTime() {
return this.totalTransmitTime.get();
}
public long getQueueTimePerEvent() {
return getTransmittedEvents() == 0 ? 0 : getTotalQueueTime() / getTransmittedEvents();
}
public long getTransmitTimePerEvent() {
return getAcknowledgedEvents() == 0 ? 0 : getTotalTransmitTime() / getAcknowledgedEvents();
}
public void setMinimumQueueTime(long time) {
while (true) {
long currentMinimumQueueTime = this.minimumQueueTime.get();
if (time < currentMinimumQueueTime) {
if (this.minimumQueueTime.compareAndSet(currentMinimumQueueTime, time)) {
return;
}
} else {
return;
}
}
}
public long getMinimumQueueTime() {
return this.minimumQueueTime.get();
}
public void setMaximumQueueTime(long time) {
while (true) {
long currentMaximumQueueTime = this.maximumQueueTime.get();
if (time > currentMaximumQueueTime) {
if (this.maximumQueueTime.compareAndSet(currentMaximumQueueTime, time)) {
return;
}
} else {
return;
}
}
}
public long getMaximumQueueTime() {
return this.maximumQueueTime.get();
}
public void setMinimumTransmitTime(long time) {
while (true) {
long currentMinimumTransmitTime = this.minimumTransmitTime.get();
if (time < currentMinimumTransmitTime) {
if (this.minimumTransmitTime.compareAndSet(currentMinimumTransmitTime, time)) {
return;
}
} else {
return;
}
}
}
public long getMinimumTransmitTime() {
return this.minimumTransmitTime.get();
}
public void setMaximumTransmitTime(long time) {
while (true) {
long currentMaximumTransmitTime = this.maximumTransmitTime.get();
if (time > currentMaximumTransmitTime) {
if (this.maximumTransmitTime.compareAndSet(currentMaximumTransmitTime, time)) {
return;
}
} else {
return;
}
}
}
public long getMaximumTransmitTime() {
return this.maximumTransmitTime.get();
}
public String toString() {
return new StringBuilder()
.append(getClass().getSimpleName())
.append("[")
.append("receivedEvents=")
.append(getReceivedEvents())
.append("; transmittedEvents=")
.append(getTransmittedEvents())
.append("; acknowledgedEvents=")
.append(getAcknowledgedEvents())
.append("; totalQueueTime=")
.append(getTotalQueueTime())
.append("; minimumQueueTime=")
.append(getMinimumQueueTime())
.append("; maximumQueueTime=")
.append(getMaximumQueueTime())
.append("; queueTimePerEvent=")
.append(getQueueTimePerEvent())
.append("; totalTransmitTime=")
.append(getTotalTransmitTime())
.append("; minimumTransmitTime=")
.append(getMinimumTransmitTime())
.append("; maximumTransmitTime=")
.append(getMaximumTransmitTime())
.append("; transmitTimePerEvent=")
.append(getTransmitTimePerEvent())
.append("]")
.toString();
}
}
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
public class TimingGatewayEventFilter implements GatewayEventFilter, Declarable {
private GatewaySenderQueueStatistics queueStatistics;
private Map<Long,Long> transmitStartTimes;
public TimingGatewayEventFilter() {
this.queueStatistics = new GatewaySenderQueueStatistics();
this.transmitStartTimes = new ConcurrentHashMap<>();
launchDumpQueueStatisticsThread();
}
public boolean beforeEnqueue(GatewayQueueEvent event) {
// Increment received events
this.queueStatistics.incrementReceivedEvents();
return true;
}
public boolean beforeTransmit(GatewayQueueEvent event) {
// This method can be called multiple times for the same batch if the remote site is
// not connected.
GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) event;
if (this.transmitStartTimes.containsKey(gsei.getShadowKey())) {
// This case means the batch is being re-attempted.
// @TODO Decrement the previous time from the stats and add the new time.
//System.out.println(Thread.currentThread().getName() + ": Reattempting transmission event=" + event.getKey());
} else {
// Increment transmitted events
this.queueStatistics.incrementTransmittedEvents();
// Calculate and save queue time for this event
long currentTime = System.currentTimeMillis();
long queueTime = currentTime - gsei.getCreationTime();
this.queueStatistics.addQueueTime(queueTime);
// Set the transmit start time for this event
this.transmitStartTimes.put(gsei.getShadowKey(), currentTime);
// Log the current event
//logTime(gsei, "queueTime", currentTime, gsei.getCreationTime(), queueTime);
}
return true;
}
public void afterAcknowledgement(GatewayQueueEvent event) {
// Get transmit start time for this event
GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) event;
Long transmitStartTime = this.transmitStartTimes.remove(gsei.getShadowKey());
// If the event was not transmitted by this member, ignore it.
// Only handle primary events.
if (transmitStartTime != null) {
// Increment acknowledged events
this.queueStatistics.incrementAcknowledgedEvents();
// Calculate and save transmit time for this event
long currentTime = System.currentTimeMillis();
long transmitTime = currentTime - transmitStartTime;
this.queueStatistics.addTransmitTime(transmitTime);
// Log the current event
//logTime(gsei, "transmitTime", currentTime, transmitStartTime, transmitTime);
}
}
private GatewaySenderQueueStatistics getQueueStatistics() {
return this.queueStatistics;
}
private void logTime(GatewaySenderEventImpl event, String activity, long currentTime, long startTime, long time) {
StringBuilder builder = new StringBuilder();
builder
.append(getClass().getSimpleName())
.append(" ")
.append(activity)
.append(" for event ")
.append("shadowKey=")
.append(event.getShadowKey())
.append("; key=")
.append(event.getKey())
.append("; currentTime=")
.append(currentTime)
.append("; startTime=")
.append(startTime)
.append("; time=")
.append(time);
System.out.println(builder.toString());
}
private void launchDumpQueueStatisticsThread() {
final Cache cache = CacheFactory.getAnyInstance();
Thread thread = new Thread(
new Runnable() {
public void run() {
long previousReceivedEvents=0, previousTransmittedEvents=0, previousAcknowledgedEvents=0;
GatewaySenderQueueStatistics queueStatistics = getQueueStatistics();
while (true) {
long currentReceivedEvents = queueStatistics.getReceivedEvents();
long currentTransmittedEvents = queueStatistics.getTransmittedEvents();
long currentAcknowledgedEvents = queueStatistics.getAcknowledgedEvents();
if (currentReceivedEvents != previousReceivedEvents
|| currentTransmittedEvents != previousTransmittedEvents
|| currentAcknowledgedEvents != previousAcknowledgedEvents) {
cache.getLogger().info(queueStatistics.toString());
//System.out.println(queueStatistics);
previousReceivedEvents = currentReceivedEvents;
previousTransmittedEvents = currentTransmittedEvents;
previousAcknowledgedEvents = currentAcknowledgedEvents;
}
try {Thread.sleep(2000);} catch (InterruptedException e) {}
}
}
});
thread.setDaemon(true);
thread.start();
}
public void close() {
}
public void init(Properties properties) {
}
}