Kafka is open source software, used by Smarts 10.x Server Manager and VeloCloud SDWAN to pass data between components. This article will provide following details:
vEdge devices are removed from Smarts Server Manager Topology.
Discovery and monitoring issues of VeloCloud Orchestrator VCO in Smarts Server manager.
10.X
Kafka is generally download and extracted from tgz file to a directory on Linux machine.
In this example the Kafka install directory(KAFKA_HOME) is : /opt/kafka
If the KAFKA_HOME location is not known, follow the following instructions:
The Unix ‘locate’ command can be used to find the server.properties file. This indicates the install location of kafka, and the server.properties file has the ports used by kafka and zookeeper.
Locate server.properties
/opt/kafka/config/server.properties
How to check the status of Kafka and Zookeeper?
View the file server.properties located at KAFKA_HOME/config
less /opt/kafka/config/server.properties
listeners=SASL_PLAINTEXT://10.10.77.21:9092
zookeeper.connect=10.10.77.21:2181
From the file, ports in use by kafka and zookeeper services can be identified. There should be java processes listening at these ports.
a) Check the kafka port listed in the server.properties file with the netstat command, port 9092.
sudo netstat -anp | grep 9092
The below output shows that there is a java process listening indicating good results.
tcp6 0 0 :::9092 :::* LISTEN 52922/java
b) Check to see if zookeeper process is listening to port 2181
sudo netstat -anp | grep 2181
The below output shows that there is a java process listening indicating good results.
tcp6 0 0 10.10.77.21:2181 :::* LISTEN 52926/java
Best practice on Linux OS is to systemctl to create services for Kafka and Zookeeper, and use these to start and stop the processes.
Refer KB article: VMware Smart Assurance Smarts 10.x: How to set up services for Kafka and Zookeeper components using systemctl (Linux) so they will restart after system restart?
Use ‘systemctl status kafka’ to check the status. In the example below, Kafka failed to restart.
[localhost config]$ sudo systemctl status kafka
? kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: disabled)
Active: failed (Result: exit-code) since Thu 2019-12-19 17:33:48 GMT; 8s ago
Process: 17648 ExecStop=/opt/kafka/bin/kafka-server-stop.sh (code=exited, status=1/FAILURE)
Process: 17336 ExecStart=/bin/sh -c /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1 (code=exited, status=1/FAILURE)
Main PID: 17336 (code=exited, status=1/FAILURE)
Systemctl messages output to /var/log/messages
/var/log/messages
Dec 19 17:33:43 rpc11845 systemd: Stopping kafka.service...
Dec 19 17:33:43 rpc11845 zookeeper-server-start.sh: [2019-12-19 17:33:43,586] INFO Processed session termination for sessionid: 0x100b5e538cf0003 (org.apache.zookeeper.server.PrepRequestProcessor)
Dec 19 17:33:43 rpc11845 zookeeper-server-start.sh: [2019-12-19 17:33:43,589] INFO Closed socket connection for client /10.10.77.21:49764 which had sessionid 0x100b5e538cf0003 (org.apache.zookeeper.server.NIOServerCnxn)
Dec 19 17:33:44 rpc11845 zookeeper-server-start.sh: [2019-12-19 17:33:44,179] INFO Processed session termination for sessionid: 0x100b5e538cf0002 (org.apache.zookeeper.server.PrepRequestProcessor)
Dec 19 17:33:44 rpc11845 zookeeper-server-start.sh: [2019-12-19 17:33:44,181] INFO Closed socket connection for client /10.10.77.21:49662 which had sessionid 0x100b5e538cf0002 (org.apache.zookeeper.server.NIOServerCnxn)
Dec 19 17:33:46 rpc11845 systemd: Started kafka.service.
Dec 19 17:33:46 rpc11845 systemd: Starting kafka.service...
Dec 19 17:33:48 rpc11845 systemd: kafka.service: main process exited, code=exited, status=1/FAILURE
Dec 19 17:33:48 rpc11845 kafka-server-stop.sh: No kafka server to stop
Dec 19 17:33:48 rpc11845 systemd: kafka.service: control process exited, code=exited status=1
Dec 19 17:33:48 rpc11845 systemd: Unit kafka.service entered failed state.
Dec 19 17:33:48 rpc11845 systemd: kafka.service failed.
Commands to list, delete and create topics in Kafka:
List Topics: (all on one line, there is no carriage return after the OPTS=-)
export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
Example results:
[localhost ~]$ ./kafka-list-topics.sh
__consumer_offsets
discoveryTopic
monitoringTopic
Delete topics (All in one line):
export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf
/opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic discoveryTopic
/opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic monitoringTopic
Create topics (All in one line):
export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic monitoringTopic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic discoveryTopic
How to view messages of a topic in Kafka?
Command:
export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf
cd /opt/kafka/bin/
./kafka-console-consumer.sh --topic discoveryTopic -from-beginning --consumer.config=consumer.properties --bootstrap-server=<server_IP/name>:9092
./kafka-view-discoveryTopic.sh | head
Output:
[2019-12-19 17:39:25,464] WARN The configuration 'zookeeper.connect' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2019-12-19 17:39:25,465] WARN The configuration 'zookeeper.connection.timeout.ms' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
{"groupName":"group","discoveryID":null,"jobID":"9999","type":"START","timestamp":1576758431,"value":0.0,"action":"r","properties":{"context":"#.#.#.#","source":"velocloud-sdwane########-####-####-####-############","type":"START","collector":"VCODiscover"},"metrics":{},"relations":[],"initialized":true,"forceRefresh":true,"name":"velocloud-sdwan########-####-####-####-############START"}
{"groupName":"group","discoveryID":null,"jobID":"9999","type":"Tenant","timestamp":1576758433,"value":0.0,"action":"r","properties":{"TenantId":"29","VelocloudOrchestrator":"#.#.#.#","TenantName":"FlexWare_SD-WAN-OTT_PVT","source":"velocloud-sdwan########-####-####-####-############","type":"Tenant","EnterpriseLogicalId":"########-####-####-####-############"},"metrics":{"VCO-Req":{"properties":{"unit":"code","name":"VCO-Req"},"value":8080.0}},"relations":[{"type":"Orchestrator","element":"VCO-54.241.216.200","relationName":"SupervisedBy"}],"initialized":true,"forceRefresh":true,"name":"FlexWare_SD-WAN-OTT_PVT"}
How to Check the VCO Collector instances within ESM:
dmctl -s ESM-10 invoke ICF_PersistentDataSet::VCOTopologyCollectorInstanceIds get
Server ESM-10 User: admin
admin's Password: #####
{
{
DISCOVERY:ORCHESTRATOR-<IP>
velocloud-sdwan########-####-####-####-############
}
{
MONITORING:ORCHESTRATOR-<IP>
velocloud-sdwan########-####-####-####-############
}
}
Example of kafka error in collector log file (/opt/DCF/Collecting/Collector-Manager/velocloud-sdwan########-####-####-####-############/logs/collecting-0-0.log)
SEVERE -- [2019-12-19 17:35:50 GMT] -- StreamCollector::newRawValue(): Error while pushing value VEdge-Dell_(Migrated_to_xxxx)-Dell_-_xxxxxx
com.watch4net.apg.v2.collector.PipeError: One of the previous raw values could not be sent !
at com.watch4net.kafka.connector.KafkaConnector.pushData(KafkaConnector.java:240)