admin管理员组

文章数量:1794759

Kafka

Kafka

1、将日志数据收集并发送到Kafka topic中(循环周期CEL\Time)

//创建producer对象Properties props = new Properties();props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");//属性设置props.put("serializer.class", "kafka.serializer.StringEncoder"); //属性设置props.put("request.required.acks", "1");//属性设置ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);//创建reader对象  reader 对象中自己封装了1、读取最后一行 2、将数据存入一维数组 3、日期转换 等方法Reader reader = new Reader();double[] rtData;String rtmsg;while (true) {long time = System.currentTimeMillis();for (String tagName : tags) {rtData = reader.getrtdata(tagName, DataPath);rtmsg = tagName + " " + rtData[0] + " " + rtData[1] + " " + rtData[2] + " " + rtData[3] + " " + rtData[4] + " " + rtData[5];String ip = "";try {ip = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {// TODO Auto-generated catch blocke.printStackTrace();}KeyedMessage<String, String> data = new KeyedMessage<String, String>("test2", ip, rtmsg);//数据格式producer.send(data);发送到topic}long _long = System.currentTimeMillis()-time;System.out.println(_long);Utils.sleep(CEL\Time-_long);}

本文标签: kafka