本帖最后由 hyj 于 2019-6-25 20:10 编辑
问题导读
1.jdbc connector插件的作用是什么?
2.如何配置kafka?
3.如何实现oracle中插入数据,同步到kafka?
1.登陆Oracle:
[mw_shl_code=bash,true][oracle@localhost ~]$ lsnrctl status
[oracle@localhost ~]$ lsnrctl start[/mw_shl_code]
[oracle@localhost ~]$ sqlplus /nolog
SQL> conn / as sysdba
SQL> startup
这样oracle就启动了。为了使在kafka的服务器上可以访问到oracle这台服务器的ip,需要在oracel的安装目录下(/data/oracle/product/11.2.0/db_1/network/admin)的listener.ora中添加ip监听:
然后新建一个会话登陆自己的账号:
[mw_shl_code=bash,true][oracle@localhost ~]$ sqlplus /nolog
SQL> conn wyh/wyhpwd;[/mw_shl_code]
创建我们要用的数据库:
[mw_shl_code=sql,true]SQL> create table test_user(id number(19) not null primary key, username varchar2(100),password varchar2(100),modified timestamp(0) default SYSTIMESTAMP not null);[/mw_shl_code]
创建自增序列,使主键自增:
[mw_shl_code=sql,true]SQL> create sequence test_user_seq start with 1 increment by 1;
[/mw_shl_code]
创建触发器:
[mw_shl_code=sql,true]
SQL> create or replace trigger test_user_seq_tr
2 before insert or update on test_user for each row
3 begin
4 if inserting then
5 select test_user_seq.NEXTVAL into :new.id from dual;
6 end if;
7 END;
8 /[/mw_shl_code]
为时间列创建一个索引:
[mw_shl_code=sql,true]SQL> create index test_modified_index on test_user (modified);
[/mw_shl_code]
插入数据:
[mw_shl_code=sql,true]SQL> insert into test_user(username,password) values('tom','111');
1 row created.
SQL> insert into test_user(username,password) values('bob','222');
1 row created.
SQL> insert into test_user(username,password) values('jhon','333');
1 row created.
SQL> insert into test_user(username,password) values('rose','444');
1 row created.
SQL> insert into test_user (username,password) values('amy','555');
1 row created.
SQL> commit;
​[/mw_shl_code]
这里要注意insert之后要commit一下,否则topic中读不到这条记录。
2.启动kafka。
3.从confluent官网下载oracle的jdbc connector插件:
https://www.confluent.io/connector/kafka-connect-jdbc/#download
还要下载一个oracle的jdbc驱动jar,放在kafka的安装包下的lib目录下。
创建一个plugin.path目录:/usr/local/wyh/kafka/kafka_2.12-2.1.0/share/java,将刚才下载的插件压缩包解压后的Lib下的jar包都放在这个目录下。注意ojdbc.jar的版本要和jdk以及oracle的版本对应。我这里使用的是jdk1.8+oracle11g。
4.在kafka安装包下的config中的connect-standalone.properties文件中修改plugin.path:
5.在config下创建一个connector的配置文件:
[mw_shl_code=bash,true][root@localhost config]# vi wyh-oracle-connector.properties
[/mw_shl_code]
添加如下内容:
[mw_shl_code=bash,true]name=test-oracle-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.password=wyhpwd
connection.url=jdbc:oracle:thin:@192.168.184.129:1522:orcl
connection.user=wyh
table.whitelist=TEST_USER
mode=incrementing
incrementing.column.name=ID
topic.prefix=test-oracle-[/mw_shl_code]
这里一定要注意在oracle内部表名和列名都是大写,所以配置中table.whitelist和incrementing.column.name都要大写,否则报错。此处的name是连接器名称,是唯一的,不能重复。connector.class是连接器的类名。在连接oracel时,username和password最好和url分开写,因为分开写会进行隐藏密码,否则直接写在url中会明文显示密码。table.whitelist是表示允许复制的表。mode表示增量查询的模式,也就是根据哪种模式来跟踪数据的更新。incrementing.column.name是具体以哪个列名来作为mode 下的跟踪。topic.prefix是在表很多的情况下可以根据topic的前缀对每一个表都有一个不同的topic,这里我们只有一个表。输出数据的topic就是topic.prefix加上表名。这个topic会自动创建。topic.prefix是必须指定的。
6.启动connector:
[mw_shl_code=bash,true][root@localhost kafka_2.12-2.1.0]# bin/connect-standalone.sh config/connect-standalone.properties config/wyh-oracle-connector.properties
[/mw_shl_code]
7.启动consumer:
[mw_shl_code=bash,true][root@localhost kafka_2.12-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092 --from-beginning --topic test-oracle-TEST_USER
[/mw_shl_code]
得到的消息如图:
图中payload中的数据就是数据库中的每条数据。
这里先暂存一个疑问:为什么Oracle中的id是1,2,3,4,5,但是topic中读取消息的id是字母(如:AQ==)。
针对于mode为incrementing的connector,只适用于insert类型的数据变化,是通过检测新增的ID大于之前读取的最大的ID来确定是否是要更新的数据。对于update和delete的数据在这种模式下无法检测更新。
这里我们再插入一条数据:
[mw_shl_code=sql,true]SQL> insert into test_user(username,password) values('bill','666');
[/mw_shl_code]
再看topic:
这样也就实现了oracle中的数据在Insert时与kafka同步。
最新经典文章,欢迎关注公众号
---------------------
作者:QYHuiiQ
原文:https://blog.csdn.net/QYHuiiQ/article/details/89002795
|
|