## Checkpoint 1

In this project, you'll implement an unoptimized SQL runtime. It is worth 10 points

### Requirements

• All .scala files in /src/main/scala and its subdirectories will be compiled and the main function of the object microbase.Microbase will be run.
• The grader will wait until the code prints $>\n. If this takes more than 2 seconds, you will receive a 0. • The grader will write a series of CREATE TABLE and SELECT commands to your code's System.in, with one command per \n-delimited line. After processing each statement, your code must print$>\n on a new line to indicate that it is done. If your code exceeds a per-operation time-out, you will receive a 0 for that query and all subsequent parts of the assignment.
• When your code is provided with a CREATE TABLE statement, this indicates that there is a file called data/[tableName].data, where [tableName] is the name of the table. This file contains UTF-8-encoded records, one per \n-delimited line, with fields in human-readable string representation (i.e., as in a CSV file) delimited by the pipe character (|). Note that there will not be any INSERT statements.
• When your code is provided with a SELECT statement, it must evaluate the SELECT statement and print the results to System.out, one per \n-delimited line, with fields in human-readable string representations delimited by the pipe character (|). You will be expected to support the following features of SQL:
• * and table.* targets
• Attribute and relation aliasing (i.e., SELECT bar.foo AS baz FROM table AS bar)
• Project, Filter, Table, and Cartesian Products
• FROM-Nested Subqueries
• Your response to SELECT queries will be checked against Sqlite3.

### Parsing SQL

You may use Spark's SQL parser:
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

...
def parseSql(sql: String): LogicalPlan =
new SparkSqlParser().parsePlan(sql)
...

Note that parsePlan will parse both queries (SELECT) and DDL/DML (e.g., CREATE TABLE).

### CREATE TABLE

  case class CreateTableStatement(
tableName: Seq[String],
tableSchema: StructType,
partitioning: Seq[Transform],
bucketSpec: Option[Object],
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
serde: Option[SerdeInfo],
external: Boolean,
ifNotExists: Boolean) extends ParsedStatement


This is a lot to take in, but for our purposes here, the only three parts of the CreateTableStatement class that we care about are the tableName, tableSchema, and location. The table name may seem a bit weird, as it's a sequence and not a string. This is to manage multi-part table names used to reference tables in a specific schema (e.g., dataSource.tableName). For our purposes, you can always expect this sequence to contain exactly one element.

The second part is the tableSchema. Recall that tables are collections of tuples. Spark refers to tuple types as StructTypes. A StructType's elements are StructFields, which include the field's name, its data type, and some other metadata like whether the field is allowed to be null.

Finally, we have the location. Unlike a CREATE TABLE command in a normal relational database, here we're defining what's called an "external table". We're defining a schema over a remote file so that the database can access its contents. The location field will point at a (relative) path to a data file with the contents of the table. For example:

  CREATE TABLE R(id int, fruit string) USING csv OPTIONS(path 'data/R.csv', delimiter '|')

The corresponding file located at data/R.data might contain:
1|apple
2|banana
3|clementine
4|duran


You'll note that the provider field will be set to Some("csv") and the options field will be Map("delimiter" -> "|"). Feel free to implement these as you see fit, but they will not change for this project. As in the example above, the data file itself will be CSV-like, with one record per line (\n-delimited), and fields in human-readable strings (|-delimited).

### SELECT

Any other LogicalPlan returned by parsePlan will be a query. You'll need to evaluate this query and return results in a format identical to the data files above. If you need some help debugging, your output should be identical to (modulo row order) the output produced by Sqlite3 for the same query on the same data.

Although you will not be graded on the specific implementation strategy you pick, the following is one relatively straightforward approach to getting an A that closely mirrors how Spark itself is implemented. First though, let's talk a little about how Spark executes queries. Broadly, Spark queries go through five phases:

