hi,这是我昨晚尝试着去改的。。
object SparkStreamingDemo {
def main(args: Array[String]) {
//创建spark实例
val sparkConf = new SparkConf().setAppName("QueueStream")
sparkConf.setMaster("local")
// 创建sparkStreamingContext ,
val ssc = new StreamingContext(sparkConf, Seconds(15))
var rddQueue = new Queue[RDD[String]]()
val inputStream = ssc.queueStream(rddQueue)
val iHBaseUtils = new IHBaseUtils()
inputStream.foreachRDD(rdd =>{
def func(records: Seq[String]): Unit ={
while(!records.equals(null)){
records.flatMap(_.split(":")).(x_.1)
val describe = records.splitAt(1)
iHBaseUtils.put("ExamResult",patientId,"JieGuo","describe",describe)
}
}
val repartitionedRDD = rdd.repartition(3)
repartitionedRDD.foreachPartition(func)
})
ssc.start()
val seq = conn()
for (i <- 1 to 3) {
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(seq,10)
}
Thread.sleep(3000)
}
ssc.awaitTermination()
}
def conn(): Seq[String] = {
val conn_str = "jdbc:sqlserver://192.168.1.21;username=sa;password=yishidb;database=CDRDB16;useUnicode=true&characterEncoding=UTF-8"
//classOf[com.mysql.jdbc.Driver]
// Class.forName("com.mysql.jdbc.Driver").newInstance();
val conn = DriverManager.getConnection(conn_str)
try {
// Configure to be Read Only
val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
// Execute Query,查询检查表 ,有patientId,describe属性。
val rs = statement.executeQuery("select PATIENT_ID,EXAM_FINDING_DESC from DC_EXAM_RESULT")
// Iterate Over ResultSet
var seq = Seq("")
while(rs.next()){
val patientId = rs.getString("PATIENT_ID")
val describe = rs.getString("EXAM_FINDING_DESC")
seq = seq :+ patientId + ":"+ describe
}
return seq;
}
finally {
conn.close
}
}
我现在有3个疑问:
1. def conn中我把resultset里的两个字段用冒号拼在了一起放在一个SEQ里进行返回,那么在sparkstreaming里我该如何进行分割 取出,这点我卡了很久没写出来。。。
2. for (i <- 1 to 3) {
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(seq,10)
}
Thread.sleep(3000)
}
这段代码我到现在都没看到到底是什么意思?是同步?还是线程安全,每句我都有疑问。
3.Seq的数据在inputstream割出来之后插入hbase方法具体改如何写,现在一片茫然啊。。下面附上我的IHBaseUtils类
public class IHBaseUtils implements Serializable,HBaseUtils {
public static Admin admin = null;
public static Connection conn = null;
public static String zkNodes = "master,slave01,slave02";
public IHBaseUtils() {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zkNodes);
try {
conn = ConnectionFactory.createConnection(conf);
admin = conn.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
public void createHTable(String tableName, String col, Integer versions) throws Exception {
if (admin.tableExists(TableName.valueOf(tableName))) {
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
System.out.println(tableName + "表已存在,先删除……");
System.out.println("开始建立新表……");
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col.getBytes());
hColumnDescriptor.setMaxVersions(versions);
tableDesc.addFamily(hColumnDescriptor);
admin.createTable(tableDesc);
System.out.println(tableName + "新表已建立");
} else {
System.out.println(tableName + "表不存在,开始建表…………");
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col.getBytes());
hColumnDescriptor.setMaxVersions(versions);
tableDesc.addFamily(hColumnDescriptor);
admin.createTable(tableDesc);
System.out.println(tableName + "表已建立");
}
}
public void put(String tableName, String row, String columnFamily,
String column, String data) throws IOException {
Table table = conn.getTable(TableName.valueOf(tableName));
Put p1 = new Put(Bytes.toBytes(row));
p1.addColumn(columnFamily.getBytes(), column.getBytes(), data.getBytes());
table.put(p1);
System.out.println("put'" + row + "'," + columnFamily + ":" + column
+ "','" + data + "'");
}
public List get(String tableName, String row) throws IOException {
Table table = conn.getTable(TableName.valueOf(tableName));
Get get = new Get(row.getBytes());
get.setMaxVersions();
Result result = table.get(get);
List<KeyValue> list = result.list();
List<ExamResult> examResults = new ArrayList<>();
for(final KeyValue kv: list) {
ExamResult examResult = new ExamResult();
examResult.setDescribe(Bytes.toString(kv.getValue()));
examResult.setPatientId(Bytes.toString(kv.getRow()));
examResults.add(examResult);
}
return examResults;
// if (raw.length == 1) {
// for(final KeyValue kv: list){
// examResult = new ExamResult();
// examResult.setPatientId(kv.getRow().toString().getBytes());
// examResult.setDescribe(Bytes.toString(kv.getValue()));
// doc.setTitle(new String(doc.getTitle().getBytes("gbk"),"UTF-8"));
// examResult.setPatientId(Bytes.toString(kv.getRow()));
// System.out.println("PatienId is " + examResult.getPatientId() + "Decribe is " + examResult.getDescribe());
}
|