【大数据】Spark读取Hive/Hbase/Elasticsearch/Kafka/Mysql等数据源
创始人
2024-06-03 04:20:37
0

spark读取数据源

  • Spark读取Hiveserver2数据源
    • 环境信息
    • 代码
  • Spark读取mysql数据
    • 环境信息
    • 代码
  • spark读取kafka
    • 环境信息
    • 代码
  • spark读取hdfs数据
    • 环境信息
    • 代码
  • spark读取Hbase数据源
    • 环境信息
    • 代码
  • spark读取es数据源
    • 环境信息
    • 代码
  • 配置maven依赖

Spark读取Hiveserver2数据源

环境信息

  • hiveserver2地址
  • 用户名
  • 密码

代码

def main(args: Array[String]): Unit = {val Array(url, database, table, username, password) = argsval sparkConf = new SparkConf().setAppName("Spark Mysql Demo (Scala)")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()Dataset rowDataset = sparkSession.read().format("jdbc").option("url", url).option("dbtable", "$database.$table").option("user", username).option("password", password).option("driver", "org.apache.hive.jdbc.HiveDriver").load().filter("`table_name.day`='20210112'");rowDataset.show();spark.stop()
}

Spark读取mysql数据

环境信息

  • mysql地址
  • 用户名
  • 密码
  • 库表

代码

def main(args: Array[String]): Unit = {val Array(url, username, password, table) = argsval sparkConf = new SparkConf().setAppName("Spark Mysql Demo (Scala)")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val props = new Properties()props.setProperty("user", username)props.setProperty("password", password)val df: DataFrame = spark.read.jdbc(url, table, props)val rowNumbers: Long = df.count()println("数据总条数据: " + rowNumbers)// select 里的参数位为 mysql 的字段df.select("id").where("id >= 3").show()// 写入数据// df.write.mode(SaveMode.Append).jdbc(url,"tb_02",props)spark.stop()}

spark读取kafka

环境信息

  • brokers地址
  • topics信息
  • 消费组id

代码

  def main(args: Array[String]): Unit = {val Array(brokers, topics, interval, groupId) = argsval sparkConf = new SparkConf().setAppName("Spark Kafka Demo (Scala)")val ssc = new StreamingContext(sparkConf, Seconds(interval.toInt))// kafka参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))// 消息val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array(topics), kafkaParams))// 单词统计val lines = messages.map(_.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}

spark读取hdfs数据

环境信息

  • 源地址
  • 读取的分区数
  • 目标地址

代码

  def main(args: Array[String]): Unit = {val Array(src, partition, dest) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HDFS Demo (Scala)")// 1、创建sessionval session: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()// 2、依据sc创建rddval sc: SparkContext = session.sparkContextval file: RDD[String] = sc.textFile(src, partition)file.saveAsTextFile(dest)session.stop()}

spark读取Hbase数据源

环境信息

  • zk的地址
  • zk上的hbase设置的rootDir
  • hbase的master地址
  • table

代码

  def main(args: Array[String]): Unit = {val Array(zookeeper, rootdir, master, table) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HBase Demo (Scala)")// 支持 hive 读写val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val hbaseConfig: Configuration = HBaseConfiguration.create()hbaseConfig.set("hbase.zookeeper.quorum",zookeeper)hbaseConfig.set("hbase.rootdir", rootdir)hbaseConfig.set("hbase.master", master)// 设置查询的表名hbaseConfig.set(TableInputFormat.INPUT_TABLE, table)val stuRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count: Long = stuRDD.count()println("Students RDD Count:" + count)stuRDD.cache()// 遍历输出stuRDD.foreach({ case (_, result) =>val key: String = Bytes.toString(result.getRow)println("Row key:" + key)})spark.stop()}

spark读取es数据源

环境信息

  • es的用户名
  • es的密码
  • es的服务地址
  • es的clusterName,主要是集群权限给关了。需要自己指定
  • es的index

代码

  def main(args: Array[String]): Unit = {val Array(user, password, esIp, clusterName, index) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark Es Demo (Scala)").setMaster("local[*]")sparkConf.set("cluster.name", clusterName)sparkConf.set("es.internal.es.cluster.name", clusterName)sparkConf.set("es.internal.es.version", "7.12") // 防止 security_exception: action [cluster:monitor/main] is unauthorizedsparkConf.set("es.index.auto.create", "true")sparkConf.set("es.nodes", esIp)sparkConf.set("es.port", "9200")sparkConf.set("es.mapping.date.rich", "false")sparkConf.set("es.index.read.missing.as.empty", "true")sparkConf.set("es.net.http.auth.user", user) //访问es的用户名sparkConf.set("es.net.http.auth.pass", password) //访问es的密码sparkConf.set("es.nodes.wan.only", "true")sparkConf.set("es.index.read.allow.red.status", "true") // 防止 security_exception: action [cluster:monitor/health] is unauthorizedval sc = new SparkContext(sparkConf)write2Es(sc, index)read2Es(sc, index)sc.stop()}def write2Es(sc: SparkContext, index: String): Unit = {val numbers: Map[String, String] = Map("jsIp" -> "11111","address" -> "11111", "enterprise" -> "北京","xian" -> "11111", "ip" -> "11111","source" -> "11111", "sheng" -> "11111","phone" -> "11111", "shi" -> "11111","ipLong" -> "333", "time" -> "2022-12-27 09:56:50","qsIp" -> "11111", "contacts" -> "11111","email" -> "11111@163.com")val rdd: RDD[Map[String, Any]] = sc.makeRDD(Seq(numbers))EsSpark.saveToEs(rdd, s"${index}/_doc")println("--------------------End-----------------")}def read2Es(sc: SparkContext, index: String): Unit = {val rdd: RDD[(String, collection.Map[String, AnyRef])] = EsSpark.esRDD(sc, s"${index}/_doc")println("------------------rdd.count():" + rdd.count())rdd.foreach(line => {val key: String = line._1val value: collection.Map[String, AnyRef] = line._2println("------------------key:" + key)for (tmp <- value) {val key1: String = tmp._1val value1: AnyRef = tmp._2println("------------------key1:" + key1)println("------------------value1:" + value1)}})}

配置maven依赖

2.123.2.13.3.12.12.31.12.77org.apache.sparkspark-sql_${scala.version}${spark.version}org.apache.sparkspark-hive_${scala.version}${spark.version}org.apache.sparkspark-streaming_${scala.version}${spark.version}org.apache.sparkspark-streaming-kafka-0-10_${scala.version}${spark.version}org.apache.sparkspark-sql-kafka-0-10_${scala.version}${spark.version}com.fasterxml.jackson.corejackson-core${jackson.version}com.fasterxml.jackson.corejackson-databind${jackson.version}com.fasterxml.jackson.corejackson-annotations${jackson.version}com.amazonawsaws-java-sdk-s3${s3.version}org.apache.hadoophadoop-aws${hadoop.version}commons-httpclientcommons-httpclient3.1org.elasticsearchelasticsearch-hadoop7.10.2org.apache.sparkspark-*org.apache.hbase.connectors.sparkhbase-spark1.0.0org.apache.hadoophadoop-hdfsorg.apache.sparkspark-*com.thoughtworks.paranamerparanamer2.8org.slf4jslf4j-log4j121.7.2

希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...