In this project, you'll extend your SQL runtime to support aggregate queries. It is worth 8 points.
When the Spark SQL Parser encounters something that looks like a function, it doesn't try to interpret it directly. Instead, it'll produce a UnresolvedFunction expression node. You'll need to replace these.
Like most databases, Spark maintains a "Function Registry", a catalog of all functions and their implementations. All of the "built-in" functions are provided in FunctionRegistry.builtin. Here's a little snippet you can use to replace functions. It doesn't support everything, but will be sufficient for this project.
case UnresolvedFunction(name, arguments, isDistinct, filter, ignoreNulls) => { val builder = FunctionRegistry.builtin .lookupFunctionBuilder(name) .getOrElse { throw new RuntimeException( s"Unable to resolve function `${name}`" ) } builder(arguments) // returns the replacement expression node. }
FunctionRegistry.lookupFunctionBuilder returns a 'builder' function. When called on the arguments of the UnresolvedFunction, the builder function returns an expression that implements the function. For example, looking up "regexp_extract" in the registry returns a function that, when called on two string-typed expressions and a literal integer, will return a RegExpExtract object.
Because the Spark SQL Parser doesn't try to resolve functions, it is incapable of distinguishing between normal functions:
SELECT regexp_extract(A, "a(b+)a", 1) FROM Rand aggregate functions:
SELECT sum(A) FROM RBoth of these will parse into a LogicalPlan topped with a Project node.
While not required, you might find it easier to work with the resulting plans if you replace them with actual Aggregate plan nodes. Look for Project nodes with any expression in its projectList that is a subclass of AggregateFunction.
AggregateFunctions are unevaluable, because they don't get evaluated on a single row. Instead, there are several methods on an AggregateFunction that describe how to initialize an accumulator (what Spark calls an AggregationBuffer), how to incorporate input rows into it, and how to extract a final result value from the buffer.
The AggregateFunction can be an instance of either:
The following methods are relevant:
TPC-H is a standard database benchmark. The benchmark consists of a dataset generator and 22 standard query templates. This checkpoint uses three queries based on TPC-H Queries 1, 3, 5, 6, 10, 11, 12, and 14. The dataset generator and template values can be found at the TPC-H website, and is run at scaling factor (SF) 0.1. Minor variations in the queries may be made. The queries have been rewritten slightly to make them easier to Analyze.
SELECT LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS, SUM(LINEITEM.QUANTITY) AS SUM_QTY, SUM(LINEITEM.EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)) AS SUM_DISC_PRICE, SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)*(CAST(1.0 as float)+LINEITEM.TAX)) AS SUM_CHARGE, AVG(LINEITEM.QUANTITY) AS AVG_QTY, AVG(LINEITEM.EXTENDEDPRICE) AS AVG_PRICE, AVG(LINEITEM.DISCOUNT) AS AVG_DISC, COUNT(*) AS COUNT_ORDER FROM LINEITEM WHERE LINEITEM.SHIPDATE <= DATE '1998-10-01' GROUP BY LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS ORDER BY LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS
SELECT LINEITEM.ORDERKEY, SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)) AS REVENUE, ORDERS.ORDERDATE, ORDERS.SHIPPRIORITY FROM CUSTOMER, ORDERS, LINEITEM WHERE CUSTOMER.MKTSEGMENT = 'BUILDING' AND CUSTOMER.CUSTKEY = ORDERS.CUSTKEY AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY AND ORDERS.ORDERDATE < DATE '1995-03-15' AND LINEITEM.SHIPDATE > DATE '1995-03-15' GROUP BY LINEITEM.ORDERKEY, ORDERS.ORDERDATE, ORDERS.SHIPPRIORITY ORDER BY REVENUE DESC, ORDERDATE LIMIT 10
SELECT NATION.NAME, SUM(LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)) AS REVENUE FROM REGION, NATION, CUSTOMER, ORDERS, LINEITEM, SUPPLIER WHERE CUSTOMER.CUSTKEY = ORDERS.CUSTKEY AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY AND LINEITEM.SUPPKEY = SUPPLIER.SUPPKEY AND CUSTOMER.NATIONKEY = NATION.NATIONKEY AND SUPPLIER.NATIONKEY = NATION.NATIONKEY AND NATION.REGIONKEY = REGION.REGIONKEY AND REGION.NAME = 'ASIA' AND ORDERS.ORDERDATE >= DATE '1994-01-01' AND ORDERS.ORDERDATE < DATE '1995-01-01' GROUP BY NATION.NAME ORDER BY REVENUE DESC
SELECT SUM(LINEITEM.EXTENDEDPRICE*LINEITEM.DISCOUNT) AS REVENUE FROM LINEITEM WHERE LINEITEM.SHIPDATE >= DATE '1994-01-01' AND LINEITEM.SHIPDATE < DATE '1995-01-01' AND LINEITEM.DISCOUNT > CAST(0.05 AS float) AND LINEITEM.DISCOUNT < CAST(0.07 as float) AND LINEITEM.QUANTITY < CAST(24 AS float)
SELECT CUSTOMER.CUSTKEY, SUM(LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)) AS REVENUE, CUSTOMER.ACCTBAL, NATION.NAME, CUSTOMER.ADDRESS, CUSTOMER.PHONE, CUSTOMER.COMMENT FROM CUSTOMER, ORDERS, LINEITEM, NATION WHERE CUSTOMER.CUSTKEY = ORDERS.CUSTKEY AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY AND ORDERS.ORDERDATE >= DATE '1993-10-01' AND ORDERS.ORDERDATE < DATE '1994-01-01' AND LINEITEM.RETURNFLAG = 'R' AND CUSTOMER.NATIONKEY = NATION.NATIONKEY GROUP BY CUSTOMER.CUSTKEY, CUSTOMER.ACCTBAL, CUSTOMER.PHONE, NATION.NAME, CUSTOMER.ADDRESS, CUSTOMER.COMMENT ORDER BY REVENUE ASC LIMIT 20
SELECT PK_V.PARTKEY, PK_V.VALUE FROM ( SELECT PS.PARTKEY, SUM(PS.SUPPLYCOST * CAST(PS.AVAILQTY AS float)) AS VALUE FROM PARTSUPP PS, SUPPLIER S, NATION N WHERE PS.SUPPKEY = S.SUPPKEY AND S.NATIONKEY = N.NATIONKEY AND N.NAME = 'GERMANY' GROUP BY PS.PARTKEY ) PK_V, ( SELECT SUM(PS.SUPPLYCOST * CAST(PS.AVAILQTY AS float)) AS VALUE FROM PARTSUPP PS, SUPPLIER S, NATION N WHERE PS.SUPPKEY = S.SUPPKEY AND S.NATIONKEY = N.NATIONKEY AND N.NAME = 'GERMANY' ) CUTOFF_V WHERE PK_V.VALUE > (CUTOFF_V.VALUE * CAST(0.0001 AS double) / CAST(100.0 AS double)) ORDER BY PK_V.VALUE DESC
SELECT LINEITEM.SHIPMODE, SUM(CASE WHEN ORDERS.ORDERPRIORITY = '1-URGENT' OR ORDERS.ORDERPRIORITY = '2-HIGH' THEN 1 ELSE 0 END) AS HIGH_LINE_COUNT, SUM(CASE WHEN ORDERS.ORDERPRIORITY <> '1-URGENT' AND ORDERS.ORDERPRIORITY <> '2-HIGH' THEN 1 ELSE 0 END) AS LOW_LINE_COUNT FROM LINEITEM, ORDERS WHERE ORDERS.ORDERKEY = LINEITEM.ORDERKEY AND (LINEITEM.SHIPMODE='MAIL' OR LINEITEM.SHIPMODE='SHIP') AND LINEITEM.COMMITDATE < LINEITEM.RECEIPTDATE AND LINEITEM.SHIPDATE < LINEITEM.COMMITDATE AND LINEITEM.RECEIPTDATE >= DATE '1994-01-01' AND LINEITEM.RECEIPTDATE < DATE '1995-01-01' GROUP BY LINEITEM.SHIPMODE ORDER BY LINEITEM.SHIPMODE
SELECT CAST(100.00 AS double) * PROMO_ONLY / ALL_REVENUE AS PROMO_REVENUE FROM ( SELECT SUM( CASE WHEN PART.TYPE LIKE 'PROMO%' THEN LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT) ELSE cast(0 as float) END ) AS PROMO_ONLY, SUM( LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT) ) AS ALL_REVENUE FROM LINEITEM, PART WHERE LINEITEM.PARTKEY = PART.PARTKEY AND LINEITEM.SHIPDATE >= DATE '1995-09-01' AND LINEITEM.SHIPDATE < DATE '1995-10-01' ) AGGREGATE
This page last updated 2024-12-03 16:56:15 -0500