MySQL、Hadoop、Sqoop是現(xiàn)今互聯(lián)網(wǎng)領(lǐng)域最重要的三大開源軟件之一。MySQL作為最受歡迎的關(guān)系型數(shù)據(jù)庫管理系統(tǒng)之一,被廣泛使用于互聯(lián)網(wǎng)中各類網(wǎng)站。而Hadoop則是分布式計(jì)算系統(tǒng)中的重要代表,其分布式存儲(chǔ)系統(tǒng)HDFS和分布式計(jì)算框架MapReduce被廣泛應(yīng)用于大數(shù)據(jù)場景中。Sqoop則是Hadoop的一個(gè)子項(xiàng)目,主要用來完成Hadoop和關(guān)系型數(shù)據(jù)庫(如MySQL)之間的數(shù)據(jù)傳輸。
import com.mysql.jdbc.Driver
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.sqoop.SqoopOptions
import org.apache.sqoop.tool.ImportTool
val sqoopOptions = new SqoopOptions()
sqoopOptions.setConnectString("jdbc:mysql://localhost/mydatabase")
sqoopOptions.setUsername("user")
sqoopOptions.setPassword("password")
sqoopOptions.setTable("mytable")
sqoopOptions.setColumns(Array("id", "name", "age"))
sqoopOptions.setFieldsTerminatedBy(',')
sqoopOptions.setTargetDir("/input/mytable")
sqoopOptions.setNumMappers(4)
val importTool = new ImportTool()
importTool.run(sqoopOptions)
val conf = new Configuration()
val job = new Job(conf, "MyJob")
job.setJarByClass(MyJob.getClass())
FileInputFormat.addInputPath(job, new Path("/input/"))
FileOutputFormat.setOutputPath(job, new Path("/output/"))
job.setMapperClass(MyMapper.class)
job.setReducerClass(MyReducer.class)
job.setMapOutputKeyClass(Text.class)
job.setMapOutputValueClass(LongWritable.class)
job.setOutputKeyClass(Text.class)
job.setOutputValueClass(LongWritable.class)
job.waitForCompletion(true)
如上所示,pre標(biāo)簽用于展示Scala中使用Sqoop和Hadoop的示例代碼。首先使用SqoopOptions設(shè)置了數(shù)據(jù)傳輸?shù)南嚓P(guān)參數(shù),然后調(diào)用Sqoop的ImportTool來將MySQL中的數(shù)據(jù)導(dǎo)入到Hadoop中。最后通過Hadoop的MapReduce進(jìn)行數(shù)據(jù)處理,處理結(jié)果輸出到指定目錄。