1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); messages.foreachRDD(new Function<JavaPairRDD<String,String>,Void>(){ public Void call(JavaPairRDD<String, String> v1) throws Exception { v1.foreach(new VoidFunction<Tuple2<String, String>>(){ public void call(Tuple2<String, String> tuple2) { try{ JSONObject a = new JSONObject(tuple2._2); ... |
欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) | Powered by Discuz! 7.0.0 |