本篇介绍用python写脚本,抓取系统metrics,然后调用kafka client library把metrics吐给kafka的案例分享。对于用kafka的同学实用性很高。
在运行本实例前需要先下载两个python库到本地 : six和kafka-python
cat config_system_metrics.json
{
"env": {
"site": "cluster",
"component": "namenode",
"metric_prefix": "system"
},
"output": {
"kafka": {
"topic": "system_metrics_cluster",
"brokerList": ["10.10.10.1:9092", "10.10.10.2:9092", "10.10.10.3:9092"]
}
}
}
cat system_metrics.python
#!/usr/bin/env python
import sys
import os
import json
import socket
import re
import time
import logging
import threading
# load kafka-python
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)),'','lib/six'))
import six
# load kafka-python
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)),'','lib/kafka-python'))
from kafka import KafkaClient, SimpleProducer, SimpleConsumer
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',
datefmt='%m-%d %H:%M')
CONFIG_FILE = 'config_system_metrics.json'
class LoadConfig(object):
def __init__(self):
config_file = "./" + CONFIG_FILE
try:
f = open(config_file, 'r')
except Exception, e:
print "Load config file %s Error !" % config_file
sys.exit(1)
try:
config_json = json.loads(f.read())
except Exception, e:
print "Convert config file to Json format Error !"
sys.exit(1)
if f:
f.close()
self.config = config_json
class Kafka(LoadConfig):
def __init__(self):
LoadConfig.__init__(self)
self.broker = self.config["output"]["kafka"]["brokerList"]
def kafka_connect(self):
#print "Connecting to kafka "+str(self.broker)
# To send messages synchronously
kc = KafkaClient(self.broker,timeout = 30)
producer = SimpleProducer(kc,async=False,batch_send=True)
return kc, producer
def kafka_produce(self, producer, topic, kafka_json):
# ************ Sample of kafka_json ********************
# {'timestamp': 1463710, 'host': 'xxx', 'metric': 'system.nic.receivedbytes', 'value': '4739', 'component': 'namenode', 'site': 'apolloqa'}
# ******************************************************
# Note that the application is responsible for encoding messages to type str
producer.send_messages(topic, kafka_json)
class Metric(LoadConfig):
def __init__(self):
LoadConfig.__init__(self)
try :
self.fqdn = socket.getfqdn()
except Exception , e:
print "Could not get hostname ! %s " %e
sys.exit(1)
self.data = []
self.datapoint = {}
self.datapoint["timestamp"] = int(round(time.time() * 1000))
self.datapoint["host"] = self.fqdn
self.datapoint["component"] = self.config['env']['component']
self.datapoint["site"] = self.config['env']['site']
class Metric_Uptime(Metric):
def __init__(self):
Metric.__init__(self)
self.demensions = ["uptime.day", "idletime.day"]
self.result = os.popen('cat /proc/uptime').readlines()
self.data = []
def metric_collect(self):
for line in self.result:
values = re.split("\s+", line.rstrip())
for i in range(len(self.demensions)):
self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + 'uptime' + '.' + self.demensions[i]
self.datapoint["value"] = str(round(float(values[i]) / 86400 , 2))
self.data.append(self.datapoint.copy())
return self.data
class Metric_Loadavg(Metric):
def __init__(self):
Metric.__init__(self)
self.demensions = ['cpu.loadavg.1min', 'cpu.loadavg.5min', 'cpu.loadavg.15min']
self.result = os.popen('cat /proc/loadavg').readlines()
self.data = []
def metric_collect(self):
for line in self.result:
values = re.split("\s+", line.strip())
for i in range(len(self.demensions)):
self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + 'loadavg' + '.' + self.demensions[i]
self.datapoint["value"] = values[i]
self.data.append(self.datapoint.copy())
return self.data
class Metric_Memory(Metric):
def __init__(self):
Metric.__init__(self)
self.result = os.popen('cat /proc/meminfo').readlines()
self.data = []
def metric_collect(self):
for line in self.result:
demensions = re.split(":?\s+", line.rstrip())
self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + 'memory' + '.' + demensions[0] + '.kB'
self.datapoint["value"] = demensions[1]
self.data.append(self.datapoint.copy())
return self.data
class Metric_CpuTemp(Metric):
def __init__(self):
Metric.__init__(self)
self.result = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines()
self.data = []
def metric_collect(self):
for line in self.result:
demensions = re.split("\|", line.strip())
self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + re.split(" ", demensions[0])[0] + '.Temp'
self.datapoint["value"] = re.split(" ", demensions[1])[1]
self.data.append(self.datapoint.copy())
print self.data
return self.data
class Metric_Net(Metric):
def __init__(self):
Metric.__init__(self)
self.demensions = ['receivedbytes', 'receivedpackets', 'receivederrs', 'receiveddrop', 'transmitbytes', 'transmitpackets',
'transmiterrs', 'transmitdrop']
self.result = os.popen("cat /proc/net/dev").readlines()
self.data = []
def metric_collect(self):
for line in self.result:
if re.match('^(Inter|\s+face|\s+lo)', line) :
continue
interface = re.split(':?\s+', line)[1]
values = re.split(':?\s+', line)[2:6] + re.split(':?\s+', line)[9:13]
for i in range(len(self.demensions)):
self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + interface + "." + self.demensions[i]
self.datapoint["value"] = values[i]
self.data.append(self.datapoint.copy())
print self.data
return self.data
class Collect(LoadConfig):
def __init__(self):
LoadConfig.__init__(self)
self.topic = self.config['output']['kafka']['topic']
def connect(self):
self.kafkaclient = Kafka()
self.kc, self.producer = self.kafkaclient.kafka_connect()
return self.kc, self.producer
def send(self, kafka_producer, msg):
self.kafkaclient.kafka_produce(kafka_producer, self.topic, json.dumps(msg))
def close(self, kafka_producer, kafka_client):
if kafka_producer is not None:
kafka_producer.stop()
if kafka_client is not None:
kafka_client.close()
def run(self, kafka_client, kafka_producer, metric):
data = metric.metric_collect()
#print data
self.send(kafka_producer, data)
self.close(kafka_producer, kafka_client)
collector = Collect()
metrics = [Metric_Uptime(), Metric_Loadavg(), Metric_Memory(), Metric_CpuTemp(), Metric_Net()]
# Establish Tcp connection once forever, share the same Tcp conncetion
kafka_client, kafka_producer = collector.connect()
for metric in metrics:
t = threading.Thread(target = collector.run , args = (kafka_client, kafka_producer, metric) )
t.start()