Optimize Reading Orc Files From Spark Sql
Chapter 4. Spark SQL and DataFrames: Introduction to Built-in Data Sources
In the previous affiliate, we explained the evolution of and justification for construction in Spark. In particular, we discussed how the Spark SQL engine provides a unified foundation for the high-level DataFrame and Dataset APIs. Now, we'll continue our discussion of the DataFrame and explore its interoperability with Spark SQL.
This chapter and the next also explore how Spark SQL interfaces with some of the external components shown in Figure 4-i.
In detail, Spark SQL:
-
Provides the engine upon which the high-level Structured APIs we explored in Chapter 3 are built.
-
Can read and write information in a multifariousness of structured formats (east.one thousand., JSON, Hive tables, Parquet, Avro, ORC, CSV).
-
Lets you query data using JDBC/ODBC connectors from external business organization intelligence (BI) data sources such as Tableau, Power BI, Talend, or from RDBMSs such every bit MySQL and PostgreSQL.
-
Provides a programmatic interface to interact with structured information stored as tables or views in a database from a Spark application
-
Offers an interactive shell to consequence SQL queries on your structured data.
-
Supports ANSI SQL:2003-compliant commands and HiveQL.
Figure 4-1. Spark SQL connectors and data sources
Let's begin with how you can use Spark SQL in a Spark application.
Using Spark SQL in Spark Applications
The SparkSession
, introduced in Spark 2.0, provides a unified entry betoken for programming Spark with the Structured APIs. You can employ a SparkSession
to access Spark functionality: simply import the class and create an instance in your code.
To event any SQL query, utilize the sql()
method on the SparkSession
instance, spark
, such as spark.sql("SELECT * FROM myTableName")
. All spark.sql
queries executed in this fashion return a DataFrame on which you may perform further Spark operations if you desire—the kind we explored in Chapter 3 and the ones you lot will acquire well-nigh in this chapter and the next.
Bones Query Examples
In this department we'll walk through a few examples of queries on the Airline On-Time Functioning and Causes of Flight Delays information prepare, which contains information on US flights including date, filibuster, distance, origin, and destination. Information technology's available as a CSV file with over a million records. Using a schema, we'll read the data into a DataFrame and register the DataFrame equally a temporary view (more on temporary views shortly) and then we can query it with SQL.
Query examples are provided in lawmaking snippets, and Python and Scala notebooks containing all of the lawmaking presented here are available in the book's GitHub repo. These examples will offer you a sense of taste of how to use SQL in your Spark applications via the spark.sql
programmatic interface. Similar to the DataFrame API in its declarative flavor, this interface allows you to query structured information in your Spark applications.
Normally, in a standalone Spark application, you volition create a SparkSession
instance manually, as shown in the following example. However, in a Spark vanquish (or Databricks notebook), the SparkSession
is created for yous and accessible via the accordingly named variable spark
.
Let's get started past reading the data gear up into a temporary view:
// In Scala
import
org.apache.spark.sql.SparkSession
val
spark
=
SparkSession
.
architect
.
appName
(
"SparkSQLExampleApp"
)
.
getOrCreate
()
// Path to data set
val
csvFile
=
"/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
// Read and create a temporary view
// Infer schema (note that for larger files you may want to specify the schema)
val
df
=
spark
.
read
.
format
(
"csv"
)
.
pick
(
"inferSchema"
,
"true"
)
.
pick
(
"header"
,
"true"
)
.
load
(
csvFile
)
// Create a temporary view
df
.
createOrReplaceTempView
(
"us_delay_flights_tbl"
)
# In Python
from
pyspark.sql
import
SparkSession
# Create a SparkSession
spark
=
(
SparkSession
.
builder
.
appName
(
"SparkSQLExampleApp"
)
.
getOrCreate
())
# Path to information set
csv_file
=
"/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
# Read and create a temporary view
# Infer schema (note that for larger files you
# may want to specify the schema)
df
=
(
spark
.
read
.
format
(
"csv"
)
.
choice
(
"inferSchema"
,
"true"
)
.
option
(
"header"
,
"true"
)
.
load
(
csv_file
))
df
.
createOrReplaceTempView
(
"us_delay_flights_tbl"
)
Annotation
If you want to specify a schema, you lot can use a DDL-formatted cord. For instance:
// In Scala
val
schema
=
"date STRING, delay INT, distance INT,
origin STRING, destination String"
# In Python
schema
=
"`appointment` Cord, `delay` INT, `distance` INT,
`origin`
STRING
,
`destination`
Cord
"
At present that we have a temporary view, we can issue SQL queries using Spark SQL. These queries are no different from those you might issue against a SQL table in, say, a MySQL or PostgreSQL database. The betoken here is to testify that Spark SQL offers an ANSI:2003–compliant SQL interface, and to demonstrate the interoperability between SQL and DataFrames.
The US flying delays data set has 5 columns:
-
The
date
column contains a string like02190925
. When converted, this maps to02-19 09:25 am
. -
The
delay
column gives the delay in minutes between the scheduled and actual deviation times. Early departures show negative numbers. -
The
distance
column gives the distance in miles from the origin airport to the destination airdrome. -
The
origin
column contains the origin IATA airport code. -
The
destination
column contains the destination IATA airport code.
With that in heed, let's try some example queries confronting this information set.
Offset, nosotros'll find all flights whose distance is greater than i,000 miles:
spark.sql("""SELECT distance, origin, destination FROM us_delay_flights_tbl WHERE distance > chiliad Club BY distance DESC""").show(10) +--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | +--------+------+-----------+ only showing summit 10 rows
As the results show, all of the longest flights were between Honolulu (HNL) and New York (JFK). Next, we'll find all flights between San Francisco (SFO) and Chicago (ORD) with at to the lowest degree a two-hour delay:
spark.sql("""SELECT engagement, filibuster, origin, destination FROM us_delay_flights_tbl WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' Society by delay DESC""").show(10) +--------+-----+------+-----------+ |date |delay|origin|destination| +--------+-----+------+-----------+ |02190925|1638 |SFO |ORD | |01031755|396 |SFO |ORD | |01022330|326 |SFO |ORD | |01051205|320 |SFO |ORD | |01190925|297 |SFO |ORD | |02171115|296 |SFO |ORD | |01071040|279 |SFO |ORD | |01051550|274 |SFO |ORD | |03120730|266 |SFO |ORD | |01261104|258 |SFO |ORD | +--------+-----+------+-----------+ only showing top ten rows
It seems at that place were many significantly delayed flights between these two cities, on unlike dates. (As an do, convert the date
cavalcade into a readable format and find the days or months when these delays were most common. Were the delays related to wintertime months or holidays?)
Let'southward try a more complicated query where we use the CASE
clause in SQL. In the following instance, we desire to label all US flights, regardless of origin and destination, with an indication of the delays they experienced: Very Long Delays (> half dozen hours), Long Delays (2–half-dozen hours), etc. Nosotros'll add these human-readable labels in a new column called Flight_Delays
:
spark.sql("""SELECT filibuster, origin, destination, Example WHEN delay > 360 Then 'Very Long Delays' WHEN delay >= 120 AND filibuster <= 360 And so 'Long Delays' WHEN delay >= 60 AND delay < 120 So 'Short Delays' WHEN filibuster > 0 and delay < 60 And so 'Tolerable Delays' WHEN delay = 0 So 'No Delays' ELSE 'Early' Terminate AS Flight_Delays FROM us_delay_flights_tbl ORDER BY origin, delay DESC""").show(10) +-----+------+-----------+-------------+ |delay|origin|destination|Flight_Delays| +-----+------+-----------+-------------+ |333 |ABE |ATL |Long Delays | |305 |ABE |ATL |Long Delays | |275 |ABE |ATL |Long Delays | |257 |ABE |ATL |Long Delays | |247 |ABE |DTW |Long Delays | |247 |ABE |ATL |Long Delays | |219 |ABE |ORD |Long Delays | |211 |ABE |ATL |Long Delays | |197 |ABE |DTW |Long Delays | |192 |ABE |ORD |Long Delays | +-----+------+-----------+-------------+ simply showing top 10 rows
As with the DataFrame and Dataset APIs, with the spark.sql
interface you tin behave common data analysis operations similar those nosotros explored in the previous chapter. The computations undergo an identical journey in the Spark SQL engine (meet "The Catalyst Optimizer" in Chapter 3 for details), giving you the same results.
All three of the preceding SQL queries can be expressed with an equivalent DataFrame API query. For example, the offset query can be expressed in the Python DataFrame API as:
# In Python
from
pyspark.sql.functions
import
col
,
desc
(
df
.
select
(
"altitude"
,
"origin"
,
"destination"
)
.
where
(
col
(
"distance"
)
>
yard
)
.
orderBy
(
desc
(
"distance"
)))
.
prove
(
10
)
# Or
(
df
.
select
(
"altitude"
,
"origin"
,
"destination"
)
.
where
(
"distance > one thousand"
)
.
orderBy
(
"distance"
,
ascending
=
False
)
.
bear witness
(
10
))
This produces the aforementioned results as the SQL query:
+--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | +--------+------+-----------+ simply showing top 10 rows
As an exercise, try converting the other two SQL queries to use the DataFrame API.
As these examples bear witness, using the Spark SQL interface to query information is similar to writing a regular SQL query to a relational database table. Although the queries are in SQL, yous tin can feel the similarity in readability and semantics to DataFrame API operations, which you encountered in Affiliate 3 and will explore further in the adjacent chapter.
To enable you to query structured data as shown in the preceding examples, Spark manages all the complexities of creating and managing views and tables, both in memory and on disk. That leads usa to our next topic: how tables and views are created and managed.
SQL Tables and Views
Tables hold data. Associated with each table in Spark is its relevant metadata, which is data virtually the table and its data: the schema, clarification, table name, database proper name, column names, partitions, concrete location where the actual data resides, etc. All of this is stored in a central metastore.
Instead of having a divide metastore for Spark tables, Spark by default uses the Apache Hive metastore, located at /user/hive/warehouse, to persist all the metadata about your tables. However, y'all may change the default location by setting the Spark config variable spark.sql.warehouse.dir
to some other location, which can be set to a local or external distributed storage.
Managed Versus UnmanagedTables
Spark allows you to create two types of tables: managed and unmanaged. For a managed table, Spark manages both the metadata and the information in the file store. This could be a local filesystem, HDFS, or an object store such as Amazon S3 or Azure Blob. For an unmanaged table, Spark only manages the metadata, while you manage the data yourself in an external data source such every bit Cassandra.
With a managed table, considering Spark manages everything, a SQL command such as DROP Tabular array table_name
deletes both the metadata and the data. With an unmanaged table, the aforementioned control volition delete simply the metadata, not the actual data. We volition await at some examples of how to create managed and unmanaged tables in the next department.
Creating SQL Databases and Tables
Tables reside inside a database. By default, Spark creates tables under the default
database. To create your own database name, yous can issue a SQL command from your Spark application or notebook. Using the U.s.a. flight delays data ready, allow'southward create both a managed and an unmanaged table. To begin, we'll create a database called learn_spark_db
and tell Spark nosotros want to use that database:
// In Scala/Python
spark
.
sql
(
"CREATE DATABASE learn_spark_db"
)
spark
.
sql
(
"USE learn_spark_db"
)
From this point, any commands we issue in our application to create tables will result in the tables beingness created in this database and residing nether the database proper noun learn_spark_db
.
Creating a managed table
To create a managed table within the database learn_spark_db
, you can result a SQL query like the following:
// In Scala/Python
spark
.
sql
(
"CREATE Tabular array managed_us_delay_flights_tbl (date STRING, delay INT,
distance INT, origin STRING, destination STRING)"
)
You can do the same thing using the DataFrame API like this:
# In Python
# Path to our U.s.a. flight delays CSV file
csv_file
=
"/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
# Schema as divers in the preceding example
schema
=
"date STRING, delay INT, distance INT, origin STRING, destination STRING"
flights_df
=
spark
.
read
.
csv
(
csv_file
,
schema
=
schema
)
flights_df
.
write
.
saveAsTable
(
"managed_us_delay_flights_tbl"
)
Both of these statements will create the managed table us_delay_flights_tbl
in the learn_spark_db
database.
Creating an unmanaged table
By contrast, you can create unmanaged tables from your own data sources—say, Parquet, CSV, or JSON files stored in a file shop accessible to your Spark application.
To create an unmanaged table from a data source such equally a CSV file, in SQL employ:
spark.sql("""CREATE TABLE us_delay_flights_tbl(date String, filibuster INT, distance INT, origin String, destination String) USING csv OPTIONS (PATH '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")
And inside the DataFrame API use:
(flights_df .write .option("path", "/tmp/data/us_flights_delay") .saveAsTable("us_delay_flights_tbl"))
Note
To enable you lot to explore these examples, we take created Python and Scala example notebooks that yous tin find in the book'southward GitHub repo.
Creating Views
In addition to creating tables, Spark can create views on peak of existing tables. Views can be global (visible across all SparkSession
south on a given cluster) or session-scoped (visible simply to a single SparkSession
), and they are temporary: they disappear afterward your Spark application terminates.
Creating views has a like syntax to creating tables inside a database. In one case you create a view, you lot tin query it equally you would a table. The difference between a view and a table is that views don't really concur the information; tables persist after your Spark application terminates, merely views disappear.
You lot can create a view from an existing tabular array using SQL. For example, if you wish to piece of work on only the subset of the The states flight delays information set with origin airports of New York (JFK) and San Francisco (SFO), the following queries will create global temporary and temporary views consisting of just that slice of the table:
-- In SQL
CREATE
OR
Replace
GLOBAL
TEMP
VIEW
us_origin_airport_SFO_global_tmp_view
Every bit
SELECT
date
,
delay
,
origin
,
destination
from
us_delay_flights_tbl
WHERE
origin
=
'SFO'
;
CREATE
OR
REPLACE
TEMP
VIEW
us_origin_airport_JFK_tmp_view
AS
SELECT
date
,
delay
,
origin
,
destination
from
us_delay_flights_tbl
WHERE
origin
=
'JFK'
You can reach the same thing with the DataFrame API every bit follows:
# In Python
df_sfo
=
spark
.
sql
(
"SELECT date, delay, origin, destination FROM
us_delay_flights_tbl
WHERE
origin
=
'SFO'
")
df_jfk
=
spark
.
sql
(
"SELECT date, delay, origin, destination FROM
us_delay_flights_tbl
WHERE
origin
=
'JFK'
")
# Create a temporary and global temporary view
df_sfo
.
createOrReplaceGlobalTempView
(
"us_origin_airport_SFO_global_tmp_view"
)
df_jfk
.
createOrReplaceTempView
(
"us_origin_airport_JFK_tmp_view"
)
Once y'all've created these views, you can effect queries against them only as you lot would against a tabular array. Proceed in mind that when accessing a global temporary view you must employ the prefix global_temp.<view_name>
, because Spark creates global temporary views in a global temporary database called global_temp
. For example:
-- In SQL
SELECT
*
FROM
global_temp
.
us_origin_airport_SFO_global_tmp_view
By contrast, you can access the normal temporary view without the global_temp
prefix:
-- In SQL
SELECT
*
FROM
us_origin_airport_JFK_tmp_view
// In Scala/Python
spark
.
read
.
table
(
"us_origin_airport_JFK_tmp_view"
)
// Or
spark
.
sql
(
"SELECT * FROM us_origin_airport_JFK_tmp_view"
)
You tin also drop a view only like you would a tabular array:
-- In SQL
DROP
VIEW
IF
EXISTS
us_origin_airport_SFO_global_tmp_view
;
DROP
VIEW
IF
EXISTS
us_origin_airport_JFK_tmp_view
// In Scala/Python
spark
.
itemize
.
dropGlobalTempView
(
"us_origin_airport_SFO_global_tmp_view"
)
spark
.
itemize
.
dropTempView
(
"us_origin_airport_JFK_tmp_view"
)
Temporary views versus global temporary views
The difference between temporary and global temporary views being subtle, it can be a source of mild confusion amid developers new to Spark. A temporary view is tied to a single SparkSession
within a Spark application. In dissimilarity, a global temporary view is visible across multiple SparkSession
s within a Spark application. Yep, y'all tin create multiple SparkSession
s inside a single Spark application—this tin can be handy, for example, in cases where you lot want to access (and combine) information from two different SparkSession
due south that don't share the same Hive metastore configurations.
Caching SQL Tables
Although we will hash out table caching strategies in the next chapter, it's worth mentioning hither that, similar DataFrames, you can cache and uncache SQL tables and views. In Spark three.0, in addition to other options, you lot can specify a tabular array as LAZY
, meaning that it should simply be buried when information technology is first used instead of immediately:
-- In SQL
CACHE
[
LAZY
]
Table
<
table
-
name
>
UNCACHE
Table
<
table
-
name
>
Reading Tables into DataFrames
Often, data engineers build information pipelines as part of their regular data ingestion and ETL processes. They populate Spark SQL databases and tables with cleansed data for consumption by applications downstream.
Permit's assume yous have an existing database, learn_spark_db
, and tabular array, us_delay_flights_tbl
, fix for use. Instead of reading from an external JSON file, you tin can simply use SQL to query the tabular array and assign the returned result to a DataFrame:
// In Scala
val
usFlightsDF
=
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
val
usFlightsDF2
=
spark
.
table
(
"us_delay_flights_tbl"
)
# In Python
us_flights_df
=
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
us_flights_df2
=
spark
.
table
(
"us_delay_flights_tbl"
)
Now y'all have a cleansed DataFrame read from an existing Spark SQL table. Yous tin too read data in other formats using Spark's born data sources, giving y'all the flexibility to interact with various common file formats.
Data Sources for DataFrames and SQL Tables
As shown in Figure 4-1, Spark SQL provides an interface to a variety of information sources. It also provides a set of common methods for reading and writing data to and from these data sources using the Information Sources API.
In this section we volition cover some of the built-in data sources, available file formats, and ways to load and write data, along with specific options pertaining to these data sources. But first, let's accept a closer wait at two high-level Information Source API constructs that dictate the manner in which you interact with different data sources: DataFrameReader
and DataFrameWriter
.
DataFrameReader
DataFrameReader
is the core construct for reading data from a data source into a DataFrame. It has a defined format and a recommended blueprint for usage:
DataFrameReader.format(args).option("fundamental", "value").schema(args).load()
This blueprint of stringing methods together is common in Spark, and easy to read. We saw it in Chapter 3 when exploring common data analysis patterns.
Note that y'all can only access a DataFrameReader
through a SparkSession
instance. That is, you lot cannot create an case of DataFrameReader
. To go an instance handle to it, use:
SparkSession.read // or SparkSession.readStream
While read
returns a handle to DataFrameReader
to read into a DataFrame from a static data source, readStream
returns an example to read from a streaming source. (We will cover Structured Streaming later on in the book.)
Arguments to each of the public methods to DataFrameReader
have different values. Table 4-one enumerates these, with a subset of the supported arguments.
Method | Arguments | Clarification |
---|---|---|
format() | "parquet" , "csv" , "txt" , "json" , "jdbc" , "orc" , "avro" , etc. | If you don't specify this method, and so the default is Parquet or whatever is set in spark.sql.sources.default . |
choice() | ("mode", {PERMISSIVE | FAILFAST | DROPMALFORMED } ) ("inferSchema", {true | fake}) ("path", "path_file_data_source") | A series of cardinal/value pairs and options. The Spark documentation shows some examples and explains the unlike modes and their actions. The default style is PERMISSIVE . The "inferSchema" and "mode" options are specific to the JSON and CSV file formats. |
schema() | DDL String or StructType , e.m., 'A INT, B String' orStructType(...) | For JSON or CSV format, you can specify to infer the schema in the option() method. Mostly, providing a schema for whatsoever format makes loading faster and ensures your data conforms to the expected schema. |
load() | "/path/to/data/source" | The path to the data source. This can be empty if specified in option("path", "...") . |
While nosotros won't comprehensively enumerate all the unlike combinations of arguments and options, the documentation for Python, Scala, R, and Java offers suggestions and guidance. It's worthwhile to bear witness a couple of examples, though:
// In Scala
// Use Parquet
val
file
=
"""/databricks-datasets/learning-spark-v2/flights/summary-
data/parquet/2010-summary.parquet"""
val
df
=
spark
.
read
.
format
(
"parquet"
).
load
(
file
)
// Utilise Parquet; you can omit format("parquet") if you lot wish as it'southward the default
val
df2
=
spark
.
read
.
load
(
file
)
// Employ CSV
val
df3
=
spark
.
read
.
format
(
"csv"
)
.
option
(
"inferSchema"
,
"true"
)
.
selection
(
"header"
,
"true"
)
.
option
(
"manner"
,
"PERMISSIVE"
)
.
load
(
"/databricks-datasets/learning-spark-v2/flights/summary-information/csv/*"
)
// Utilise JSON
val
df4
=
spark
.
read
.
format
(
"json"
)
.
load
(
"/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
)
Annotation
In full general, no schema is needed when reading from a static Parquet data source—the Parquet metadata usually contains the schema, so it's inferred. However, for streaming data sources yous will have to provide a schema. (We will comprehend reading from streaming data sources in Affiliate 8.)
Parquet is the default and preferred data source for Spark because it'south efficient, uses columnar storage, and employs a fast pinch algorithm. You will come across additional benefits afterwards (such equally columnar pushdown), when nosotros comprehend the Catalyst optimizer in greater depth.
DataFrameWriter
DataFrameWriter
does the contrary of its counterpart: it saves or writes data to a specified built-in information source. Different with DataFrameReader
, y'all access its instance not from a SparkSession
but from the DataFrame you lot wish to save. It has a few recommended usage patterns:
DataFrameWriter.format(args) .option(args) .bucketBy(args) .partitionBy(args) .save(path) DataFrameWriter.format(args).selection(args).sortBy(args).saveAsTable(table)
To get an instance handle, use:
DataFrame.write // or DataFrame.writeStream
Arguments to each of the methods to DataFrameWriter
as well accept different values. We list these in Table 4-2, with a subset of the supported arguments.
Method | Arguments | Clarification |
---|---|---|
format() | "parquet" , "csv" , "txt" , "json" , "jdbc" , "orc" , "avro" , etc. | If you don't specify this method, then the default is Parquet or whatever is set up in spark.sql.sources.default . |
pick() | ("mode", {suspend | overwrite | ignore | error or errorifexists} ) ("mode", {SaveMode.Overwrite | SaveMode.Append, SaveMode.Ignore, SaveMode.ErrorIfExists}) ("path", "path_to_write_to") | A series of cardinal/value pairs and options. The Spark documentation shows some examples. This is an overloaded method. The default way options are error or errorifexists and SaveMode.ErrorIfExists ; they throw an exception at runtime if the data already exists. |
bucketBy() | (numBuckets, col, col..., coln) | The number of buckets and names of columns to bucket by. Uses Hive's bucketing scheme on a filesystem. |
save() | "/path/to/data/source" | The path to save to. This can exist empty if specified in option("path", "...") . |
saveAsTable() | "table_name" | The table to save to. |
Here's a curt case snippet to illustrate the use of methods and arguments:
// In Scala
// Use JSON
val
location
=
...
df
.
write
.
format
(
"json"
).
mode
(
"overwrite"
).
save
(
location
)
Parquet
We'll start our exploration of data sources with Parquet, because information technology's the default information source in Spark. Supported and widely used by many big information processing frameworks and platforms, Parquet is an open source columnar file format that offers many I/O optimizations (such equally compression, which saves storage space and allows for quick access to data columns).
Because of its efficiency and these optimizations, we recommend that later yous accept transformed and cleansed your information, you save your DataFrames in the Parquet format for downstream consumption. (Parquet is also the default table open format for Delta Lake, which nosotros will cover in Chapter nine.)
Reading Parquet files into a DataFrame
Parquet files are stored in a directory structure that contains the data files, metadata, a number of compressed files, and some status files. Metadata in the footer contains the version of the file format, the schema, and column data such as the path, etc.
For example, a directory in a Parquet file might contain a set of files similar this:
_SUCCESS _committed_1799640464332036264 _started_1799640464332036264 part-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet
There may be a number of part-XXXX compressed files in a directory (the names shown here accept been shortened to fit on the page).
To read Parquet files into a DataFrame, you lot simply specify the format and path:
// In Scala
val
file
=
"""/databricks-datasets/learning-spark-v2/flights/summary-data/
parquet/2010-summary.parquet/"""
val
df
=
spark
.
read
.
format
(
"parquet"
).
load
(
file
)
# In Python
file
=
"""/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/
2010-summary.parquet/"""
df
=
spark
.
read
.
format
(
"parquet"
)
.
load
(
file
)
Unless you are reading from a streaming data source there's no need to supply the schema, because Parquet saves it as part of its metadata.
Reading Parquet files into a Spark SQL table
Equally well as reading Parquet files into a Spark DataFrame, you tin also create a Spark SQL unmanaged table or view directly using SQL:
-- In SQL
CREATE
OR
REPLACE
TEMPORARY
VIEW
us_delay_flights_tbl
USING
parquet
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/
2010-summary.parquet/"
)
Once y'all've created the table or view, you can read data into a DataFrame using SQL, as we saw in some before examples:
// In Scala
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
).
bear witness
()
# In Python
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
.
evidence
()
Both of these operations return the same results:
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |United States |Romania |1 | |United States |Ireland |264 | |U.s. |Republic of india |69 | |Egypt |United states |24 | |Republic of equatorial guinea|United States |1 | |United States |Singapore |25 | |United States |Grenada |54 | |Costa Rica |United States |477 | |Senegal |Us |29 | |United States |Marshall islands |44 | +-----------------+-------------------+-----+ simply showing superlative 10 rows
Writing DataFrames to Parquet files
Writing or saving a DataFrame every bit a table or file is a common operation in Spark. To write a DataFrame y'all simply use the methods and arguments to the DataFrameWriter
outlined earlier in this chapter, supplying the location to save the Parquet files to. For instance:
// In Scala
df
.
write
.
format
(
"parquet"
)
.
mode
(
"overwrite"
)
.
choice
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/information/parquet/df_parquet"
)
# In Python
(
df
.
write
.
format
(
"parquet"
)
.
way
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/data/parquet/df_parquet"
))
Note
Call back that Parquet is the default file format. If you don't include the format()
method, the DataFrame will nevertheless be saved every bit a Parquet file.
This volition create a gear up of compact and compressed Parquet files at the specified path. Since we used snappy equally our pinch choice hither, nosotros'll accept snappy compressed files. For brevity, this example generated but one file; usually, there may be a dozen or so files created:
-rw-r--r-- 1 jules wheel 0 May 19 10:58 _SUCCESS -rw-r--r-- 1 jules wheel 966 May 19 ten:58 part-00000-<...>-c000.snappy.parquet
Writing DataFrames to Spark SQL tables
Writing a DataFrame to a SQL tabular array is equally piece of cake every bit writing to a file—but use saveAsTable()
instead of save()
. This volition create a managed tabular array chosen us_delay_flights_tbl
:
// In Scala
df
.
write
.
way
(
"overwrite"
)
.
saveAsTable
(
"us_delay_flights_tbl"
)
# In Python
(
df
.
write
.
manner
(
"overwrite"
)
.
saveAsTable
(
"us_delay_flights_tbl"
))
To sum upward, Parquet is the preferred and default built-in data source file format in Spark, and it has been adopted by many other frameworks. Nosotros recommend that you use this format in your ETL and data ingestion processes.
JSON
JavaScript Object Notation (JSON) is likewise a popular data format. It came to prominence as an piece of cake-to-read and easy-to-parse format compared to XML. It has two representational formats: single-line mode and multiline manner. Both modes are supported in Spark.
In unmarried-line mode each line denotes a single JSON object, whereas in multiline mode the entire multiline object constitutes a unmarried JSON object. To read in this mode, prepare multiLine
to true in the selection()
method.
Reading a JSON file into a DataFrame
Y'all tin can read a JSON file into a DataFrame the same way you did with Parquet—merely specify "json"
in the format()
method:
// In Scala
val
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
val
df
=
spark
.
read
.
format
(
"json"
).
load
(
file
)
# In Python
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
df
=
spark
.
read
.
format
(
"json"
)
.
load
(
file
)
Reading a JSON file into a Spark SQL table
Yous tin can also create a SQL table from a JSON file just similar you did with Parquet:
-- In SQL
CREATE
OR
Supplant
TEMPORARY
VIEW
us_delay_flights_tbl
USING
json
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-information/json/*"
)
Once the table is created, you can read information into a DataFrame using SQL:
//
In
Scala
/
Python
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
.
show
()
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
15
|
|
United
States
|
Republic of croatia
|
1
|
|
United
States
|
Republic of ireland
|
344
|
|
Egypt
|
United
States
|
15
|
|
United
States
|
India
|
62
|
|
United
States
|
Singapore
|
ane
|
|
United
States
|
Grenada
|
62
|
|
Costa
Rica
|
United
States
|
588
|
|
Senegal
|
United
States
|
40
|
|
Moldova
|
United
States
|
i
|
+-----------------+-------------------+-----+
only
showing
pinnacle
x
rows
Writing DataFrames to JSON files
Saving a DataFrame as a JSON file is simple. Specify the appropriate DataFrameWriter
methods and arguments, and supply the location to salvage the JSON files to:
// In Scala
df
.
write
.
format
(
"json"
)
.
style
(
"overwrite"
)
.
option
(
"pinch"
,
"snappy"
)
.
save
(
"/tmp/information/json/df_json"
)
# In Python
(
df
.
write
.
format
(
"json"
)
.
style
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/data/json/df_json"
))
This creates a directory at the specified path populated with a set up of meaty JSON files:
-rw-r--r-- 1 jules bike 0 May 16 14:44 _SUCCESS -rw-r--r-- 1 jules wheel 71 May 16 14:44 part-00000-<...>-c000.json
JSON data source options
Table 4-three describes common JSON options for DataFrameReader
and DataFrameWriter
. For a comprehensive list, we refer you to the documentation.
Property name | Values | Pregnant | Telescopic |
---|---|---|---|
compression | none , uncompressed , bzip2 , deflate , gzip , lz4 , or snappy | Use this compression codec for writing. Annotation that read volition just detect the compression or codec from the file extension. | Write |
dateFormat | yyyy-MM-dd or DateTimeFormatter | Use this format or whatsoever format from Java's DateTimeFormatter . | Read/write |
multiLine | true , false | Use multiline style. Default is false (single-line style). | Read |
allowUnquotedFieldNames | true , false | Allow unquoted JSON field names. Default is false . | Read |
CSV
Equally widely used as plain text files, this common text file format captures each datum or field delimited by a comma; each line with comma-separated fields represents a record. Even though a comma is the default separator, you may use other delimiters to separate fields in cases where commas are function of your data. Popular spreadsheets can generate CSV files, so it'south a pop format among data and business analysts.
Reading a CSV file into a DataFrame
As with the other congenital-in data sources, you tin can use the DataFrameReader
methods and arguments to read a CSV file into a DataFrame:
// In Scala
val
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
val
schema
=
"DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME String, count INT"
val
df
=
spark
.
read
.
format
(
"csv"
)
.
schema
(
schema
)
.
choice
(
"header"
,
"truthful"
)
.
pick
(
"fashion"
,
"FAILFAST"
)
// Exit if any errors
.
selection
(
"nullValue"
,
""
)
// Replace any null data with quotes
.
load
(
file
)
# In Python
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
schema
=
"DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME String, count INT"
df
=
(
spark
.
read
.
format
(
"csv"
)
.
option
(
"header"
,
"true"
)
.
schema
(
schema
)
.
pick
(
"mode"
,
"FAILFAST"
)
# Leave if any errors
.
option
(
"nullValue"
,
""
)
# Replace any null data field with quotes
.
load
(
file
))
Reading a CSV file into a Spark SQL tabular array
Creating a SQL table from a CSV data source is no different from using Parquet or JSON:
-- In SQL
CREATE
OR
Supplant
TEMPORARY
VIEW
us_delay_flights_tbl
USING
csv
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
,
header
"true"
,
inferSchema
"true"
,
style
"FAILFAST"
)
Once you've created the table, y'all can read data into a DataFrame using SQL as before:
//
In
Scala
/
Python
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
.
show
(
10
)
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
1
|
|
United
States
|
Republic of ireland
|
264
|
|
United
States
|
India
|
69
|
|
Egypt
|
United
States
|
24
|
|
Equatorial
Republic of guinea
|
United
States
|
ane
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
simply
showing
top
10
rows
Writing DataFrames to CSV files
Saving a DataFrame as a CSV file is elementary. Specify the appropriate DataFrameWriter
methods and arguments, and supply the location to save the CSV files to:
// In Scala
df
.
write
.
format
(
"csv"
).
fashion
(
"overwrite"
).
save
(
"/tmp/data/csv/df_csv"
)
# In Python
df
.
write
.
format
(
"csv"
)
.
manner
(
"overwrite"
)
.
save
(
"/tmp/data/csv/df_csv"
)
This generates a folder at the specified location, populated with a agglomeration of compressed and compact files:
-rw-r--r-- 1 jules wheel 0 May sixteen 12:17 _SUCCESS -rw-r--r-- 1 jules wheel 36 May 16 12:17 part-00000-251690eb-<...>-c000.csv
CSV data source options
Tabular array iv-4 describes some of the common CSV options for DataFrameReader
and DataFrameWriter
. Because CSV files can be complex, many options are bachelor; for a comprehensive list nosotros refer you to the documentation.
Belongings name | Values | Pregnant | Scope |
---|---|---|---|
compression | none , bzip2 , deflate , gzip , lz4 , or snappy | Use this compression codec for writing. | Write |
dateFormat | yyyy-MM-dd or DateTimeFormatter | Employ this format or any format from Java's DateTimeFormatter . | Read/write |
multiLine | true , simulated | Utilize multiline mode. Default is false (single-line mode). | Read |
inferSchema | true , simulated | If true , Spark volition determine the column data types. Default is simulated . | Read |
sep | Whatsoever character | Employ this character to separate column values in a row. Default delimiter is a comma (, ). | Read/write |
escape | Any character | Use this character to escape quotes. Default is \ . | Read/write |
header | true , false | Indicates whether the beginning line is a header denoting each column name. Default is false . | Read/write |
Avro
Introduced in Spark 2.4 as a born data source, the Avro format is used, for example, past Apache Kafka for message serializing and deserializing. Information technology offers many benefits, including straight mapping to JSON, speed and efficiency, and bindings available for many programming languages.
Reading an Avro file into a DataFrame
Reading an Avro file into a DataFrame using DataFrameReader
is consequent in usage with the other data sources we have discussed in this section:
// In Scala
val
df
=
spark
.
read
.
format
(
"avro"
)
.
load
(
"/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
)
df
.
show
(
false
)
# In Python
df
=
(
spark
.
read
.
format
(
"avro"
)
.
load
(
"/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
))
df
.
show
(
truncate
=
Imitation
)
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
1
|
|
United
States
|
Ireland
|
264
|
|
United
States
|
India
|
69
|
|
Arab republic of egypt
|
United
States
|
24
|
|
Equatorial
Guinea
|
United
States
|
ane
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
only
showing
top
10
rows
Reading an Avro file into a Spark SQL table
Again, creating SQL tables using an Avro data source is no dissimilar from using Parquet, JSON, or CSV:
-- In SQL
CREATE
OR
Supercede
TEMPORARY
VIEW
episode_tbl
USING
avro
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
)
Once you've created a table, you lot can read data into a DataFrame using SQL:
// In Scala
spark
.
sql
(
"SELECT * FROM episode_tbl"
).
show
(
false
)
# In Python
spark
.
sql
(
"SELECT * FROM episode_tbl"
)
.
testify
(
truncate
=
Faux
)
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
ane
|
|
United
States
|
Republic of ireland
|
264
|
|
United
States
|
Republic of india
|
69
|
|
Egypt
|
United
States
|
24
|
|
Equatorial
Guinea
|
United
States
|
1
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
simply
showing
acme
x
rows
Writing DataFrames to Avro files
Writing a DataFrame every bit an Avro file is elementary. As usual, specify the advisable DataFrameWriter
methods and arguments, and supply the location to save the Avro files to:
// In Scala
df
.
write
.
format
(
"avro"
)
.
mode
(
"overwrite"
)
.
salvage
(
"/tmp/data/avro/df_avro"
)
# In Python
(
df
.
write
.
format
(
"avro"
)
.
mode
(
"overwrite"
)
.
save
(
"/tmp/data/avro/df_avro"
))
This generates a folder at the specified location, populated with a bunch of compressed and meaty files:
-rw-r--r-- one jules bike 0 May 17 xi:54 _SUCCESS -rw-r--r-- one jules wheel 526 May 17 xi:54 part-00000-ffdf70f4-<...>-c000.avro
Avro data source options
Tabular array four-5 describes common options for DataFrameReader
and DataFrameWriter
. A comprehensive listing of options is in the documentation.
Belongings name | Default value | Meaning | Scope |
---|---|---|---|
avroSchema | None | Optional Avro schema provided by a user in JSON format. The data blazon and naming of record fields should friction match the input Avro data or Goad information (Spark internal data type), otherwise the read/write activity will fail. | Read/write |
recordName | topLevelRecord | Top-level record proper name in write upshot, which is required in the Avro spec. | Write |
recordNamespace | "" | Tape namespace in write result. | Write |
ignoreExtension | true | If this selection is enabled, all files (with and without the .avro extension) are loaded. Otherwise, files without the .avro extension are ignored. | Read |
compression | snappy | Allows you to specify the compression codec to use in writing. Currently supported codecs are uncompressed , snappy , debunk , bzip2 , and xz .If this option is non ready, the value in spark.sql.avro.pinch.codec is taken into account. | Write |
ORC
As an boosted optimized columnar file format, Spark 2.x supports a vectorized ORC reader. Two Spark configurations dictate which ORC implementation to use. When spark.sql.orc.impl
is set to native
and spark.sql.orc.enableVectorizedReader
is set up to true
, Spark uses the vectorized ORC reader. A vectorized reader reads blocks of rows (often ane,024 per block) instead of ane row at a fourth dimension, streamlining operations and reducing CPU usage for intensive operations like scans, filters, aggregations, and joins.
For Hive ORC SerDe (serialization and deserialization) tables created with the SQL command USING HIVE OPTIONS (fileFormat 'ORC')
, the vectorized reader is used when the Spark configuration parameter spark.sql.hive.convertMetastoreOrc
is fix to true
.
Reading an ORC file into a DataFrame
To read in a DataFrame using the ORC vectorized reader, you lot can merely employ the normal DataFrameReader
methods and options:
// In Scala
val
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
val
df
=
spark
.
read
.
format
(
"orc"
).
load
(
file
)
df
.
testify
(
10
,
false
)
# In Python
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-information/orc/*"
df
=
spark
.
read
.
format
(
"orc"
)
.
option
(
"path"
,
file
)
.
load
()
df
.
testify
(
10
,
False
)
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
1
|
|
United
States
|
Republic of ireland
|
264
|
|
United
States
|
India
|
69
|
|
Egypt
|
United
States
|
24
|
|
Equatorial
Guinea
|
United
States
|
1
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
simply
showing
acme
10
rows
Reading an ORC file into a Spark SQL tabular array
In that location is no deviation from Parquet, JSON, CSV, or Avro when creating a SQL view using an ORC information source:
-- In SQL
CREATE
OR
Supplant
TEMPORARY
VIEW
us_delay_flights_tbl
USING
orc
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-information/orc/*"
)
Once a table is created, you can read data into a DataFrame using SQL equally usual:
//
In
Scala
/
Python
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
.
testify
()
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
1
|
|
United
States
|
Republic of ireland
|
264
|
|
United
States
|
India
|
69
|
|
Arab republic of egypt
|
United
States
|
24
|
|
Equatorial
Republic of guinea
|
United
States
|
1
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
only
showing
superlative
10
rows
Writing DataFrames to ORC files
Writing dorsum a transformed DataFrame subsequently reading is equally uncomplicated using the DataFrameWriter
methods:
// In Scala
df
.
write
.
format
(
"orc"
)
.
mode
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/data/orc/df_orc"
)
# In Python
(
df
.
write
.
format
(
"orc"
)
.
fashion
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
salve
(
"/tmp/data/orc/flights_orc"
))
The issue volition be a folder at the specified location containing some compressed ORC files:
-rw-r--r-- ane jules wheel 0 May 16 17:23 _SUCCESS -rw-r--r-- 1 jules wheel 547 May sixteen 17:23 part-00000-<...>-c000.snappy.orc
Images
In Spark 2.4 the community introduced a new data source, image files, to support deep learning and machine learning frameworks such every bit TensorFlow and PyTorch. For computer vision–based machine learning applications, loading and processing prototype data sets is of import.
Reading an prototype file into a DataFrame
As with all of the previous file formats, you can use the DataFrameReader
methods and options to read in an image file as shown here:
// In Scala
import
org.apache.spark.ml.source.paradigm
val
imageDir
=
"/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val
imagesDF
=
spark
.
read
.
format
(
"image"
).
load
(
imageDir
)
imagesDF
.
printSchema
imagesDF
.
select
(
"image.height"
,
"image.width"
,
"prototype.nChannels"
,
"prototype.fashion"
,
"label"
).
show
(
5
,
imitation
)
# In Python
from
pyspark.ml
import
image
image_dir
=
"/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
images_df
=
spark
.
read
.
format
(
"image"
)
.
load
(
image_dir
)
images_df
.
printSchema
()
root
|--
prototype
:
struct
(
nullable
=
true
)
|
|--
origin
:
string
(
nullable
=
true
)
|
|--
height
:
integer
(
nullable
=
truthful
)
|
|--
width
:
integer
(
nullable
=
true
)
|
|--
nChannels
:
integer
(
nullable
=
truthful
)
|
|--
mode
:
integer
(
nullable
=
truthful
)
|
|--
data
:
binary
(
nullable
=
true
)
|--
label
:
integer
(
nullable
=
true
)
images_df
.
select
(
"image.height"
,
"image.width"
,
"image.nChannels"
,
"image.style"
,
"label"
)
.
show
(
5
,
truncate
=
Faux
)
+------+-----+---------+----+-----+
|
peak
|
width
|
nChannels
|
mode
|
label
|
+------+-----+---------+----+-----+
|
288
|
384
|
3
|
16
|
0
|
|
288
|
384
|
three
|
16
|
1
|
|
288
|
384
|
3
|
16
|
0
|
|
288
|
384
|
3
|
xvi
|
0
|
|
288
|
384
|
three
|
16
|
0
|
+------+-----+---------+----+-----+
merely
showing
top
5
rows
Binary Files
Spark 3.0 adds support for binary files as a data source. The DataFrameReader
converts each binary file into a unmarried DataFrame row (record) that contains the raw content and metadata of the file. The binary file information source produces a DataFrame with the post-obit columns:
-
path: StringType
-
modificationTime: TimestampType
-
length: LongType
-
content: BinaryType
Reading a binary file into a DataFrame
To read binary files, specify the data source format as a binaryFile
. You can load files with paths matching a given global design while preserving the behavior of partition discovery with the data source option pathGlobFilter
. For example, the following lawmaking reads all JPG files from the input directory with any partitioned directories:
// In Scala
val
path
=
"/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val
binaryFilesDF
=
spark
.
read
.
format
(
"binaryFile"
)
.
option
(
"pathGlobFilter"
,
"*.jpg"
)
.
load
(
path
)
binaryFilesDF
.
show
(
5
)
# In Python
path
=
"/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df
=
(
spark
.
read
.
format
(
"binaryFile"
)
.
option
(
"pathGlobFilter"
,
"*.jpg"
)
.
load
(
path
))
binary_files_df
.
prove
(
5
)
+--------------------+-------------------+------+--------------------+-----+
|
path
|
modificationTime
|
length
|
content
|
characterization
|
+--------------------+-------------------+------+--------------------+-----+
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
55037
|
[
FF
D8
FF
E0
00
1.
..|
0
|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54634
|
[
FF
D8
FF
E0
00
i.
..|
1
|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54624
|
[
FF
D8
FF
E0
00
1.
..|
0
|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54505
|
[
FF
D8
FF
E0
00
1.
..|
0
|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54475
|
[
FF
D8
FF
E0
00
ane.
..|
0
|
+--------------------+-------------------+------+--------------------+-----+
only
showing
top
five
rows
To ignore partitioning information discovery in a directory, you can set recursiveFileLookup
to "true"
:
// In Scala
val
binaryFilesDF
=
spark
.
read
.
format
(
"binaryFile"
)
.
pick
(
"pathGlobFilter"
,
"*.jpg"
)
.
option
(
"recursiveFileLookup"
,
"true"
)
.
load
(
path
)
binaryFilesDF
.
show
(
5
)
# In Python
binary_files_df
=
(
spark
.
read
.
format
(
"binaryFile"
)
.
selection
(
"pathGlobFilter"
,
"*.jpg"
)
.
option
(
"recursiveFileLookup"
,
"truthful"
)
.
load
(
path
))
binary_files_df
.
evidence
(
5
)
+--------------------+-------------------+------+--------------------+
|
path
|
modificationTime
|
length
|
content
|
+--------------------+-------------------+------+--------------------+
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
55037
|
[
FF
D8
FF
E0
00
one.
..|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54634
|
[
FF
D8
FF
E0
00
one.
..|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54624
|
[
FF
D8
FF
E0
00
1.
..|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54505
|
[
FF
D8
FF
E0
00
ane.
..|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54475
|
[
FF
D8
FF
E0
00
i.
..|
+--------------------+-------------------+------+--------------------+
only
showing
top
5
rows
Note that the characterization
column is absent when the recursiveFileLookup
selection is set to "true"
.
Currently, the binary file information source does not support writing a DataFrame back to the original file format.
In this section, yous got a tour of how to read data into a DataFrame from a range of supported file formats. We as well showed you how to create temporary views and tables from the existing congenital-in data sources. Whether you lot're using the DataFrame API or SQL, the queries produce identical outcomes. You can examine some of these queries in the notebook available in the GitHub repo for this book.
Summary
To epitomize, this chapter explored the interoperability between the DataFrame API and Spark SQL. In particular, you lot got a flavor of how to use Spark SQL to:
-
Create managed and unmanaged tables using Spark SQL and the DataFrame API.
-
Read from and write to various built-in information sources and file formats.
-
Use the
spark.sql
programmatic interface to issue SQL queries on structured data stored as Spark SQL tables or views. -
Peruse the Spark
Catalog
to inspect metadata associated with tables and views. -
Employ the
DataFrameWriter
andDataFrameReader
APIs.
Through the code snippets in the chapter and the notebooks available in the book'southward GitHub repo, y'all got a feel for how to use DataFrames and Spark SQL. Continuing in this vein, the next chapter farther explores how Spark interacts with the external data sources shown in Figure 4-1. Yous'll see some more in-depth examples of transformations and the interoperability between the DataFrame API and Spark SQL.
Source: https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch04.html
Postar um comentário for "Optimize Reading Orc Files From Spark Sql"