(一个)kafka-jstorm集群实时日志分析 它 ———kafka实时日志处理

(一个)kafka-jstorm集群实时日志分析 它 ———kafka实时日志处理

大家好,又见面了,我是全栈君,今天给大家准备了Idea注册码。

package com.doctor.logbackextend;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.commons.lang.RandomStringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * zookeeper 和kafka环境准备好。

本地端口号默认设置 * * @author doctor * * @time 2014年10月24日 下午3:14:01 */public class KafkaAppenderTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class); /** 先启动此測试方法,模拟log日志输出到kafka */ @Test public void test_log_producer() { while(true){ LOG.info("test_log_producer : " + RandomStringUtils.random(3, "hello doctro,how are you,and you")); } } /** 再启动此測试方法。模拟消费者获取日志,进而分析,此方法不过打印打控制台,不是log。防止模拟log測试方法数据混淆 */ @Test public void test_comsumer(){ Properties props = new Properties(); props.put("zookeeper.connect", "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"); props.put("group.id", "kafkatest-group");// props.put("zookeeper.session.timeout.ms", "400");// props.put("zookeeper.sync.time.ms", "200");// props.put("auto.commit.interval.ms", "1000"); ConsumerConfig paramConsumerConfig = new ConsumerConfig(props ); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(paramConsumerConfig ); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put("kafka-test", new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStream = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerStream.get("kafka-test"); for (KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) System.out.println(new String("test_comsumer: " + new String(it.next().message()))); } }}

      为了实时日志处理互联网系统的日志,对于电商来说具有非常重要的意义,比方,淘宝购物时候,你浏览某些商品的时候。系统后台实时日志处理分析后,系统能够向用户实时推荐给用户相关商品。来引导用户的选择等等。

        为了实时日志处理。我们选择kafka集群,日志的处理分析选择jstorm集群,至于jstorm处理的结果,你能够选择保存到数据库里。入hbase、mysql。maridb等。

系统的日志接口选择了slf4j,logback组合,为了让系统的日志可以写入kafka集群,选择扩展logback Appender。在logback里配置一下。就行自己主动输出日志到kafka集群。

kafka的集群安装,在此不介绍了,为了模拟真实性,zookeeper本地集群也安装部署了。


以下是怎样扩展logback Appender

package com.doctor.logbackextend;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;

public class KafkaAppender extends AppenderBase<ILoggingEvent> {

	private String topic;
	private String zookeeperHost;
	

	private String broker;
	private Producer<String, String> producer;
	private Formatter formatter;
	
	public String getBroker() {
		return broker;
	}

	public void setBroker(String broker) {
		this.broker = broker;
	}
	@Override
	protected void append(ILoggingEvent eventObject) {
		String message = this.formatter.formate(eventObject);
		this.producer.send(new KeyedMessage<String, String>(this.topic, message));

	}

	@Override
	public void start() {
		if (this.formatter == null) {
			this.formatter = new MessageFormatter();
		}
		
		super.start();
		Properties props = new Properties();
		props.put("zk.connect", this.zookeeperHost);
		props.put("metadata.broker.list", this.broker);
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		
		ProducerConfig config = new ProducerConfig(props);
		this.producer = new Producer<String, String>(config);
	}

	@Override
	public void stop() {
		super.stop();
		this.producer.close();
	}

	
	
	public String getTopic() {
		return topic;
	}

	public void setTopic(String topic) {
		this.topic = topic;
	}

	public String getZookeeperHost() {
		return zookeeperHost;
	}

	public void setZookeeperHost(String zookeeperHost) {
		this.zookeeperHost = zookeeperHost;
	}

	public Producer<String, String> getProducer() {
		return producer;
	}

	public void setProducer(Producer<String, String> producer) {
		this.producer = producer;
	}


	public Formatter getFormatter() {
		return formatter;
	}

	public void setFormatter(Formatter formatter) {
		this.formatter = formatter;
	}
	
	
	
	/**
	 * 格式化日志格式
	 * @author doctor
	 *
	 * @time   2014年10月24日 上午10:37:17
	 */
	interface Formatter{
		String formate(ILoggingEvent event);
	}
	
	public static class MessageFormatter implements Formatter{

		@Override
		public String formate(ILoggingEvent event) {
			
			return event.getFormattedMessage();
		}
		
	}
}


对于日志的输出格式
MessageFormatter没有特殊处理,由于仅仅是模拟一下,你能够制定你的格式,入json等。

在logback.xml的配置例如以下:

<appender name="kafka" class="com.doctor.logbackextend.KafkaAppender">
 		<topic>kafka-test</topic>
 		<!-- <zookeeperHost>127.0.0.1:2181</zookeeperHost> -->
 		<!-- <broker>127.0.0.1:9092</broker> -->
 		<zookeeperHost>127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183</zookeeperHost>
 		<broker>127.0.0.1:9092,127.0.0.1:9093</broker>
 	</appender>
 	
 	
	<root level="all">
		<appender-ref ref="stdout" />
		<appender-ref ref="defaultAppender" />
		<appender-ref ref="kafka" />
	</root>

  <zookeeperHost>

    我本地启动了三个zookeer。依据配置。你能够知道是怎样配置的吧。

   kafka集群的broker我配置了两个,都是在本地机器。


測试代码:

package com.doctor.logbackextend;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.commons.lang.RandomStringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * zookeeper 和kafka环境准备好。本地端口号默认设置
 * 
 * @author doctor
 *
 * @time   2014年10月24日 下午3:14:01
 */
public class KafkaAppenderTest {
	private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class);
	

	/** 先启动此測试方法,模拟log日志输出到kafka */
	@Test
	public void test_log_producer() {
		while(true){
			LOG.info("test_log_producer : "  + RandomStringUtils.random(3, "hello doctro,how are you,and you"));
		}
	}
	
	
	/** 再启动此測试方法,模拟消费者获取日志,进而分析,此方法不过打印打控制台,不是log。防止模拟log測试方法数据混淆 */
	@Test
	public void test_comsumer(){
		Properties props = new Properties();
		props.put("zookeeper.connect", "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
		props.put("group.id", "kafkatest-group");
//		props.put("zookeeper.session.timeout.ms", "400");
//		props.put("zookeeper.sync.time.ms", "200");
//		props.put("auto.commit.interval.ms", "1000");
		ConsumerConfig paramConsumerConfig = new ConsumerConfig(props );
		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(paramConsumerConfig );
		
		Map<String, Integer> topicCountMap = new HashMap<>();
		topicCountMap.put("kafka-test", new Integer(1));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerStream = consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerStream.get("kafka-test");
		
		for (KafkaStream<byte[], byte[]> stream : streams) {
			ConsumerIterator<byte[], byte[]> it = stream.iterator();
			while(it.hasNext())
			System.out.println(new String("test_comsumer: " + new String(it.next().message())));
		}
		
		
	}

}


结果,明天再附上截图。

版权声明:本文博客原创文章,博客,未经同意,不得转载。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/117693.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 参数化查询原理

    参数化查询原理机房重构敲组合查询时 会遇到多个操作符 因为之前在使用参数化查询时只要遇到给数据库赋值时就使用参数 光知道这样能防止 SQL 注入 直到如今才知道它为什么能防止 SQL 注入 索性就把操作符也用成参数 但这时就报 语法错误 了 可是解决了很长时间 老以为是 sql 语句写错了 自我认为是那种丢掉一个空格或引号之类的错误 其实是没真正理解之前听到到 防 SQL 注入 的原理 或是说为什么能 防

    2025年11月27日
    5
  • 六大排序算法:插入排序、希尔排序、选择排序、冒泡排序、堆排序、快速排序「建议收藏」

    六大排序算法:插入排序、希尔排序、选择排序、冒泡排序、堆排序、快速排序「建议收藏」文章目录:1.插入排序2.希尔排序1.插入排序步骤:1.从第一个元素开始,该元素可以认为已经被排序2.取下一个元素tem,从已排序的元素序列从后往前扫描3.如果该元素大于tem,则将该元素移到下一位4.重复步骤3,直到找到已排序元素中小于等于tem的元素5.tem插入到该元素的后面,如果已排序所有元素都大于tem,则将tem插入到下标为0的位置6.重复步骤2~5动图演示如下:思路:  在待排序的元素中,假设前n-1个元素已有序,现将第n个元素插入到前面已经排好的序列中,使得前n个

    2022年7月12日
    20
  • Vue.js 父组件向子组件传值和子组件向父组件传值

    Vue.js 父组件向子组件传值和子组件向父组件传值父组件向子组件传值组件实例定义方式,注意:一定要使用props属性来定义父组件传递过来的数据&amp;lt;script&amp;gt;//创建Vue实例,得到ViewModelvarvm=newVue({el:’#app’,data:{msg:’这是父组件中的消息’},components…

    2022年5月3日
    36
  • oidc auth2.0_使用Spring Security 5.0和OIDC轻松构建身份验证「建议收藏」

    oidc auth2.0_使用Spring Security 5.0和OIDC轻松构建身份验证「建议收藏」oidcauth2.0“我喜欢编写身份验证和授权代码。”〜从来没有Java开发人员。厌倦了一次又一次地建立相同的登录屏幕?尝试使用OktaAPI进行托管身份验证,授权和多因素身份验证。SpringSecurity不仅是一个功能强大且可高度自定义的身份验证和访问控制框架,它还是保护基于Spring的应用程序的实际标准。从前,SpringSecurity需要使用大量的XML来…

    2022年8月31日
    6
  • Java过滤器与SpringMVC拦截器之间的关系与区别[通俗易懂]

    今天学习和认识了一下,过滤器和SpringMVC的拦截器的区别,学到了不少的东西,以前一直以为拦截器就是过滤器实现的,现在想想还真是一种错误啊,而且看的比较粗浅,没有一个全局而又细致的认识,由于已至深夜,时间原因,我就把一些网友的观点重点摘录下来,大家仔细看后也一定会有一个比较新的认识(在此非常感谢那些大牛们的无私奉献,分享他们的经验与心得,才能让像我这样的小白有机会站一下你们这些巨人的肩膀,才能

    2022年4月12日
    35
  • 根据中奖概率抽奖算法

    根据中奖概率抽奖算法

    2021年6月16日
    111

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号