- public class DbSink extends AbstractSink
- implements Configurable
- {
- private static final Logger log = LoggerFactory.getLogger(DbSink.class);
-
- public Sink.Status process() throws EventDeliveryException {
- Channel channel = getChannel();
- Transaction tx = channel.getTransaction();
- Sink.Status localStatus;
- try {
- tx.begin();
- Event e = channel.take();
- if (e == null) {
- tx.rollback();
- localStatus = Sink.Status.BACKOFF;
- return localStatus;
- }
- String msg = new String(e.getBody());
- log.debug(msg); //丢失部分数据点
- JSONObject json = parse(msg); //格式化
- tx.commit();
- return Sink.Status.READY;
- } catch (Exception e) {
- e.printStackTrace();
- log.error(e.toString());
- tx.rollback();
- return Sink.Status.BACKOFF;
- } finally {
- tx.close();
- } }
-
- public void configure(Context context)
- {
- String path = JafkaUtil.getJafkaConfigParameter(context, "proxool.path");
- try {
- log.info("proxool.dbpool appDir:" + path);
- JAXPConfigurator.configure(path, false);
- log.info("proxool.dbpool init succ!");
- } catch (ProxoolException e) {
- e.printStackTrace();
- }
- }
复制代码
|