1. Analysis: Placeholders left after parsing are replaced and variables are "wired up"
2. Optimization: The logical plan is rewritten into a more efficient plan.
3. Phyisical Planning: The logical plan is transformed into a physical plan, as specific algorithms are chosen. Some optimization happens here as well.
4. Code Generation: Spark produces native code for its plans.
5. Execution: The query is distributed to executor nodes (if needed) and run in parallel.

For this class, we'll be taking a slightly simpler approach to the latter three steps, as they are intended for a large-scale distributed system. For checkpoint 1, you can also still get an A without completing the optimization step (we'll come back to that in checkpoint 2). That leaves us with basically three steps:

1. Analysis
2. Iterator Construction
3. Execution
Let's take each of these in turn.

### Analysis

Shortly after parsing, Spark applies a two-part process called analysis:

• Resolution: Placeholder AST nodes are replaced by the real elements
• Human-facing string-typed identifiers are "wired together" using Spark-internal identifiers (exprId) and/or replaced by type-aware nodes.
• Nodes with unknown types in the Expression and LogicalPlan ASTs are replaced by the corresponding typed values
• Validation: Spark checks to see if all of the types "line up" (e.g., no arithmetic between integers and strings)

Spark's validation logic is mostly intact, but you'll need to implement the Resolution step yourself. In particular, Spark's SQL Parser leaves behind "placeholder" nodes in both the Expression and LogicalPlan ASTs, whenever the user references something by a string. Normally, the analysis step replaces these placeholders with something that can actually be used. Placeholders that you can expect to encounter are listed below.

Before we discuss placeholders, we need to take a brief digression to explain the exprId field in many Expression AST nodes. For example:

