Skip to content Skip to sidebar Skip to footer

Optimize Reading Orc Files From Spark Sql

Chapter 4. Spark SQL and DataFrames: Introduction to Built-in Data Sources

In the previous affiliate, we explained the evolution of and justification for construction in Spark. In particular, we discussed how the Spark SQL engine provides a unified foundation for the high-level DataFrame and Dataset APIs. Now, we'll continue our discussion of the DataFrame and explore its interoperability with Spark SQL.

This chapter and the next also explore how Spark SQL interfaces with some of the external components shown in Figure 4-i.

In detail, Spark SQL:

  • Provides the engine upon which the high-level Structured APIs we explored in Chapter 3 are built.

  • Can read and write information in a multifariousness of structured formats (east.one thousand., JSON, Hive tables, Parquet, Avro, ORC, CSV).

  • Lets you query data using JDBC/ODBC connectors from external business organization intelligence (BI) data sources such as Tableau, Power BI, Talend, or from RDBMSs such every bit MySQL and PostgreSQL.

  • Provides a programmatic interface to interact with structured information stored as tables or views in a database from a Spark application

  • Offers an interactive shell to consequence SQL queries on your structured data.

  • Supports ANSI SQL:2003-compliant commands and HiveQL.

Spark SQL connectors and data sources

Figure 4-1. Spark SQL connectors and data sources

Let's begin with how you can use Spark SQL in a Spark application.

Using Spark SQL in Spark Applications

The SparkSession, introduced in Spark 2.0, provides a unified entry betoken for programming Spark with the Structured APIs. You can employ a SparkSession to access Spark functionality: simply import the class and create an instance in your code.

To event any SQL query, utilize the sql() method on the SparkSession instance, spark, such as spark.sql("SELECT * FROM myTableName"). All spark.sql queries executed in this fashion return a DataFrame on which you may perform further Spark operations if you desire—the kind we explored in Chapter 3 and the ones you lot will acquire well-nigh in this chapter and the next.

Bones Query Examples

In this department we'll walk through a few examples of queries on the Airline On-Time Functioning and Causes of Flight Delays information prepare, which contains information on US flights including date, filibuster, distance, origin, and destination. Information technology's available as a CSV file with over a million records. Using a schema, we'll read the data into a DataFrame and register the DataFrame equally a temporary view (more on temporary views shortly) and then we can query it with SQL.

Query examples are provided in lawmaking snippets, and Python and Scala notebooks containing all of the lawmaking presented here are available in the book's GitHub repo. These examples will offer you a sense of taste of how to use SQL in your Spark applications via the spark.sql programmatic interface. Similar to the DataFrame API in its declarative flavor, this interface allows you to query structured information in your Spark applications.

Normally, in a standalone Spark application, you volition create a SparkSession instance manually, as shown in the following example. However, in a Spark vanquish (or Databricks notebook), the SparkSession is created for yous and accessible via the accordingly named variable spark.

Let's get started past reading the data gear up into a temporary view:

              // In Scala              import              org.apache.spark.sql.SparkSession              val              spark              =              SparkSession              .              architect              .              appName              (              "SparkSQLExampleApp"              )              .              getOrCreate              ()              // Path to data set                            val              csvFile              =              "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"              // Read and create a temporary view              // Infer schema (note that for larger files you may want to specify the schema)              val              df              =              spark              .              read              .              format              (              "csv"              )              .              pick              (              "inferSchema"              ,              "true"              )              .              pick              (              "header"              ,              "true"              )              .              load              (              csvFile              )              // Create a temporary view              df              .              createOrReplaceTempView              (              "us_delay_flights_tbl"              )            
              # In Python              from              pyspark.sql              import              SparkSession              # Create a SparkSession              spark              =              (              SparkSession              .              builder              .              appName              (              "SparkSQLExampleApp"              )              .              getOrCreate              ())              # Path to information set              csv_file              =              "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"              # Read and create a temporary view              # Infer schema (note that for larger files you                            # may want to specify the schema)              df              =              (              spark              .              read              .              format              (              "csv"              )              .              choice              (              "inferSchema"              ,              "true"              )              .              option              (              "header"              ,              "true"              )              .              load              (              csv_file              ))              df              .              createOrReplaceTempView              (              "us_delay_flights_tbl"              )            
Annotation

If you want to specify a schema, you lot can use a DDL-formatted cord. For instance:

                // In Scala                val                schema                =                "date STRING, delay INT, distance INT,                                                  origin STRING, destination String"              
                # In Python                schema                =                "`appointment` Cord, `delay` INT, `distance` INT,                                `origin`                STRING                ,                `destination`                Cord                "              

At present that we have a temporary view, we can issue SQL queries using Spark SQL. These queries are no different from those you might issue against a SQL table in, say, a MySQL or PostgreSQL database. The betoken here is to testify that Spark SQL offers an ANSI:2003–compliant SQL interface, and to demonstrate the interoperability between SQL and DataFrames.

The US flying delays data set has 5 columns:

  • The date column contains a string like 02190925. When converted, this maps to 02-19 09:25 am.

  • The delay column gives the delay in minutes between the scheduled and actual deviation times. Early departures show negative numbers.

  • The distance column gives the distance in miles from the origin airport to the destination airdrome.

  • The origin column contains the origin IATA airport code.

  • The destination column contains the destination IATA airport code.

With that in heed, let's try some example queries confronting this information set.

Offset, nosotros'll find all flights whose distance is greater than i,000 miles:

spark.sql("""SELECT distance, origin, destination  FROM us_delay_flights_tbl WHERE distance > chiliad  Club BY distance DESC""").show(10)  +--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | +--------+------+-----------+ only showing summit 10 rows

As the results show, all of the longest flights were between Honolulu (HNL) and New York (JFK). Next, we'll find all flights between San Francisco (SFO) and Chicago (ORD) with at to the lowest degree a two-hour delay:

