Search the Community
Showing results for tags 'hive'.
-
In today’s data-driven world, efficient workflow management and secure storage are essential for the success of any project or organization. If you have large datasets in a cloud-based project management platform like Hive, you can smoothly migrate them to a relational database management system (RDBMS), like MySQL. Now, you must be wondering why you should […]View the full article
-
Are you looking to migrate a large amount of Hive ACID tables to BigQuery? ACID enabled Hive tables support transactions that accept updates and delete DML operations. In this blog, we will explore migrating Hive ACID tables to BigQuery. The approach explored in this blog works for both compacted (major / minor) and non-compacted Hive tables. Let’s first understand the term ACID and how it works in Hive. ACID stands for four traits of database transactions: Atomicity (an operation either succeeds completely or fails, it does not leave partial data) Consistency (once an application performs an operation the results of that operation are visible to it in every subsequent operation) Isolation (an incomplete operation by one user does not cause unexpected side effects for other users) Durability (once an operation is complete it will be preserved even in the face of machine or system failure) Starting in Version 0.14, Hive supports all ACID properties which enables it to use transactions, create transactional tables, and run queries like Insert, Update, and Delete on tables. Underlying the Hive ACID table, files are in the ORC ACID version. To support ACID features, Hive stores table data in a set of base files and all the insert, update, and delete operation data in delta files. At the read time, the reader merges both the base file and delta files to present the latest data. As operations modify the table, a lot of delta files are created and need to be compacted to maintain adequate performance. There are two types of compactions, minor and major. Minor compaction takes a set of existing delta files and rewrites them to a single delta file per bucket. Major compaction takes one or more delta files and the base file for the bucket and rewrites them into a new base file per bucket. Major compaction is more expensive but is more effective. Organizations configure automatic compactions, but they also need to perform manual compactions when automated fails. If compaction is not performed for a long time after a failure, it results in a lot of small delta files. Running compaction on these large numbers of small delta files can become a very resource intensive operation and can run into failures as well. Some of the issues with Hive ACID tables are: NameNode capacity problems due to small delta files. Table Locks during compaction. Running major compactions on Hive ACID tables is a resource intensive operation. Longer time taken for data replication to DR due to small files. Benefits of migrating Hive ACIDs to BigQuery Some of the benefits of migrating Hive ACID tables to BigQuery are: Once data is loaded into managed BigQuery tables, BigQuery manages and optimizes the data stored in the internal storage and handles compaction. So there will not be any small file issue like we have in Hive ACID tables. The locking issue is resolved here as BigQuery storage read API is gRPC based and is highly parallelized. As ORC files are completely self-describing, there is no dependency on Hive Metastore DDL. BigQuery has an in-built schema inference feature that can infer the schema from an ORC file and supports schema evolution without any need for tools like Apache Spark to perform schema inference. Hive ACID table structure and sample data Here is the sample Hive ACID table “employee_trans” Schema code_block [StructValue([(u'code', u"hive> show create table employee_trans;\r\nOK\r\nCREATE TABLE `employee_trans`(\r\n `id` int, \r\n `name` string, \r\n `age` int, \r\n `gender` string)\r\nROW FORMAT SERDE \r\n 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' \r\nSTORED AS INPUTFORMAT \r\n 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' \r\nOUTPUTFORMAT \r\n 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'\r\nLOCATION\r\n 'hdfs://hive-cluster-m/user/hive/warehouse/aciddb.db/employee_trans'\r\nTBLPROPERTIES (\r\n 'bucketing_version'='2', \r\n 'transactional'='true', \r\n 'transactional_properties'='default', \r\n 'transient_lastDdlTime'='1657906607')"), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eda26dc94d0>)])] This sample ACID table “employee_trans” has 3 records. code_block [StructValue([(u'code', u'hive> select * from employee_trans;\r\nOK\r\n1 James 30 M\r\n3 Jeff 45 M\r\n2 Ann 40 F\r\nTime taken: 0.1 seconds, Fetched: 3 row(s)'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eda26dc9310>)])] For every insert, update and delete operation, small delta files are created. This is the underlying directory structure of the Hive ACID enabled table. code_block [StructValue([(u'code', u'hdfs://hive-cluster-m/user/hive/warehouse/aciddb.db/employee_trans/delete_delta_0000005_0000005_0000\r\nhdfs://hive-cluster-m/user/hive/warehouse/aciddb.db/employee_trans/delete_delta_0000006_0000006_0000\r\nhdfs://hive-cluster-m/user/hive/warehouse/aciddb.db/employee_trans/delta_0000001_0000001_0000\r\nhdfs://hive-cluster-m/user/hive/warehouse/aciddb.db/employee_trans/delta_0000002_0000002_0000\r\nhdfs://hive-cluster-m/user/hive/warehouse/aciddb.db/employee_trans/delta_0000003_0000003_0000\r\nhdfs://hive-cluster-m/user/hive/warehouse/aciddb.db/employee_trans/delta_0000004_0000004_0000\r\nhdfs://hive-cluster-m/user/hive/warehouse/aciddb.db/employee_trans/delta_0000005_0000005_0000'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eda26a28c50>)])] These ORC files in an ACID table are extended with several columns: code_block [StructValue([(u'code', u'struct<\r\n operation: int,\r\n originalTransaction: bigInt,\r\n bucket: int,\r\n rowId: bigInt,\r\n currentTransaction: bigInt,\r\n row: struct<...>\r\n>'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eda26a28810>)])] Steps to Migrate Hive ACID tables to BigQuery Migrate underlying Hive table HDFS data Copy the files present under employee_trans hdfs directory and stage in GCS. You can use either HDFS2GCS solution or Distcp. HDFS2GCS solution uses open source technologies to transfer data and provide several benefits like status reporting, error handling, fault tolerance, incremental/delta loading, rate throttling, start/stop, checksum validation, byte2byte comparison etc. Here is the high level architecture of the HDFS2GCS solution. Please refer to the public github URL HDFS2GCS to learn more about this tool. The source location may contain extra files that we don’t necessarily want to copy. Here, we can use filters based on regular expressions to do things such as copying files with the .ORC extension only. Load ACID Tables as-is to BigQuery Once the underlying Hive acid table files are copied to GCS, use the BQ load tool to load data in BigQuery base table. This base table will have all the change events. Data verification Run “select *” on the base table to verify if all the changes are captured. Note: Use of “select * …” is used for demonstration purposes and is not a stated best practice. Loading to target BigQuery table The following query will select only the latest version of all records from the base table, by discarding the intermediate delete and update operations. You can either load the results of this query into a target table using scheduled query on-demand with the overwrite option or alternatively, you can also create this query as a view on the base table to get the latest records from the base table directly. code_block [StructValue([(u'code', u'WITH\r\n latest_records_desc AS (\r\n SELECT\r\n Row.*,\r\n operation,\r\n ROW_NUMBER() OVER (PARTITION BY originalTransaction ORDER BY originalTransaction ASC, bucket ASC, rowId ASC, currentTransaction DESC) AS rownum\r\n FROM\r\n `hiveacid-sandbox.hivetobq.basetable` )\r\nSELECT id,name,age,gender\r\nFROM\r\n latest_records_desc\r\nWHERE\r\n rownum=1\r\n AND operation != 2'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eda2680bc90>)])] Once the data is loaded in target BigQuey table, you can perform validation using below steps: a. Use the Data Validation Tool to validate the Hive ACID table and the target BigQuery table. DVT provides an automated and repeatable solution to perform schema and validation tasks. This tool supports the following validations: Column validation (count, sum, avg, min, max, group by) Row validation (BQ, Hive, and Teradata only) Schema validation Custom Query validation Ad hoc SQL exploration b. If you have analytical HiveQLs running on this ACID table, translate them using the BigQuery SQL translation service and point to the target BigQuery table. Hive DDL Migration (Optional) Since ORC is self-contained, leverage BigQuery’s schema inference feature when loading. There is no dependency to extract Hive DDLs from Metastore. But if you have an organization-wide policy to pre-create datasets and tables before migration, this step will be useful and will be a good starting point. a. Extract Hive ACID DDL dumps and translate them using BigQuery translation service to create equivalent BigQuery DDLs. There is a Batch SQL translation service to bulk translate exported HQL (Hive Query Language) scripts from a source metadata bucket in Google Cloud Storage to BigQuery equivalent SQLs into a target GCS bucket. You can also use BigQuery interactive SQL translator which is a live, real time SQL translation tool across multiple SQL dialects to translate a query like HQL dialect into a BigQuery Standard SQL query. This tool can reduce time and effort to migrate SQL workloads to BigQuery. b. Create managed BigQuery tables using the translated DDLs. Here is the screenshot of the translation service in the BigQuery console. Submit “Translate” to translate the HiveQLs and “Run” to execute the query. For creating tables from batch translated bulk sql queries, you can use Airflow BigQuery operator (BigQueryInsertJobOperator) to run multiple queries After the DDLs are converted, copy the ORC files to GCS and perform ELT in BigQuery. The pain points of Hive ACID tables are resolved when migrating to BigQuery. When you migrate the ACID tables to BigQuery, you can leverage BigQuery ML and GeoViz capabilities for real-time analytics. If you are interested in exploring more, please check out the additional resources section. Additional Resources Hive ACID ACID ORC Format HDFS2GCS Solution DistCp Data Validation Tool BigQuery Translation Service Related Article Scheduling a command in GCP using Cloud Run and Cloud Scheduler How to efficiently and quickly schedule commands like Gsutil using Cloud Run and Cloud Scheduler. Read Article
-
- best practices
- acid
-
(and 2 more)
Tagged with:
-
We are excited to launch two new features that help enforce access controls with Amazon EMR on EC2 clusters (EMR Clusters). These features are supported with jobs that are submitted to the cluster using the EMR Steps API. First is Runtime Role with EMR Steps. A Runtime Role is an AWS Identity and Access Management (IAM) role that you associate with an EMR Step. An EMR Step uses this role to access AWS resources. The second is integration with AWS Lake Formation to apply table and column-level access controls for Apache Spark and Apache Hive jobs with EMR Steps. View the full article
-
- iam
- lake formation
-
(and 7 more)
Tagged with:
-
Amazon EMR Release 6.2 now supports improved Apache HBase performance on Amazon S3 with persistent HFile tracking, and Apache Hive ACID transactions on HDFS and Amazon S3. EMR 6.2 contains performance improvements to EMR Runtime for Apache Spark, and PrestoDB performance improvements. View the full article
- 1 reply
-
- apache hbase
- amazon s3
-
(and 5 more)
Tagged with:
-
Apache Hive supports transactional tables which provide ACID guarantees. There has been a significant amount of work that has gone into hive to make these transactional tables highly performant. Apache Spark provides some capabilities to access hive external tables but it cannot access hive managed tables. To access hive managed tables from spark Hive Warehouse Connector needs to be used. We are happy to announce Spark Direct Reader mode in Hive Warehouse Connector which can read hive transactional tables directly from the filesystem. This feature has been available from CDP-Public-Cloud-2.0 (7.2.0.0) and CDP-DC-7.1 (7.1.1.0) releases onwards. Hive Warehouse Connector (HWC) was available to provide access to managed tables in hive from spark, however since this involved communication with LLAP there was an additional hop to get the data and process it in spark vs the ability of spark to directly read the data from FileSystem for External tables. This leads to performance degradation in accessing data from managed tables vs external tables. Additionally a lot of use cases for HWC were associated with ETL jobs where a super user was running these jobs to update data in multiple tables hence authorization was not a strong business need for this case. HWC Spark Direct Reader is an additional mode available in HWC which tries to address the above concerns. This article describes the usage of spark direct reader to consume hive transactional table data in a spark application. It also introduces the methods and APIs to read hive transactional tables into spark dataframes. Finally, it demonstrates the transaction handling and semantics while using this reader. HWC Spark Direct Reader is derived from Qubole Spark Acid Connector. Prerequisites Following are the prerequisites to be able to query hive managed tables from Spark Direct Reader – Connectivity to HMS (Hive Metastore) which means the spark application should be able to access hive metastore using thrift URI. This URI is determined by hive config hive.metastore.uris The User launching spark application must have Read and Execute permissions on hive warehouse location on the filesystem. The location is determined by hive config hive.metastore.warehouse.dir Use cases This section focuses on the usage of the Spark Direct Reader to read transactional tables. Consider that we have a hive transactional table emp_acid which contains information about the employees. With auto-translate extension enabled The connector has an auto-translate rule which is a spark extension rule which automatically instructs spark to use spark direct reader in case of managed tables so that the user does not need to specify it explicitly. See employee data in table emp_acid scala> spark.sql("select * from emp_acid").show +------+----------+--------------------+-------------+--------------+-----+-----+-------+ |emp_id|first_name| e_mail|date_of_birth| city|state| zip|dept_id| +------+----------+--------------------+-------------+--------------+-----+-----+-------+ |677509| Lois|lois.walker@hotma… | 3/29/1981| Denver | CO|80224| 4| |940761| Brenda|brenda.robinson@g...| 7/31/1970| Stonewall | LA |71078| 5| |428945| Joe|joe.robinson@gmai… | 6/16/1963| Michigantown| IN |46057| 3| ………. ………. ………. Using transactional tables in conjunction with other data sources Spark direct reader works seamlessly with other data sources as well, like in the below example we are joining emp_acid table with an external table dept_ext to find out corresponding departments of employees. scala> sql("select e.emp_id, e.first_name, d.name department from emp_acid e join dept_ext d on e.dept_id = d.id").show +------+----------+-----------+ |emp_id|first_name| department| +------+----------+-----------+ |677509| Lois | HR | |940761| Brenda| FINANCE| |428945| Joe | ADMIN | Here direct reader is used to fetch the data of emp_acid table since it’s transactional table, the data of dept_ext table is fetched by spark’s native reader scala> sql("select e.emp_id, e.first_name, d.name department from emp_acid e join dept_ext d on e.dept_id = d.id").explain == Physical Plan == *(2) Project [emp_id#288, first_name#289, name#287 AS department#255] +- *(2) BroadcastHashJoin [dept_id#295], [id#286], Inner, BuildRight :- *(2) Filter isnotnull(dept_id#295) : +- *(2) Scan HiveAcidRelation(org.apache.spark.sql.SparkSession@1444fa42,default.emp_acid,Map(transactional -> true, numFilesErasureCoded -> 0, bucketing_version -> 2, transient_lastDdlTime -> 1594830632, transactional_properties -> default, table -> default.emp_acid)) [emp_id#288,first_name#289,dept_id#295] PushedFilters: [IsNotNull(dept_id)], ReadSchema: struct<emp_id:int,first_name:string,dept_id:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#169] +- *(1) Project [id#286, name#287] +- *(1) Filter isnotnull(id#286) +- *(1) FileScan orc default.dept_ext[id#286,name#287] Batched: true, Format: ORC, Location: InMemoryFileIndex[hdfs://anurag-hwc-1.anurag-hwc.root.hwx.site:8020/warehouse/tablespace/external..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string> Configurations to enable auto-translate To turn on auto translate feature, we need to specify spark sql extension like spark.sql.extensions=com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension Using Hive Warehouse Connector executeQuery() API If you are already using Hive Warehouse Connector in your spark application then you can continue to use executeQuery() API and switch to Spark Direct Reader just by adding some configurations. Code is similar to what we need to use with Hive Warehouse Connector. Queries shown in sections 3.1.1 and 3.1.2 can be done like the following. scala> val hive = com.hortonworks.hwc.HiveWarehouseSession.session(spark).build() scala> hive.executeQuery("select * from emp_acid").show scala> hive.executeQuery("select e.emp_id, e.first_name, d.name department from emp_acid e join dept_ext d on e.dept_id = d.id").show Configurations to use Spark Direct Reader via Hive Warehouse Connector API spark.datasource.hive.warehouse.read.via.llap=false spark.sql.hive.hwc.execution.mode=spark spark.sql.extensions=com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension Behind the scenes – Read Architecture Spark Direct Reader is built on top DataSource V1 APIs exposed by spark which allows us to plug in custom data sources to spark. In our case the custom data source is the layer which enables us to read hive ACID tables. Following diagram depicts the high level read process and transaction management in connector Spark Driver parses the query and for each ACID table starts a read txn. Transaction snapshot for each table is stored separately and is used for generating the split. Spark driver serializes and sends the partition info and txn snapshot to executors. Executors read the specific split using the transaction snapshot. Processed and transformed data is sent back to the driver. Driver commits the read transactions started. Note on transactions Currently the connector only supports single table transaction consistency i.e. one new transaction is opened for each table involved in query. This means if multiple tables are involved in the query then all may not use the same snapshot of data. Transactions when single table t1 is involved scala> spark.sql(“select * from t1”).show 20/07/08 05:41:39 INFO transaction.HiveAcidTxn: Begin transaction {"id":"174","validTxns":"174:9223372036854775807::"} 20/07/08 05:41:39 INFO transaction.HiveAcidTxn: Lock taken for lockInfo com.qubole.spark.hiveacid.transaction.LockInfo@37f5c5fd in transaction with id 174 .... .... 20/07/08 05:41:47 INFO transaction.HiveAcidTxn: End transaction {"id":"174","validTxns":"174:9223372036854775807::"} abort = false Transactions when multiple tables t3 and t4 are involved scala> spark.sql("select * from default.t3 join default.t4 on default.t3.a = default.t4.a").show 20/07/08 05:43:36 INFO transaction.HiveAcidTxn: Begin transaction {"id":"175","validTxns":"175:9223372036854775807::"} 20/07/08 05:43:36 INFO transaction.HiveAcidTxn: Lock taken for lockInfo com.qubole.spark.hiveacid.transaction.LockInfo@67cc7aee in transaction with id 175 .... .... 20/07/08 05:43:36 INFO transaction.HiveAcidTxn: Begin transaction {"id":"176","validTxns":"176:175:175:"} 20/07/08 05:43:36 INFO transaction.HiveAcidTxn: Lock taken for lockInfo com.qubole.spark.hiveacid.transaction.LockInfo@a8c0c6d in transaction with id 176 .... .... 20/07/08 05:43:53 INFO transaction.HiveAcidTxn: End transaction {"id":"175","validTxns":"175:9223372036854775807::"} abort = false 20/07/08 05:43:53 INFO transaction.HiveAcidTxn: End transaction {"id":"176","validTxns":"176:175:175:"} abort = false Notice different transactions 175 and 176 when two different tables t3 and t4 are present in the query. API to close transactions explicitly To commit or abort transactions, we have a sql listener which does it whenever a dataframe operation or spark sql query finishes. In some cases when .explain() / .rdd() / .cache() are invoked on a dataframe, it opens a transaction and never closes it since technically they are not spark sql queries so the sql listener does not kick in. To handle this scenario and to be able to close the transactions manually, an explicit API is exposed which can be invoked like the following. scala> com.qubole.spark.hiveacid.transaction.HiveAcidTxnManagerObject.commitTxn(spark) Or if you are using Hive Warehouse Connector’s session (say ‘hive’ is the instance) scala> hive.commitTxn Configuration Summary To use Spark Direct Reader, we need the following configurations. Property Value Description spark.hadoop.hive.metastore.uris thrift://<host>:<port> Hive metastore URI spark.sql.extensions com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension Extension needed to auto-translate to work spark.kryo.registrator com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator For using kryo serialization. spark.sql.hive.hwc.execution.mode spark spark.datasource.hive.warehouse.read.via.llap false Hive Warehouse connector jar should be supplied to spark-shell or spark-submit using –jars option while launching the application. For instance, spark-shell can be launched like the following. spark-shell --jars /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/hive-warehouse-connector-assembly-<version>.jar \ --conf "spark.sql.extensions=com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension" \ --conf "spark.datasource.hive.warehouse.read.via.llap=false" \ --conf "spark.sql.hive.hwc.execution.mode=spark" \ --conf "spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator" \ --conf "spark.hadoop.hive.metastore.uris=<metastore_uri>" Further Information and Resources Cloudera Data Warehouse HWC Spark Direct Reader for accessing Hive data Integrating Apache Hive with Apache Spark The post Enabling high-speed Spark direct reader for Apache Hive ACID tables appeared first on Cloudera Blog. View the full article
-
Forum Statistics
63.6k
Total Topics61.7k
Total Posts