背景:公司线下ETC机房有个Mycat集群,供订单系统使用,现需要进行数据异构将Mysql数据(近)实时写入另一套数据库用作读请求和数据归档用
技术选型:binlog解析工具:阿里开源的canal 消息中间件:kafka 流式框架:SparkStreaming
上代码
canal解析mysqlbinlog 实时写入kafka:
[mw_shl_code=java,true]package kafka;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.lang.SystemUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class AbstractCanalClient {
protected final static Logger logger = LoggerFactory
.getLogger(AbstractCanalClient.class);
protected static final String SEP = SystemUtils.LINE_SEPARATOR;
protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
protected volatile boolean running = false;
protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
logger.error("parse events has an error", e);
}
};
protected Thread thread = null;
protected CanalConnector connector;
protected static String context_format = null;
protected static String row_format = null;
protected static String transaction_format = null;
protected String destination;
protected Producer<String, String> kafkaProducer = null;
protected String topic;
protected String table;
static {
context_format = SEP
+ "****************************************************" + SEP;
context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}"
+ SEP;
context_format += "* Start : [{}] " + SEP;
context_format += "* End : [{}] " + SEP;
context_format += "****************************************************"
+ SEP;
row_format = SEP
+ "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
+ SEP;
transaction_format = SEP
+ "================> binlog[{}:{}] , executeTime : {} , delay : {}ms"
+ SEP;
}
public AbstractCanalClient(String destination) {
this(destination, null);
}
public AbstractCanalClient(String destination, CanalConnector connector) {
this(destination, connector, null);
}
public AbstractCanalClient(String destination, CanalConnector connector,
Producer<String, String> kafkaProducer) {
this.connector = connector;
this.destination = destination;
this.kafkaProducer = kafkaProducer;
}
protected void start() {
Assert.notNull(connector, "connector is null");
Assert.notNull(kafkaProducer, "Kafka producer configuration is null");
Assert.notNull(topic, "kafaka topic is null");
Assert.notNull(table,"table is null");
thread = new Thread(new Runnable() {
public void run() {
process();
}
});
thread.setUncaughtExceptionHandler(handler);
thread.start();
running = true;
}
protected void stop() {
if (!running) {
return;
}
running = false;
if (thread != null) {
try {
thread.join();
} catch (InterruptedException e) {
// ignore
}
}
kafkaProducer.close();
MDC.remove("destination");
}
protected void process() {
int batchSize = 1024;
while (running) {
try {
MDC.put("destination", destination);
connector.connect();
connector.subscribe("databaseName\\.tableName");
while (running) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
try {
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
} else {
kafkaEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
} catch (Exception e) {
connector.rollback(batchId); // 处理失败, 回滚数据
}
}
} catch (Exception e) {
logger.error("process error!", e);
} finally {
connector.disconnect();
MDC.remove("destination");
}
}
}
private void printSummary(Message message, long batchId, int size) {
long memsize = 0;
for (Entry entry : message.getEntries()) {
memsize += entry.getHeader().getEventLength();
}
String startPosition = null;
String endPosition = null;
if (!CollectionUtils.isEmpty(message.getEntries())) {
startPosition = buildPositionForDump(message.getEntries().get(0));
endPosition = buildPositionForDump(message.getEntries().get(
message.getEntries().size() - 1));
}
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
logger.info(context_format, new Object[]{batchId, size, memsize,
format.format(new Date()), startPosition, endPosition});
}
protected String buildPositionForDump(Entry entry) {
long time = entry.getHeader().getExecuteTime();
Date date = new Date(time);
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
return entry.getHeader().getLogfileName() + ":"
+ entry.getHeader().getLogfileOffset() + ":"
+ entry.getHeader().getExecuteTime() + "("
+ format.format(date) + ")";
}
private void kafkaEntry(List<Entry> entrys) throws InterruptedException, ExecutionException {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException(
"ERROR ## parser of eromanga-event has an error , data:"
+ entry.toString(), e);
}
String logfileName = entry.getHeader().getLogfileName();
Long logfileOffset = entry.getHeader().getLogfileOffset();
String dbName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
EventType eventType = rowChage.getEventType();
if (eventType == EventType.DELETE || eventType == EventType.UPDATE
|| eventType == EventType.INSERT) {
for (RowData rowData : rowChage.getRowDatasList()) {
String tmpstr = "";
if (eventType == EventType.DELETE) {
tmpstr = getDeleteJson(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
tmpstr = getInsertJson(rowData.getAfterColumnsList());
} else if (eventType == EventType.UPDATE) {
tmpstr = getUpdateJson(rowData.getBeforeColumnsList(),
rowData.getAfterColumnsList());
} else {
continue;
}
logger.info(this.topic+tmpstr);
kafkaProducer.send(
new ProducerRecord<String, String>(this.topic,
tmpstr)).get();
}
}
}
}
private JSONObject columnToJson(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
return json;
}
private String getInsertJson(List<Column> columns) {
JSONObject json = new JSONObject();
json.put("type", "insert");
json.put("data", this.columnToJson(columns));
return json.toJSONString();
}
private String getUpdateJson(List<Column> befcolumns, List<Column> columns) {
JSONObject json = new JSONObject();
json.put("type", "update");
json.put("data", this.columnToJson(columns));
return json.toJSONString();
}
private String getDeleteJson(List<Column> columns) {
JSONObject json = new JSONObject();
json.put("type", "delete");
json.put("data", this.columnToJson(columns));
return json.toJSONString();
}
protected void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
long executeTime = entry.getHeader().getExecuteTime();
long delayTime = new Date().getTime() - executeTime;
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
TransactionBegin begin = null;
try {
begin = TransactionBegin.parseFrom(entry
.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(
"parse event has an error , data:"
+ entry.toString(), e);
}
// 打印事务头信息,执行的线程id,事务耗时
logger.info(
transaction_format,
new Object[]{
entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader()
.getLogfileOffset()),
String.valueOf(entry.getHeader()
.getExecuteTime()),
String.valueOf(delayTime)});
logger.info(" BEGIN ----> Thread id: {}",
begin.getThreadId());
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
TransactionEnd end = null;
try {
end = TransactionEnd.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(
"parse event has an error , data:"
+ entry.toString(), e);
}
// 打印事务提交信息,事务id
logger.info("----------------\n");
logger.info(" END ----> transaction id: {}",
end.getTransactionId());
logger.info(
transaction_format,
new Object[]{
entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader()
.getLogfileOffset()),
String.valueOf(entry.getHeader()
.getExecuteTime()),
String.valueOf(delayTime)});
}
continue;
}
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException(
"parse event has an error , data:"
+ entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
logger.info(
row_format,
new Object[]{
entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader()
.getLogfileOffset()),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType,
String.valueOf(entry.getHeader()
.getExecuteTime()),
String.valueOf(delayTime)});
if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
logger.info(" sql ----> " + rowChage.getSql() + SEP);
continue;
}
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
printColumn(rowData.getAfterColumnsList());
}
}
}
}
}
protected void printColumn(List<Column> columns) {
for (Column column : columns) {
StringBuilder builder = new StringBuilder();
builder.append(column.getName() + " : " + column.getValue());
builder.append(" type=" + column.getMysqlType());
if (column.getUpdated()) {
builder.append(" update=" + column.getUpdated());
}
builder.append(SEP);
logger.info(builder.toString());
}
}
public void setConnector(CanalConnector connector) {
this.connector = connector;
}
public void setKafkaProducer(Producer<String, String> kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
public void setKafkaTopic(String topic) {
this.topic = topic;
}
public void setFilterTable(String table) {
this.table = table;
}
}
[/mw_shl_code]
[mw_shl_code=java,true]package kafka;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.net.InetSocketAddress;
import java.util.Properties;
public class ClusterCanalClient extends AbstractCanalClient {
public ClusterCanalClient(String destination) {
super(destination);
}
public static void main(String args[]) {
String destination = null;//"example";
String topic = null;
// String canalhazk = null;
String kafka = null;
String hostname = null;
String table = null;
if (args.length != 5) {
logger.error("input param must : hostname destination topic kafka table" +
"for example: localhost example topic 192.168.0.163:9092 tablname");
System.exit(1);
} else {
hostname = args[0];
destination = args[1];
topic = args[2];
// canalhazk = args[2];
kafka = args[3];
table = args[4];
}
// 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(
hostname, 11111),
destination,
"canal",
"canal");
// CanalConnector connector = CanalConnectors.newClusterConnector(
// canalhazk, destination, "userName", "passwd");
Properties props = new Properties();
props.put("bootstrap.servers", kafka);
props.put("request.required.acks",1);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);//32m
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
final ClusterCanalClient clientTest = new ClusterCanalClient(destination);
clientTest.setConnector(connector);
clientTest.setKafkaProducer(producer);
clientTest.setKafkaTopic(topic);
clientTest.setFilterTable(table);
clientTest.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
logger.info("## stop the canal client");
clientTest.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal:\n{}", ExceptionUtils.getFullStackTrace(e));
} finally {
logger.info("## canal client is down.");
}
}
});
}
}[/mw_shl_code]
SparkStreaming 将kafka数据 写入HBase
[mw_shl_code=scala,true]package bcw.etl.syncdata
import java.util
import com.alibaba.fastjson.{JSON, JSONObject}
import example.utils.KafkaOffset_ZKManager
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.HasOffsetRanges
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author zhuqitian
* 2018-5-13
* Kafka to HBase
* create 'wms_schedule_main','info'
* ./kafka-topics.sh --create --topic wms_schedule_main --zookeeper ip:2181/kafka0.9 --partitions 3 --replication-factor 1
*
* note:
*
*/
object Kafka_to_HBase extends App {
var logger: Logger = Logger.getLogger(Kafka_to_HBase.getClass)
val conf = new SparkConf()
.setAppName("Kafka_to_HBASE")
.setMaster("local")
.set("spark.streaming.kafka.maxRatePerPartition", "100000")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "ip:9092",
"auto.offset.reset" -> "smallest"
)
val topicSet = "wms_schedule_main".split(",").toSet
val groupName = "wms_mysql_test"
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))
val config = HBaseConfiguration.create
config.set("hbase.zookeeper.quorum", "ip")
config.set("hbase.zookeeper.property.clientPort", "2181")
val conn = ConnectionFactory.createConnection(config)
val table: Table = conn.getTable(TableName.valueOf("wms_schedule_main"))
var puts = new util.ArrayList[Put]
val DStream: InputDStream[(String, String)] = KafkaOffset_ZKManager.createMyDirectKafkaStream(
ssc, kafkaParams, topicSet, groupName)
DStream.foreachRDD((rdd, btime) => {
if (!rdd.isEmpty()) {
val startTime = System.currentTimeMillis
println(s">>>>>>>>>>>>>>>>>>>>>>start : $startTime")
val message: RDD[JSONObject] = rdd.map(line => JSON.parseObject(line._2))
.map(json => json.getJSONObject("data"))
.filter(x => x.getString("biid") != null)
val jsondata = message.map(jsondata => {
val rowKey = jsondata.getString("biid").reverse + jsondata.getString("chdt")
val put = new Put(rowKey.getBytes)//md5
val columns = jsondata.keySet().toArray()
for (key <- columns) {
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(key.toString), Bytes.toBytes(jsondata.getString(key.toString)))
}
puts.add(put)
}).count()
println(" puts size : " + puts.size())
table.put(puts)
val endTime = System.currentTimeMillis
println(">>>>>>>>>>>>>>>>>>>>>>this batch took " + (endTime - startTime) + " milliseconds. data size is " + jsondata)
println("##################################" + btime)
puts.clear()
println("puts clear after size : " + puts.size())
}
KafkaOffset_ZKManager.storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, groupName)
})
ssc.start()
ssc.awaitTermination()
}
[/mw_shl_code]
总结:1.解析mysql binlog 封装成JSON实时写kafka 2.SparkStreaming 消费kafka数据 解析JSON 3.遍历JSON中key作为字段名跟上value写入HBase(注意HBase中rowkey的设计)
延伸:消费一次且仅一次的语义实现?幂等写入?sql on HBase? HBase split重操作耗时导致请求响应延迟?
好久不写帖子了,分享一下
|
|