解决方法:修改flume-ng-log4jappender中的append方法。修改为:
- @Override
- public synchronized void append(LoggingEvent event) throws FlumeException{
- //If rpcClient is null, it means either this appender object was never
- //setup by setting hostname and port and then calling activateOptions
- //or this appender object was closed by calling close(), so we throw an
- //exception to show the appender is no longer accessible.
- boolean ready = true;
- String errorMsg = "Cannot Append to Appender! Appender either closed or" +
- " not setup correctly!";
- if (rpcClient == null) {
- LogLog.error(errorMsg);
- if (unsafeMode) {
- activateOptions();
- if(rpcClient == null || !rpcClient.isActive()) {
- ready = false;
- }
- }else {
- throw new FlumeException(errorMsg);
- }
- }
- if(unsafeMode && !ready) {
- return;
- }
-
- if(!rpcClient.isActive()){
- reconnect();
- }
- if(rpcClient == null || !rpcClient.isActive()) {
- if(unsafeMode) {
- return;
- } else {
- LogLog.error(errorMsg);
- throw new FlumeException(errorMsg);
- }
- }
- //Client created first time append is called.
- Map<String, String> hdrs = new HashMap<String, String>();
- hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(), event.getLoggerName());
- hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
- String.valueOf(event.getTimeStamp()));
-
- //To get the level back simply use
- //LoggerEvent.toLevel(hdrs.get(Integer.parseInt(
- //Log4jAvroHeaders.LOG_LEVEL.toString()))
- hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
- String.valueOf(event.getLevel().toInt()));
-
- Event flumeEvent;
- Object message = event.getMessage();
- if (message instanceof GenericRecord) {
- GenericRecord record = (GenericRecord) message;
- populateAvroHeaders(hdrs, record.getSchema(), message);
- flumeEvent = EventBuilder.withBody(serialize(record, record.getSchema()), hdrs);
- } else if (message instanceof SpecificRecord || avroReflectionEnabled) {
- Schema schema = ReflectData.get().getSchema(message.getClass());
- populateAvroHeaders(hdrs, schema, message);
- flumeEvent = EventBuilder.withBody(serialize(message, schema), hdrs);
- } else {
- hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
- String msg = layout != null ? layout.format(event) : message.toString();
- flumeEvent = EventBuilder.withBody(msg, Charset.forName("UTF8"), hdrs);
- }
-
- try {
- rpcClient.append(flumeEvent);
- } catch (EventDeliveryException e) {
- String msg = "Flume append() failed.";
- LogLog.error(msg);
- if (unsafeMode) {
- return;
- }
- throw new FlumeException(msg + " Exception follows.", e);
- }
- }
复制代码
|