问题导读
1、负载均衡,日志方面需要哪些改进?
2、Flume如何实现负载均衡?
主机DNS配置:
- 192.168.177.167 machine-1
- 192.168.177.168 machine-2
- 192.168.177.158 machine-0
- 192.168.177.174 hadoop-master hbase-master
复制代码
hadoop-maser 和machine-2当主机,其它机器当做collector机,存储在HDFS中。
hadoop-master和machine-2机上的flume配置:
- agent.sources=s1
- agent.channels=c1
- agent.sinks=k1 k2
-
- agent.sinkgroups = g1
- agent.sinkgroups.g1.sinks = k1 k2
- agent.sinkgroups.g1.processor.type = load_balance
- agent.sinkgroups.g1.processor.selector = round_robin
- agent.sinkgroups.g1.processor.backoff = true
-
-
- agent.sources.s1.type=avro
- agent.sources.s1.channels=c1
- agent.sources.s1.bind=0.0.0.0
- agent.sources.s1.port=51515
- agent.sources.s1.interceptors=i1
- agent.sources.s1.interceptors.i1.type=timestamp
-
-
- agent.channels.c1.type=jdbc
-
-
- agent.sinks.k1.channel = c1
- agent.sinks.k1.type = avro
- agent.sinks.k1.hostname = machine-0
- agent.sinks.k1.port = 51515
- agent.sinks.k2.channel = c1
- agent.sinks.k2.type = avro
- agent.sinks.k2.hostname = machine-1
- agent.sinks.k2.port = 51515
复制代码
machine-1 和machine-0的flume配置:
- agent.sources=s1
- agent.channels=c1
- agent.sinks=k1
-
-
- agent.sources.s1.type=avro
- agent.sources.s1.channels=c1
- agent.sources.s1.bind=0.0.0.0
- agent.sources.s1.port=51515
-
-
- agent.channels.c1.type=jdbc
-
-
- agent.sinks.k1.type=hdfs
- agent.sinks.k1.channel=c1
- agent.sinks.k1.hdfs.path=/flume/%Y/%m
- agent.sinks.k1.hdfs.filePrefix=flume
- agent.sinks.k1.hdfs.fileSuffix=.log
- agent.sinks.k1.hdfs.rollInterval=3600
- agent.sinks.k1.hdfs.rollCount=0
- agent.sinks.k1.hdfs.rollSize=0
- agent.sinks.k1.hdfs.fileType=DataStream
- agent.sinks.k1.hdfs.writeFormat=Text
- agent.sinks.k1.hdfs.useLocalTimeStamp=false
复制代码
log4j的配置:
- # File Appender rootLog
- log4j.rootLogger=DEBUG,stdout,rootLog
-
-
- #console configure for DEV environment
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
-
-
- log4j.appender.rootLog=org.apache.log4j.RollingFileAppender
- log4j.appender.rootLog.File= rootLog.log
- log4j.appender.rootLog.MaxFileSize=5000KB
- log4j.appender.rootLog.MaxBackupIndex=20
- log4j.appender.rootLog.layout=org.apache.log4j.PatternLayout
- log4j.appender.rootLog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
-
-
- # File Appender boentel
- #log4j.logger.com.boentel=DEBUG,boentel
- #log4j.additivity.com.boentel=true
- #log4j.appender.boentel=org.apache.log4j.RollingFileAppender
- #log4j.appender.boentel.File= boentel.log
- #log4j.appender.boentel.MaxFileSize=2000KB
- #log4j.appender.boentel.MaxBackupIndex=20
- #log4j.appender.boentel.layout=org.apache.log4j.PatternLayout
- #log4j.appender.boentel.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
-
-
- log4j.logger.com.loadbalance= DEBUG,loadbalance
- log4j.additivity.com.loadbalance= true
-
-
- log4j.appender.loadbalance = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
- log4j.appender.loadbalance.Hosts =machine-2:51515 hadoop-master:51515
- #log4j.appender.loadbalance.UnsafeMode = true
- log4j.appender.out2.MaxBackoff = 30000
- #FQDN RANDOM ,default is ROUND_ROBIN
- log4j.appender.loadbalance.Selector = RANDOM
- log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout
- log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
复制代码
测试代码:
- import java.util.Date;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
-
- import org.apache.log4j.Logger;
-
-
-
-
-
-
- public class Worker implements Runnable{
-
-
-
- private static final Logger LOG = Logger.getLogger(Worker.class);
- private String command;
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- new Worker("0").init();
- }
-
-
- public void init(){
- int numWorkers = 1;
- int threadPoolSize = 3 ;
-
-
- ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(threadPoolSize);
-
- //schedule to run after sometime
- System.out.println("Current Time = "+new Date());
- Worker worker = null;
- for(int i=0; i< numWorkers; i++){
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- worker = new Worker("do heavy processing");
- // scheduledThreadPool.schedule(worker, 10, TimeUnit.SECONDS);
- //scheduleAtFixedRate
- // scheduledThreadPool.scheduleAtFixedRate(worker, 0, 1, TimeUnit.SECONDS);
- scheduledThreadPool.scheduleWithFixedDelay(worker, 5, 10,
- TimeUnit.SECONDS);
-
- }
-
- //add some delay to let some threads spawn by scheduler
- try {
- Thread.sleep(30000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- scheduledThreadPool.shutdown();
- while(!scheduledThreadPool.isTerminated()){
- //wait for all tasks to finish
- }
- LOG.info("Finished all threads");
- }
- public Worker(String command){
- this.command = command;
- }
- @Override
- public void run() {
- LOG.info(Thread.currentThread().getName()+" Start. Command = "+command);
- processCommand();
- LOG.info(Thread.currentThread().getName()+" End.");
- }
-
- private void processCommand() {
- try {
- for(int i = 1000; i < 1200; i++){
- LOG.info("sequence:" + i);
- }
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public String toString(){
- return this.command;
- }
- }
复制代码
小结:
最终能实现负载均衡的作用,但是,性能上还有些欠缺。
当一台机死掉时,客户端将尝试不断链接,影响到数据传送到其它机子上。当死掉的机器恢复后,客户端备份的数据会重新发送到flume agent。数据正确性是达到了,但是,万一这个app当掉了,对应的日志信息不就丢了吗?这是一个问题,有待进一步的改进。
|