如何将本地目录下txt文件内容发送给Kafka(求代码范例)
本帖最后由 harley 于 2017-1-8 21:07 编辑本人刚接触Kafka, 是个小白,求大神帮忙解答,谢谢!
应用场景:
我需要从Oracle中不断把业务数据导入到Kafka, 并写入HBase或Hive
目前想使用的方法:
1. 每15分钟从Oracle中导出数据生成txt文件放于本地目录(非Hadoop节点)
2. 同时将文件内容取出并发送给Kafka
3. 再用Spark Streaming来订阅Kafka中的数据进行处理写入Hive
目前有以下二点问题,求大神解答:
1.以上方法是否可行? 有没有更实时简单的方法(除Oracle golden gate外)
2.如何将文本文件传送给Kafka (求代码范例)?
3.除Spark Streaming外,有无其它实时处理方案?
本帖最后由 J20_果农 于 2017-1-8 21:41 编辑
目前有以下二点问题,求大神解答:
1.以上方法是否可行? 有没有更实时简单的方法(除Oracle golden gate外)
答:为什么这么折腾呢?直接用sqoop 从oracle导入hive就可以了。
2.如何将文本文件传送给Kafka (求代码范例)?
答:写个生产者线程类,读取文件内容,然后发送给kafka,具体代码可以看kafka官方例子
核心代码如下:producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
读写文件的方法:
/**
* 以行为单位读取文件,常用于读面向行的格式化文件
* @param fileName 文件名
*/
public static void readFileByLines(String fileName){
File file = new File(fileName);
BufferedReader reader = null;
try {
System.out.println("以行为单位读取文件内容,一次读一整行:");
reader = new BufferedReader(new FileReader(file));
String tempString = null;
int line = 1;
//一次读入一行,直到读入null为文件结束
while ((tempString = reader.readLine()) != null){
//显示行号
System.out.println("line " + line + ": " + tempString);
line++;
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null){
try {
reader.close();
} catch (IOException e1) {
}
}
}
}
3.除Spark Streaming外,有无其它实时处理方案?
答:也可以使用storm来处理
@J20_果农 非常感谢你的答复 基本上解决了我的问题
因为数据量较大,一个小时上千万的数据,之前测试sqoop有卡死的情况,目前是用sql loader导出效率会更高一些.
我需要的最终方案是数据能从Oracle尽量实时的写入HBase或Hive,因刚研究实时数据处理,在网上查了好久,比较乱,还没理清楚
能否再帮忙解答一下,往Kafka写数据只能一行一行的发送吗,不能一个文件一个文件的发送吗
harley 发表于 2017-1-8 22:40
@J20_果农 非常感谢你的答复 基本上解决了我的问题
因为数据量较大,一个小时上千万的数据,之前测试sqoop ...
这个不是的
创建topic参数可以设置一个或多个--config "Property(属性)",下面是创建一个topic名称为"my-topic"例子,它设置了2个参数max message size 和 flush rate.
bin/kafka-topics.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka02 --create --topic my-topic --partitions 1 --replication-factor 1
--config max.message.bytes=64000 --config flush.messages=1
可以设置topic的大小和传输率
下面是一个topic模型,更多可以在找找资料
http://www.aboutyun.com/data/attachment/forum/201409/28/143553t3nhnsbri6s6nfh5.png?_=3999538
本帖最后由 harley 于 2017-1-10 22:40 编辑
@einhep 你说的是topic设置,我问的是怎么样多条记录一起发送,而不是一条一条的Send, 例如我将txt中所有记录抓出,一起发送给Kafka,避免数据太多,单条单条发送中间可能丢失数据
在网上有看到这个的说明,但是没有找到MessageSet怎么用的例子:
Kafka系统默认支持MessageSet,把多条Message自动地打成一个Group后发送出去,均摊后拉低了每次通信的RTT。而且在组织MessageSet的同时,还可以把数据重新排序,从爆发流式的随机写入优化成较为平稳的线性写入。
能帮忙说明一下吗,谢谢啦! @einhep @J20_果农 J20_果农 发表于 2017-1-8 21:39
目前有以下二点问题,求大神解答:
1.以上方法是否可行? 有没有更实时简单的方法(除Oracle golden gate外 ...
你好,这个读取文件的代码没有返回值,如何向send中传数据啊?
lihao 发表于 2018-1-10 09:32
你好,这个读取文件的代码没有返回值,如何向send中传数据啊?
不好意思,已经搞定了,谢谢代码提供者了。。。
页:
[1]