spark.sql("""SELECT engagement, filibuster, origin, destination  FROM us_delay_flights_tbl  WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'  Society by delay DESC""").show(10)  +--------+-----+------+-----------+ |date    |delay|origin|destination| +--------+-----+------+-----------+ |02190925|1638 |SFO   |ORD        | |01031755|396  |SFO   |ORD        | |01022330|326  |SFO   |ORD        | |01051205|320  |SFO   |ORD        | |01190925|297  |SFO   |ORD        | |02171115|296  |SFO   |ORD        | |01071040|279  |SFO   |ORD        | |01051550|274  |SFO   |ORD        | |03120730|266  |SFO   |ORD        | |01261104|258  |SFO   |ORD        | +--------+-----+------+-----------+ only showing top ten rows

It seems at that place were many significantly delayed flights between these two cities, on unlike dates. (As an do, convert the date cavalcade into a readable format and find the days or months when these delays were most common. Were the delays related to wintertime months or holidays?)

Let'southward try a more complicated query where we use the CASE clause in SQL. In the following instance, we desire to label all US flights, regardless of origin and destination, with an indication of the delays they experienced: Very Long Delays (> half dozen hours), Long Delays (2–half-dozen hours), etc. Nosotros'll add these human-readable labels in a new column called Flight_Delays:

spark.sql("""SELECT filibuster, origin, destination,                Example                   WHEN delay > 360 Then 'Very Long Delays'                   WHEN delay >= 120 AND filibuster <= 360 And so 'Long Delays'                   WHEN delay >= 60 AND delay < 120 So 'Short Delays'                   WHEN filibuster > 0 and delay < 60 And so 'Tolerable Delays'                   WHEN delay = 0 So 'No Delays'                   ELSE 'Early'                Terminate AS Flight_Delays                FROM us_delay_flights_tbl                ORDER BY origin, delay DESC""").show(10)  +-----+------+-----------+-------------+ |delay|origin|destination|Flight_Delays| +-----+------+-----------+-------------+ |333  |ABE   |ATL        |Long Delays  | |305  |ABE   |ATL        |Long Delays  | |275  |ABE   |ATL        |Long Delays  | |257  |ABE   |ATL        |Long Delays  | |247  |ABE   |DTW        |Long Delays  | |247  |ABE   |ATL        |Long Delays  | |219  |ABE   |ORD        |Long Delays  | |211  |ABE   |ATL        |Long Delays  | |197  |ABE   |DTW        |Long Delays  | |192  |ABE   |ORD        |Long Delays  | +-----+------+-----------+-------------+ simply showing top 10 rows

As with the DataFrame and Dataset APIs, with the spark.sql interface you tin behave common data analysis operations similar those nosotros explored in the previous chapter. The computations undergo an identical journey in the Spark SQL engine (meet "The Catalyst Optimizer" in Chapter 3 for details), giving you the same results.

All three of the preceding SQL queries can be expressed with an equivalent DataFrame API query. For example, the offset query can be expressed in the Python DataFrame API as:

              # In Python              from              pyspark.sql.functions              import              col              ,              desc              (              df              .              select              (              "altitude"              ,              "origin"              ,              "destination"              )              .              where              (              col              (              "distance"              )              >              yard              )              .              orderBy              (              desc              (              "distance"              )))              .              prove              (              10              )              # Or              (              df              .              select              (              "altitude"              ,              "origin"              ,              "destination"              )              .              where              (              "distance > one thousand"              )              .              orderBy              (              "distance"              ,              ascending              =              False              )              .              bear witness              (              10              ))            

This produces the aforementioned results as the SQL query:

+--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | +--------+------+-----------+ simply showing top 10 rows

As an exercise, try converting the other two SQL queries to use the DataFrame API.

As these examples bear witness, using the Spark SQL interface to query information is similar to writing a regular SQL query to a relational database table. Although the queries are in SQL, yous tin can feel the similarity in readability and semantics to DataFrame API operations, which you encountered in Affiliate 3 and will explore further in the adjacent chapter.

To enable you to query structured data as shown in the preceding examples, Spark manages all the complexities of creating and managing views and tables, both in memory and on disk. That leads usa to our next topic: how tables and views are created and managed.

SQL Tables and Views

Tables hold data. Associated with each table in Spark is its relevant metadata, which is data virtually the table and its data: the schema, clarification, table name, database proper name, column names, partitions, concrete location where the actual data resides, etc. All of this is stored in a central metastore.

Instead of having a divide metastore for Spark tables, Spark by default uses the Apache Hive metastore, located at /user/hive/warehouse, to persist all the metadata about your tables. However, y'all may change the default location by setting the Spark config variable spark.sql.warehouse.dir to some other location, which can be set to a local or external distributed storage.

Managed Versus UnmanagedTables

Spark allows you to create two types of tables: managed and unmanaged. For a managed table, Spark manages both the metadata and the information in the file store. This could be a local filesystem, HDFS, or an object store such as Amazon S3 or Azure Blob. For an unmanaged table, Spark only manages the metadata, while you manage the data yourself in an external data source such every bit Cassandra.

With a managed table, considering Spark manages everything, a SQL command such as DROP Tabular array table_name deletes both the metadata and the data. With an unmanaged table, the aforementioned control volition delete simply the metadata, not the actual data. We volition await at some examples of how to create managed and unmanaged tables in the next department.

Creating SQL Databases and Tables

Tables reside inside a database. By default, Spark creates tables under the default database. To create your own database name, yous can issue a SQL command from your Spark application or notebook. Using the U.s.a. flight delays data ready, allow'southward create both a managed and an unmanaged table. To begin, we'll create a database called learn_spark_db and tell Spark nosotros want to use that database:

              // In Scala/Python              spark              .              sql              (              "CREATE DATABASE learn_spark_db"              )              spark              .              sql              (              "USE learn_spark_db"              )            

From this point, any commands we issue in our application to create tables will result in the tables beingness created in this database and residing nether the database proper noun learn_spark_db.

Creating a managed table

To create a managed table within the database learn_spark_db, you can result a SQL query like the following:

                // In Scala/Python                spark                .                sql                (                "CREATE Tabular array managed_us_delay_flights_tbl (date STRING, delay INT,                                                  distance INT, origin STRING, destination STRING)"                )              