case class AttributeReference(
name: String,
dataType: DataType,
nullable: Boolean = true,
val exprId: ExprId = NamedExpression.newExprId,
val qualifier: Seq[String] = Seq.empty[String])
extends Attribute with Unevaluable {


In most node types, the exprId field is allocated automatically with a fresh identifier (i.e., using NamedExpression.newExprId) when the node is created. Spark uses exprIds internally to keep track of which expressions line up with which other expressions. Two Attributes are the same if and only if they have the same exprId (whether their names are the same or different does not matter).

Both LogicalPlan and Expression provide a transform (and transformUp and transformDown) method to aid with rewriting ASTs. These methods make it very easy to replace parts of the tree. For example, to compute a new tree with UnresolvedRelation nodes replaced, you might write

  plan.transform {
case UnresolvedRelation(Seq(tableName)) => /* write your replacement here */
}


The other thing to discuss before we move on is resolution. Expression provides a resolved method that checks to see whether the expression has been fully resolved. The dataType method will not work until resolved returns true. resolved, in particular checks for three things:

1. All descendents of the node must be resolved
2. The node itself must not be an Unresolved___
3. The node's children must have a dataType compatible with the node itself.

The last condition is especially tricky, but you can call e.checkInputDataTypes() on each node of the tree to check for errors (see below).

That all being said, here are unresolved nodes you can expect to encounter:

#### LogicalPlan

case class UnresolvedRelation(
nameElements: Seq[String],
options: CaseInsensitiveStringMap,
isStreaming: Boolean)


This class is used when a relation is referenced in a LogicalPlan in SQL (typically the FROM clause of a SELECT). The nameElements field encodes the '.'-separated elements of the table name (e.g., foo.bar would be encoded as Seq("foo", "bar")). Under typical use, this sequence will always have only one element. Name elements are case-insensitive.

Occurrences of this class will need to be replaced during analysis with an AST node that knows what attributes the corresponding table has. Spark has several built-in LogicalPlan nodes that can be used to encode tables, but you might find it easier to just create your own subclass of LeafNode to represent a table node. A subclass of LeafNode only needs to implement one field:

case class ____(____)
extends LeafNode
{
val output: AttributeSet = ???
}

Note that AttributeSet is a subclass of Seq[Attribute]. In general, the output field should be given as a sequence of AttributeReferences (see above).

#### Expression

case class UnresolvedStar(target: Option[Seq[String]])


Any asterisk * appearing in a SQL is translated into this class. Generally, this happens in three places:

• SELECT * FROM fooProject(Seq(UnresolvedStar(None)), ...)
• SELECT foo.* FROM foo, barProject(Seq(UnresolvedStar(Some(Seq("foo")))), ...)
• SELECT COUNT(*) FROM FOOAggregate(..., Seq(Count(Seq(UnresolvedStar(None)))), ...)

The first two cases (the only two we care about in this checkpoint) are special, as they both represent multiple fields. You'll need to expand these out during the analysis phase. Note that like UnresolvedRelation, table names are Sequences of .-separated strings.

case class UnresolvedAttribute(nameParts: Seq[String])


An attribute name that hasn't been "wired up" with an exprId. In general, there are two cases that need to be handled during Analysis:

• attributeUnresolvedAttribute(Seq("attribute"))
• relation.attributeUnresolvedAttribute(Seq("relation", "attribute"))

For resolving attributes, keep in mind that each operator (that has been resolved already) knows its output schema (typically computing it from the input schema):

val attributes: AttributeSet = source.output

As above, note that AttributeSet is a subclass of Seq[Attribute] In general, you can expect the contents of this sequence to consist of:
• AttributeReference
• UnresolvedAttribute
Assuming that you've done your analysis job right, you should only see AttributeReferences.

One additional thing that may be helpful in resolving UnresolvedAttributes is that AttributeReference has a qualifiers field Spark uses to store the table name. This field is automatically managed in nested subqueries, but keep in mind that if you're using a custom table class (as suggested above), you will need to set this field yourself when declaring the table's output there.

### Iterator Construction

Although Expression provides an eval method, LogicalPlan does not. To evaluate LogicalPlans, you need to compile them. The most straightforward way to implement a Relational Algebra plan is a so called "pull"-based model that you might already be familiar with: Iterators. This is the starting point for Spark, and many other relational database engine's runtimes. Implementing an iterator typically involves two methods:
class MyIterator extends Iterator[MyFoo]
{
def hasNext: Boolean = /* return true if there are more elements */
def next: MyFoo = /* assemble and return the MyFoo instance */
}

For example, here's a simple one that iterates over a range of values.
class Range(low: Int, high: Int) extends Iterator[Int]
{
var current = low
def hasNext = current < high
def next: Int = { val ret = current; current += 1; return ret }
}

In addition to the base table class you created above, you'll be expected to support the following LogicalPlan node types:

For each operator, look at the operator in isolation. Imagine that you have an iterator over its input(s). Forget about the full stack of plan nodes and focus on implementing each relational operator in terms of its inputs. Also keep in mind that some operators are just there as decorations (e.g., SubqueryAlias).

### Expression Evaluation

It will be helpful (particularly for Project and Filter) to have a way to evaluate primitive-valued expressions. Fortunately, Expression already has a method for this: eval

def eval(input: InternalRow): Any = ???

For example:
val row = InternalRow.fromSeq( Seq(1, 2, "bob") )
val literal = expression.eval(row)


Specifically eval takes an org.apache.spark.sql.catalyst.InternalRow as input and produces an Any as output (analogous to Java's Object). InternalRow is a very thin wrapper around an Array[Any]. The InternalRow argument will be passed down through the tree and evaluated.

Observe that some node types are marked as Unevaluable. These will need to be replaced before you call eval. In particular, AttributeReference is unevaluable because it references the attribute symbolically (by expreId), while InternalRow doesn't let you look up attributes this way. An easy way to fix this is to create your own subclass of Expression that instead references the attribute by its position in the InternalRow.

Since InternalRow is already used by Spark, you may wish to use it as the content type for your Iterators as well.

### Evaluation

Once you have an iterator implemented, you just need to print out the contents, one per line, with fields separated by a | character.

### Example Queries

You can find a collection of example queries here. Your code should support all of the queries in the TABLEXX.SQL files, at a minimum for this checkpoint.