欧美一区二区三区,国内熟女精品熟女A片视频小说,日本av网,小鲜肉男男GAY做受XXX网站

flink如何對接mqtt

方一強2年前172瀏覽0評論

flink如何對接mqtt?

flink同自定義數據源eMQTT可以按照以下方式對接:

測試環境 :

單機服務器:8核12G,

設置并行度為2,

測試結果: 執行3分鐘, 大概1秒4萬的并發量, 未延遲, 只是簡單測試, 并未達到極限。

Client11.java (主要用來處理emqtt的配置)

package com.flink.utils.emqtt;

import java.net.URISyntaxException;

import java.util.ArrayList;

import org.fusesource.mqtt.client.Future;

import org.fusesource.mqtt.client.FutureConnection;

import org.fusesource.mqtt.client.MQTT;

import org.fusesource.mqtt.client.Message;

import org.fusesource.mqtt.client.QoS;

import org.fusesource.mqtt.client.Topic;

/**

* 客戶端訂閱消息

*/

public class Client11 {

private final static String CONNECTION_STRING = "tcp://192.168.3.101:61613";

private final static boolean CLEAN_START = true;

private final static short KEEP_ALIVE = 30;// 低耗網絡,但是又需要及時獲取數據,心跳30s

// private final static String CLIENT_ID = "client11";

public static Topic[] topics = {

new Topic("$share/group/0001/#", QoS.AT_LEAST_ONCE), // 2 只有一次

new Topic("mqtt/aaa", QoS.AT_LEAST_ONCE), // 1 至少一次

new Topic("mqtt/ccc", QoS.AT_MOST_ONCE)}; // 0 至多一次

public final static long RECONNECTION_ATTEMPT_MAX = 6;

public final static long RECONNECTION_DELAY = 2000;

public final static int SEND_BUFFER_SIZE = 64;// 發送最大緩沖為2M

public ArrayList<String> list = new ArrayList<String>();

public FutureConnection start() {

String CLIENT_ID = (int)(Math.random()*100) + "";

// 創建MQTT對象

MQTT mqtt = new MQTT();

try {

// 設置mqtt broker的ip和端口

mqtt.setHost(CONNECTION_STRING);

// 連接前清空會話信息

mqtt.setCleanSession(CLEAN_START);

// 設置重新連接的次數

mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);

// 設置重連的間隔時間

mqtt.setReconnectDelay(RECONNECTION_DELAY);

// 設置心跳時間

mqtt.setKeepAlive(KEEP_ALIVE);

// 設置緩沖的大小

mqtt.setSendBufferSize(SEND_BUFFER_SIZE);

//設置客戶端id

mqtt.setClientId(CLIENT_ID);

// 獲取mqtt的連接對象BlockingConnection ,采用Future模式 訂閱主題

// final FutureConnection connection = mqtt.futureConnection();

FutureConnection connection = mqtt.futureConnection();

connection.connect();

connection.subscribe(topics);

return connection;

} catch (URISyntaxException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

} finally {

}

return null;

}

}

SourceTest.java (flink的自定義數據源+ 數據存儲redis)

package com.flink;

import com.google.gson.Gson;

import org.apache.flink.api.common.functions.mapFunction;

import org.apache.flink.api.java.tuple.Tuple;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple5;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.*;

import org.fusesource.mqtt.client.Message;

import org.fusesource.mqtt.client.Future;

import com.flink.utils.emqtt.Client11;

import org.fusesource.mqtt.client.FutureConnection;

public class SourceTest {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

DataStream<String> inputStream= env.addSource(new EmqttSource());

// inputStream.print();

DataStream<List<deviceData>> redisData = inputStream.rebalance().map(new MapFunction<String, List<deviceData>>() {

@Override

public List<deviceData> map(String s) throws Exception {

String[] array = s.split("@@");

String topic = (String) array[1];

String message = (String) array[0];

return RulesEngine(message, topic);

}

});

// redisData.addSink(new OpnetsdbWriter());

redisData.addSink(new redisWriter());

env.execute("Intsmaze Custom Source");

}

public static List<deviceData> RulesEngine(String message, String topic){

try {

// String topic = "3333/D4:36:39:1A:0D:D3/Send/Data/FOCAS";

List<deviceData> d = new ArrayList<>();

Gson gson = new Gson();

Map<String, Object> map = new HashMap<String, Object>();

map = gson.fromJson(message, map.getClass());

String dataType = (String) map.get("type");

if(dataType.equals("Data")||dataType.equals("data")) {

ArrayList dataList = (ArrayList) map.get("values");

String[] array = topic.split("/");

for (int i = 0; i < dataList.size(); i++) {

deviceData d1 = new deviceData();

Map<String, String> dataDict = (Map<String, String>) dataList.get(i);

d1.machID = dataDict.get("machID");

d1.compID = array[0];

d1.gateMac = array[1];

d1.Type = dataType;

d1.operationValue = dataDict.get("name");

d1.operationData = dataDict.get("data");

d1.gatherTime = dataDict.get("time");

d.add(d1);

}

return d;

}else{

System.out.println("無法解析該類型數據");

}

} catch (Throwable t) {

t.printStackTrace();

}

return null;

}

// SourceFunction<String>

public static class EmqttSource implements ParallelSourceFunction<String> {

private static final long serialVersionUID = 1L;

private volatile boolean isRunning = true;

@Override

public void run(SourceContext<String> ctx) throws Exception {

Client11 client = new Client11();

FutureConnection connection = client.start();

int Num = 0;

String msg;

while (isRunning) {

Future<Message> futrueMessage = connection.receive();

Message message = futrueMessage.await();

Num++;

// System.out.println("MQTTFutureClient.Receive Message " + "Topic Title :" + message.getTopic() + " context :"

// + String.valueOf(message.getPayloadBuffer()));

// ctx.collect(Num + " context :" + String.valueOf(message.getPayloadBuffer()));

msg = String.valueOf(message.getPayloadBuffer()).substring(6);

ctx.collect(msg + "@@" + message.getTopic());

}

}

@Override

public void cancel() {

isRunning = false;

}

}

}。

uri java,flink如何對接mqtt