本篇介绍用python写脚本,抓取系统metrics,然后调用kafka client library把metrics吐给kafka的案例分享。对于用kafka的同学实用性很高。

在运行本实例前需要先下载两个python库到本地 : six和kafka-python

cat config_system_metrics.json 

{

   "env": {

    "site": "cluster",

    "component": "namenode",

    "metric_prefix": "system"

   },

   "output": {

     "kafka": {

       "topic": "system_metrics_cluster",

       "brokerList": ["10.10.10.1:9092", "10.10.10.2:9092", "10.10.10.3:9092"]

     }

   }

}

cat system_metrics.python

#!/usr/bin/env python

import sys

import os

import json

import socket

import re

import time

import logging

import threading

# load kafka-python

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)),'','lib/six'))

import six

# load kafka-python

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)),'','lib/kafka-python'))

from kafka import KafkaClient, SimpleProducer, SimpleConsumer

logging.basicConfig(level=logging.INFO,

                    format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',

                    datefmt='%m-%d %H:%M')

CONFIG_FILE = 'config_system_metrics.json'

class LoadConfig(object):

        def __init__(self):

                config_file = "./" + CONFIG_FILE

                try:

                        f = open(config_file, 'r')

                except Exception, e:

                        print "Load config file %s Error !" % config_file

                        sys.exit(1)

                try:

                        config_json = json.loads(f.read())

                except Exception, e:

                        print "Convert config file to Json format Error !"

                        sys.exit(1)

                if f:

                        f.close()

                self.config = config_json

class Kafka(LoadConfig):

        def __init__(self):

                LoadConfig.__init__(self)

                self.broker = self.config["output"]["kafka"]["brokerList"]

        def kafka_connect(self):

                #print "Connecting to kafka "+str(self.broker)

                # To send messages synchronously

                kc = KafkaClient(self.broker,timeout = 30)

                producer = SimpleProducer(kc,async=False,batch_send=True)

                return kc, producer

        def kafka_produce(self, producer, topic, kafka_json):

                # ************  Sample of kafka_json ********************

                # {'timestamp': 1463710, 'host': 'xxx', 'metric': 'system.nic.receivedbytes', 'value': '4739', 'component': 'namenode', 'site': 'apolloqa'}

                # ******************************************************

                # Note that the application is responsible for encoding messages to type str

                producer.send_messages(topic, kafka_json)

class Metric(LoadConfig):

        def __init__(self):

                LoadConfig.__init__(self)

                try :

                        self.fqdn = socket.getfqdn()

                except Exception , e:

                        print "Could not get hostname ! %s " %e

                        sys.exit(1)

                self.data = []

                self.datapoint = {}

                self.datapoint["timestamp"] = int(round(time.time() * 1000))

                self.datapoint["host"] = self.fqdn

                self.datapoint["component"] = self.config['env']['component']

                self.datapoint["site"] = self.config['env']['site']

class Metric_Uptime(Metric):

        def __init__(self):

                Metric.__init__(self)

                self.demensions = ["uptime.day", "idletime.day"]

                self.result = os.popen('cat /proc/uptime').readlines()

                self.data = []

        def metric_collect(self):

                for line in self.result:

                        values = re.split("\s+", line.rstrip())

                        for i in range(len(self.demensions)):

                                self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + 'uptime' + '.' + self.demensions[i]

                                self.datapoint["value"] = str(round(float(values[i]) / 86400 , 2))

                                self.data.append(self.datapoint.copy())

                return self.data

class Metric_Loadavg(Metric):

        def __init__(self):

                Metric.__init__(self)

                self.demensions = ['cpu.loadavg.1min', 'cpu.loadavg.5min', 'cpu.loadavg.15min']

                self.result = os.popen('cat /proc/loadavg').readlines()

                self.data = []

        def metric_collect(self):

                for line in self.result:

                        values = re.split("\s+", line.strip())

                        for i in range(len(self.demensions)):

                                self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + 'loadavg' + '.' + self.demensions[i] 

                                self.datapoint["value"] = values[i]

                                self.data.append(self.datapoint.copy())

                return self.data

class Metric_Memory(Metric):

        def __init__(self):

                Metric.__init__(self)

                self.result = os.popen('cat /proc/meminfo').readlines()

                self.data = []

        def metric_collect(self):

                for line in self.result:

                        demensions = re.split(":?\s+", line.rstrip())

                        self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + 'memory' + '.' + demensions[0] + '.kB'

                        self.datapoint["value"] = demensions[1]

                        self.data.append(self.datapoint.copy())

                return self.data

class Metric_CpuTemp(Metric):

        def __init__(self):

                Metric.__init__(self)

                self.result = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines()

                self.data = []

        def metric_collect(self):

                for line in self.result:

                        demensions = re.split("\|", line.strip())

                        self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + re.split(" ", demensions[0])[0] + '.Temp' 

                        self.datapoint["value"] = re.split(" ", demensions[1])[1]

                        self.data.append(self.datapoint.copy())

                print self.data

                return self.data

class Metric_Net(Metric):

        def __init__(self):

                Metric.__init__(self)

                self.demensions = ['receivedbytes', 'receivedpackets', 'receivederrs', 'receiveddrop', 'transmitbytes', 'transmitpackets',

                 'transmiterrs', 'transmitdrop']

                self.result = os.popen("cat /proc/net/dev").readlines()

                self.data = []

        def metric_collect(self):

                for line in self.result:

                        if re.match('^(Inter|\s+face|\s+lo)', line) :

                                continue

                        interface = re.split(':?\s+', line)[1]

                        values = re.split(':?\s+', line)[2:6] + re.split(':?\s+', line)[9:13]

                        for i in range(len(self.demensions)):

                                self.datapoint["metric"] = self.config['env']['metric_prefix'] + "." + interface + "." + self.demensions[i]

                                self.datapoint["value"] = values[i]

                                self.data.append(self.datapoint.copy())

                print self.data

                return self.data

class Collect(LoadConfig):

        def __init__(self):

                LoadConfig.__init__(self)

                self.topic = self.config['output']['kafka']['topic']

        def connect(self):

                self.kafkaclient = Kafka()

                self.kc, self.producer = self.kafkaclient.kafka_connect()

                return self.kc, self.producer

        def send(self, kafka_producer, msg):

                self.kafkaclient.kafka_produce(kafka_producer, self.topic, json.dumps(msg))

        def close(self, kafka_producer, kafka_client):

                if kafka_producer is not None:

                        kafka_producer.stop()

                if kafka_client is not None:

                        kafka_client.close()

        def run(self, kafka_client, kafka_producer, metric):

                data = metric.metric_collect()

                #print data

                self.send(kafka_producer, data)

                self.close(kafka_producer, kafka_client)

collector = Collect()

metrics = [Metric_Uptime(), Metric_Loadavg(), Metric_Memory(), Metric_CpuTemp(), Metric_Net()]

# Establish Tcp connection once forever, share the same Tcp conncetion

kafka_client, kafka_producer = collector.connect()

for metric in metrics:

        t = threading.Thread(target = collector.run , args = (kafka_client, kafka_producer, metric) )

        t.start()