欧美一区二区三区,国内熟女精品熟女A片视频小说,日本av网,小鲜肉男男GAY做受XXX网站

flink讀mysql寫mysql

錢琪琛2年前12瀏覽0評論

在Flink中,我們可以很方便地讀取MySQL數(shù)據(jù)庫的數(shù)據(jù),并將其寫回到MySQL中。下面就是一個簡單的例子,演示如何讀取MySQL數(shù)據(jù)并寫回到MySQL中。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.sql.Types;
public class MySQLReaderWriter {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 設置參數(shù)
final String[] fieldNames = {"id", "name", "age"};
final TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.INT};
final JDBCInputFormat inputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/flink")
.setUsername("root")
.setPassword("password")
.setQuery("SELECT * FROM users WHERE age >25")
.setRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames))
.finish();
DataSetresultSet = env.createInput(inputFormat);
// 映射結果
DataSet>result = resultSet.map(new MapFunction>() {
@Override
public Tuple2map(Row row) throws Exception {
int id = (int) row.getField(0);
String name = (String) row.getField(1);
return new Tuple2<>(id, name);
}
});
// 寫回到MySQL
final JDBCOutputFormat outputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/flink")
.setUsername("root")
.setPassword("password")
.setQuery("INSERT INTO result(id, name) VALUES (?, ?)")
.setSqlTypes(new int[] {Types.INTEGER, Types.VARCHAR})
.finish();
result.flatMap(new FlatMapFunction, Row>() {
@Override
public void flatMap(Tuple2input, Collectoroutput) {
Row row = new Row(2);
row.setField(0, input.f0);
row.setField(1, input.f1);
output.collect(row);
}
}).output(outputFormat);
env.execute();
}
}

以上代碼中,我們使用了JDBCInputFormat和JDBCOutputFormat來讀取和寫回MySQL數(shù)據(jù),同時還涉及了數(shù)據(jù)類型的轉換。通過這個例子,我們可以很容易地將Flink與MySQL集成,并進行數(shù)據(jù)處理和寫回操作。