Skip to content
Permalink
Browse files
[Build] Fix Antlr Shadowing and add Integration Tests (#2428)
* Reverts Antlr Shadowing and adds Integration Test to Check Proper Shading

Previously we had no test suites which checked if the Spark3Runtime jar was actually
usable with Spark3 builds. To check that the shadowJar is doing the right thing we
add a new integration test which runs using only Spark3 and the ShadowJar. Only a
few tests are included to prove that the jar is stable while we still rely on our
unit tests for the majority of test coverage.

* Shades Antlr Runtime

This reintroduces the shading of Antlr along with a set of integration tests to verify that it is correctly
shaded and working as expected. To accomplish this we copy several utility classes from Apache Spark so we
can break our dependency on Spark Internal's accessing Antlr classes.
  • Loading branch information
RussellSpitzer committed Apr 15, 2021
1 parent 6f0ecad commit 1884beeec2e4d48c8627c320a4612320f716af7f
@@ -64,7 +64,7 @@ jobs:
- uses: actions/setup-java@v1
with:
java-version: 8
- run: ./gradlew build -x test -x javadoc
- run: ./gradlew build -x test -x javadoc -x integrationTest

build-javadoc:
runs-on: ubuntu-latest
@@ -985,6 +985,7 @@ project(":iceberg-spark3-extensions") {
testCompile project(path: ':iceberg-spark3', configuration: 'testArtifacts')

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
// We shade this in Spark3 Runtime to avoid issues with Spark's Antlr Runtime
runtime "org.antlr:antlr4-runtime:4.7.1"
antlr "org.antlr:antlr4:4.7.1"
}
@@ -1000,6 +1001,13 @@ project(':iceberg-spark3-runtime') {

tasks.jar.dependsOn tasks.shadowJar

sourceSets {
integration {
java.srcDir "$projectDir/src/integration/java"
resources.srcDir "$projectDir/src/integration/resources"
}
}

configurations {
compile {
exclude group: 'org.apache.spark'
@@ -1021,6 +1029,18 @@ project(':iceberg-spark3-runtime') {
compile(project(':iceberg-nessie')) {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}

integrationImplementation 'org.apache.spark:spark-hive_2.12'
integrationImplementation 'junit:junit'
integrationImplementation 'org.slf4j:slf4j-simple'
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-spark', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-spark3', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-spark3-extensions', configuration: 'testArtifacts')
// Not allowed on our classpath, only the runtime jar is allowed
integrationCompileOnly project(':iceberg-spark3-extensions')
integrationCompileOnly project(':iceberg-spark3')
}

shadowJar {
@@ -1038,7 +1058,6 @@ project(':iceberg-spark3-runtime') {
relocate 'com.google', 'org.apache.iceberg.shaded.com.google'
relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml'
relocate 'com.github.benmanes', 'org.apache.iceberg.shaded.com.github.benmanes'
relocate 'org.antlr.v4.runtime', 'org.apache.iceberg.shaded.org.antlr.v4.runtime'
relocate 'org.checkerframework', 'org.apache.iceberg.shaded.org.checkerframework'
relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded'
@@ -1054,10 +1073,22 @@ project(':iceberg-spark3-runtime') {
relocate 'org.apache.arrow', 'org.apache.iceberg.shaded.org.apache.arrow'
relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch'
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
// relocate Antlr runtime and related deps to shade Iceberg specific version
relocate 'org.antlr.v4.runtime', 'org.apache.iceberg.shaded.org.antlr.v4.runtime'

classifier null
}

task integrationTest(type: Test) {
description = "Test Spark3 Runtime Jar"
group = "verification"
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
inputs.file(shadowJar.archiveFile.get().asFile.path)
}
integrationTest.dependsOn shadowJar
check.dependsOn integrationTest

jar {
enabled = false
}
@@ -22,16 +22,14 @@ package org.apache.spark.sql.catalyst.parser.extensions
import java.util.Locale
import org.antlr.v4.runtime._
import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.Interval
import org.antlr.v4.runtime.misc.ParseCancellationException
import org.antlr.v4.runtime.tree.TerminalNodeImpl
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParseErrorListener
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.parser.UpperCaseCharStream
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.NonReservedContext
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.QuotedIdentifierContext
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -124,13 +122,13 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = {
val lexer = new IcebergSqlExtensionsLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
lexer.addErrorListener(IcebergParseErrorListener)

val tokenStream = new CommonTokenStream(lexer)
val parser = new IcebergSqlExtensionsParser(tokenStream)
parser.addParseListener(IcebergSqlExtensionsPostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
parser.addErrorListener(IcebergParseErrorListener)

try {
try {
@@ -150,22 +148,52 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
}
}
catch {
case e: ParseException if e.command.isDefined =>
case e: IcebergParseException if e.command.isDefined =>
throw e
case e: ParseException =>
case e: IcebergParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
throw new IcebergParseException(Option(command), e.message, position, position)
}
}
}

/* Copied from Apache Spark's to avoid dependency on Spark Internals */
class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
override def consume(): Unit = wrapped.consume
override def getSourceName(): String = wrapped.getSourceName
override def index(): Int = wrapped.index
override def mark(): Int = wrapped.mark
override def release(marker: Int): Unit = wrapped.release(marker)
override def seek(where: Int): Unit = wrapped.seek(where)
override def size(): Int = wrapped.size

override def getText(interval: Interval): String = {
// ANTLR 4.7's CodePointCharStream implementations have bugs when
// getText() is called with an empty stream, or intervals where
// the start > end. See
// https://github.com/antlr/antlr4/commit/ac9f7530 for one fix
// that is not yet in a released ANTLR artifact.
if (size() > 0 && (interval.b - interval.a >= 0)) {
wrapped.getText(interval)
} else {
""
}
}

// scalastyle:off
override def LA(i: Int): Int = {
val la = wrapped.LA(i)
if (la == 0 || la == IntStream.EOF) la
else Character.toUpperCase(la)
}
// scalastyle:on
}

/**
* The post-processor validates & cleans-up the parse tree during the parse process.
*/
// while we reuse ParseErrorListener and ParseException, we have to copy and modify PostProcessor
// as it directly depends on classes generated from the extensions grammar file
case object IcebergSqlExtensionsPostProcessor extends IcebergSqlExtensionsBaseListener {

/** Remove the back ticks from an Identifier. */
@@ -198,3 +226,70 @@ case object IcebergSqlExtensionsPostProcessor extends IcebergSqlExtensionsBaseLi
parent.addChild(new TerminalNodeImpl(f(newToken)))
}
}

/* Partially copied from Apache Spark's Parser to avoid dependency on Spark Internals */
case object IcebergParseErrorListener extends BaseErrorListener {
override def syntaxError(
recognizer: Recognizer[_, _],
offendingSymbol: scala.Any,
line: Int,
charPositionInLine: Int,
msg: String,
e: RecognitionException): Unit = {
val (start, stop) = offendingSymbol match {
case token: CommonToken =>
val start = Origin(Some(line), Some(token.getCharPositionInLine))
val length = token.getStopIndex - token.getStartIndex + 1
val stop = Origin(Some(line), Some(token.getCharPositionInLine + length))
(start, stop)
case _ =>
val start = Origin(Some(line), Some(charPositionInLine))
(start, start)
}
throw new IcebergParseException(None, msg, start, stop)
}
}

/**
* Copied from Apache Spark
* A [[ParseException]] is an [[AnalysisException]] that is thrown during the parse process. It
* contains fields and an extended error message that make reporting and diagnosing errors easier.
*/
class IcebergParseException(
val command: Option[String],
message: String,
val start: Origin,
val stop: Origin) extends AnalysisException(message, start.line, start.startPosition) {

def this(message: String, ctx: ParserRuleContext) = {
this(Option(IcebergParserUtils.command(ctx)),
message,
IcebergParserUtils.position(ctx.getStart),
IcebergParserUtils.position(ctx.getStop))
}

override def getMessage: String = {
val builder = new StringBuilder
builder ++= "\n" ++= message
start match {
case Origin(Some(l), Some(p)) =>
builder ++= s"(line $l, pos $p)\n"
command.foreach { cmd =>
val (above, below) = cmd.split("\n").splitAt(l)
builder ++= "\n== SQL ==\n"
above.foreach(builder ++= _ += '\n')
builder ++= (0 until p).map(_ => "-").mkString("") ++= "^^^\n"
below.foreach(builder ++= _ += '\n')
}
case _ =>
command.foreach { cmd =>
builder ++= "\n== SQL ==\n" ++= cmd
}
}
builder.toString
}

def withCommand(cmd: String): IcebergParseException = {
new IcebergParseException(Option(cmd), message, start, stop)
}
}
@@ -20,6 +20,7 @@
package org.apache.spark.sql.catalyst.parser.extensions

import org.antlr.v4.runtime._
import org.antlr.v4.runtime.misc.Interval
import org.antlr.v4.runtime.tree.ParseTree
import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.iceberg.DistributionMode
@@ -30,9 +31,8 @@ import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.parser.ParserUtils._
import org.apache.spark.sql.catalyst.parser.extensions.IcebergParserUtils.withOrigin
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser._
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.CallArgument
@@ -43,6 +43,8 @@ import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.connector.expressions
import org.apache.spark.sql.connector.expressions.ApplyTransform
import org.apache.spark.sql.connector.expressions.FieldReference
@@ -182,7 +184,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
.map(visitConstant)
.map(lit => LiteralValue(lit.value, lit.dataType))
reference.orElse(literal)
.getOrElse(throw new ParseException(s"Invalid transform argument", ctx))
.getOrElse(throw new IcebergParseException(s"Invalid transform argument", ctx))
}

/**
@@ -237,3 +239,28 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
ctx.accept(this).asInstanceOf[T]
}
}

/* Partially copied from Apache Spark's Parser to avoid dependency on Spark Internals */
object IcebergParserUtils {

private[sql] def withOrigin[T](ctx: ParserRuleContext)(f: => T): T = {
val current = CurrentOrigin.get
CurrentOrigin.set(position(ctx.getStart))
try {
f
} finally {
CurrentOrigin.set(current)
}
}

private[sql] def position(token: Token): Origin = {
val opt = Option(token)
Origin(opt.map(_.getLine), opt.map(_.getCharPositionInLine))
}

/** Get the command which created the token. */
private[sql] def command(ctx: ParserRuleContext): String = {
val stream = ctx.getStart.getInputStream
stream.getText(Interval.of(0, stream.size() - 1))
}
}
@@ -30,6 +30,7 @@
import org.apache.spark.sql.catalyst.expressions.Literal$;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.catalyst.parser.ParserInterface;
import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException;
import org.apache.spark.sql.catalyst.plans.logical.CallArgument;
import org.apache.spark.sql.catalyst.plans.logical.CallStatement;
import org.apache.spark.sql.catalyst.plans.logical.NamedArgument;
@@ -131,7 +132,7 @@ public void testCallWithVarSubstitution() throws ParseException {

@Test
public void testCallParseError() {
AssertHelpers.assertThrows("Should fail with a sensible parse error", ParseException.class,
AssertHelpers.assertThrows("Should fail with a sensible parse error", IcebergParseException.class,
"missing '(' at 'radish'",
() -> parser.parsePlan("CALL cat.system radish kebab"));
}

0 comments on commit 1884bee

Please sign in to comment.