flink如何對接mqtt?
flink同自定義數據源eMQTT可以按照以下方式對接:
測試環境 :單機服務器:8核12G,設置并行度為2,測試結果: 執行3分鐘, 大概1秒4萬的并發量, 未延遲, 只是簡單測試, 并未達到極限。Client11.java (主要用來處理emqtt的配置)package com.flink.utils.emqtt;import java.net.URISyntaxException;
import java.util.ArrayList;
import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
/**
* 客戶端訂閱消息
*/
public class Client11 {
private final static String CONNECTION_STRING = "tcp://192.168.3.101:61613";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗網絡,但是又需要及時獲取數據,心跳30s
// private final static String CLIENT_ID = "client11";
public static Topic[] topics = {
new Topic("$share/group/0001/#", QoS.AT_LEAST_ONCE), // 2 只有一次
new Topic("mqtt/aaa", QoS.AT_LEAST_ONCE), // 1 至少一次
new Topic("mqtt/ccc", QoS.AT_MOST_ONCE)}; // 0 至多一次
public final static long RECONNECTION_ATTEMPT_MAX = 6;
public final static long RECONNECTION_DELAY = 2000;
public final static int SEND_BUFFER_SIZE = 64;// 發送最大緩沖為2M
public ArrayList<String> list = new ArrayList<String>();
public FutureConnection start() {
String CLIENT_ID = (int)(Math.random()*100) + "";
// 創建MQTT對象
MQTT mqtt = new MQTT();
try {
// 設置mqtt broker的ip和端口
mqtt.setHost(CONNECTION_STRING);
// 連接前清空會話信息
mqtt.setCleanSession(CLEAN_START);
// 設置重新連接的次數
mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
// 設置重連的間隔時間
mqtt.setReconnectDelay(RECONNECTION_DELAY);
// 設置心跳時間
mqtt.setKeepAlive(KEEP_ALIVE);
// 設置緩沖的大小
mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
//設置客戶端id
mqtt.setClientId(CLIENT_ID);
// 獲取mqtt的連接對象BlockingConnection ,采用Future模式 訂閱主題
// final FutureConnection connection = mqtt.futureConnection();
FutureConnection connection = mqtt.futureConnection();
connection.connect();
connection.subscribe(topics);
return connection;
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
}
return null;
}
}
SourceTest.java (flink的自定義數據源+ 數據存儲redis)package com.flink;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.mapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.*;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.Future;
import com.flink.utils.emqtt.Client11;
import org.fusesource.mqtt.client.FutureConnection;
public class SourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream= env.addSource(new EmqttSource());
// inputStream.print();
DataStream<List<deviceData>> redisData = inputStream.rebalance().map(new MapFunction<String, List<deviceData>>() {
@Override
public List<deviceData> map(String s) throws Exception {
String[] array = s.split("@@");
String topic = (String) array[1];
String message = (String) array[0];
return RulesEngine(message, topic);
}
});
// redisData.addSink(new OpnetsdbWriter());
redisData.addSink(new redisWriter());
env.execute("Intsmaze Custom Source");
}
public static List<deviceData> RulesEngine(String message, String topic){
try {
// String topic = "3333/D4:36:39:1A:0D:D3/Send/Data/FOCAS";
List<deviceData> d = new ArrayList<>();
Gson gson = new Gson();
Map<String, Object> map = new HashMap<String, Object>();
map = gson.fromJson(message, map.getClass());
String dataType = (String) map.get("type");
if(dataType.equals("Data")||dataType.equals("data")) {
ArrayList dataList = (ArrayList) map.get("values");
String[] array = topic.split("/");
for (int i = 0; i < dataList.size(); i++) {
deviceData d1 = new deviceData();
Map<String, String> dataDict = (Map<String, String>) dataList.get(i);
d1.machID = dataDict.get("machID");
d1.compID = array[0];
d1.gateMac = array[1];
d1.Type = dataType;
d1.operationValue = dataDict.get("name");
d1.operationData = dataDict.get("data");
d1.gatherTime = dataDict.get("time");
d.add(d1);
}
return d;
}else{
System.out.println("無法解析該類型數據");
}
} catch (Throwable t) {
t.printStackTrace();
}
return null;
}
// SourceFunction<String>
public static class EmqttSource implements ParallelSourceFunction<String> {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
Client11 client = new Client11();
FutureConnection connection = client.start();
int Num = 0;
String msg;
while (isRunning) {
Future<Message> futrueMessage = connection.receive();
Message message = futrueMessage.await();
Num++;
// System.out.println("MQTTFutureClient.Receive Message " + "Topic Title :" + message.getTopic() + " context :"
// + String.valueOf(message.getPayloadBuffer()));
// ctx.collect(Num + " context :" + String.valueOf(message.getPayloadBuffer()));
msg = String.valueOf(message.getPayloadBuffer()).substring(6);
ctx.collect(msg + "@@" + message.getTopic());
}
}
@Override
public void cancel() {
isRunning = false;
}
}
}。