admin管理员组文章数量:1794759
Kafka
1、将日志数据收集并发送到Kafka topic中(循环周期CEL\Time)
//创建producer对象Properties props = new Properties();props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");//属性设置props.put("serializer.class", "kafka.serializer.StringEncoder"); //属性设置props.put("request.required.acks", "1");//属性设置ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);//创建reader对象 reader 对象中自己封装了1、读取最后一行 2、将数据存入一维数组 3、日期转换 等方法Reader reader = new Reader();double[] rtData;String rtmsg;while (true) {long time = System.currentTimeMillis();for (String tagName : tags) {rtData = reader.getrtdata(tagName, DataPath);rtmsg = tagName + " " + rtData[0] + " " + rtData[1] + " " + rtData[2] + " " + rtData[3] + " " + rtData[4] + " " + rtData[5];String ip = "";try {ip = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {// TODO Auto-generated catch blocke.printStackTrace();}KeyedMessage<String, String> data = new KeyedMessage<String, String>("test2", ip, rtmsg);//数据格式producer.send(data);发送到topic}long _long = System.currentTimeMillis()-time;System.out.println(_long);Utils.sleep(CEL\Time-_long);}
本文标签: kafka
版权声明:本文标题:Kafka 内容由林淑君副主任自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.xiehuijuan.com/baike/1692515711a141616.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论