You can do the same thing using the DataFrame API like this:

                # In Python                # Path to our U.s.a. flight delays CSV file                                csv_file                =                "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"                # Schema as divers in the preceding example                schema                =                "date STRING, delay INT, distance INT, origin STRING, destination STRING"                flights_df                =                spark                .                read                .                csv                (                csv_file                ,                schema                =                schema                )                flights_df                .                write                .                saveAsTable                (                "managed_us_delay_flights_tbl"                )              

Both of these statements will create the managed table us_delay_flights_tbl in the learn_spark_db database.

Creating an unmanaged table

By contrast, you can create unmanaged tables from your own data sources—say, Parquet, CSV, or JSON files stored in a file shop accessible to your Spark application.

To create an unmanaged table from a data source such equally a CSV file, in SQL employ:

spark.sql("""CREATE TABLE us_delay_flights_tbl(date String, filibuster INT,    distance INT, origin String, destination String)    USING csv OPTIONS (PATH    '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")

And inside the DataFrame API use:

(flights_df   .write   .option("path", "/tmp/data/us_flights_delay")   .saveAsTable("us_delay_flights_tbl"))
Note

To enable you lot to explore these examples, we take created Python and Scala example notebooks that yous tin find in the book'southward GitHub repo.

Creating Views

In addition to creating tables, Spark can create views on peak of existing tables. Views can be global (visible across all SparkSessionsouth on a given cluster) or session-scoped (visible simply to a single SparkSession), and they are temporary: they disappear afterward your Spark application terminates.

Creating views has a like syntax to creating tables inside a database. In one case you create a view, you lot tin query it equally you would a table. The difference between a view and a table is that views don't really concur the information; tables persist after your Spark application terminates, merely views disappear.

You lot can create a view from an existing tabular array using SQL. For example, if you wish to piece of work on only the subset of the The states flight delays information set with origin airports of New York (JFK) and San Francisco (SFO), the following queries will create global temporary and temporary views consisting of just that slice of the table:

              -- In SQL              CREATE              OR              Replace              GLOBAL              TEMP              VIEW              us_origin_airport_SFO_global_tmp_view              Every bit              SELECT              date              ,              delay              ,              origin              ,              destination              from              us_delay_flights_tbl              WHERE              origin              =              'SFO'              ;              CREATE              OR              REPLACE              TEMP              VIEW              us_origin_airport_JFK_tmp_view              AS              SELECT              date              ,              delay              ,              origin              ,              destination              from              us_delay_flights_tbl              WHERE              origin              =              'JFK'            

You can reach the same thing with the DataFrame API every bit follows:

              # In Python              df_sfo              =              spark              .              sql              (              "SELECT date, delay, origin, destination FROM                            us_delay_flights_tbl              WHERE              origin              =              'SFO'              ")              df_jfk              =              spark              .              sql              (              "SELECT date, delay, origin, destination FROM                            us_delay_flights_tbl              WHERE              origin              =              'JFK'              ")              # Create a temporary and global temporary view              df_sfo              .              createOrReplaceGlobalTempView              (              "us_origin_airport_SFO_global_tmp_view"              )              df_jfk              .              createOrReplaceTempView              (              "us_origin_airport_JFK_tmp_view"              )            

Once y'all've created these views, you can effect queries against them only as you lot would against a tabular array. Proceed in mind that when accessing a global temporary view you must employ the prefix global_temp.<view_name> , because Spark creates global temporary views in a global temporary database called global_temp. For example:

              -- In SQL                            SELECT              *              FROM              global_temp              .              us_origin_airport_SFO_global_tmp_view            

By contrast, you can access the normal temporary view without the global_temp prefix:

              -- In SQL                            SELECT              *              FROM              us_origin_airport_JFK_tmp_view            
              // In Scala/Python              spark              .              read              .              table              (              "us_origin_airport_JFK_tmp_view"              )              // Or              spark              .              sql              (              "SELECT * FROM us_origin_airport_JFK_tmp_view"              )            

You tin also drop a view only like you would a tabular array:

              -- In SQL              DROP              VIEW              IF              EXISTS              us_origin_airport_SFO_global_tmp_view              ;              DROP              VIEW              IF              EXISTS              us_origin_airport_JFK_tmp_view            
              // In Scala/Python              spark              .              itemize              .              dropGlobalTempView              (              "us_origin_airport_SFO_global_tmp_view"              )              spark              .              itemize              .              dropTempView              (              "us_origin_airport_JFK_tmp_view"              )            

Temporary views versus global temporary views

The difference between temporary and global temporary views being subtle, it can be a source of mild confusion amid developers new to Spark. A temporary view is tied to a single SparkSession within a Spark application. In dissimilarity, a global temporary view is visible across multiple SparkSessions within a Spark application. Yep, y'all tin create multiple SparkSessions inside a single Spark application—this tin can be handy, for example, in cases where you lot want to access (and combine) information from two different SparkSessiondue south that don't share the same Hive metastore configurations.

Caching SQL Tables

Although we will hash out table caching strategies in the next chapter, it's worth mentioning hither that, similar DataFrames, you can cache and uncache SQL tables and views. In Spark three.0, in addition to other options, you lot can specify a tabular array as LAZY, meaning that it should simply be buried when information technology is first used instead of immediately:

              -- In SQL                            CACHE                                          [              LAZY              ]                                          Table                                                          <                table                -                name                >                                                        UNCACHE                                          Table                                                          <                table                -                name                >                          

Reading Tables into DataFrames

Often, data engineers build information pipelines as part of their regular data ingestion and ETL processes. They populate Spark SQL databases and tables with cleansed data for consumption by applications downstream.

Permit's assume yous have an existing database, learn_spark_db, and tabular array, us_delay_flights_tbl, fix for use. Instead of reading from an external JSON file, you tin can simply use SQL to query the tabular array and assign the returned result to a DataFrame:

              // In Scala              val              usFlightsDF              =              spark              .              sql              (              "SELECT * FROM us_delay_flights_tbl"              )              val              usFlightsDF2              =              spark              .              table              (              "us_delay_flights_tbl"              )            
              # In Python              us_flights_df              =              spark              .              sql              (              "SELECT * FROM us_delay_flights_tbl"              )              us_flights_df2              =              spark              .              table              (              "us_delay_flights_tbl"              )            

Now y'all have a cleansed DataFrame read from an existing Spark SQL table. Yous tin too read data in other formats using Spark's born data sources, giving y'all the flexibility to interact with various common file formats.

Data Sources for DataFrames and SQL Tables

As shown in Figure 4-1, Spark SQL provides an interface to a variety of information sources. It also provides a set of common methods for reading and writing data to and from these data sources using the Information Sources API.

In this section we volition cover some of the built-in data sources, available file formats, and ways to load and write data, along with specific options pertaining to these data sources. But first, let's accept a closer wait at two high-level Information Source API constructs that dictate the manner in which you interact with different data sources: DataFrameReader and DataFrameWriter.

DataFrameReader

DataFrameReader is the core construct for reading data from a data source into a DataFrame. It has a defined format and a recommended blueprint for usage:

DataFrameReader.format(args).option("fundamental", "value").schema(args).load()

This blueprint of stringing methods together is common in Spark, and easy to read. We saw it in Chapter 3 when exploring common data analysis patterns.

Note that y'all can only access a DataFrameReader through a SparkSession instance. That is, you lot cannot create an case of DataFrameReader. To go an instance handle to it, use:

SparkSession.read  // or  SparkSession.readStream

While read returns a handle to DataFrameReader to read into a DataFrame from a static data source, readStream returns an example to read from a streaming source. (We will cover Structured Streaming later on in the book.)

Arguments to each of the public methods to DataFrameReader have different values. Table 4-one enumerates these, with a subset of the supported arguments.

Table 4-1. DataFrameReader methods, arguments, and options
Method Arguments Clarification
format() "parquet", "csv", "txt", "json", "jdbc", "orc", "avro", etc. If you don't specify this method, and so the default is Parquet or whatever is set in spark.sql.sources.default.
choice() ("mode", {PERMISSIVE | FAILFAST | DROPMALFORMED } )
("inferSchema", {true | fake})
("path", "path_file_data_source")
A series of cardinal/value pairs and options.
The Spark documentation shows some examples and explains the unlike modes and their actions. The default style is PERMISSIVE. The "inferSchema" and "mode" options are specific to the JSON and CSV file formats.
schema() DDL String or StructType, e.m., 'A INT, B String' or
StructType(...)
For JSON or CSV format, you can specify to infer the schema in the option() method. Mostly, providing a schema for whatsoever format makes loading faster and ensures your data conforms to the expected schema.
load() "/path/to/data/source" The path to the data source. This can be empty if specified in option("path", "...").

While nosotros won't comprehensively enumerate all the unlike combinations of arguments and options, the documentation for Python, Scala, R, and Java offers suggestions and guidance. It's worthwhile to bear witness a couple of examples, though:

              // In Scala              // Use Parquet                            val              file              =              """/databricks-datasets/learning-spark-v2/flights/summary-                              data/parquet/2010-summary.parquet"""              val              df              =              spark              .              read              .              format              (              "parquet"              ).              load              (              file              )              // Utilise Parquet; you can omit format("parquet") if you lot wish as it'southward the default              val              df2              =              spark              .              read              .              load              (              file              )              // Employ CSV              val              df3              =              spark              .              read              .              format              (              "csv"              )              .              option              (              "inferSchema"              ,              "true"              )              .              selection              (              "header"              ,              "true"              )              .              option              (              "manner"              ,              "PERMISSIVE"              )              .              load              (              "/databricks-datasets/learning-spark-v2/flights/summary-information/csv/*"              )              // Utilise JSON              val              df4              =              spark              .              read              .              format              (              "json"              )              .              load              (              "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"              )            
Annotation

In full general, no schema is needed when reading from a static Parquet data source—the Parquet metadata usually contains the schema, so it's inferred. However, for streaming data sources yous will have to provide a schema. (We will comprehend reading from streaming data sources in Affiliate 8.)

Parquet is the default and preferred data source for Spark because it'south efficient, uses columnar storage, and employs a fast pinch algorithm. You will come across additional benefits afterwards (such equally columnar pushdown), when nosotros comprehend the Catalyst optimizer in greater depth.

DataFrameWriter

DataFrameWriter does the contrary of its counterpart: it saves or writes data to a specified built-in information source. Different with DataFrameReader, y'all access its instance not from a SparkSession but from the DataFrame you lot wish to save. It has a few recommended usage patterns:

DataFrameWriter.format(args)   .option(args)   .bucketBy(args)   .partitionBy(args)   .save(path)  DataFrameWriter.format(args).selection(args).sortBy(args).saveAsTable(table)

To get an instance handle, use:

DataFrame.write // or  DataFrame.writeStream

Arguments to each of the methods to DataFrameWriter as well accept different values. We list these in Table 4-2, with a subset of the supported arguments.

Table 4-2. DataFrameWriter methods, arguments, and options
Method Arguments Clarification
format() "parquet", "csv", "txt", "json", "jdbc", "orc", "avro", etc. If you don't specify this method, then the default is Parquet or whatever is set up in spark.sql.sources.default.
pick() ("mode", {suspend | overwrite | ignore | error or errorifexists} )
("mode", {SaveMode.Overwrite | SaveMode.Append, SaveMode.Ignore, SaveMode.ErrorIfExists})
("path", "path_to_write_to")
A series of cardinal/value pairs and options. The Spark documentation shows some examples. This is an overloaded method. The default way options are error or errorifexists and SaveMode.ErrorIfExists; they throw an exception at runtime if the data already exists.
bucketBy() (numBuckets, col, col..., coln) The number of buckets and names of columns to bucket by. Uses Hive's bucketing scheme on a filesystem.
save() "/path/to/data/source" The path to save to. This can exist empty if specified in option("path", "...").
saveAsTable() "table_name" The table to save to.

Here's a curt case snippet to illustrate the use of methods and arguments:

              // In Scala              // Use JSON              val              location              =              ...              df              .              write              .              format              (              "json"              ).              mode              (              "overwrite"              ).              save              (              location              )            

Parquet

We'll start our exploration of data sources with Parquet, because information technology's the default information source in Spark. Supported and widely used by many big information processing frameworks and platforms, Parquet is an open source columnar file format that offers many I/O optimizations (such equally compression, which saves storage space and allows for quick access to data columns).

Because of its efficiency and these optimizations, we recommend that later yous accept transformed and cleansed your information, you save your DataFrames in the Parquet format for downstream consumption. (Parquet is also the default table open format for Delta Lake, which nosotros will cover in Chapter nine.)

Reading Parquet files into a DataFrame

Parquet files are stored in a directory structure that contains the data files, metadata, a number of compressed files, and some status files. Metadata in the footer contains the version of the file format, the schema, and column data such as the path, etc.

For example, a directory in a Parquet file might contain a set of files similar this:

_SUCCESS _committed_1799640464332036264 _started_1799640464332036264 part-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet

There may be a number of part-XXXX compressed files in a directory (the names shown here accept been shortened to fit on the page).

To read Parquet files into a DataFrame, you lot simply specify the format and path:

                // In Scala                val                file                =                """/databricks-datasets/learning-spark-v2/flights/summary-data/                                  parquet/2010-summary.parquet/"""                val                df                =                spark                .                read                .                format                (                "parquet"                ).                load                (                file                )              
                # In Python                file                =                """/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/                                  2010-summary.parquet/"""                df                =                spark                .                read                .                format                (                "parquet"                )                .                load                (                file                )              

Unless you are reading from a streaming data source there's no need to supply the schema, because Parquet saves it as part of its metadata.

Reading Parquet files into a Spark SQL table

Equally well as reading Parquet files into a Spark DataFrame, you tin also create a Spark SQL unmanaged table or view directly using SQL:

                -- In SQL                CREATE                OR                REPLACE                TEMPORARY                VIEW                us_delay_flights_tbl                USING                parquet                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/                                  2010-summary.parquet/"                )              

Once y'all've created the table or view, you can read data into a DataFrame using SQL, as we saw in some before examples:

                // In Scala                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                ).                bear witness                ()              
                # In Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                evidence                ()              

Both of these operations return the same results:

+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |United States    |Romania            |1    | |United States    |Ireland            |264  | |U.s.    |Republic of india              |69   | |Egypt            |United states      |24   | |Republic of equatorial guinea|United States      |1    | |United States    |Singapore          |25   | |United States    |Grenada            |54   | |Costa Rica       |United States      |477  | |Senegal          |Us      |29   | |United States    |Marshall islands   |44   | +-----------------+-------------------+-----+ simply showing superlative 10 rows

Writing DataFrames to Parquet files

Writing or saving a DataFrame every bit a table or file is a common operation in Spark. To write a DataFrame y'all simply use the methods and arguments to the DataFrameWriter outlined earlier in this chapter, supplying the location to save the Parquet files to. For instance:

                // In Scala                df                .                write                .                format                (                "parquet"                )                .                mode                (                "overwrite"                )                .                choice                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/information/parquet/df_parquet"                )              
                # In Python                (                df                .                write                .                format                (                "parquet"                )                .                way                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/data/parquet/df_parquet"                ))              
Note

Call back that Parquet is the default file format. If you don't include the format() method, the DataFrame will nevertheless be saved every bit a Parquet file.

This volition create a gear up of compact and compressed Parquet files at the specified path. Since we used snappy equally our pinch choice hither, nosotros'll accept snappy compressed files. For brevity, this example generated but one file; usually, there may be a dozen or so files created:

-rw-r--r--  1 jules  wheel    0 May 19 10:58 _SUCCESS -rw-r--r--  1 jules  wheel  966 May 19 ten:58 part-00000-<...>-c000.snappy.parquet

Writing DataFrames to Spark SQL tables

Writing a DataFrame to a SQL tabular array is equally piece of cake every bit writing to a file—but use saveAsTable() instead of save(). This volition create a managed tabular array chosen us_delay_flights_tbl:

                // In Scala                df                .                write                .                way                (                "overwrite"                )                .                saveAsTable                (                "us_delay_flights_tbl"                )              
                # In Python                (                df                .                write                .                manner                (                "overwrite"                )                .                saveAsTable                (                "us_delay_flights_tbl"                ))              

To sum upward, Parquet is the preferred and default built-in data source file format in Spark, and it has been adopted by many other frameworks. Nosotros recommend that you use this format in your ETL and data ingestion processes.

JSON

JavaScript Object Notation (JSON) is likewise a popular data format. It came to prominence as an piece of cake-to-read and easy-to-parse format compared to XML. It has two representational formats: single-line mode and multiline manner. Both modes are supported in Spark.

In unmarried-line mode each line denotes a single JSON object, whereas in multiline mode the entire multiline object constitutes a unmarried JSON object. To read in this mode, prepare multiLine to true in the selection() method.

Reading a JSON file into a DataFrame

Y'all tin can read a JSON file into a DataFrame the same way you did with Parquet—merely specify "json" in the format() method:

                // In Scala                val                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"                val                df                =                spark                .                read                .                format                (                "json"                ).                load                (                file                )              
                # In Python                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"                df                =                spark                .                read                .                format                (                "json"                )                .                load                (                file                )              

Reading a JSON file into a Spark SQL table

Yous tin can also create a SQL table from a JSON file just similar you did with Parquet:

                -- In SQL                                CREATE                                                OR                                                Supplant                                                TEMPORARY                                                VIEW                                                                  us_delay_flights_tbl                                                                USING                                                json                                                OPTIONS                                                (                                                path                                                "/databricks-datasets/learning-spark-v2/flights/summary-information/json/*"                                                )              

Once the table is created, you can read information into a DataFrame using SQL:

                //                In                Scala                /                Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                show                ()                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                15                |                |                United                States                |                Republic of croatia                |                1                |                |                United                States                |                Republic of ireland                |                344                |                |                Egypt                |                United                States                |                15                |                |                United                States                |                India                |                62                |                |                United                States                |                Singapore                |                ane                |                |                United                States                |                Grenada                |                62                |                |                Costa                Rica                |                United                States                |                588                |                |                Senegal                |                United                States                |                40                |                |                Moldova                |                United                States                |                i                |                +-----------------+-------------------+-----+                only                showing                pinnacle                x                rows              

Writing DataFrames to JSON files

Saving a DataFrame as a JSON file is simple. Specify the appropriate DataFrameWriter methods and arguments, and supply the location to salvage the JSON files to:

                // In Scala                df                .                write                .                format                (                "json"                )                .                style                (                "overwrite"                )                .                option                (                "pinch"                ,                "snappy"                )                .                save                (                "/tmp/information/json/df_json"                )              
                # In Python                (                df                .                write                .                format                (                "json"                )                .                style                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/data/json/df_json"                ))              

This creates a directory at the specified path populated with a set up of meaty JSON files:

-rw-r--r--  1 jules  bike   0 May 16 14:44 _SUCCESS -rw-r--r--  1 jules  wheel  71 May 16 14:44 part-00000-<...>-c000.json

JSON data source options

Table 4-three describes common JSON options for DataFrameReader and DataFrameWriter. For a comprehensive list, we refer you to the documentation.

Table 4-three. JSON options for DataFrameReader and DataFrameWriter
Property name Values Pregnant Telescopic
compression none, uncompressed, bzip2, deflate, gzip, lz4, or snappy Use this compression codec for writing. Annotation that read volition just detect the compression or codec from the file extension. Write
dateFormat yyyy-MM-dd or DateTimeFormatter Use this format or whatsoever format from Java's DateTimeFormatter. Read/write
multiLine true, false Use multiline style. Default is false (single-line style). Read
allowUnquotedFieldNames true, false Allow unquoted JSON field names. Default is false. Read

CSV

Equally widely used as plain text files, this common text file format captures each datum or field delimited by a comma; each line with comma-separated fields represents a record. Even though a comma is the default separator, you may use other delimiters to separate fields in cases where commas are function of your data. Popular spreadsheets can generate CSV files, so it'south a pop format among data and business analysts.

Reading a CSV file into a DataFrame

As with the other congenital-in data sources, you tin can use the DataFrameReader methods and arguments to read a CSV file into a DataFrame:

                // In Scala                val                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"                val                schema                =                "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME String, count INT"                val                df                =                spark                .                read                .                format                (                "csv"                )                .                schema                (                schema                )                .                choice                (                "header"                ,                "truthful"                )                .                pick                (                "fashion"                ,                "FAILFAST"                )                // Exit if any errors                .                selection                (                "nullValue"                ,                ""                )                // Replace any null data with quotes                .                load                (                file                )              
                # In Python                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"                schema                =                "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME String, count INT"                df                =                (                spark                .                read                .                format                (                "csv"                )                .                option                (                "header"                ,                "true"                )                .                schema                (                schema                )                .                pick                (                "mode"                ,                "FAILFAST"                )                # Leave if any errors                .                option                (                "nullValue"                ,                ""                )                # Replace any null data field with quotes                .                load                (                file                ))              

Reading a CSV file into a Spark SQL tabular array

Creating a SQL table from a CSV data source is no different from using Parquet or JSON:

                -- In SQL                CREATE                OR                Supplant                TEMPORARY                VIEW                us_delay_flights_tbl                USING                csv                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"                ,                header                "true"                ,                inferSchema                "true"                ,                style                "FAILFAST"                )              

Once you've created the table, y'all can read data into a DataFrame using SQL as before:

                //                In                Scala                /                Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                show                (                10                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                1                |                |                United                States                |                Republic of ireland                |                264                |                |                United                States                |                India                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Republic of guinea                |                United                States                |                ane                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                simply                showing                top                10                rows              

Writing DataFrames to CSV files

Saving a DataFrame as a CSV file is elementary. Specify the appropriate DataFrameWriter methods and arguments, and supply the location to save the CSV files to:

                // In Scala                df                .                write                .                format                (                "csv"                ).                fashion                (                "overwrite"                ).                save                (                "/tmp/data/csv/df_csv"                )              
                # In Python                df                .                write                .                format                (                "csv"                )                .                manner                (                "overwrite"                )                .                save                (                "/tmp/data/csv/df_csv"                )              

This generates a folder at the specified location, populated with a agglomeration of compressed and compact files:

-rw-r--r--  1 jules  wheel   0 May sixteen 12:17 _SUCCESS -rw-r--r--  1 jules  wheel  36 May 16 12:17 part-00000-251690eb-<...>-c000.csv

CSV data source options

Tabular array iv-4 describes some of the common CSV options for DataFrameReader and DataFrameWriter. Because CSV files can be complex, many options are bachelor; for a comprehensive list nosotros refer you to the documentation.

Tabular array 4-4. CSV options for DataFrameReader and DataFrameWriter
Belongings name Values Pregnant Scope
compression none, bzip2, deflate, gzip, lz4, or snappy Use this compression codec for writing. Write
dateFormat yyyy-MM-dd or DateTimeFormatter Employ this format or any format from Java's DateTimeFormatter. Read/write
multiLine true, simulated Utilize multiline mode. Default is false (single-line mode). Read
inferSchema true, simulated If true, Spark volition determine the column data types. Default is simulated. Read
sep Whatsoever character Employ this character to separate column values in a row. Default delimiter is a comma (,). Read/write
escape Any character Use this character to escape quotes. Default is \. Read/write
header true, false Indicates whether the beginning line is a header denoting each column name. Default is false. Read/write

Avro

Introduced in Spark 2.4 as a born data source, the Avro format is used, for example, past Apache Kafka for message serializing and deserializing. Information technology offers many benefits, including straight mapping to JSON, speed and efficiency, and bindings available for many programming languages.

Reading an Avro file into a DataFrame

Reading an Avro file into a DataFrame using DataFrameReader is consequent in usage with the other data sources we have discussed in this section:

                // In Scala                val                df                =                spark                .                read                .                format                (                "avro"                )                .                load                (                "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"                )                df                .                show                (                false                )              
                # In Python                df                =                (                spark                .                read                .                format                (                "avro"                )                .                load                (                "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"                ))                df                .                show                (                truncate                =                Imitation                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                1                |                |                United                States                |                Ireland                |                264                |                |                United                States                |                India                |                69                |                |                Arab republic of egypt                |                United                States                |                24                |                |                Equatorial                Guinea                |                United                States                |                ane                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                only                showing                top                10                rows              

Reading an Avro file into a Spark SQL table

Again, creating SQL tables using an Avro data source is no dissimilar from using Parquet, JSON, or CSV:

                -- In SQL                                CREATE                OR                Supercede                TEMPORARY                VIEW                episode_tbl                USING                avro                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"                )              

Once you've created a table, you lot can read data into a DataFrame using SQL:

                // In Scala                spark                .                sql                (                "SELECT * FROM episode_tbl"                ).                show                (                false                )              
                # In Python                spark                .                sql                (                "SELECT * FROM episode_tbl"                )                .                testify                (                truncate                =                Faux                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                ane                |                |                United                States                |                Republic of ireland                |                264                |                |                United                States                |                Republic of india                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                simply                showing                acme                x                rows              

Writing DataFrames to Avro files

Writing a DataFrame every bit an Avro file is elementary. As usual, specify the advisable DataFrameWriter methods and arguments, and supply the location to save the Avro files to:

                // In Scala                df                .                write                .                format                (                "avro"                )                .                mode                (                "overwrite"                )                .                salvage                (                "/tmp/data/avro/df_avro"                )              
                # In Python                (                df                .                write                .                format                (                "avro"                )                .                mode                (                "overwrite"                )                .                save                (                "/tmp/data/avro/df_avro"                ))              

This generates a folder at the specified location, populated with a bunch of compressed and meaty files:

-rw-r--r--  one jules  bike    0 May 17 xi:54 _SUCCESS -rw-r--r--  one jules  wheel  526 May 17 xi:54 part-00000-ffdf70f4-<...>-c000.avro

Avro data source options

Tabular array four-5 describes common options for DataFrameReader and DataFrameWriter. A comprehensive listing of options is in the documentation.

Table 4-five. Avro options for DataFrameReader and DataFrameWriter
Belongings name Default value Meaning Scope
avroSchema None Optional Avro schema provided by a user in JSON format. The data blazon and naming of record fields should friction match the input Avro data or Goad information (Spark internal data type), otherwise the read/write activity will fail. Read/write
recordName topLevelRecord Top-level record proper name in write upshot, which is required in the Avro spec. Write
recordNamespace "" Tape namespace in write result. Write
ignoreExtension true If this selection is enabled, all files (with and without the .avro extension) are loaded. Otherwise, files without the .avro extension are ignored. Read
compression snappy Allows you to specify the compression codec to use in writing. Currently supported codecs are uncompressed, snappy, debunk, bzip2, and xz.
If this option is non ready, the value in spark.sql.avro.pinch.codec is taken into account.
Write

ORC

As an boosted optimized columnar file format, Spark 2.x supports a vectorized ORC reader. Two Spark configurations dictate which ORC implementation to use. When spark.sql.orc.impl is set to native and spark.sql.orc.enableVectorizedReader is set up to true, Spark uses the vectorized ORC reader. A vectorized reader reads blocks of rows (often ane,024 per block) instead of ane row at a fourth dimension, streamlining operations and reducing CPU usage for intensive operations like scans, filters, aggregations, and joins.

For Hive ORC SerDe (serialization and deserialization) tables created with the SQL command USING HIVE OPTIONS (fileFormat 'ORC'), the vectorized reader is used when the Spark configuration parameter spark.sql.hive.convertMetastoreOrc is fix to true.

Reading an ORC file into a DataFrame

To read in a DataFrame using the ORC vectorized reader, you lot can merely employ the normal DataFrameReader methods and options:

                // In Scala                                val                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"                val                df                =                spark                .                read                .                format                (                "orc"                ).                load                (                file                )                df                .                testify                (                10                ,                false                )              
                # In Python                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-information/orc/*"                df                =                spark                .                read                .                format                (                "orc"                )                .                option                (                "path"                ,                file                )                .                load                ()                df                .                testify                (                10                ,                False                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                1                |                |                United                States                |                Republic of ireland                |                264                |                |                United                States                |                India                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                simply                showing                acme                10                rows              

Reading an ORC file into a Spark SQL tabular array

In that location is no deviation from Parquet, JSON, CSV, or Avro when creating a SQL view using an ORC information source:

                -- In SQL                CREATE                OR                Supplant                TEMPORARY                VIEW                us_delay_flights_tbl                USING                orc                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-information/orc/*"                )              

Once a table is created, you can read data into a DataFrame using SQL equally usual:

                //                In                Scala                /                Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                testify                ()                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                1                |                |                United                States                |                Republic of ireland                |                264                |                |                United                States                |                India                |                69                |                |                Arab republic of egypt                |                United                States                |                24                |                |                Equatorial                Republic of guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                only                showing                superlative                10                rows              

Writing DataFrames to ORC files

Writing dorsum a transformed DataFrame subsequently reading is equally uncomplicated using the DataFrameWriter methods:

                // In Scala                df                .                write                .                format                (                "orc"                )                .                mode                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/data/orc/df_orc"                )              
                # In Python                (                df                .                write                .                format                (                "orc"                )                .                fashion                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                salve                (                "/tmp/data/orc/flights_orc"                ))              

The issue volition be a folder at the specified location containing some compressed ORC files:

-rw-r--r--  ane jules  wheel    0 May 16 17:23 _SUCCESS -rw-r--r--  1 jules  wheel  547 May sixteen 17:23 part-00000-<...>-c000.snappy.orc

Images

In Spark 2.4 the community introduced a new data source, image files, to support deep learning and machine learning frameworks such every bit TensorFlow and PyTorch. For computer vision–based machine learning applications, loading and processing prototype data sets is of import.

Reading an prototype file into a DataFrame

As with all of the previous file formats, you can use the DataFrameReader methods and options to read in an image file as shown here:

                // In Scala                import                org.apache.spark.ml.source.paradigm                val                imageDir                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                val                imagesDF                =                spark                .                read                .                format                (                "image"                ).                load                (                imageDir                )                imagesDF                .                printSchema                imagesDF                .                select                (                "image.height"                ,                "image.width"                ,                "prototype.nChannels"                ,                "prototype.fashion"                ,                "label"                ).                show                (                5                ,                imitation                )              
                # In Python                from                pyspark.ml                import                image                image_dir                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                images_df                =                spark                .                read                .                format                (                "image"                )                .                load                (                image_dir                )                images_df                .                printSchema                ()                root                |--                prototype                :                struct                (                nullable                =                true                )                |                |--                origin                :                string                (                nullable                =                true                )                |                |--                height                :                integer                (                nullable                =                truthful                )                |                |--                width                :                integer                (                nullable                =                true                )                |                |--                nChannels                :                integer                (                nullable                =                truthful                )                |                |--                mode                :                integer                (                nullable                =                truthful                )                |                |--                data                :                binary                (                nullable                =                true                )                |--                label                :                integer                (                nullable                =                true                )                images_df                .                select                (                "image.height"                ,                "image.width"                ,                "image.nChannels"                ,                "image.style"                ,                "label"                )                .                show                (                5                ,                truncate                =                Faux                )                +------+-----+---------+----+-----+                |                peak                |                width                |                nChannels                |                mode                |                label                |                +------+-----+---------+----+-----+                |                288                |                384                |                3                |                16                |                0                |                |                288                |                384                |                three                |                16                |                1                |                |                288                |                384                |                3                |                16                |                0                |                |                288                |                384                |                3                |                xvi                |                0                |                |                288                |                384                |                three                |                16                |                0                |                +------+-----+---------+----+-----+                merely                showing                top                5                rows              

Binary Files

Spark 3.0 adds support for binary files as a data source. The DataFrameReader converts each binary file into a unmarried DataFrame row (record) that contains the raw content and metadata of the file. The binary file information source produces a DataFrame with the post-obit columns:

  • path: StringType

  • modificationTime: TimestampType

  • length: LongType

  • content: BinaryType

Reading a binary file into a DataFrame

To read binary files, specify the data source format as a binaryFile. You can load files with paths matching a given global design while preserving the behavior of partition discovery with the data source option pathGlobFilter. For example, the following lawmaking reads all JPG files from the input directory with any partitioned directories:

                // In Scala                val                path                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                val                binaryFilesDF                =                spark                .                read                .                format                (                "binaryFile"                )                .                option                (                "pathGlobFilter"                ,                "*.jpg"                )                .                load                (                path                )                binaryFilesDF                .                show                (                5                )              
                # In Python                path                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                binary_files_df                =                (                spark                .                read                .                format                (                "binaryFile"                )                .                option                (                "pathGlobFilter"                ,                "*.jpg"                )                .                load                (                path                ))                binary_files_df                .                prove                (                5                )                +--------------------+-------------------+------+--------------------+-----+                |                path                |                modificationTime                |                length                |                content                |                characterization                |                +--------------------+-------------------+------+--------------------+-----+                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                55037                |                [                FF                D8                FF                E0                00                1.                ..|                0                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54634                |                [                FF                D8                FF                E0                00                i.                ..|                1                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54624                |                [                FF                D8                FF                E0                00                1.                ..|                0                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54505                |                [                FF                D8                FF                E0                00                1.                ..|                0                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54475                |                [                FF                D8                FF                E0                00                ane.                ..|                0                |                +--------------------+-------------------+------+--------------------+-----+                only                showing                top                five                rows              

To ignore partitioning information discovery in a directory, you can set recursiveFileLookup to "true":

                // In Scala                val                binaryFilesDF                =                spark                .                read                .                format                (                "binaryFile"                )                .                pick                (                "pathGlobFilter"                ,                "*.jpg"                )                .                option                (                "recursiveFileLookup"                ,                "true"                )                .                load                (                path                )                binaryFilesDF                .                show                (                5                )              
                # In Python                binary_files_df                =                (                spark                .                read                .                format                (                "binaryFile"                )                .                selection                (                "pathGlobFilter"                ,                "*.jpg"                )                .                option                (                "recursiveFileLookup"                ,                "truthful"                )                .                load                (                path                ))                binary_files_df                .                evidence                (                5                )                +--------------------+-------------------+------+--------------------+                |                path                |                modificationTime                |                length                |                content                |                +--------------------+-------------------+------+--------------------+                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                55037                |                [                FF                D8                FF                E0                00                one.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54634                |                [                FF                D8                FF                E0                00                one.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54624                |                [                FF                D8                FF                E0                00                1.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54505                |                [                FF                D8                FF                E0                00                ane.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54475                |                [                FF                D8                FF                E0                00                i.                ..|                +--------------------+-------------------+------+--------------------+                only                showing                top                5                rows              

Note that the characterization column is absent when the recursiveFileLookup selection is set to "true".

Currently, the binary file information source does not support writing a DataFrame back to the original file format.

In this section, yous got a tour of how to read data into a DataFrame from a range of supported file formats. We as well showed you how to create temporary views and tables from the existing congenital-in data sources. Whether you lot're using the DataFrame API or SQL, the queries produce identical outcomes. You can examine some of these queries in the notebook available in the GitHub repo for this book.

Summary

To epitomize, this chapter explored the interoperability between the DataFrame API and Spark SQL. In particular, you lot got a flavor of how to use Spark SQL to:

  • Create managed and unmanaged tables using Spark SQL and the DataFrame API.

  • Read from and write to various built-in information sources and file formats.

  • Use the spark.sql programmatic interface to issue SQL queries on structured data stored as Spark SQL tables or views.

  • Peruse the Spark Catalog to inspect metadata associated with tables and views.

  • Employ the DataFrameWriter and DataFrameReader APIs.

Through the code snippets in the chapter and the notebooks available in the book'southward GitHub repo, y'all got a feel for how to use DataFrames and Spark SQL. Continuing in this vein, the next chapter farther explores how Spark interacts with the external data sources shown in Figure 4-1. Yous'll see some more in-depth examples of transformations and the interoperability between the DataFrame API and Spark SQL.

couchgonly1954.blogspot.com

Source: https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch04.html

Postar um comentário for "Optimize Reading Orc Files From Spark Sql"