Flume集成logback将日志写入HDFS
Flume 配置文件(Kafka代替Flume Channel)
- flume-test-conf.properties
# 组件命名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=44444 #指定hdfs sink a1.sinks.k1.type = hdfs #hdfs目录,带有时间信息 a1.sinks.k1.hdfs.path = /flume/tailout/%Y-%m-%d/%H/ #生成的hdfs文件名的前缀 a1.sinks.k1.hdfs.filePrefix = events- #指定滚动时间,默认是30秒,设置为0表示禁用该策略 a1.sinks.k1.hdfs.rollInterval = 3600 #指定滚动大小,设置为0表示禁用该策略 a1.sinks.k1.hdfs.rollSize = #指定滚动条数 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.useLocalTimeStamp = true #副本策略 a1.sinks.k1.hdfs.minBlockReplicas=1 #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # 指定Kafka替换Channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = kafka:9092 a1.channels.c1.kafka.topic = flume_channel_test a1.channels.c1.kafka.consumer.group.id = flume-consumer-against_cheating_01 a1.channels.c1.kafka.consumer.timeout.ms = 70000 a1.channels.c1.kafka.consumer.request.timeout.ms = 80000 a1.channels.c1.kafka.consumer.fetch.max.wait.ms=7000 a1.channels.c1.kafka.consumer.offset.flush.interval.ms = 50000 a1.channels.c1.kafka.consumer.session.timeout.ms = 70000 a1.channels.c1.kafka.consumer.heartbeat.interval.ms = 60000 a1.channels.c1.kafka.consumer.enable.auto.commit = false a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 组件绑定 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Spring Boot + logback集成Flume
Flume作为一个采集工具,它除了可以自行监控日志文件,也可以配合logback实现日志直接打印到Flume中。
- Maven新增Flume依赖
com.teambytes.logback
logback-flume-appender_2.10
0.0.9
- logback配置
springboot项目中resources文件夹下新增logback-spring.xml配置文件,内容如下
<configuration> <appender name="consoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyy MMM dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}:%L- %msg%n
pattern>
encoder>
appender>
<appender name="fileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>log/logFile.log
file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>log/logFile.%d{yyyy-MM-dd}.%i.log
fileNamePattern>
<MaxHistory> 7
MaxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>5MB
maxFileSize>
timeBasedFileNamingAndTriggeringPolicy>
rollingPolicy> <encoder> <pattern>%d{yyy MMM dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}:%L- %msg%n
pattern>
encoder>
appender>
<appender name="flumeTest" class="com.teambytes.logback.flume.FlumeLogstashV1Appender">
<flumeAgents> localhost:44444
flumeAgents> <flumeProperties> connect-timeout=4000; request-timeout=8000
flumeProperties> <batchSize>100
batchSize> <reportingWindow>1000
reportingWindow> <additionalAvroHeaders> myHeader = myValue
additionalAvroHeaders> <application>JustryDeng's Application
application> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %message%n%ex
pattern>
layout>
appender>
<logger name="com" level="info"> <appender-ref ref="flumeTest"/>
logger> <root level="info"> <appender-ref ref="consoleAppender"/>
root>
configuration>
- 测试代码
@SpringBootApplication @Controller @EnableScheduling public class AppRun {
private static final Logger LOGGER = LoggerFactory.getLogger(AppRun.class); public static void main(String[] args) {
SpringApplication.run(AppRun.class, args); } @Scheduled(cron = "0/5 * * * * ?") public void printLog() {
LOGGER.info("我是一条日志"); } }
测试
- 先启动Flume
# 切换至Flume Home文件夹下执行 bin/flume-ng agent --conf-file conf/flume-test-conf.properties -name a1 -Dflume.root.logger=INFO,console
- 启动SparingBoot项目

- 查看HDFS对应文件

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/229677.html原文链接:https://javaforall.net
